Files
icinga-plugins/checks/check_airq.py
2025-02-27 23:52:00 +01:00

261 lines
8.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""AirQ (temp & CO2) python check"""
"""dependencys:
- pip3 install nagiosplugin
- pip3 install argparse
- pip3 install git+https://github.com/heinemml/CO2Meter
or
- pip3 install git+https://git.ao-it.net/python/CO2Meter
"""
__version__ = '0.3.3'
__author__ = 'anima'
# imports
import logging
import argparse
import json
from time import sleep
from os import path
import re
import subprocess
import socket
import threading
import nagiosplugin
# log settings
logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
class AirQ:
def __init__(self, run_mode: str = 'local', server: str = '127.0.0.1', port: int = 4554, device: str = None):
self.temp = None
self.co2 = None
self.device = device
self.server = server
self.port = port
self.threads = list()
match run_mode:
case 'local':
logging.debug(f'start AirQ in {run_mode} mode')
if self.get_usb_dev():
self.get_local_data()
case 'server':
logging.debug(f'start AirQ in {run_mode} mode')
thread_data = threading.Thread(target=self.get_local_data_loop, group=None)
thread_server = threading.Thread(target=self.run_server, group=None)
if self.get_usb_dev():
thread_data.start()
thread_server.start()
self.threads.append(thread_server)
case 'client':
logging.debug(f'start AirQ in {run_mode} mode')
self.run_client()
def stop_treads(self):
for thread in self.threads:
logging.debug(f'stop threads')
thread.join()
def run_server(self):
s = socket.socket()
s.bind(('', self.port))
s.listen(1)
logging.debug(f'run AirQ server on port {self.port}')
try:
while True:
conn, addr = s.accept()
data = {"co2": self.co2, "temp": self.temp}
logging.debug(f'connection from {addr}, send {data=}')
conn.send(str(json.dumps(data)).encode())
conn.close()
except KeyboardInterrupt:
logging.debug(f'stop AirQ server')
s.close()
def run_client(self):
s = socket.socket()
s.connect((self.server, self.port))
logging.debug(f'connected to {self.server=} on {self.port=}')
data = json.loads(s.recv(1024).decode())
logging.debug(f'server response: {data=}')
s.close()
if 'co2' in data.keys():
self.co2 = data['co2']
if 'temp' in data.keys():
self.temp = data['temp']
def get_usb_dev(self) -> bool:
"""search the airq usb device
Returns:
bool: True if device path found
"""
if self.device is not None:
if path.exists(self.device):
logging.debug(f'device path allready kown: {self.device=}')
return True
search_string = '04D9:A052'
# search_string = '04d9:a052' # maybe lowercase on some systems ?
# search_string = '[Holtek USB-zyTemp]' # alt but unknown if all systems print this in log
matches = list()
# log has con is the log entry rolls out
command = ["journalctl", "-k", "--no-pager"] # alternative: dmesg # maybe usefull lsusb
pattern = re.compile(r'hidraw\d{1,2}')
## get kernel logs
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
logging.debug(f'search for usb device was not successfull, see {result.returncode=} - {result.stderr=}')
return False
## search in log for usb device
lines = result.stdout.splitlines()
for line in lines:
if search_string in line:
matches = pattern.findall(line)
logging.debug(f'found usb device mapping in {line=}')
break
## search in found log line the usb name and verify path exists
if len(matches) > 0:
dev_path = f'/dev/{matches[0]}'
if path.exists(dev_path):
logging.debug(f'usb device is under path {dev_path}')
self.device = dev_path
return True
logging.error(f'usb device not found!')
return False
def get_local_data_loop(self):
while True:
self.get_local_data()
def get_local_data(self) -> bool:
# load only if need (in client mode not needed)
from CO2Meter import CO2Meter
if self.device is None:
return False
resp = {}
sensor = CO2Meter(self.device)
while not all(key in resp.keys() for key in ['co2', 'temperature']):
logging.debug(f'collect data ...')
resp = sensor.get_data()
logging.debug(f'data collected {resp=}')
sleep(1)
self.co2 = resp['co2']
self.temp = resp['temperature']
return True
class AirQCO2Resource(nagiosplugin.Resource):
def __init__(self, airq) -> None:
self.airq = airq
def probe(self) -> list:
"""read from airq the co2 value
Returns:
nagisplugin.Metric: metric elements
"""
return nagiosplugin.Metric(name='airq_co2', value=int(self.airq.co2), uom='ppm', context='scalar_context')
class AirQTempResource(nagiosplugin.Resource):
def __init__(self, airq) -> None:
self.airq = airq
def probe(self) -> list:
"""read from airq the temperature value
Returns:
nagisplugin.Metric: metric elements
"""
return nagiosplugin.Metric(name='airq_temp', value=float(self.airq.temp), uom='°C', context='scalar_context')
def parse_args() -> argparse.Namespace:
"""evaluates given arguments
Returns:
argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument)
"""
argp = argparse.ArgumentParser(description=__doc__)
# Default args
argp.add_argument('-v', '--verbose', action='count', default=0,
help='increase output verbosity (use up to 3 times)')
argp.add_argument('-H', '--hostname', default='localhost',
help='IP address or hostname of device to query')
argp.add_argument('-t', '--timeout', default=30,
help='abort execution after TIMEOUT seconds')
argp.add_argument('-m', '--check_mode',
choices=[
'co2',
'temp',
'server',
'client',
], default='local',
help='check mode to run')
# Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT
argp.add_argument('-w', '--warning', default=':80',
help='warning threshold')
argp.add_argument('-c', '--critical', default=':90',
help='critical threshold')
## AirQ args
argp.add_argument('-A', '--airq_mode',
choices=[
'local',
'client',
], default='local',
help='set mode for airq class')
argp.add_argument('-S', '--airq_server', default='localhost',
help='ip or hostname of airq server')
argp.add_argument('-P', '--airq_port', default=4554,
help='port of airq server')
argp.add_argument('-D', '--airq_device', default=None,
help='path to AirQ device')
args = argp.parse_args()
return args
# @nagiosplugin.guarded(verbose=0)
def main():
args = parse_args()
if args.verbose >= 3:
logging.getLogger().setLevel(logging.DEBUG)
if args.check_mode != 'server':
airq = AirQ(run_mode=args.airq_mode, server=args.airq_server, port=args.airq_port, device=args.airq_device)
check = None
# dice which check will be run bases on check_mode
match args.check_mode:
case 'co2':
check = nagiosplugin.Check(
AirQCO2Resource(airq=airq),
nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
nagiosplugin.Summary())
check.name = "CO2"
case 'temp':
check = nagiosplugin.Check(
AirQTempResource(airq=airq),
nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
nagiosplugin.Summary())
check.name = "Temperature"
case 'server':
try:
airq = AirQ(run_mode=args.check_mode, server=args.airq_server, port=args.airq_port, device=args.airq_device)
except KeyboardInterrupt:
airq.stop_treads()
case _:
raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}')
if check is not None:
check.main(args.verbose, args.timeout)
if __name__ == '__main__':
main()