some optimisations and add logging

This commit is contained in:
2025-02-27 22:50:37 +01:00
parent 35284d07b8
commit 8011d1282f

View File

@@ -8,58 +8,79 @@ or
- pip3 install git+https://git.ao-it.net/python/CO2Meter - pip3 install git+https://git.ao-it.net/python/CO2Meter
""" """
__version__ = '0.2.0' __version__ = '0.3.0'
__author__ = 'anima' __author__ = 'anima'
# imports # imports
import logging import logging
import argparse import argparse
import nagiosplugin import json
from time import sleep from time import sleep
from os import path from os import path
import re import re
import subprocess import subprocess
import socket import socket
import json import threading
import nagiosplugin
# log settings # log settings
logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO) logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
class AirQ: class AirQ:
def __init__(self, run_mode: str, server: str = None, port: int = 4554): def __init__(self, run_mode: str = 'local', server: str = '127.0.0.1', port: int = 4554, device: str = None):
self.temp = None self.temp = None
self.co2 = None self.co2 = None
self.device = None self.device = device
self.server = server self.server = server
self.port = port self.port = port
self.threads = list()
match run_mode: match run_mode:
case 'local': case 'local':
self.get_usb_dev() logging.debug(f'start AirQ in {run_mode} mode')
if self.get_usb_dev():
self.get_local_data() self.get_local_data()
case 'server': case 'server':
self.get_usb_dev() logging.debug(f'start AirQ in {run_mode} mode')
self.run_server() thread_data = threading.Thread(target=self.get_local_data_loop, group=None)
thread_server = threading.Thread(target=self.run_server, group=None)
if self.get_usb_dev():
thread_data.start()
thread_server.start()
self.threads.append(thread_server)
case 'client': case 'client':
logging.debug(f'start AirQ in {run_mode} mode')
self.run_client() self.run_client()
def stop_treads(self):
for thread in self.threads:
logging.debug(f'stop threads')
thread.join()
def run_server(self): def run_server(self):
s = socket.socket() s = socket.socket()
s.bind(('', self.port)) s.bind(('', self.port))
s.listen(1) s.listen(1)
logging.debug(f'run AirQ server on port {self.port}')
try:
while True: while True:
self.get_local_data() conn, addr = s.accept()
c, addr = s.accept()
data = {"co2": self.co2, "temp": self.temp} data = {"co2": self.co2, "temp": self.temp}
c.send(str(json.dumps(data)).encode()) logging.debug(f'connection from {addr}, send {data=}')
c.close() conn.send(str(json.dumps(data)).encode())
conn.close()
except KeyboardInterrupt:
logging.debug(f'stop AirQ server')
s.close() s.close()
def run_client(self): def run_client(self):
s = socket.socket() s = socket.socket()
s.connect(('127.0.0.1', self.port)) s.connect((self.server, self.port))
logging.debug(f'connected to {self.server=} on {self.port=}')
data = json.loads(s.recv(1024).decode()) data = json.loads(s.recv(1024).decode())
logging.debug(f'server response: {data=}')
s.close()
if 'co2' in data.keys(): if 'co2' in data.keys():
self.co2 = data['co2'] self.co2 = data['co2']
if 'temp' in data.keys(): if 'temp' in data.keys():
@@ -71,16 +92,22 @@ class AirQ:
Returns: Returns:
bool: True if device path found bool: True if device path found
""" """
if self.device is not None:
if path.exists(self.device):
logging.debug(f'device path allready kown: {self.device=}')
return True
search_string = '04D9:A052' search_string = '04D9:A052'
# search_string = '04d9:a052' # maybe lowercase on some systems ? # search_string = '04d9:a052' # maybe lowercase on some systems ?
# search_string = '[Holtek USB-zyTemp]' # alt but unknown if all systems print this in log # search_string = '[Holtek USB-zyTemp]' # alt but unknown if all systems print this in log
matches = list() matches = list()
command = ["journalctl", "-k", "--no-pager"] # maybe --reverse to get newst first ? # alternative: dmesg # log has con is the log entry rolls out
command = ["journalctl", "-k", "--no-pager"] # alternative: dmesg # maybe usefull lsusb
pattern = re.compile(r'hidraw\d{2}') pattern = re.compile(r'hidraw\d{2}')
## get kernel logs ## get kernel logs
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0: if result.returncode != 0:
logging.debug(f'search for usb device was not successfull, see {result.returncode=} - {result.stderr=}')
return False return False
## search in log for usb device ## search in log for usb device
@@ -88,15 +115,22 @@ class AirQ:
for line in lines: for line in lines:
if search_string in line: if search_string in line:
matches = pattern.findall(line) matches = pattern.findall(line)
logging.debug(f'found usb device mapping in {line=}')
break break
## search in found log line the usb name and verify path exists ## search in found log line the usb name and verify path exists
if len(matches) > 0: if len(matches) > 0:
dev_path = f'/dev/{matches[0]}' dev_path = f'/dev/{matches[0]}'
if path.exists(f'/dev/{matches[0]}'): if path.exists(f'/dev/{matches[0]}'):
logging.debug(f'usb device is in /dev/{matches[0]}')
self.device = f'/dev/{matches[0]}' self.device = f'/dev/{matches[0]}'
return True return True
else: return False logging.error(f'usb device not found!')
return False
def get_local_data_loop(self):
while True:
self.get_local_data()
def get_local_data(self) -> bool: def get_local_data(self) -> bool:
# load only if need (in client mode not needed) # load only if need (in client mode not needed)
@@ -107,8 +141,10 @@ class AirQ:
resp = {} resp = {}
sensor = CO2Meter(self.device) sensor = CO2Meter(self.device)
while not all(key in resp.keys() for key in ['co2', 'temperature']): while not all(key in resp.keys() for key in ['co2', 'temperature']):
sleep(1) logging.debug(f'collect data ...')
resp = sensor.get_data() resp = sensor.get_data()
logging.debug(f'data collected {resp=}')
sleep(1)
self.co2 = resp['co2'] self.co2 = resp['co2']
self.temp = resp['temperature'] self.temp = resp['temperature']
@@ -181,6 +217,8 @@ def parse_args() -> argparse.Namespace:
help='ip or hostname of airq server') help='ip or hostname of airq server')
argp.add_argument('-P', '--airq_port', default=4554, argp.add_argument('-P', '--airq_port', default=4554,
help='port of airq server') help='port of airq server')
argp.add_argument('-D', '--airq_device', default=None,
help='path to AirQ device')
args = argp.parse_args() args = argp.parse_args()
return args return args
@@ -191,7 +229,7 @@ def main():
logging.getLogger().setLevel(logging.DEBUG) logging.getLogger().setLevel(logging.DEBUG)
if args.check_mode != 'server': if args.check_mode != 'server':
airq = AirQ(run_mode=args.airq_mode, server=args.airq_server, port=args.airq_port) airq = AirQ(run_mode=args.airq_mode, server=args.airq_server, port=args.airq_port, device=args.airq_device)
check = None check = None
# dice which check will be run bases on check_mode # dice which check will be run bases on check_mode
@@ -209,7 +247,10 @@ def main():
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "Temperature" check.name = "Temperature"
case 'server': case 'server':
airq = AirQ(run_mode=args.check_mode, server=args.airq_server, port=args.airq_port) try:
airq = AirQ(run_mode=args.check_mode, server=args.airq_server, port=args.airq_port, device=args.airq_device)
except KeyboardInterrupt:
airq.stop_treads()
case _: case _:
raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}') raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}')