diff --git a/checks/check_airq.py b/checks/check_airq.py new file mode 100755 index 0000000..a52b6f9 --- /dev/null +++ b/checks/check_airq.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +""" Template python check""" +"""dependencys: +- pip3 install nagiosplugin +- pip3 install argparse +- pip3 install git+https://github.com/heinemml/CO2Meter +or +- pip3 install git+https://git.ao-it.net/python/CO2Meter +""" + +__version__ = '0.2.0' +__author__ = 'anima' + +# imports +import logging +import argparse +import nagiosplugin +from time import sleep +from os import path +import re +import subprocess +import socket +import json + +# log settings +logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO) + + +class AirQ: + def __init__(self, run_mode: str, server: str = None, port: int = 4554): + self.temp = None + self.co2 = None + self.device = None + self.server = server + self.port = port + + match run_mode: + case 'local': + self.get_usb_dev() + self.get_local_data() + case 'server': + self.get_usb_dev() + self.run_server() + case 'client': + self.run_client() + + def run_server(self): + s = socket.socket() + s.bind(('', self.port)) + s.listen(1) + while True: + self.get_local_data() + c, addr = s.accept() + data = {"co2": self.co2, "temp": self.temp} + c.send(str(json.dumps(data)).encode()) + c.close() + s.close() + + def run_client(self): + s = socket.socket() + s.connect(('127.0.0.1', self.port)) + data = json.loads(s.recv(1024).decode()) + if 'co2' in data.keys(): + self.co2 = data['co2'] + if 'temp' in data.keys(): + self.temp = data['temp'] + + def get_usb_dev(self) -> bool: + """search the airq usb device + + Returns: + bool: True if device path found + """ + + search_string = '04D9:A052' + # search_string = '04d9:a052' # maybe lowercase on some systems ? + # search_string = '[Holtek USB-zyTemp]' # alt but unknown if all systems print this in log + matches = list() + command = ["journalctl", "-k", "--no-pager"] # maybe --reverse to get newst first ? # alternative: dmesg + pattern = re.compile(r'hidraw\d{2}') + ## get kernel logs + result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode != 0: + return False + + ## search in log for usb device + lines = result.stdout.splitlines() + for line in lines: + if search_string in line: + matches = pattern.findall(line) + break + + ## search in found log line the usb name and verify path exists + if len(matches) > 0: + dev_path = f'/dev/{matches[0]}' + if path.exists(f'/dev/{matches[0]}'): + self.device = f'/dev/{matches[0]}' + return True + else: return False + + def get_local_data(self) -> bool: + # load only if need (in client mode not needed) + from CO2Meter import CO2Meter + + if self.device is None: + return False + resp = {} + sensor = CO2Meter(self.device) + while not all(key in resp.keys() for key in ['co2', 'temperature']): + sleep(1) + resp = sensor.get_data() + + self.co2 = resp['co2'] + self.temp = resp['temperature'] + return True + + +class AirQCO2Resource(nagiosplugin.Resource): + def __init__(self, airq) -> None: + self.airq = airq + + def probe(self) -> list: + """read from airq the co2 value + + Returns: + list[nagisplugin.Metric]: multiple metric elements (yield) + """ + return nagiosplugin.Metric(name='airq_co2', value=int(self.airq.co2), context='scalar_context') + + +class AirQTempResource(nagiosplugin.Resource): + def __init__(self, airq) -> None: + self.airq = airq + + def probe(self) -> list: + """read from airq the temperature value + + Returns: + list[nagisplugin.Metric]: multiple metric elements (yield) + """ + return nagiosplugin.Metric(name='airq_co2', value=float(self.airq.temp), context='scalar_context') + + +def parse_args() -> argparse.Namespace: + """evaluates given arguments + + Returns: + argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument) + """ + argp = argparse.ArgumentParser(description=__doc__) + # Default args + argp.add_argument('-v', '--verbose', action='count', default=0, + help='increase output verbosity (use up to 3 times)') + argp.add_argument('-H', '--hostname', default='localhost', + help='IP address or hostname of device to query') + argp.add_argument('-t', '--timeout', default=30, + help='abort execution after TIMEOUT seconds') + argp.add_argument('-m', '--check_mode', + choices=[ + 'co2', + 'temp', + 'server', + 'client', + ], default='local', + help='check mode to run') + + # Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT + argp.add_argument('-w', '--warning', default=':80', + help='warning threshold') + argp.add_argument('-c', '--critical', default=':90', + help='critical threshold') + + ## AirQ args + argp.add_argument('-A', '--airq_mode', + choices=[ + 'local', + 'client', + ], default='local', + help='set mode for airq class') + argp.add_argument('-S', '--airq_server', default='localhost', + help='ip or hostname of airq server') + argp.add_argument('-P', '--airq_port', default=4554, + help='port of airq server') + args = argp.parse_args() + return args + +# @nagiosplugin.guarded(verbose=0) +def main(): + args = parse_args() + if args.verbose >= 3: + logging.getLogger().setLevel(logging.DEBUG) + + if args.check_mode != 'server': + airq = AirQ(run_mode=args.airq_mode, server=args.airq_server, port=args.airq_port) + check = None + + # dice which check will be run bases on check_mode + match args.check_mode: + case 'co2': + check = nagiosplugin.Check( + AirQCO2Resource(airq=airq), + nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical), + nagiosplugin.Summary()) + check.name = "CO2" + case 'temp': + check = nagiosplugin.Check( + AirQTempResource(airq=airq), + nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical), + nagiosplugin.Summary()) + check.name = "Temperature" + case 'server': + airq = AirQ(run_mode=args.check_mode, server=args.airq_server, port=args.airq_port) + case _: + raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}') + + if check is not None: + check.main(args.verbose, args.timeout) + +if __name__ == '__main__': + main() \ No newline at end of file