From eec17edf93d1bd1b488ee5beaf990252f230e68a Mon Sep 17 00:00:00 2001
From: anima
Date: Sun, 21 Sep 2025 00:23:33 +0200
Subject: [PATCH] reorga code
---
checks/check_airq.py | 157 ++++++++++++++++++++++++-------------------
1 file changed, 86 insertions(+), 71 deletions(-)
diff --git a/checks/check_airq.py b/checks/check_airq.py
index 70c3d74..95b0c10 100755
--- a/checks/check_airq.py
+++ b/checks/check_airq.py
@@ -8,7 +8,7 @@ or
- pip3 install git+https://git.ao-it.net/python/CO2Meter
"""
-__version__ = '0.3.3'
+__version__ = '0.4.0'
__author__ = 'anima'
# imports
@@ -27,64 +27,19 @@ import nagiosplugin
logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
-class AirQ:
- def __init__(self, run_mode: str = 'local', server: str = '127.0.0.1', port: int = 4554, device: str = None):
+class AirQBase:
+ def __init__(self):
self.temp = None
self.co2 = None
+
+
+class AirQLocal(AirQBase):
+ def __init__(self, device: str = None):
+ super().__init__()
self.device = device
- self.server = server
- self.port = port
- self.threads = list()
-
- match run_mode:
- case 'local':
- logging.debug(f'start AirQ in {run_mode} mode')
- if self.get_usb_dev():
- self.get_local_data()
- case 'server':
- logging.debug(f'start AirQ in {run_mode} mode')
- thread_data = threading.Thread(target=self.get_local_data_loop, group=None)
- thread_server = threading.Thread(target=self.run_server, group=None)
- if self.get_usb_dev():
- thread_data.start()
- thread_server.start()
- self.threads.append(thread_server)
- case 'client':
- logging.debug(f'start AirQ in {run_mode} mode')
- self.run_client()
-
- def stop_treads(self):
- for thread in self.threads:
- logging.debug(f'stop threads')
- thread.join()
-
- def run_server(self):
- s = socket.socket()
- s.bind(('', self.port))
- s.listen(1)
- logging.debug(f'run AirQ server on port {self.port}')
- try:
- while True:
- conn, addr = s.accept()
- data = {"co2": self.co2, "temp": self.temp}
- logging.debug(f'connection from {addr}, send {data=}')
- conn.send(str(json.dumps(data)).encode())
- conn.close()
- except KeyboardInterrupt:
- logging.debug(f'stop AirQ server')
- s.close()
-
- def run_client(self):
- s = socket.socket()
- s.connect((self.server, self.port))
- logging.debug(f'connected to {self.server=} on {self.port=}')
- data = json.loads(s.recv(1024).decode())
- logging.debug(f'server response: {data=}')
- s.close()
- if 'co2' in data.keys():
- self.co2 = data['co2']
- if 'temp' in data.keys():
- self.temp = data['temp']
+ logging.debug(f'start AirQ in local mode')
+ if self.get_usb_dev():
+ self.get_local_data()
def get_usb_dev(self) -> bool:
"""search the airq usb device
@@ -98,10 +53,8 @@ class AirQ:
return True
search_string = '04D9:A052'
- # search_string = '04d9:a052' # maybe lowercase on some systems ?
# search_string = '[Holtek USB-zyTemp]' # alt but unknown if all systems print this in log
matches = list()
- # log has con is the log entry rolls out
command = ["journalctl", "-k", "--no-pager"] # alternative: dmesg # maybe usefull lsusb
pattern = re.compile(r'hidraw\d{1,2}')
## get kernel logs
@@ -128,10 +81,6 @@ class AirQ:
logging.error(f'usb device not found!')
return False
- def get_local_data_loop(self):
- while True:
- self.get_local_data()
-
def get_local_data(self) -> bool:
# load only if need (in client mode not needed)
from CO2Meter import CO2Meter
@@ -149,7 +98,67 @@ class AirQ:
self.co2 = resp['co2']
self.temp = resp['temperature']
return True
-
+
+
+class AirQServer(AirQLocal):
+ def __init__(self, port: int = 4554, device: str = None, ):
+ super().__init__(device = device)
+ self.threads = list()
+ self.port = port
+ logging.debug(f'start AirQ in server mode')
+ thread_data = threading.Thread(target=self.get_local_data_loop, group=None)
+ thread_server = threading.Thread(target=self.run_server, group=None)
+ if self.get_usb_dev():
+ thread_data.start()
+ thread_server.start()
+ self.threads.append(thread_server)
+
+ def stop_treads(self):
+ for thread in self.threads:
+ logging.debug(f'stop threads')
+ thread.join()
+
+ def get_local_data_loop(self):
+ while True:
+ self.get_local_data()
+
+ def run_server(self):
+ s = socket.socket()
+ s.bind(('', self.port))
+ s.listen(1)
+ logging.debug(f'run AirQ server on port {self.port}')
+ try:
+ while True:
+ conn, addr = s.accept()
+ data = {"co2": self.co2, "temp": self.temp}
+ logging.debug(f'connection from {addr}, send {data=}')
+ conn.send(str(json.dumps(data)).encode())
+ conn.close()
+ except KeyboardInterrupt:
+ logging.debug(f'stop AirQ server')
+ s.close()
+
+
+class AirQClient(AirQBase):
+ def __init__(self, server: str = 'localhost', port: int = 4554):
+ super().__init__()
+ self.server = server
+ self.port = port
+ logging.debug(f'start AirQ in client mode')
+ self.run_client()
+
+ def run_client(self):
+ s = socket.socket()
+ s.connect((self.server, self.port))
+ logging.debug(f'connected to {self.server=} on {self.port=}')
+ data = json.loads(s.recv(1024).decode())
+ logging.debug(f'server response: {data=}')
+ s.close()
+ if 'co2' in data.keys():
+ self.co2 = data['co2']
+ if 'temp' in data.keys():
+ self.temp = data['temp']
+
class AirQCO2Resource(nagiosplugin.Resource):
def __init__(self, airq) -> None:
@@ -229,26 +238,32 @@ def main():
logging.getLogger().setLevel(logging.DEBUG)
if args.check_mode != 'server':
- airq = AirQ(run_mode=args.airq_mode, server=args.airq_server, port=args.airq_port, device=args.airq_device)
+ if args.airq_mode == 'local':
+ airq = AirQLocal(device=args.airq_device)
+ elif args.airq_mode == 'client':
+ airq = AirQClient(server=args.airq_server, port=args.airq_port)
+ else:
+ logging.error('Unknown airq mode')
+ exit()
check = None
# dice which check will be run bases on check_mode
match args.check_mode:
case 'co2':
check = nagiosplugin.Check(
- AirQCO2Resource(airq=airq),
- nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
- nagiosplugin.Summary())
+ AirQCO2Resource(airq=airq),
+ nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
+ nagiosplugin.Summary())
check.name = "CO2"
case 'temp':
check = nagiosplugin.Check(
- AirQTempResource(airq=airq),
- nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
- nagiosplugin.Summary())
+ AirQTempResource(airq=airq),
+ nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
+ nagiosplugin.Summary())
check.name = "Temperature"
case 'server':
try:
- airq = AirQ(run_mode=args.check_mode, server=args.airq_server, port=args.airq_port, device=args.airq_device)
+ airq = AirQServer(port=args.airq_port, device=args.airq_device)
except KeyboardInterrupt:
airq.stop_treads()
case _: