Compare commits

...

15 Commits

Author SHA1 Message Date
ececdf0b04 add performance data 2025-03-29 12:08:38 +01:00
d15d40e902 fix error if oid_index empty 2025-03-21 20:41:25 +01:00
03cf24f072 add error if no response 2025-03-21 19:40:43 +01:00
42c3eac20a inital check with disk function 2025-03-21 19:30:32 +01:00
4d6becffa1 add disk counter 2025-03-11 20:27:15 +01:00
493a395150 optimize codestyle 2025-03-11 18:59:15 +01:00
4294372e41 try other uom because icinga / graphana do shit with this 2025-02-28 21:48:56 +01:00
6f0655e8fe fix type for raid status 2025-02-28 21:04:12 +01:00
2099b14708 inital version of speedtest 2025-02-28 21:01:43 +01:00
8a5fab8a24 fix bug for raid status warnings 2025-02-28 20:57:35 +01:00
a3afc470bb add uom 2025-02-27 23:52:00 +01:00
87147e6dff fix naming and docstring 2025-02-27 23:40:58 +01:00
b9fec51fda add requirement files for pip 2025-02-27 23:07:28 +01:00
bb1d45335a fix regex for device search 2025-02-27 23:02:31 +01:00
6628c638af fix regex for device search 2025-02-27 23:02:23 +01:00
7 changed files with 473 additions and 58 deletions

View File

@@ -8,7 +8,7 @@ or
- pip3 install git+https://git.ao-it.net/python/CO2Meter - pip3 install git+https://git.ao-it.net/python/CO2Meter
""" """
__version__ = '0.3.0' __version__ = '0.3.3'
__author__ = 'anima' __author__ = 'anima'
# imports # imports
@@ -103,7 +103,7 @@ class AirQ:
matches = list() matches = list()
# log has con is the log entry rolls out # log has con is the log entry rolls out
command = ["journalctl", "-k", "--no-pager"] # alternative: dmesg # maybe usefull lsusb command = ["journalctl", "-k", "--no-pager"] # alternative: dmesg # maybe usefull lsusb
pattern = re.compile(r'hidraw\d{2}') pattern = re.compile(r'hidraw\d{1,2}')
## get kernel logs ## get kernel logs
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0: if result.returncode != 0:
@@ -121,9 +121,9 @@ class AirQ:
## search in found log line the usb name and verify path exists ## search in found log line the usb name and verify path exists
if len(matches) > 0: if len(matches) > 0:
dev_path = f'/dev/{matches[0]}' dev_path = f'/dev/{matches[0]}'
if path.exists(f'/dev/{matches[0]}'): if path.exists(dev_path):
logging.debug(f'usb device is in /dev/{matches[0]}') logging.debug(f'usb device is under path {dev_path}')
self.device = f'/dev/{matches[0]}' self.device = dev_path
return True return True
logging.error(f'usb device not found!') logging.error(f'usb device not found!')
return False return False
@@ -159,9 +159,9 @@ class AirQCO2Resource(nagiosplugin.Resource):
"""read from airq the co2 value """read from airq the co2 value
Returns: Returns:
list[nagisplugin.Metric]: multiple metric elements (yield) nagisplugin.Metric: metric elements
""" """
return nagiosplugin.Metric(name='airq_co2', value=int(self.airq.co2), context='scalar_context') return nagiosplugin.Metric(name='airq_co2', value=int(self.airq.co2), uom='ppm', context='scalar_context')
class AirQTempResource(nagiosplugin.Resource): class AirQTempResource(nagiosplugin.Resource):
@@ -172,9 +172,9 @@ class AirQTempResource(nagiosplugin.Resource):
"""read from airq the temperature value """read from airq the temperature value
Returns: Returns:
list[nagisplugin.Metric]: multiple metric elements (yield) nagisplugin.Metric: metric elements
""" """
return nagiosplugin.Metric(name='airq_co2', value=float(self.airq.temp), context='scalar_context') return nagiosplugin.Metric(name='airq_temp', value=float(self.airq.temp), uom='°C', context='scalar_context')
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:

View File

