inital version of airq check
This commit is contained in:
220
checks/check_airq.py
Executable file
220
checks/check_airq.py
Executable file
@@ -0,0 +1,220 @@
|
||||
#!/usr/bin/env python3
|
||||
""" Template python check"""
|
||||
"""dependencys:
|
||||
- pip3 install nagiosplugin
|
||||
- pip3 install argparse
|
||||
- pip3 install git+https://github.com/heinemml/CO2Meter
|
||||
or
|
||||
- pip3 install git+https://git.ao-it.net/python/CO2Meter
|
||||
"""
|
||||
|
||||
__version__ = '0.2.0'
|
||||
__author__ = 'anima'
|
||||
|
||||
# imports
|
||||
import logging
|
||||
import argparse
|
||||
import nagiosplugin
|
||||
from time import sleep
|
||||
from os import path
|
||||
import re
|
||||
import subprocess
|
||||
import socket
|
||||
import json
|
||||
|
||||
# log settings
|
||||
logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
|
||||
|
||||
|
||||
class AirQ:
|
||||
def __init__(self, run_mode: str, server: str = None, port: int = 4554):
|
||||
self.temp = None
|
||||
self.co2 = None
|
||||
self.device = None
|
||||
self.server = server
|
||||
self.port = port
|
||||
|
||||
match run_mode:
|
||||
case 'local':
|
||||
self.get_usb_dev()
|
||||
self.get_local_data()
|
||||
case 'server':
|
||||
self.get_usb_dev()
|
||||
self.run_server()
|
||||
case 'client':
|
||||
self.run_client()
|
||||
|
||||
def run_server(self):
|
||||
s = socket.socket()
|
||||
s.bind(('', self.port))
|
||||
s.listen(1)
|
||||
while True:
|
||||
self.get_local_data()
|
||||
c, addr = s.accept()
|
||||
data = {"co2": self.co2, "temp": self.temp}
|
||||
c.send(str(json.dumps(data)).encode())
|
||||
c.close()
|
||||
s.close()
|
||||
|
||||
def run_client(self):
|
||||
s = socket.socket()
|
||||
s.connect(('127.0.0.1', self.port))
|
||||
data = json.loads(s.recv(1024).decode())
|
||||
if 'co2' in data.keys():
|
||||
self.co2 = data['co2']
|
||||
if 'temp' in data.keys():
|
||||
self.temp = data['temp']
|
||||
|
||||
def get_usb_dev(self) -> bool:
|
||||
"""search the airq usb device
|
||||
|
||||
Returns:
|
||||
bool: True if device path found
|
||||
"""
|
||||
|
||||
search_string = '04D9:A052'
|
||||
# search_string = '04d9:a052' # maybe lowercase on some systems ?
|
||||
# search_string = '[Holtek USB-zyTemp]' # alt but unknown if all systems print this in log
|
||||
matches = list()
|
||||
command = ["journalctl", "-k", "--no-pager"] # maybe --reverse to get newst first ? # alternative: dmesg
|
||||
pattern = re.compile(r'hidraw\d{2}')
|
||||
## get kernel logs
|
||||
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
if result.returncode != 0:
|
||||
return False
|
||||
|
||||
## search in log for usb device
|
||||
lines = result.stdout.splitlines()
|
||||
for line in lines:
|
||||
if search_string in line:
|
||||
matches = pattern.findall(line)
|
||||
break
|
||||
|
||||
## search in found log line the usb name and verify path exists
|
||||
if len(matches) > 0:
|
||||
dev_path = f'/dev/{matches[0]}'
|
||||
if path.exists(f'/dev/{matches[0]}'):
|
||||
self.device = f'/dev/{matches[0]}'
|
||||
return True
|
||||
else: return False
|
||||
|
||||
def get_local_data(self) -> bool:
|
||||
# load only if need (in client mode not needed)
|
||||
from CO2Meter import CO2Meter
|
||||
|
||||
if self.device is None:
|
||||
return False
|
||||
resp = {}
|
||||
sensor = CO2Meter(self.device)
|
||||
while not all(key in resp.keys() for key in ['co2', 'temperature']):
|
||||
sleep(1)
|
||||
resp = sensor.get_data()
|
||||
|
||||
self.co2 = resp['co2']
|
||||
self.temp = resp['temperature']
|
||||
return True
|
||||
|
||||
|
||||
class AirQCO2Resource(nagiosplugin.Resource):
|
||||
def __init__(self, airq) -> None:
|
||||
self.airq = airq
|
||||
|
||||
def probe(self) -> list:
|
||||
"""read from airq the co2 value
|
||||
|
||||
Returns:
|
||||
list[nagisplugin.Metric]: multiple metric elements (yield)
|
||||
"""
|
||||
return nagiosplugin.Metric(name='airq_co2', value=int(self.airq.co2), context='scalar_context')
|
||||
|
||||
|
||||
class AirQTempResource(nagiosplugin.Resource):
|
||||
def __init__(self, airq) -> None:
|
||||
self.airq = airq
|
||||
|
||||
def probe(self) -> list:
|
||||
"""read from airq the temperature value
|
||||
|
||||
Returns:
|
||||
list[nagisplugin.Metric]: multiple metric elements (yield)
|
||||
"""
|
||||
return nagiosplugin.Metric(name='airq_co2', value=float(self.airq.temp), context='scalar_context')
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
"""evaluates given arguments
|
||||
|
||||
Returns:
|
||||
argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument)
|
||||
"""
|
||||
argp = argparse.ArgumentParser(description=__doc__)
|
||||
# Default args
|
||||
argp.add_argument('-v', '--verbose', action='count', default=0,
|
||||
help='increase output verbosity (use up to 3 times)')
|
||||
argp.add_argument('-H', '--hostname', default='localhost',
|
||||
help='IP address or hostname of device to query')
|
||||
argp.add_argument('-t', '--timeout', default=30,
|
||||
help='abort execution after TIMEOUT seconds')
|
||||
argp.add_argument('-m', '--check_mode',
|
||||
choices=[
|
||||
'co2',
|
||||
'temp',
|
||||
'server',
|
||||
'client',
|
||||
], default='local',
|
||||
help='check mode to run')
|
||||
|
||||
# Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT
|
||||
argp.add_argument('-w', '--warning', default=':80',
|
||||
help='warning threshold')
|
||||
argp.add_argument('-c', '--critical', default=':90',
|
||||
help='critical threshold')
|
||||
|
||||
## AirQ args
|
||||
argp.add_argument('-A', '--airq_mode',
|
||||
choices=[
|
||||
'local',
|
||||
'client',
|
||||
], default='local',
|
||||
help='set mode for airq class')
|
||||
argp.add_argument('-S', '--airq_server', default='localhost',
|
||||
help='ip or hostname of airq server')
|
||||
argp.add_argument('-P', '--airq_port', default=4554,
|
||||
help='port of airq server')
|
||||
args = argp.parse_args()
|
||||
return args
|
||||
|
||||
# @nagiosplugin.guarded(verbose=0)
|
||||
def main():
|
||||
args = parse_args()
|
||||
if args.verbose >= 3:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
if args.check_mode != 'server':
|
||||
airq = AirQ(run_mode=args.airq_mode, server=args.airq_server, port=args.airq_port)
|
||||
check = None
|
||||
|
||||
# dice which check will be run bases on check_mode
|
||||
match args.check_mode:
|
||||
case 'co2':
|
||||
check = nagiosplugin.Check(
|
||||
AirQCO2Resource(airq=airq),
|
||||
nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
|
||||
nagiosplugin.Summary())
|
||||
check.name = "CO2"
|
||||
case 'temp':
|
||||
check = nagiosplugin.Check(
|
||||
AirQTempResource(airq=airq),
|
||||
nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
|
||||
nagiosplugin.Summary())
|
||||
check.name = "Temperature"
|
||||
case 'server':
|
||||
airq = AirQ(run_mode=args.check_mode, server=args.airq_server, port=args.airq_port)
|
||||
case _:
|
||||
raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}')
|
||||
|
||||
if check is not None:
|
||||
check.main(args.verbose, args.timeout)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user