#!/usr/bin/env python3 """SNMP defaults linux python check""" """dependencys: - pip3 install nagiosplugin - pip3 install argparse - pip3 install easysnmp - https://easysnmp.readthedocs.io/en/latest/index.html - https://easysnmp.readthedocs.io/en/latest/session_api.html """ __version__ = '0.1.2' __author__ = 'anima' # imports import logging import argparse import nagiosplugin from easysnmp import Session # log settings logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO) def init_snmp_session(hostname: str = 'localhost', snmp_version: str | int = '2', snmp_community: str = 'public', snmp_security_level: str = 'auth_with_privacy', snmp_security_username: str = 'monitoring', snmp_auth_protocol: str = 'SHA256', snmp_auth_password: str | None = None, snmp_privacy_protocol: str = 'AES', snmp_privacy_password: str | None = None ) -> Session: """init a easysnmp session Args: hostname (str, optional): hostname or ip of host. Defaults to 'localhost'. snmp_version (str, optional): version of snmp (1, 2, 3). Defaults to 2. snmp_community (str, only need for v1 & v2): snmp community if snmp v1 or v2 used (any). Defaults to 'public'. snmp_security_level (str, only need for v3): security level (no_auth_or_privacy, auth_without_privacy or auth_with_privacy). Defaults to 'auth_with_privacy'. snmp_security_username (str, only need for v3): security name (any). Defaults to 'monitoring'. snmp_auth_protocol (str, only need for v3): authentication protocol (MD5, SHA256, SHA512). Defaults to 'SHA256'. snmp_auth_password (str, only need for v3): authentication passphrase (any). Defaults to None. snmp_privacy_protocol (str, only need for v3): privacy protocol (AES, DES). Defaults to 'AES'. snmp_privacy_password (str, only need for v3): privacy passphrase (any). Defaults to None. Return: easysnmp.Session: session for get or walk actions """ # check if snmp version an valid str or int and saves as int valid_versions: list[str, int] = ['1','2','3', 1, 2, 3] if snmp_version not in valid_versions: raise ValueError(f'{snmp_version=} non of {valid_versions=}') else: snmp_version = int(snmp_version) if snmp_version == '3': # check if snmp security level valid valid_security_levels: list[str] = ['no_auth_or_privacy', 'auth_without_privacy', 'auth_with_privacy'] if snmp_security_level not in valid_security_levels: raise ValueError(f'{snmp_security_level=} is non of {valid_security_levels=}') # check if auth protocol valid if snmp_security_level.startswith('auth'): valid_auth_protocols: list[str] = ['MD5', 'SHA', 'SHA256', 'SHA512'] if snmp_auth_protocol not in valid_auth_protocols: raise ValueError(f'{snmp_auth_protocol=} is non of {valid_auth_protocols=}') if snmp_auth_password is None: raise ValueError(f'{snmp_auth_password=} is not set') # check if privacy protocol valid if not snmp_security_level.startswith('no'): valid_privacy_protocols: list[str] = ['AES', 'DES'] if snmp_privacy_protocol not in valid_privacy_protocols: raise ValueError(f'{snmp_privacy_protocol=} is non of {valid_privacy_protocols=}') if snmp_privacy_password is None: raise ValueError(f'{snmp_privacy_password=} is not set') if snmp_version == 3: # 2025-01-02: only 'auth_with_privacy' is tested session = Session(hostname=hostname, version=snmp_version, security_level=snmp_security_level, security_username=snmp_security_username, privacy_protocol=snmp_privacy_protocol, privacy_password=snmp_privacy_password, auth_protocol=snmp_auth_protocol, auth_password=snmp_auth_password) else: session = Session(hostname=hostname, version=snmp_version, community=snmp_community) return session # # General Summary # class SNMPSummary(nagiosplugin.Summary): """default summary for own results""" def verbose(self, results): """creates a multiline response""" result_str = '' for result in results: result_str += f'{str(result)}\n' return result_str def ok(self, results): return def problem(self, results): return # # Disk # class SNMPDiskResource(nagiosplugin.Resource): def __init__(self, session, exclude_tmpfs:bool = True): """disk check via snmp Args: session (easysnmp.Session): Easysnmp Session object exclude_tmpfs (bool, optional): ignore disks from dev type tmpfs. Defaults to True. """ self.session = session self.exclude_tmpfs = exclude_tmpfs def probe(self) -> list: """check disk size of all disks Returns: list[nagisplugin.Metric]: multiple metric elements (yield) """ baseoid = '.1.3.6.1.4.1.2021.9.1' oids = dict() oids['dskIndex'] = '.1' oids['dskPath'] = '.2' oids['dskDevice'] = '.3' oids['dskTotal'] = '.6' oids['dskAvail'] = '.7' oids['dskUsed'] = '.8' oids['dskPercent'] = '.9' oids['dskPercentNode'] = '.10' results = dict() disks = dict() for key,value in oids.items(): results[key] = self.session.walk(baseoid + value) if len(results[key]) <= 0: raise ValueError('no values from snmp') for key, value in results.items(): for item in value: if item.oid_index == '': if key == 'dskIndex': disks[item.value] = dict() else: disks[item.oid.split('.')[-1]][key] = item.value else: if key == 'dskIndex': disks[item.value] = dict() else: disks[item.oid_index][key] = item.value for disk in disks.values(): if not (self.exclude_tmpfs and disk['dskDevice'] == 'tmpfs'): yield nagiosplugin.Metric(name=disk['dskPath'], value=int(disk['dskPercent']), uom='%', context='scalar_context') yield nagiosplugin.Metric(name=disk['dskPath'], value=disk, context='disk_info') class SNMPDiskResult(nagiosplugin.Result): def human_size(self, value: int | float, source_unit: str = 'b', target_unit: str = 'gb') -> list[float, str]: """convert a byte int to a higher unit stop convert if value < 1024 supported units: ['b', 'kb', 'mb', 'gb', 'tb'] Args: value (int | float): original value source_unit (str, optional): unit of original unit. Defaults to 'b'. target_unit (str, optional): target unit. Defaults to 'gb'. Returns: list[float, str]: [new_value, unit] """ units = ['b', 'kb', 'mb', 'gb', 'tb'] if value < 1024 or source_unit == target_unit: return [value, source_unit] return self.human_size(value / 1024, units[units.index(source_unit) + 1], target_unit=target_unit) def __str__(self): free_size, free_unit = self.human_size(int(self.metric.value["dskAvail"]), 'kb') return f'{self.metric.value["dskPath"]} has {round(free_size, 2)} {free_unit} free' def parse_args() -> argparse.Namespace: """evaluates given arguments Returns: argsparse.Namespace: Namespace Object with all arguments insert (use: args.long_name_of_argument) """ argp = argparse.ArgumentParser(description=__doc__) # Default args argp.add_argument('-v', '--verbose', action='count', default=0, help='increase output verbosity (use up to 3 times)') argp.add_argument('-H', '--hostname', default='localhost', help='IP address or hostname of device to query') argp.add_argument('-t', '--timeout', default=30, help='abort execution after TIMEOUT seconds') argp.add_argument('-m', '--check_mode', choices=[ 'disk', 'cpu', 'load?', 'memory', ], help='check mode to run') # Nagios args / see https://nagios-plugins.org/doc/guidelines.html#THRESHOLDFORMAT argp.add_argument('-w', '--warning', default=':80', help='warning threshold') argp.add_argument('-c', '--critical', default=':90', help='critical threshold') # SNMP args (same -* as snmpwalk / snmpget linux command; exept -C/-c & -V /-v because already used (see above)) argp.add_argument('-V', '--snmp-version', default='2', help='Used SNMP Version. (1, 2, 3)') argp.add_argument('-C', '--snmp_community', default='public', help='SNMP community if snmp v1 or v2 used (any).') argp.add_argument('-l', '--snmp-security-level', default='auth_with_privacy', help='Security level (no_auth_or_privacy, auth_without_privacy or auth_with_privacy)') argp.add_argument('-u', '--snmp-security-username', default='monitoring', help='Security name (any)') argp.add_argument('-a', '--snmp-auth-protocol', default='SHA256', help='Authentication protocol (MD5, SHA256, SHA512)') argp.add_argument('-A', '--snmp-auth-pass', default=None, help='Authentication passphrase (any)') argp.add_argument('-x', '--snmp-privacy-protocol', default='AES', help='Privacy protocol (AES, DES)') argp.add_argument('-X', '--snmp-privacy-pass', default=None, help='Privacy passphrase (any)') args = argp.parse_args() return args def main(): args = parse_args() if args.verbose >= 3: logging.getLogger().setLevel(logging.DEBUG) session = init_snmp_session(hostname=args.hostname, snmp_version=args.snmp_version, snmp_community=args.snmp_community, snmp_security_level=args.snmp_security_level, snmp_security_username=args.snmp_security_username, snmp_auth_protocol=args.snmp_auth_protocol, snmp_auth_password=args.snmp_auth_pass, snmp_privacy_protocol=args.snmp_privacy_protocol, snmp_privacy_password=args.snmp_privacy_pass) # dice which check will be run bases on check_mode match args.check_mode: case 'disk': check = nagiosplugin.Check( SNMPDiskResource(session=session), nagiosplugin.Context(name='disk_info', result_cls=SNMPDiskResult), nagiosplugin.ScalarContext(name='scalar_context', fmt_metric='{name} is {value}{uom} used', warning=args.warning, critical=args.critical), SNMPSummary()) check.name = "Disk usage" case _: raise nagiosplugin.CheckError(f'Unknown check mode: {args.check_mode}') check.main(args.verbose, args.timeout) if __name__ == '__main__': main()