@@ -6,7 +6,7 @@
- pip3 install requests - pip3 install requests
""" """
__version__ = '0.5.2' __version__ = '0.5.3'
__author__ = 'anima' __author__ = 'anima'
# imports # imports
@@ -165,6 +165,7 @@ class DockerSwarmServiceStatesResource(nagiosplugin.Resource):
data['state'] = service['state'] data['state'] = service['state']
data['status'] = service['status']['tasks'] data['status'] = service['status']['tasks']
yield nagiosplugin.Metric(name='docker_service', value=data, context='docker_service') yield nagiosplugin.Metric(name='docker_service', value=data, context='docker_service')
return nagiosplugin.Metric(name='services', value=len(response), context='scalar_context')
class DockerSwarmServiceStatesContext(nagiosplugin.Context): class DockerSwarmServiceStatesContext(nagiosplugin.Context):
@@ -208,6 +209,7 @@ class DockerSwarmServiceUpdatesResource(nagiosplugin.Resource):
""" """
response = self.api.services() response = self.api.services()
updates = 0
for service in response: for service in response:
data = dict() data = dict()
data['name'] = service['serviceName'] data['name'] = service['serviceName']
@@ -234,6 +236,11 @@ class DockerSwarmServiceUpdatesResource(nagiosplugin.Resource):
data['latestDigest'] = None data['latestDigest'] = None
yield nagiosplugin.Metric(name='docker_service', value=data, context='docker_service') yield nagiosplugin.Metric(name='docker_service', value=data, context='docker_service')
if data['latestDigest'] is not None and data['imageDigest'] != data['latestDigest']:
updates += 1
return nagiosplugin.Metric(name='updates', value=updates, context='scalar_context')
class DockerSwarmServiceUpdatesContext(nagiosplugin.Context): class DockerSwarmServiceUpdatesContext(nagiosplugin.Context):
def __init__(self, name): def __init__(self, name):
@@ -353,12 +360,14 @@ def main():
check = nagiosplugin.Check( check = nagiosplugin.Check(
DockerSwarmServiceStatesResource(api=api), DockerSwarmServiceStatesResource(api=api),
DockerSwarmServiceStatesContext(name='docker_service'), DockerSwarmServiceStatesContext(name='docker_service'),
nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
DockerSwarmSummary()) DockerSwarmSummary())
check.name = "swarm service states" check.name = "swarm service states"
case 'service_updates': case 'service_updates':
check = nagiosplugin.Check( check = nagiosplugin.Check(
DockerSwarmServiceUpdatesResource(api=api), DockerSwarmServiceUpdatesResource(api=api),
DockerSwarmServiceUpdatesContext(name='docker_service'), DockerSwarmServiceUpdatesContext(name='docker_service'),
nagiosplugin.ScalarContext(name='scalar_context', warning=args.warning, critical=args.critical),
DockerSwarmSummary()) DockerSwarmSummary())
check.name = "swarm service updates" check.name = "swarm service updates"
case _: case _:

262
checks/check_snmp_linux.py Executable file
View File

