#!/usr/bin/env python3 """AirQ (temp & CO2) python check""" """dependencys: - pip3 install nagiosplugin - pip3 install argparse - pip3 install git+https://github.com/heinemml/CO2Meter or - pip3 install git+https://git.ao-it.net/python/CO2Meter """ __version__ = '0.2.0' __author__ = 'anima' # imports import logging import argparse import nagiosplugin from time import sleep from os import path import re import subprocess import socket import json # log settings logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO) class AirQ: def __init__(self, run_mode: str, server: str = None, port: int = 4554): self.temp = None self.co2 = None self.device = None self.server = server self.port = port match run_mode: case 'local': self.get_usb_dev() self.get_local_data() case 'server': self.get_usb_dev() self.run_server() case 'client': self.run_client() def run_server(self): s = socket.socket() s.bind(('', self.port)) s.listen(1) while True: self.get_local_data() c, addr = s.accept() data = {"co2": self.co2, "temp": self.temp} c.send(str(json.dumps(data)).encode()) c.close() s.close() def run_client(self): s = socket.socket() s.connect(('127.0.0.1', self.port)) data = json.loads(s.recv(1024).decode()) if 'co2' in data.keys(): self.co2 = data['co2'] if 'temp' in data.keys(): self.temp = data['temp'] def get_usb_dev(self) -> bool: """search the airq usb device Returns: bool: True if device path found """ search_string = '04D9:A052' # search_string = '04d9:a052' # maybe lowercase on some systems ? # search_string = '[Holtek USB-zyTemp]' # alt but unknown if all systems print this in log matches = list() command = ["journalctl", "-k", "--no-pager"] # maybe --reverse to get newst first ? # alternative: dmesg pattern = re.compile(r'hidraw\d{2}') ## get kernel logs result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: return False ## search in log for usb device lines = result.stdout.splitlines() for line in lines: if search_string in line: matches = pattern.findall(line) break ## search in found log line the usb name and verify path exists if len(matches) > 0: dev_path = f'/dev/{matches[0]}' if path.exists(f'/dev/{matches[0]}'): self.device = f'/dev/{matches[0]}' return True else: return False def get_local_data(self) -> bool: # load only if need (in client mode not needed) from CO2Meter import CO2Meter if self.device is None: return False resp = {} sensor = CO2Meter(self.device) while not all(key in resp.keys() for key in ['co2', 'temperature']): sleep(1) resp = sensor.get_data() self.co2 = resp['co2'] self.temp = resp['temperature'] return True class AirQCO2Resource(nagiosplugin.Resource): def __init__(self, airq) -> None: self.airq = airq def probe(self) -> list: """read from airq the co2 value Returns: list[nagisplugin.Metric]: multiple metric elements (yield) """ return nagiosplugin.Metric(name='airq_co2', value=int(self.airq.co2), context='scalar_context') class AirQTempResource(nagiosplugin.Resource): def __init__(self, airq) -> None: self.airq = airq def probe(self) -> list: """read from airq the temperature value Returns: list[nagisplugin.Metric]: multiple metric elements (yield) """ return nagiosplugin.Metric(name='airq_co2', value=float(self.airq.temp), context='scalar_context') def parse_args() -> argparse.Namespace: """evaluates given arguments Returns: argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument) """ argp = argparse.ArgumentParser(description=__doc__) # Default args argp.add_argument('-v', '--verbose', action='count', default=0, help='increase output verbosity (use up to 3 times)') argp.add_argument('-H', '--hostname', default='localhost', help='IP address or hostname of device to query') argp.add_argument('-t', '--timeout', default=30, help='abort execution after TIMEOUT seconds') argp.add_argument('-m', '--check_mode', choices=[ 'co2', 'temp', 'server', 'client', ], default='local', help='check mode to run') # Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT argp.add_argument('-w', '--warning', default=':80', help='warning threshold') argp.add_argument('-c', '--critical', default=':90', help='critical threshold') ## AirQ args argp.add_argument('-A', '--airq_mode', choices=[ 'local', 'client', ], default='local', help='set mode for airq class') argp.add_argument('-S', '--airq_server', default='localhost', help='ip or hostname of airq server') argp.add_argument('-P', '--airq_port', default=4554, help='port of airq server') args = argp.parse_args() return args # @nagiosplugin.guarded(verbose=0) def main(): args = parse_args() if args.verbose >= 3: logging.getLogger().setLevel(logging.DEBUG) if args.check_mode != 'server': airq = AirQ(run_mode=args.airq_mode, server=args.airq_server, port=args.airq_port) check = None # dice which check will be run bases on check_mode match args.check_mode: case 'co2': check = nagiosplugin.Check( AirQCO2Resource(airq=airq), nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.Summary()) check.name = "CO2" case 'temp': check = nagiosplugin.Check( AirQTempResource(airq=airq), nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.Summary()) check.name = "Temperature" case 'server': airq = AirQ(run_mode=args.check_mode, server=args.airq_server, port=args.airq_port) case _: raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}') if check is not None: check.main(args.verbose, args.timeout) if __name__ == '__main__': main()