161 lines
5.0 KiB
Python
161 lines
5.0 KiB
Python
import logging
|
|
|
|
import os
|
|
import sqlite3 as sql
|
|
import shutil
|
|
|
|
class LogHandlerSQLite(logging.Handler):
|
|
"""logging handler for sqlite3"""
|
|
__AUTHOR__ = 'anima'
|
|
__VERSION__ = '1.1.0'
|
|
|
|
def __init__(self, level: int | str = 0, dbfile: str = 'logs.db') -> None:
|
|
"""logging handler for sqlite3
|
|
|
|
:params level: loglevel (see logging lib)
|
|
:type level: int | str
|
|
:params dbfile: filename for sqlite db
|
|
:type dbfile: str
|
|
"""
|
|
self._dbfile : str = dbfile
|
|
self._sqlfiles : str = os.path.dirname(os.path.realpath(__file__)) + '/sql'
|
|
self.dbVersion : str = '1.0' # needed db version
|
|
self.__setup_db()
|
|
super().__init__(level)
|
|
|
|
##> db general
|
|
def __db_con(self) -> sql.Connection:
|
|
"""connection handler for with statements
|
|
|
|
:return: connection object
|
|
:rtype: sqlite3.Connection
|
|
"""
|
|
return sql.connect(self._dbfile, detect_types=sql.PARSE_DECLTYPES | sql.PARSE_COLNAMES)
|
|
|
|
def __db_query(self, queryfile: str, payload: list = None, getId: bool = False) -> list | None:
|
|
"""send a query to db set at `dbfile`
|
|
|
|
:param queryfile: filename for sql file (without .sql) multible querys per file possable (split with ;)
|
|
:type queryfile: str
|
|
:param payload: send payload to sql query
|
|
:type payload: list
|
|
:param getId: if True this will return the id of newest row inerst form this query
|
|
:type getId: bool
|
|
|
|
:return: db data is query was select or None if no data matches # known bug: works only if payload set (empty possable)
|
|
:rtype: list | None
|
|
"""
|
|
with open(f'{self._sqlfiles}/{queryfile}.sql', 'r') as sqlfile:
|
|
sqlscript = sqlfile.read()
|
|
|
|
with self.__db_con() as con:
|
|
cur = con.cursor()
|
|
if isinstance(payload, list):
|
|
cur.execute(sqlscript, payload)
|
|
id = cur.lastrowid
|
|
else:
|
|
cur.executescript(sqlscript)
|
|
data = cur.fetchall()
|
|
if getId:
|
|
return id
|
|
elif len(data) > 0:
|
|
return data
|
|
|
|
def __setup_db(self) -> None:
|
|
"""init a new db if need check version of db and start update until `dbVersion` match
|
|
|
|
:return: none
|
|
"""
|
|
self.__db_query('tables')
|
|
if self.__get_db_version() is None:
|
|
self.__db_query('insertDbVersion')
|
|
|
|
while not self.dbVersion == self.__get_db_version():
|
|
self.__update_db(self.__get_db_version())
|
|
|
|
def __update_db(self, version) -> None:
|
|
"""update db file; create a backup for every version
|
|
|
|
:param version: needed version; version structure conroled by sql files
|
|
:type version: str
|
|
|
|
:return: none
|
|
"""
|
|
shutil.copyfile(f'{self._dbfile}', f'{self._dbfile}.v-{version}.bak')
|
|
self.__db_query(f'update/updateDb-{version}')
|
|
|
|
##> select (get) querys
|
|
def __get_db_version(self) -> str | None:
|
|
"""read dbVersion from db
|
|
|
|
:return: version from db
|
|
:rtype: str
|
|
|
|
"""
|
|
data = self.__db_query('selectDbVersion', [])
|
|
if data is None:
|
|
return data
|
|
else:
|
|
return data[0][0]
|
|
|
|
def get_log(self) -> list[tuple] | None:
|
|
"""get all logs from db
|
|
|
|
:return: all logs from db
|
|
:rtype: list[tuple]
|
|
"""
|
|
return self.__db_query('selectLogs', [])
|
|
|
|
def get_log_by_pid(self, pid: int = os.getpid()) -> list[tuple] | None:
|
|
"""get all logs from db match the pid
|
|
|
|
:param pid: process id (default is own from running instance)
|
|
:type pid: int
|
|
|
|
:return: all logs match the pid or none
|
|
:rtype: list[tuple] | None
|
|
"""
|
|
if isinstance(pid, int):
|
|
return self.__db_query('selectLogsByPID', [pid])
|
|
else: return None
|
|
|
|
def search_log(self, search: str) -> list[tuple] | None:
|
|
"""search str in log message
|
|
|
|
:param search: search string for message
|
|
:type search: str
|
|
|
|
:return: all matched log entrys or None
|
|
:rtype: list[tuple] | None
|
|
"""
|
|
return self.__db_query('selectLogsLikeMessage', [f'%{search}%'])
|
|
|
|
##> logging methods
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
"""save log to db
|
|
|
|
:param record: logging record called from logging lib
|
|
:type record: logging.LogRecord
|
|
|
|
"""
|
|
data = [
|
|
record.created,
|
|
record.process,
|
|
record.name,
|
|
record.levelname,
|
|
record.levelno,
|
|
record.filename,
|
|
record.lineno,
|
|
record.msg,
|
|
]
|
|
self.__db_query('insertLogs', data)
|
|
|
|
if __name__ == "__main__":
|
|
log = logging.getLogger(__name__)
|
|
log.setLevel(10)
|
|
|
|
logDB = LogHandlerSQLite()
|
|
log.addHandler(logDB)
|
|
|
|
log.error('Here is a info')
|
|
print(logDB.get_log_by_pid()) |