@@ -0,0 +1,262 @@
#!/usr/bin/env python3
"""SNMP defaults linux python check"""
"""dependencys:
- pip3 install nagiosplugin
- pip3 install argparse
- pip3 install easysnmp
- https://easysnmp.readthedocs.io/en/latest/index.html
- https://easysnmp.readthedocs.io/en/latest/session_api.html
"""
__version__ = '0.1.2'
__author__ = 'anima'
# imports
import logging
import argparse
import nagiosplugin
from easysnmp import Session
# log settings
logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
def init_snmp_session(hostname: str = 'localhost', snmp_version: str | int = '2', snmp_community: str = 'public',
snmp_security_level: str = 'auth_with_privacy', snmp_security_username: str = 'monitoring',
snmp_auth_protocol: str = 'SHA256', snmp_auth_password: str | None = None,
snmp_privacy_protocol: str = 'AES', snmp_privacy_password: str | None = None
) -> Session:
"""init a easysnmp session
Args:
hostname (str, optional): hostname or ip of host. Defaults to 'localhost'.
snmp_version (str, optional): version of snmp (1, 2, 3). Defaults to 2.
snmp_community (str, only need for v1 & v2): snmp community if snmp v1 or v2 used (any). Defaults to 'public'.
snmp_security_level (str, only need for v3): security level (no_auth_or_privacy, auth_without_privacy or auth_with_privacy). Defaults to 'auth_with_privacy'.
snmp_security_username (str, only need for v3): security name (any). Defaults to 'monitoring'.
snmp_auth_protocol (str, only need for v3): authentication protocol (MD5, SHA256, SHA512). Defaults to 'SHA256'.
snmp_auth_password (str, only need for v3): authentication passphrase (any). Defaults to None.
snmp_privacy_protocol (str, only need for v3): privacy protocol (AES, DES). Defaults to 'AES'.
snmp_privacy_password (str, only need for v3): privacy passphrase (any). Defaults to None.
Return:
easysnmp.Session: session for get or walk actions
"""
# check if snmp version an valid str or int and saves as int
valid_versions: list[str, int] = ['1','2','3', 1, 2, 3]
if snmp_version not in valid_versions:
raise ValueError(f'{snmp_version=} non of {valid_versions=}')
else:
snmp_version = int(snmp_version)
if snmp_version == '3':
# check if snmp security level valid
valid_security_levels: list[str] = ['no_auth_or_privacy', 'auth_without_privacy', 'auth_with_privacy']
if snmp_security_level not in valid_security_levels:
raise ValueError(f'{snmp_security_level=} is non of {valid_security_levels=}')
# check if auth protocol valid
if snmp_security_level.startswith('auth'):
valid_auth_protocols: list[str] = ['MD5', 'SHA', 'SHA256', 'SHA512']
if snmp_auth_protocol not in valid_auth_protocols:
raise ValueError(f'{snmp_auth_protocol=} is non of {valid_auth_protocols=}')
if snmp_auth_password is None:
raise ValueError(f'{snmp_auth_password=} is not set')
# check if privacy protocol valid
if not snmp_security_level.startswith('no'):
valid_privacy_protocols: list[str] = ['AES', 'DES']
if snmp_privacy_protocol not in valid_privacy_protocols:
raise ValueError(f'{snmp_privacy_protocol=} is non of {valid_privacy_protocols=}')
if snmp_privacy_password is None:
raise ValueError(f'{snmp_privacy_password=} is not set')
if snmp_version == 3: # 2025-01-02: only 'auth_with_privacy' is tested
session = Session(hostname=hostname, version=snmp_version,
security_level=snmp_security_level, security_username=snmp_security_username,
privacy_protocol=snmp_privacy_protocol, privacy_password=snmp_privacy_password,
auth_protocol=snmp_auth_protocol, auth_password=snmp_auth_password)
else:
session = Session(hostname=hostname, version=snmp_version, community=snmp_community)
return session
#
# General Summary
#
class SNMPSummary(nagiosplugin.Summary):
"""default summary for own results"""
def verbose(self, results):
"""creates a multiline response"""
result_str = ''
for result in results:
result_str += f'{str(result)}\n'
return result_str
def ok(self, results):
return
def problem(self, results):
return
#
# Disk
#
class SNMPDiskResource(nagiosplugin.Resource):
def __init__(self, session, exclude_tmpfs:bool = True):
"""disk check via snmp
Args:
session (easysnmp.Session): Easysnmp Session object
exclude_tmpfs (bool, optional): ignore disks from dev type tmpfs. Defaults to True.
"""
self.session = session
self.exclude_tmpfs = exclude_tmpfs
def probe(self) -> list:
"""check disk size of all disks
Returns:
list[nagisplugin.Metric]: multiple metric elements (yield)
"""
baseoid = '.1.3.6.1.4.1.2021.9.1'
oids = dict()
oids['dskIndex'] = '.1'
oids['dskPath'] = '.2'
oids['dskDevice'] = '.3'
oids['dskTotal'] = '.6'
oids['dskAvail'] = '.7'
oids['dskUsed'] = '.8'
oids['dskPercent'] = '.9'
oids['dskPercentNode'] = '.10'
results = dict()
disks = dict()
for key,value in oids.items():
results[key] = self.session.walk(baseoid + value)
if len(results[key]) <= 0:
raise ValueError('no values from snmp')
for key, value in results.items():
for item in value:
if item.oid_index == '':
if key == 'dskIndex':
disks[item.value] = dict()
else:
disks[item.oid.split('.')[-1]][key] = item.value
else:
if key == 'dskIndex':
disks[item.value] = dict()
else:
disks[item.oid_index][key] = item.value
for disk in disks.values():
if not (self.exclude_tmpfs and disk['dskDevice'] == 'tmpfs'):
yield nagiosplugin.Metric(name=disk['dskPath'], value=int(disk['dskPercent']), uom='%', context='scalar_context')
yield nagiosplugin.Metric(name=disk['dskPath'], value=disk, context='disk_info')
class SNMPDiskResult(nagiosplugin.Result):
def human_size(self, value: int | float, source_unit: str = 'b', target_unit: str = 'gb') -> list[float, str]:
"""convert a byte int to a higher unit
stop convert if value < 1024
supported units: ['b', 'kb', 'mb', 'gb', 'tb']
Args:
value (int | float): original value
source_unit (str, optional): unit of original unit. Defaults to 'b'.
target_unit (str, optional): target unit. Defaults to 'gb'.
Returns:
list[float, str]: [new_value, unit]
"""
units = ['b', 'kb', 'mb', 'gb', 'tb']
if value < 1024 or source_unit == target_unit:
return [value, source_unit]
return self.human_size(value / 1024, units[units.index(source_unit) + 1], target_unit=target_unit)
def __str__(self):
free_size, free_unit = self.human_size(int(self.metric.value["dskAvail"]), 'kb')
return f'{self.metric.value["dskPath"]} has {round(free_size, 2)} {free_unit} free'
def parse_args() -> argparse.Namespace:
"""evaluates given arguments
Returns:
argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument)
"""
argp = argparse.ArgumentParser(description=__doc__)
# Default args
argp.add_argument('-v', '--verbose', action='count', default=0,
help='increase output verbosity (use up to 3 times)')
argp.add_argument('-H', '--hostname', default='localhost',
help='IP address or hostname of device to query')
argp.add_argument('-t', '--timeout', default=30,
help='abort execution after TIMEOUT seconds')
argp.add_argument('-m', '--check_mode',
choices=[
'disk',
'cpu',
'load?',
'memory',
],
help='check mode to run')
# Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT
argp.add_argument('-w', '--warning', default=':80',
help='warning threshold')
argp.add_argument('-c', '--critical', default=':90',
help='critical threshold')
# SNMP args (same -* as snmpwalk / snmpget linux command; exept -C/-c & -V /-v because already used (see above))
argp.add_argument('-V', '--snmp-version', default='2',
help='Used SNMP Version. (1, 2, 3)')
argp.add_argument('-C', '--snmp_community', default='public',
help='SNMP community if snmp v1 or v2 used (any).')
argp.add_argument('-l', '--snmp-security-level', default='auth_with_privacy',
help='Security level (no_auth_or_privacy, auth_without_privacy or auth_with_privacy)')
argp.add_argument('-u', '--snmp-security-username', default='monitoring',
help='Security name (any)')
argp.add_argument('-a', '--snmp-auth-protocol', default='SHA256',
help='Authentication protocol (MD5, SHA256, SHA512)')
argp.add_argument('-A', '--snmp-auth-pass', default=None,
help='Authentication passphrase (any)')
argp.add_argument('-x', '--snmp-privacy-protocol', default='AES',
help='Privacy protocol (AES, DES)')
argp.add_argument('-X', '--snmp-privacy-pass', default=None,
help='Privacy passphrase (any)')
args = argp.parse_args()
return args
def main():
args = parse_args()
if args.verbose >= 3:
logging.getLogger().setLevel(logging.DEBUG)
session = init_snmp_session(hostname=args.hostname, snmp_version=args.snmp_version, snmp_community=args.snmp_community,
snmp_security_level=args.snmp_security_level, snmp_security_username=args.snmp_security_username,
snmp_auth_protocol=args.snmp_auth_protocol, snmp_auth_password=args.snmp_auth_pass,
snmp_privacy_protocol=args.snmp_privacy_protocol, snmp_privacy_password=args.snmp_privacy_pass)
# dice which check will be run bases on check_mode
match args.check_mode:
case 'disk':
check = nagiosplugin.Check(
SNMPDiskResource(session=session),
nagiosplugin.Context(name='disk_info', result_cls=SNMPDiskResult),
nagiosplugin.ScalarContext(name='scalar_context', fmt_metric='{name} is {value}{uom} used', warning=args.warning, critical=args.critical),
SNMPSummary())
check.name = "Disk usage"
case _:
raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}')
check.main(args.verbose, args.timeout)
if __name__ == '__main__':
main()

