import logging import os import sqlite3 as sql import shutil class LogHandlerSQLite(logging.Handler): """logging handler for sqlite3""" __AUTHOR__ = 'anima' __VERSION__ = '1.0.0' def __init__(self, level: int | str = 0, dbfile: str = 'logs.db') -> None: """logging handler for sqlite3 :params level: loglevel (see logging lib) :type level: int | str :params dbfile: filename for sqlite db :type dbfile: str """ self._dbfile : str = dbfile self._sqlfiles : str = os.path.dirname(os.path.realpath(__file__)) + '/sql' self.dbVersion : str = '1.0' # needed db version self.__setup_db() super().__init__(level) ##> db general def __db_con(self) -> sql.Connection: """connection handler for with statements :return: connection object :rtype: sqlite3.Connection """ return sql.connect(self._dbfile, detect_types=sql.PARSE_DECLTYPES | sql.PARSE_COLNAMES) def __db_query(self, queryfile: str, payload: list = None, getId: bool = False) -> list | None: """send a query to db set at `dbfile` :param queryfile: filename for sql file (without .sql) multible querys per file possable (split with ;) :type queryfile: str :param payload: send payload to sql query :type payload: list :param getId: if True this will return the id of newest row inerst form this query :type getId: bool :return: db data is query was select or None if no data matches # known bug: works only if payload set (empty possable) :rtype: list | None """ with open(f'{self._sqlfiles}/{queryfile}.sql', 'r') as sqlfile: sqlscript = sqlfile.read() with self.__db_con() as con: cur = con.cursor() if isinstance(payload, list): cur.execute(sqlscript, payload) id = cur.lastrowid else: cur.executescript(sqlscript) data = cur.fetchall() if getId: return id elif len(data) > 0: return data def __setup_db(self) -> None: """init a new db if need check version of db and start update until `dbVersion` match :return: none """ self.__db_query('tables') if self.__get_db_version() is None: self.__db_query('insertDbVersion') while not self.dbVersion == self.__get_db_version(): self.__update_db(self.__get_db_version()) def __update_db(self, version) -> None: """update db file; create a backup for every version :param version: needed version; version structure conroled by sql files :type version: str :return: none """ shutil.copyfile(f'{self._dbfile}', f'{self._dbfile}.v-{version}.bak') self.__db_query(f'update/updateDb-{version}') ##> select (get) querys def __get_db_version(self) -> str | None: """read dbVersion from db :return: version from db :rtype: str """ data = self.__db_query('selectDbVersion', []) if data is None: return data else: return data[0][0] def get_log(self) -> list[tuple] | None: """get all logs from db :return: all logs from db :rtype: list[tuple] """ return self.__db_query('selectLogs', []) def get_log_by_pid(self, pid: int = os.getpid()) -> list[tuple] | None: """get all logs from db match the pid :param pid: process id (default is own from running instance) :type pid: int :return: all logs match the pid or none :rtype: list[tuple] | None """ if isinstance(pid, int): return self.__db_query('selectLogsByPID', [pid]) else: return None ##> logging methods def emit(self, record: logging.LogRecord) -> None: """save log to db :param record: logging record called from logging lib :type record: logging.LogRecord """ data = [ record.created, record.process, record.name, record.levelname, record.levelno, record.filename, record.lineno, record.msg, ] self.__db_query('insertLogs', data) if __name__ == "__main__": log = logging.getLogger(__name__) log.setLevel(10) logDB = LogHandlerSQLite() log.addHandler(logDB) log.error('Here is a info') print(logDB.get_log_by_pid()) t = LogHandlerSQLite()