#!/usr/bin/env python3 """AirQ (temp & CO2) python check""" """dependencys: - pip3 install nagiosplugin - pip3 install argparse - pip3 install git+https://github.com/heinemml/CO2Meter or - pip3 install git+https://git.ao-it.net/python/CO2Meter """ __version__ = '0.4.0' __author__ = 'anima' # imports import logging import argparse import json from time import sleep from os import path import re import subprocess import socket import threading import nagiosplugin # log settings logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO) class AirQBase: def __init__(self): self.temp = None self.co2 = None class AirQLocal(AirQBase): def __init__(self, device: str = None): super().__init__() self.device = device logging.debug(f'start AirQ in local mode') if self.get_usb_dev(): self.get_local_data() def get_usb_dev(self) -> bool: """search the airq usb device Returns: bool: True if device path found """ if self.device is not None: if path.exists(self.device): logging.debug(f'device path allready kown: {self.device=}') return True search_string = '04D9:A052' # search_string = '[Holtek USB-zyTemp]' # alt but unknown if all systems print this in log matches = list() command = ["journalctl", "-k", "--no-pager"] # alternative: dmesg # maybe usefull lsusb pattern = re.compile(r'hidraw\d{1,2}') ## get kernel logs result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: logging.debug(f'search for usb device was not successfull, see {result.returncode=} - {result.stderr=}') return False ## search in log for usb device lines = result.stdout.splitlines() for line in lines: if search_string in line: matches = pattern.findall(line) logging.debug(f'found usb device mapping in {line=}') break ## search in found log line the usb name and verify path exists if len(matches) > 0: dev_path = f'/dev/{matches[0]}' if path.exists(dev_path): logging.debug(f'usb device is under path {dev_path}') self.device = dev_path return True logging.error(f'usb device not found!') return False def get_local_data(self) -> bool: # load only if need (in client mode not needed) from CO2Meter import CO2Meter if self.device is None: return False resp = {} sensor = CO2Meter(self.device) while not all(key in resp.keys() for key in ['co2', 'temperature']): logging.debug(f'collect data ...') resp = sensor.get_data() logging.debug(f'data collected {resp=}') sleep(1) self.co2 = resp['co2'] self.temp = resp['temperature'] return True class AirQServer(AirQLocal): def __init__(self, port: int = 4554, device: str = None, ): super().__init__(device = device) self.threads = list() self.port = port logging.debug(f'start AirQ in server mode') thread_data = threading.Thread(target=self.get_local_data_loop, group=None) thread_server = threading.Thread(target=self.run_server, group=None) if self.get_usb_dev(): thread_data.start() thread_server.start() self.threads.append(thread_server) def stop_treads(self): for thread in self.threads: logging.debug(f'stop threads') thread.join() def get_local_data_loop(self): while True: self.get_local_data() def run_server(self): s = socket.socket() s.bind(('', self.port)) s.listen(1) logging.debug(f'run AirQ server on port {self.port}') try: while True: conn, addr = s.accept() data = {"co2": self.co2, "temp": self.temp} logging.debug(f'connection from {addr}, send {data=}') conn.send(str(json.dumps(data)).encode()) conn.close() except KeyboardInterrupt: logging.debug(f'stop AirQ server') s.close() class AirQClient(AirQBase): def __init__(self, server: str = 'localhost', port: int = 4554): super().__init__() self.server = server self.port = port logging.debug(f'start AirQ in client mode') self.run_client() def run_client(self): s = socket.socket() s.connect((self.server, self.port)) logging.debug(f'connected to {self.server=} on {self.port=}') data = json.loads(s.recv(1024).decode()) logging.debug(f'server response: {data=}') s.close() if 'co2' in data.keys(): self.co2 = data['co2'] if 'temp' in data.keys(): self.temp = data['temp'] class AirQCO2Resource(nagiosplugin.Resource): def __init__(self, airq) -> None: self.airq = airq def probe(self) -> list: """read from airq the co2 value Returns: nagisplugin.Metric: metric elements """ return nagiosplugin.Metric(name='airq_co2', value=int(self.airq.co2), uom='ppm', context='scalar_context') class AirQTempResource(nagiosplugin.Resource): def __init__(self, airq) -> None: self.airq = airq def probe(self) -> list: """read from airq the temperature value Returns: nagisplugin.Metric: metric elements """ return nagiosplugin.Metric(name='airq_temp', value=float(self.airq.temp), uom='°C', context='scalar_context') def parse_args() -> argparse.Namespace: """evaluates given arguments Returns: argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument) """ argp = argparse.ArgumentParser(description=__doc__) # Default args argp.add_argument('-v', '--verbose', action='count', default=0, help='increase output verbosity (use up to 3 times)') argp.add_argument('-H', '--hostname', default='localhost', help='IP address or hostname of device to query') argp.add_argument('-t', '--timeout', default=30, help='abort execution after TIMEOUT seconds') argp.add_argument('-m', '--check_mode', choices=[ 'co2', 'temp', 'server', 'client', ], default='local', help='check mode to run') # Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT argp.add_argument('-w', '--warning', default=':80', help='warning threshold') argp.add_argument('-c', '--critical', default=':90', help='critical threshold') ## AirQ args argp.add_argument('-A', '--airq_mode', choices=[ 'local', 'client', ], default='local', help='set mode for airq class') argp.add_argument('-S', '--airq_server', default='localhost', help='ip or hostname of airq server') argp.add_argument('-P', '--airq_port', default=4554, help='port of airq server') argp.add_argument('-D', '--airq_device', default=None, help='path to AirQ device') args = argp.parse_args() return args # @nagiosplugin.guarded(verbose=0) def main(): args = parse_args() if args.verbose >= 3: logging.getLogger().setLevel(logging.DEBUG) if args.check_mode != 'server': if args.airq_mode == 'local': airq = AirQLocal(device=args.airq_device) elif args.airq_mode == 'client': airq = AirQClient(server=args.airq_server, port=args.airq_port) else: logging.error('Unknown airq mode') exit() check = None # dice which check will be run bases on check_mode match args.check_mode: case 'co2': check = nagiosplugin.Check( AirQCO2Resource(airq=airq), nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.Summary()) check.name = "CO2" case 'temp': check = nagiosplugin.Check( AirQTempResource(airq=airq), nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.Summary()) check.name = "Temperature" case 'server': try: airq = AirQServer(port=args.airq_port, device=args.airq_device) except KeyboardInterrupt: airq.stop_treads() case _: raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}') if check is not None: check.main(args.verbose, args.timeout) if __name__ == '__main__': main()