Files
logHandlerSQLite/LogHandlerSQLite.py
2024-08-10 23:24:37 +02:00

161 lines
5.0 KiB
Python

import logging
import os
import sqlite3 as sql
import shutil
class LogHandlerSQLite(logging.Handler):
"""logging handler for sqlite3"""
__AUTHOR__ = 'anima'
__VERSION__ = '1.1.0'
def __init__(self, level: int | str = 0, dbfile: str = 'logs.db') -> None:
"""logging handler for sqlite3
:params level: loglevel (see logging lib)
:type level: int | str
:params dbfile: filename for sqlite db
:type dbfile: str
"""
self._dbfile : str = dbfile
self._sqlfiles : str = os.path.dirname(os.path.realpath(__file__)) + '/sql'
self.dbVersion : str = '1.0' # needed db version
self.__setup_db()
super().__init__(level)
##> db general
def __db_con(self) -> sql.Connection:
"""connection handler for with statements
:return: connection object
:rtype: sqlite3.Connection
"""
return sql.connect(self._dbfile, detect_types=sql.PARSE_DECLTYPES | sql.PARSE_COLNAMES)
def __db_query(self, queryfile: str, payload: list = None, getId: bool = False) -> list | None:
"""send a query to db set at `dbfile`
:param queryfile: filename for sql file (without .sql) multible querys per file possable (split with ;)
:type queryfile: str
:param payload: send payload to sql query
:type payload: list
:param getId: if True this will return the id of newest row inerst form this query
:type getId: bool
:return: db data is query was select or None if no data matches # known bug: works only if payload set (empty possable)
:rtype: list | None
"""
with open(f'{self._sqlfiles}/{queryfile}.sql', 'r') as sqlfile:
sqlscript = sqlfile.read()
with self.__db_con() as con:
cur = con.cursor()
if isinstance(payload, list):
cur.execute(sqlscript, payload)
id = cur.lastrowid
else:
cur.executescript(sqlscript)
data = cur.fetchall()
if getId:
return id
elif len(data) > 0:
return data
def __setup_db(self) -> None:
"""init a new db if need check version of db and start update until `dbVersion` match
:return: none
"""
self.__db_query('tables')
if self.__get_db_version() is None:
self.__db_query('insertDbVersion')
while not self.dbVersion == self.__get_db_version():
self.__update_db(self.__get_db_version())
def __update_db(self, version) -> None:
"""update db file; create a backup for every version
:param version: needed version; version structure conroled by sql files
:type version: str
:return: none
"""
shutil.copyfile(f'{self._dbfile}', f'{self._dbfile}.v-{version}.bak')
self.__db_query(f'update/updateDb-{version}')
##> select (get) querys
def __get_db_version(self) -> str | None:
"""read dbVersion from db
:return: version from db
:rtype: str
"""
data = self.__db_query('selectDbVersion', [])
if data is None:
return data
else:
return data[0][0]
def get_log(self) -> list[tuple] | None:
"""get all logs from db
:return: all logs from db
:rtype: list[tuple]
"""
return self.__db_query('selectLogs', [])
def get_log_by_pid(self, pid: int = os.getpid()) -> list[tuple] | None:
"""get all logs from db match the pid
:param pid: process id (default is own from running instance)
:type pid: int
:return: all logs match the pid or none
:rtype: list[tuple] | None
"""
if isinstance(pid, int):
return self.__db_query('selectLogsByPID', [pid])
else: return None
def search_log(self, search: str) -> list[tuple] | None:
"""search str in log message
:param search: search string for message
:type search: str
:return: all matched log entrys or None
:rtype: list[tuple] | None
"""
return self.__db_query('selectLogsLikeMessage', [f'%{search}%'])
##> logging methods
def emit(self, record: logging.LogRecord) -> None:
"""save log to db
:param record: logging record called from logging lib
:type record: logging.LogRecord
"""
data = [
record.created,
record.process,
record.name,
record.levelname,
record.levelno,
record.filename,
record.lineno,
record.msg,
]
self.__db_query('insertLogs', data)
if __name__ == "__main__":
log = logging.getLogger(__name__)
log.setLevel(10)
logDB = LogHandlerSQLite()
log.addHandler(logDB)
log.error('Here is a info')
print(logDB.get_log_by_pid())