#!/usr/bin/env python3 """AirQ (temp & CO2) python check""" """dependencys: - pip3 install nagiosplugin - pip3 install argparse - pip3 install paho-mqtt - pip3 install git+https://github.com/heinemml/CO2Meter or - pip3 install git+https://git.ao-it.net/python/CO2Meter """ __version__ = '0.5.0' __author__ = 'anima' # imports import logging import argparse import json from time import sleep from os import path import re import subprocess import threading import nagiosplugin # log settings logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO) class AirQBase: def __init__(self): self.temp = None self.co2 = None class AirQLocal(AirQBase): def __init__(self, device: str = None): super().__init__() self.device = device logging.debug(f'start AirQ in local mode') if self.get_usb_dev(): self.get_local_data() def get_usb_dev(self) -> bool: """search the airq usb device Returns: bool: True if device path found """ if self.device is not None: if path.exists(self.device): logging.debug(f'device path allready kown: {self.device=}') return True search_string = '04D9:A052' # search_string = '[Holtek USB-zyTemp]' # alt but unknown if all systems print this in log matches = list() command = ["journalctl", "-k", "--no-pager"] # alternative: dmesg # maybe usefull lsusb pattern = re.compile(r'hidraw\d{1,2}') ## get kernel logs result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: logging.debug(f'search for usb device was not successfull, see {result.returncode=} - {result.stderr=}') return False ## search in log for usb device lines = result.stdout.splitlines() for line in lines: if search_string in line: matches = pattern.findall(line) logging.debug(f'found usb device mapping in {line=}') break ## search in found log line the usb name and verify path exists if len(matches) > 0: dev_path = f'/dev/{matches[0]}' if path.exists(dev_path): logging.debug(f'usb device is under path {dev_path}') self.device = dev_path return True logging.error(f'usb device not found!') return False def get_local_data(self) -> bool: # load only if need (in client mode not needed) from CO2Meter import CO2Meter if self.device is None: return False resp = {} sensor = CO2Meter(self.device) while not all(key in resp.keys() for key in ['co2', 'temperature']): logging.debug(f'collect data ...') resp = sensor.get_data() logging.debug(f'data collected {resp=}') sleep(1) self.co2 = resp['co2'] self.temp = resp['temperature'] return True class AirQServer(AirQLocal): def __init__(self, port: int = 4554, device: str = None, ): super().__init__(device = device) self.threads = list() self.port = port logging.debug(f'start AirQ in server mode') thread_data = threading.Thread(target=self.get_local_data_loop, group=None) thread_server = threading.Thread(target=self.run_server, group=None) if self.get_usb_dev(): thread_data.start() sleep(5) thread_server.start() self.threads.append(thread_data) self.threads.append(thread_server) def stop_treads(self): for thread in self.threads: logging.debug(f'stop threads') thread.join() def get_local_data_loop(self): while True: self.get_local_data() def run_server(self): import paho.mqtt.client as mqtt client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="AirQServer") client.connect("localhost", self.port) logging.debug(f'run AirQ server on port {self.port}') while True: sleep(10) logging.debug(f'set {self.temp=}, {self.co2=}') client.publish("airq/temp", self.temp, retain=True) client.publish("airq/co2", self.co2, retain=True) class AirQClient(AirQBase): def __init__(self, server: str = 'localhost', port: int = 4554, mode = None): super().__init__() self.server = server self.port = port logging.debug(f'start AirQ in client mode') self.received = {} self.mode = mode self.run_client() def on_message(self, client, userdata, msg): self.received[msg.topic] = msg.payload.decode() logging.debug(f"get {msg.topic=} with {self.received[msg.topic]=}") client.disconnect() # end with first response def run_client(self): import paho.mqtt.client as mqtt client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="AirQClient") client.on_message = self.on_message client.connect(self.server, self.port) client.subscribe(f"airq/{self.mode}") client.loop_start() sleep(1) client.loop_stop() if self.mode == 'co2': self.co2 = self.received[f"airq/{self.mode}"] elif self.mode == 'temp': self.temp = self.received[f"airq/{self.mode}"] class AirQCO2Resource(nagiosplugin.Resource): def __init__(self, airq) -> None: self.airq = airq def probe(self) -> list: """read from airq the co2 value Returns: nagisplugin.Metric: metric elements """ return nagiosplugin.Metric(name='airq_co2', value=int(self.airq.co2), uom='ppm', context='scalar_context') class AirQTempResource(nagiosplugin.Resource): def __init__(self, airq) -> None: self.airq = airq def probe(self) -> list: """read from airq the temperature value Returns: nagisplugin.Metric: metric elements """ return nagiosplugin.Metric(name='airq_temp', value=float(self.airq.temp), uom='°C', context='scalar_context') def parse_args() -> argparse.Namespace: """evaluates given arguments Returns: argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument) """ argp = argparse.ArgumentParser(description=__doc__) # Default args argp.add_argument('-v', '--verbose', action='count', default=0, help='increase output verbosity (use up to 3 times)') argp.add_argument('-H', '--hostname', default='localhost', help='IP address or hostname of device to query') argp.add_argument('-t', '--timeout', default=30, help='abort execution after TIMEOUT seconds') argp.add_argument('-m', '--check_mode', choices=[ 'co2', 'temp', 'server', 'client', ], default='local', help='check mode to run') # Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT argp.add_argument('-w', '--warning', default=':80', help='warning threshold') argp.add_argument('-c', '--critical', default=':90', help='critical threshold') ## AirQ args argp.add_argument('-A', '--airq_mode', choices=[ 'local', 'client', ], default='local', help='set mode for airq class') argp.add_argument('-S', '--airq_server', default='localhost', help='ip or hostname of airq server') argp.add_argument('-P', '--airq_port', default=4554, help='port of airq server') argp.add_argument('-D', '--airq_device', default=None, help='path to AirQ device') args = argp.parse_args() return args # @nagiosplugin.guarded(verbose=0) def main(): args = parse_args() if args.verbose >= 3: logging.getLogger().setLevel(logging.DEBUG) # dice which check will be run bases on check_mode if args.check_mode != 'server': if args.airq_mode == 'local': airq = AirQLocal(device=args.airq_device) elif args.airq_mode == 'client': airq = AirQClient(server=args.airq_server, port=int(args.airq_port), mode=args.check_mode) else: logging.error('Unknown airq mode') exit() check = None match args.check_mode: case 'co2': check = nagiosplugin.Check( AirQCO2Resource(airq=airq), nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.Summary()) check.name = "CO2" case 'temp': check = nagiosplugin.Check( AirQTempResource(airq=airq), nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.Summary()) check.name = "Temperature" case 'server': try: airq = AirQServer(port=int(args.airq_port), device=args.airq_device) except KeyboardInterrupt: airq.stop_treads() case _: raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}') if check is not None: check.main(args.verbose, args.timeout) if __name__ == '__main__': main()