From 6fcb73ea08fc0b86e46f9575eb46f541bd1e8412 Mon Sep 17 00:00:00 2001
From: anima
Date: Thu, 27 Feb 2025 01:11:07 +0100
Subject: [PATCH] inital version of airq check
---
checks/check_airq.py | 220 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 220 insertions(+)
create mode 100755 checks/check_airq.py
diff --git a/checks/check_airq.py b/checks/check_airq.py
new file mode 100755
index 0000000..a52b6f9
--- /dev/null
+++ b/checks/check_airq.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python3
+""" Template python check"""
+"""dependencys:
+- pip3 install nagiosplugin
+- pip3 install argparse
+- pip3 install git+https://github.com/heinemml/CO2Meter
+or
+- pip3 install git+https://git.ao-it.net/python/CO2Meter
+"""
+
+__version__ = '0.2.0'
+__author__ = 'anima'
+
+# imports
+import logging
+import argparse
+import nagiosplugin
+from time import sleep
+from os import path
+import re
+import subprocess
+import socket
+import json
+
+# log settings
+logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
+
+
+class AirQ:
+ def __init__(self, run_mode: str, server: str = None, port: int = 4554):
+ self.temp = None
+ self.co2 = None
+ self.device = None
+ self.server = server
+ self.port = port
+
+ match run_mode:
+ case 'local':
+ self.get_usb_dev()
+ self.get_local_data()
+ case 'server':
+ self.get_usb_dev()
+ self.run_server()
+ case 'client':
+ self.run_client()
+
+ def run_server(self):
+ s = socket.socket()
+ s.bind(('', self.port))
+ s.listen(1)
+ while True:
+ self.get_local_data()
+ c, addr = s.accept()
+ data = {"co2": self.co2, "temp": self.temp}
+ c.send(str(json.dumps(data)).encode())
+ c.close()
+ s.close()
+
+ def run_client(self):
+ s = socket.socket()
+ s.connect(('127.0.0.1', self.port))
+ data = json.loads(s.recv(1024).decode())
+ if 'co2' in data.keys():
+ self.co2 = data['co2']
+ if 'temp' in data.keys():
+ self.temp = data['temp']
+
+ def get_usb_dev(self) -> bool:
+ """search the airq usb device
+
+ Returns:
+ bool: True if device path found
+ """
+
+ search_string = '04D9:A052'
+ # search_string = '04d9:a052' # maybe lowercase on some systems ?
+ # search_string = '[Holtek USB-zyTemp]' # alt but unknown if all systems print this in log
+ matches = list()
+ command = ["journalctl", "-k", "--no-pager"] # maybe --reverse to get newst first ? # alternative: dmesg
+ pattern = re.compile(r'hidraw\d{2}')
+ ## get kernel logs
+ result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+ if result.returncode != 0:
+ return False
+
+ ## search in log for usb device
+ lines = result.stdout.splitlines()
+ for line in lines:
+ if search_string in line:
+ matches = pattern.findall(line)
+ break
+
+ ## search in found log line the usb name and verify path exists
+ if len(matches) > 0:
+ dev_path = f'/dev/{matches[0]}'
+ if path.exists(f'/dev/{matches[0]}'):
+ self.device = f'/dev/{matches[0]}'
+ return True
+ else: return False
+
+ def get_local_data(self) -> bool:
+ # load only if need (in client mode not needed)
+ from CO2Meter import CO2Meter
+
+ if self.device is None:
+ return False
+ resp = {}
+ sensor = CO2Meter(self.device)
+ while not all(key in resp.keys() for key in ['co2', 'temperature']):
+ sleep(1)
+ resp = sensor.get_data()
+
+ self.co2 = resp['co2']
+ self.temp = resp['temperature']
+ return True
+
+
+class AirQCO2Resource(nagiosplugin.Resource):
+ def __init__(self, airq) -> None:
+ self.airq = airq
+
+ def probe(self) -> list:
+ """read from airq the co2 value
+
+ Returns:
+ list[nagisplugin.Metric]: multiple metric elements (yield)
+ """
+ return nagiosplugin.Metric(name='airq_co2', value=int(self.airq.co2), context='scalar_context')
+
+
+class AirQTempResource(nagiosplugin.Resource):
+ def __init__(self, airq) -> None:
+ self.airq = airq
+
+ def probe(self) -> list:
+ """read from airq the temperature value
+
+ Returns:
+ list[nagisplugin.Metric]: multiple metric elements (yield)
+ """
+ return nagiosplugin.Metric(name='airq_co2', value=float(self.airq.temp), context='scalar_context')
+
+
+def parse_args() -> argparse.Namespace:
+ """evaluates given arguments
+
+ Returns:
+ argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument)
+ """
+ argp = argparse.ArgumentParser(description=__doc__)
+ # Default args
+ argp.add_argument('-v', '--verbose', action='count', default=0,
+ help='increase output verbosity (use up to 3 times)')
+ argp.add_argument('-H', '--hostname', default='localhost',
+ help='IP address or hostname of device to query')
+ argp.add_argument('-t', '--timeout', default=30,
+ help='abort execution after TIMEOUT seconds')
+ argp.add_argument('-m', '--check_mode',
+ choices=[
+ 'co2',
+ 'temp',
+ 'server',
+ 'client',
+ ], default='local',
+ help='check mode to run')
+
+ # Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT
+ argp.add_argument('-w', '--warning', default=':80',
+ help='warning threshold')
+ argp.add_argument('-c', '--critical', default=':90',
+ help='critical threshold')
+
+ ## AirQ args
+ argp.add_argument('-A', '--airq_mode',
+ choices=[
+ 'local',
+ 'client',
+ ], default='local',
+ help='set mode for airq class')
+ argp.add_argument('-S', '--airq_server', default='localhost',
+ help='ip or hostname of airq server')
+ argp.add_argument('-P', '--airq_port', default=4554,
+ help='port of airq server')
+ args = argp.parse_args()
+ return args
+
+# @nagiosplugin.guarded(verbose=0)
+def main():
+ args = parse_args()
+ if args.verbose >= 3:
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ if args.check_mode != 'server':
+ airq = AirQ(run_mode=args.airq_mode, server=args.airq_server, port=args.airq_port)
+ check = None
+
+ # dice which check will be run bases on check_mode
+ match args.check_mode:
+ case 'co2':
+ check = nagiosplugin.Check(
+ AirQCO2Resource(airq=airq),
+ nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
+ nagiosplugin.Summary())
+ check.name = "CO2"
+ case 'temp':
+ check = nagiosplugin.Check(
+ AirQTempResource(airq=airq),
+ nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
+ nagiosplugin.Summary())
+ check.name = "Temperature"
+ case 'server':
+ airq = AirQ(run_mode=args.check_mode, server=args.airq_server, port=args.airq_port)
+ case _:
+ raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}')
+
+ if check is not None:
+ check.main(args.verbose, args.timeout)
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file