View File

@@ -8,7 +8,7 @@
- https://easysnmp.readthedocs.io/en/latest/session_api.html - https://easysnmp.readthedocs.io/en/latest/session_api.html
""" """
__version__ = '0.13.1' __version__ = '0.14.0'
__author__ = 'anima' __author__ = 'anima'
# imports # imports
@@ -491,6 +491,7 @@ class SNMPSynologyDiskHealthResource(nagiosplugin.Resource, SNMPSynologyDisk):
Generator[nagisplugin.Metric]: multiple metric elements (yield) Generator[nagisplugin.Metric]: multiple metric elements (yield)
""" """
disks = self.get_disks() disks = self.get_disks()
yield nagiosplugin.Metric(name='disk_count', value=len(disks), context='disk_scalar_context')
for disk in disks: for disk in disks:
yield nagiosplugin.Metric(name=disk['id'], value=disk, context='disk_status_context') yield nagiosplugin.Metric(name=disk['id'], value=disk, context='disk_status_context')
yield nagiosplugin.Metric(name=disk['id'], value=disk, context='disk_health_context') yield nagiosplugin.Metric(name=disk['id'], value=disk, context='disk_health_context')
@@ -595,7 +596,7 @@ class SNMPSynologyRaidStatusContext(nagiosplugin.Context):
def evaluate(self, metric, resource): def evaluate(self, metric, resource):
if metric.value['status'] in ['12', '18']: if metric.value['status'] in ['12', '18']:
return self.result_cls(nagiosplugin.Critical, "critical", metric) return self.result_cls(nagiosplugin.Critical, "critical", metric)
elif metric.value['status'] in range(2,21): elif 2 <= int(metric.value['status']) <= 21:
return self.result_cls(nagiosplugin.Warn, "warning", metric) return self.result_cls(nagiosplugin.Warn, "warning", metric)
elif metric.value['status'] == '1': elif metric.value['status'] == '1':
return self.result_cls(nagiosplugin.Ok, "ok", metric) return self.result_cls(nagiosplugin.Ok, "ok", metric)
@@ -759,78 +760,94 @@ def main():
# dice which check will be run bases on check_mode # dice which check will be run bases on check_mode
match args.check_mode: match args.check_mode:
case 'system': case 'system':
check = nagiosplugin.Check(SNMPSynologySystemResource(session=session), check = nagiosplugin.Check(
SNMPSynologySystemResource(session=session),
SNMPSynologySystemContext(name='system_context'), SNMPSynologySystemContext(name='system_context'),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "System Status" check.name = "System Status"
case 'temperature': case 'temperature':
check = nagiosplugin.Check(SNMPSynologyTemperatureResource(session=session), check = nagiosplugin.Check(
SNMPSynologyTemperatureResource(session=session),
nagiosplugin.ScalarContext(name='temperature_scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.ScalarContext(name='temperature_scalar_context', warning=args.warning, critical=args.critical),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "System Temperature" check.name = "System Temperature"
case 'powersupply': case 'powersupply':
check = nagiosplugin.Check(SNMPSynologyPowerSupplyResource(session=session), check = nagiosplugin.Check(
SNMPSynologyPowerSupplyResource(session=session),
SNMPSynologyPowerSupplyContext(name='powersupply_context'), SNMPSynologyPowerSupplyContext(name='powersupply_context'),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "Power Supply Status" check.name = "Power Supply Status"
case 'fans': case 'fans':
check = nagiosplugin.Check(SNMPSynologyFansResource(session=session), check = nagiosplugin.Check(
SNMPSynologyFansResource(session=session),
SNMPSynologyFansContext(name='fan_context'), SNMPSynologyFansContext(name='fan_context'),
SNMPSynologySummary()) SNMPSynologySummary())
check.name = "Fans Status" check.name = "Fans Status"
case 'firmware': case 'firmware':
check = nagiosplugin.Check(SNMPSynologyFirmwareResource(session=session), check = nagiosplugin.Check(
SNMPSynologyFirmwareResource(session=session),
SNMPSynologyFirmwareContext(name='firmware_context'), SNMPSynologyFirmwareContext(name='firmware_context'),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "Firmware" check.name = "Firmware"
case 'cpu': case 'cpu':
check = nagiosplugin.Check(SNMPSynologyCPUResource(session=session), check = nagiosplugin.Check(
SNMPSynologyCPUResource(session=session),
nagiosplugin.ScalarContext(name='cpu_scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.ScalarContext(name='cpu_scalar_context', warning=args.warning, critical=args.critical),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "CPU Usage" check.name = "CPU Usage"
case 'memory': case 'memory':
check = nagiosplugin.Check(SNMPSynologyMemoryResource(session=session), check = nagiosplugin.Check(
SNMPSynologyMemoryResource(session=session),
nagiosplugin.ScalarContext(name='memory_scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.ScalarContext(name='memory_scalar_context', warning=args.warning, critical=args.critical),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "Memory Usage" check.name = "Memory Usage"
case 'disk': case 'disk':
check = nagiosplugin.Check(SNMPSynologyDiskHealthResource(session=session), check = nagiosplugin.Check(
SNMPSynologyDiskHealthResource(session=session),
SNMPSynologyDiskHealthContext(name='disk_health_context'), SNMPSynologyDiskHealthContext(name='disk_health_context'),
SNMPSynologyDiskStatusContext(name='disk_status_context'), SNMPSynologyDiskStatusContext(name='disk_status_context'),
nagiosplugin.ScalarContext(name='disk_scalar_context', warning=args.warning, critical=args.critical),
SNMPSynologySummary()) SNMPSynologySummary())
check.name = "Disk Health" check.name = "Disk Health"
case 'disk_retry': case 'disk_retry':
check = nagiosplugin.Check(SNMPSynologyDiskRetryResource(session=session), check = nagiosplugin.Check(
SNMPSynologyDiskRetryResource(session=session),
nagiosplugin.ScalarContext(name='disk_retry_scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.ScalarContext(name='disk_retry_scalar_context', warning=args.warning, critical=args.critical),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "Disk Connection Retrys" check.name = "Disk Connection Retrys"
case 'disk_life': case 'disk_life':
check = nagiosplugin.Check(SNMPSynologyDiskLifeResource(session=session), check = nagiosplugin.Check(
SNMPSynologyDiskLifeResource(session=session),
nagiosplugin.ScalarContext(name='disk_life_scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.ScalarContext(name='disk_life_scalar_context', warning=args.warning, critical=args.critical),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "Disk Remaining Life" check.name = "Disk Remaining Life"
case 'disk_ident': case 'disk_ident':
check = nagiosplugin.Check(SNMPSynologyDiskIdentResource(session=session), check = nagiosplugin.Check(
SNMPSynologyDiskIdentResource(session=session),
nagiosplugin.ScalarContext(name='disk_ident_scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.ScalarContext(name='disk_ident_scalar_context', warning=args.warning, critical=args.critical),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "Disk IdentFail" check.name = "Disk IdentFail"
case 'disk_sectors': case 'disk_sectors':
check = nagiosplugin.Check(SNMPSynologyDiskSectorsResource(session=session), check = nagiosplugin.Check(
SNMPSynologyDiskSectorsResource(session=session),
nagiosplugin.ScalarContext(name='disk_sectors_scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.ScalarContext(name='disk_sectors_scalar_context', warning=args.warning, critical=args.critical),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "Disk BadSectors" check.name = "Disk BadSectors"
case 'raid': case 'raid':
check = nagiosplugin.Check(SNMPSynologyRaidStatusResource(session=session), check = nagiosplugin.Check(
SNMPSynologyRaidStatusResource(session=session),
SNMPSynologyRaidStatusContext(name='raid_status_context'), SNMPSynologyRaidStatusContext(name='raid_status_context'),
SNMPSynologySummary()) SNMPSynologySummary())
check.name = "Raid status" check.name = "Raid status"
case 'raid_space': case 'raid_space':
check = nagiosplugin.Check(SNMPSynologyRaidSpaceResource(session=session), check = nagiosplugin.Check(
SNMPSynologyRaidSpaceResource(session=session),
nagiosplugin.ScalarContext(name='raid_space_scalar_context', warning=args.warning, critical=args.critical), nagiosplugin.ScalarContext(name='raid_space_scalar_context', warning=args.warning, critical=args.critical),
nagiosplugin.Summary()) nagiosplugin.Summary())
check.name = "Raid space usage" check.name = "Raid space usage"
case 'ups': case 'ups':
check = nagiosplugin.Check(SNMPSynologyUPSResource(session=session), check = nagiosplugin.Check(
SNMPSynologyUPSResource(session=session),
SNMPSynologyUPSContext(name='ups_context'), SNMPSynologyUPSContext(name='ups_context'),
SNMPSynologySummary()) SNMPSynologySummary())
check.name = "UPS status" check.name = "UPS status"

117
checks/check_speedtest.py Executable file
View File

@@ -0,0 +1,117 @@
#!/usr/bin/env python3
"""speedtest.net python check for icinga"""
"""dependencys:
- pip3 install nagiosplugin
- pip3 install argparse
- pip3 install speedtest-cli
or
system package on debian:
- apt install speedtest
"""
__version__ = '0.1.0'
__author__ = 'anima'
# imports
import logging
import argparse
import nagiosplugin
import speedtest
# log settings
logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
class SpeedtestResource(nagiosplugin.Resource):
def __init__(self) -> None:
self.test = speedtest.Speedtest(secure=True)
def probe(self) -> list:
"""speedtest check for up & download
Returns:
Generator[nagisplugin.Metric]: multiple metric elements (yield)
"""
def to_mb(byte) -> float:
kb = byte / 1024
mb = kb / 1024
return mb
logging.debug('start download speedtest...')
self.test.download()
logging.debug(f'download speed: {self.test.results.download}')
logging.debug('start upload speedtest...')
self.test.upload()
logging.debug(f'upload speed: {self.test.results.upload}')
logging.debug(f'used server: {self.test.results.server}')
yield nagiosplugin.Metric(name='download', value=round(to_mb(self.test.results.download), 2), uom='mbps', context='download_scalar_context')
yield nagiosplugin.Metric(name='upload', value=round(to_mb(self.test.results.upload), 2), uom='mbps', context='upload_scalar_context')
yield nagiosplugin.Metric(name='ping', value=self.test.results.ping, uom='ms', context='ping_scalar_context')
def parse_args() -> argparse.Namespace:
"""evaluates given arguments
Returns:
argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument)
"""
argp = argparse.ArgumentParser(description=__doc__)
# Default args
argp.add_argument('-v', '--verbose', action='count', default=0,
help='increase output verbosity (use up to 3 times)')
argp.add_argument('-H', '--hostname', default='localhost',
help='IP address or hostname of device to query')
argp.add_argument('-t', '--timeout', default=60,
help='abort execution after TIMEOUT seconds')
argp.add_argument('-m', '--check_mode',
choices=[
'speedtest',
], default='speedtest',
help='check mode to run')
# Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT
argp.add_argument('-w', '--warning', default='80:',
help='not used for this check') # for compatibility not removed
argp.add_argument('-c', '--critical', default='90:',
help='not used for this check') # for compatibility not removed
argp.add_argument('-wd', '--warning-download', default='90:',
help='warning threshold for download in mb')
argp.add_argument('-cd', '--critical-download', default='25:',
help='critical threshold for download in mb')
argp.add_argument('-wu', '--warning-upload', default='25:',
help='warning threshold for upload in mb')
argp.add_argument('-cu', '--critical-upload', default='10:',
help='critical threshold for upload in mb')
argp.add_argument('-wp', '--warning-ping', default=':50',
help='warning threshold for ping in ms')
argp.add_argument('-cp', '--critical-ping', default=':100',
help='critical threshold for ping in ms')
args = argp.parse_args()
return args
# @nagiosplugin.guarded(verbose=0)
def main():
args = parse_args()
if args.verbose >= 3:
logging.getLogger().setLevel(logging.DEBUG)
# dice which check will be run bases on check_mode
match args.check_mode:
case 'speedtest':
check = nagiosplugin.Check(
SpeedtestResource(),
nagiosplugin.ScalarContext(name='download_scalar_context', warning=args.warning_download, critical=args.critical_download),
nagiosplugin.ScalarContext(name='upload_scalar_context', warning=args.warning_upload, critical=args.critical_upload),
nagiosplugin.ScalarContext(name='ping_scalar_context', warning=args.warning_ping, critical=args.critical_ping),
nagiosplugin.Summary())
check.name = "speedtest"
case _:
raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}')
check.main(args.verbose, args.timeout)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,8 @@
## for all checks
nagiosplugin
## for api checks
requests
## for snmp checks
easysnmp

2
requirements/airq.txt Normal file
View File

@@ -0,0 +1,2 @@
## for airq
pip3 install git+https://github.com/heinemml/CO2Meter