inital check with disk function
This commit is contained in:
253
checks/check_snmp_linux.py
Executable file
253
checks/check_snmp_linux.py
Executable file
@@ -0,0 +1,253 @@
|
||||
#!/usr/bin/env python3
|
||||
"""SNMP defaults linux python check"""
|
||||
"""dependencys:
|
||||
- pip3 install nagiosplugin
|
||||
- pip3 install argparse
|
||||
- pip3 install easysnmp
|
||||
- https://easysnmp.readthedocs.io/en/latest/index.html
|
||||
- https://easysnmp.readthedocs.io/en/latest/session_api.html
|
||||
"""
|
||||
|
||||
__version__ = '0.1.0'
|
||||
__author__ = 'anima'
|
||||
|
||||
# imports
|
||||
import logging
|
||||
import argparse
|
||||
import nagiosplugin
|
||||
from easysnmp import Session
|
||||
|
||||
# log settings
|
||||
logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
|
||||
|
||||
def init_snmp_session(hostname: str = 'localhost', snmp_version: str | int = '2', snmp_community: str = 'public',
|
||||
snmp_security_level: str = 'auth_with_privacy', snmp_security_username: str = 'monitoring',
|
||||
snmp_auth_protocol: str = 'SHA256', snmp_auth_password: str | None = None,
|
||||
snmp_privacy_protocol: str = 'AES', snmp_privacy_password: str | None = None
|
||||
) -> Session:
|
||||
"""init a easysnmp session
|
||||
|
||||
Args:
|
||||
hostname (str, optional): hostname or ip of host. Defaults to 'localhost'.
|
||||
snmp_version (str, optional): version of snmp (1, 2, 3). Defaults to 2.
|
||||
snmp_community (str, only need for v1 & v2): snmp community if snmp v1 or v2 used (any). Defaults to 'public'.
|
||||
snmp_security_level (str, only need for v3): security level (no_auth_or_privacy, auth_without_privacy or auth_with_privacy). Defaults to 'auth_with_privacy'.
|
||||
snmp_security_username (str, only need for v3): security name (any). Defaults to 'monitoring'.
|
||||
snmp_auth_protocol (str, only need for v3): authentication protocol (MD5, SHA256, SHA512). Defaults to 'SHA256'.
|
||||
snmp_auth_password (str, only need for v3): authentication passphrase (any). Defaults to None.
|
||||
snmp_privacy_protocol (str, only need for v3): privacy protocol (AES, DES). Defaults to 'AES'.
|
||||
snmp_privacy_password (str, only need for v3): privacy passphrase (any). Defaults to None.
|
||||
|
||||
Return:
|
||||
easysnmp.Session: session for get or walk actions
|
||||
"""
|
||||
# check if snmp version an valid str or int and saves as int
|
||||
valid_versions: list[str, int] = ['1','2','3', 1, 2, 3]
|
||||
if snmp_version not in valid_versions:
|
||||
raise ValueError(f'{snmp_version=} non of {valid_versions=}')
|
||||
else:
|
||||
snmp_version = int(snmp_version)
|
||||
|
||||
if snmp_version == '3':
|
||||
# check if snmp security level valid
|
||||
valid_security_levels: list[str] = ['no_auth_or_privacy', 'auth_without_privacy', 'auth_with_privacy']
|
||||
if snmp_security_level not in valid_security_levels:
|
||||
raise ValueError(f'{snmp_security_level=} is non of {valid_security_levels=}')
|
||||
|
||||
# check if auth protocol valid
|
||||
if snmp_security_level.startswith('auth'):
|
||||
valid_auth_protocols: list[str] = ['MD5', 'SHA', 'SHA256', 'SHA512']
|
||||
if snmp_auth_protocol not in valid_auth_protocols:
|
||||
raise ValueError(f'{snmp_auth_protocol=} is non of {valid_auth_protocols=}')
|
||||
|
||||
if snmp_auth_password is None:
|
||||
raise ValueError(f'{snmp_auth_password=} is not set')
|
||||
|
||||
# check if privacy protocol valid
|
||||
if not snmp_security_level.startswith('no'):
|
||||
valid_privacy_protocols: list[str] = ['AES', 'DES']
|
||||
if snmp_privacy_protocol not in valid_privacy_protocols:
|
||||
raise ValueError(f'{snmp_privacy_protocol=} is non of {valid_privacy_protocols=}')
|
||||
|
||||
if snmp_privacy_password is None:
|
||||
raise ValueError(f'{snmp_privacy_password=} is not set')
|
||||
|
||||
|
||||
if snmp_version == 3: # 2025-01-02: only 'auth_with_privacy' is tested
|
||||
session = Session(hostname=hostname, version=snmp_version,
|
||||
security_level=snmp_security_level, security_username=snmp_security_username,
|
||||
privacy_protocol=snmp_privacy_protocol, privacy_password=snmp_privacy_password,
|
||||
auth_protocol=snmp_auth_protocol, auth_password=snmp_auth_password)
|
||||
else:
|
||||
session = Session(hostname=hostname, version=snmp_version, community=snmp_community)
|
||||
return session
|
||||
|
||||
|
||||
#
|
||||
# General Summary
|
||||
#
|
||||
class SNMPSummary(nagiosplugin.Summary):
|
||||
"""default summary for own results"""
|
||||
def verbose(self, results):
|
||||
"""creates a multiline response"""
|
||||
result_str = ''
|
||||
for result in results:
|
||||
result_str += f'{str(result)}\n'
|
||||
return result_str
|
||||
|
||||
def ok(self, results):
|
||||
return
|
||||
|
||||
def problem(self, results):
|
||||
return
|
||||
|
||||
|
||||
#
|
||||
# Disk
|
||||
#
|
||||
class SNMPDiskResource(nagiosplugin.Resource):
|
||||
def __init__(self, session, exclude_tmpfs:bool = True):
|
||||
"""disk check via snmp
|
||||
|
||||
Args:
|
||||
session (easysnmp.Session): Easysnmp Session object
|
||||
exclude_tmpfs (bool, optional): ignore disks from dev type tmpfs. Defaults to True.
|
||||
"""
|
||||
self.session = session
|
||||
self.exclude_tmpfs = exclude_tmpfs
|
||||
|
||||
def probe(self) -> list:
|
||||
"""check disk size of all disks
|
||||
|
||||
Returns:
|
||||
list[nagisplugin.Metric]: multiple metric elements (yield)
|
||||
"""
|
||||
baseoid = '.1.3.6.1.4.1.2021.9.1'
|
||||
oids = dict()
|
||||
oids['dskIndex'] = '.1'
|
||||
oids['dskPath'] = '.2'
|
||||
oids['dskDevice'] = '.3'
|
||||
oids['dskTotal'] = '.6'
|
||||
oids['dskAvail'] = '.7'
|
||||
oids['dskUsed'] = '.8'
|
||||
oids['dskPercent'] = '.9'
|
||||
oids['dskPercentNode'] = '.10'
|
||||
results = dict()
|
||||
disks = dict()
|
||||
for key,value in oids.items():
|
||||
results[key] = self.session.walk(baseoid + value)
|
||||
for key, value in results.items():
|
||||
for item in value:
|
||||
if key == 'dskIndex':
|
||||
disks[item.value] = dict()
|
||||
else:
|
||||
disks[item.oid_index][key] = item.value
|
||||
|
||||
for disk in disks.values():
|
||||
if not (self.exclude_tmpfs and disk['dskDevice'] == 'tmpfs'):
|
||||
yield nagiosplugin.Metric(name=disk['dskPath'], value=int(disk['dskPercent']), uom='%', context='scalar_context')
|
||||
yield nagiosplugin.Metric(name=disk['dskPath'], value=disk, context='disk_info')
|
||||
|
||||
|
||||
class SNMPDiskResult(nagiosplugin.Result):
|
||||
def human_size(self, value: int | float, source_unit: str = 'b', target_unit: str = 'gb') -> list[float, str]:
|
||||
"""convert a byte int to a higher unit
|
||||
|
||||
stop convert if value < 1024
|
||||
supported units: ['b', 'kb', 'mb', 'gb', 'tb']
|
||||
|
||||
Args:
|
||||
value (int | float): original value
|
||||
source_unit (str, optional): unit of original unit. Defaults to 'b'.
|
||||
target_unit (str, optional): target unit. Defaults to 'gb'.
|
||||
|
||||
Returns:
|
||||
list[float, str]: [new_value, unit]
|
||||
"""
|
||||
units = ['b', 'kb', 'mb', 'gb', 'tb']
|
||||
if value < 1024 or source_unit == target_unit:
|
||||
return [value, source_unit]
|
||||
return self.human_size(value / 1024, units[units.index(source_unit) + 1], target_unit=target_unit)
|
||||
|
||||
def __str__(self):
|
||||
free_size, free_unit = self.human_size(int(self.metric.value["dskAvail"]), 'kb')
|
||||
return f'{self.metric.value["dskPath"]} has {round(free_size, 2)} {free_unit} free'
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
"""evaluates given arguments
|
||||
|
||||
Returns:
|
||||
argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument)
|
||||
"""
|
||||
argp = argparse.ArgumentParser(description=__doc__)
|
||||
# Default args
|
||||
argp.add_argument('-v', '--verbose', action='count', default=0,
|
||||
help='increase output verbosity (use up to 3 times)')
|
||||
argp.add_argument('-H', '--hostname', default='localhost',
|
||||
help='IP address or hostname of device to query')
|
||||
argp.add_argument('-t', '--timeout', default=30,
|
||||
help='abort execution after TIMEOUT seconds')
|
||||
argp.add_argument('-m', '--check_mode',
|
||||
choices=[
|
||||
'disk',
|
||||
'cpu',
|
||||
'load?',
|
||||
'memory',
|
||||
],
|
||||
help='check mode to run')
|
||||
|
||||
# Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT
|
||||
argp.add_argument('-w', '--warning', default=':80',
|
||||
help='warning threshold')
|
||||
argp.add_argument('-c', '--critical', default=':90',
|
||||
help='critical threshold')
|
||||
|
||||
# SNMP args (same -* as snmpwalk / snmpget linux command; exept -C/-c & -V /-v because already used (see above))
|
||||
argp.add_argument('-V', '--snmp-version', default='2',
|
||||
help='Used SNMP Version. (1, 2, 3)')
|
||||
argp.add_argument('-C', '--snmp_community', default='public',
|
||||
help='SNMP community if snmp v1 or v2 used (any).')
|
||||
argp.add_argument('-l', '--snmp-security-level', default='auth_with_privacy',
|
||||
help='Security level (no_auth_or_privacy, auth_without_privacy or auth_with_privacy)')
|
||||
argp.add_argument('-u', '--snmp-security-username', default='monitoring',
|
||||
help='Security name (any)')
|
||||
argp.add_argument('-a', '--snmp-auth-protocol', default='SHA256',
|
||||
help='Authentication protocol (MD5, SHA256, SHA512)')
|
||||
argp.add_argument('-A', '--snmp-auth-pass', default=None,
|
||||
help='Authentication passphrase (any)')
|
||||
argp.add_argument('-x', '--snmp-privacy-protocol', default='AES',
|
||||
help='Privacy protocol (AES, DES)')
|
||||
argp.add_argument('-X', '--snmp-privacy-pass', default=None,
|
||||
help='Privacy passphrase (any)')
|
||||
|
||||
args = argp.parse_args()
|
||||
return args
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
if args.verbose >= 3:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
session = init_snmp_session(hostname=args.hostname, snmp_version=args.snmp_version, snmp_community=args.snmp_community,
|
||||
snmp_security_level=args.snmp_security_level, snmp_security_username=args.snmp_security_username,
|
||||
snmp_auth_protocol=args.snmp_auth_protocol, snmp_auth_password=args.snmp_auth_pass,
|
||||
snmp_privacy_protocol=args.snmp_privacy_protocol, snmp_privacy_password=args.snmp_privacy_pass)
|
||||
|
||||
# dice which check will be run bases on check_mode
|
||||
match args.check_mode:
|
||||
case 'disk':
|
||||
check = nagiosplugin.Check(
|
||||
SNMPDiskResource(session=session),
|
||||
nagiosplugin.Context(name='disk_info', result_cls=SNMPDiskResult),
|
||||
nagiosplugin.ScalarContext(name='scalar_context', fmt_metric='{name} is {value}{uom} used', warning=args.warning, critical=args.critical),
|
||||
SNMPSummary())
|
||||
check.name = "Disk usage"
|
||||
case _:
|
||||
raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}')
|
||||
|
||||
check.main(args.verbose, args.timeout)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user