Compare commits

..

2 Commits

Author SHA1 Message Date
1609b8fb1a initial version 2024-08-09 16:37:30 +02:00
caf5371beb add *.db files to ignore 2024-08-09 15:28:16 +02:00
9 changed files with 187 additions and 0 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
*.db
*.conf
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

152
LogHandlerSQLite.py Normal file
View File

@@ -0,0 +1,152 @@
import logging
import os
import sqlite3 as sql
import shutil
class LogHandlerSQLite(logging.Handler):
"""logging handler for sqlite3"""
__AUTHOR__ = 'anima'
__VERSION__ = '1.0.0'
def __init__(self, level: int | str = 0, dbfile: str = 'logs.db') -> None:
"""logging handler for sqlite3
:params level: loglevel (see logging lib)
:type level: int | str
:params dbfile: filename for sqlite db
:type dbfile: str
"""
self._dbfile : str = dbfile
self._sqlfiles : str = os.path.dirname(os.path.realpath(__file__)) + '/sql'
self.dbVersion : str = '1.0' # needed db version
self.__setup_db()
super().__init__(level)
##> db general
def __db_con(self) -> sql.Connection:
"""connection handler for with statements
:return: connection object
:rtype: sqlite3.Connection
"""
return sql.connect(self._dbfile, detect_types=sql.PARSE_DECLTYPES | sql.PARSE_COLNAMES)
def __db_query(self, queryfile: str, payload: list = None, getId: bool = False) -> list | None:
"""send a query to db set at `dbfile`
:param queryfile: filename for sql file (without .sql) multible querys per file possable (split with ;)
:type queryfile: str
:param payload: send payload to sql query
:type payload: list
:param getId: if True this will return the id of newest row inerst form this query
:type getId: bool
:return: db data is query was select or None if no data matches # known bug: works only if payload set (empty possable)
:rtype: list | None
"""
with open(f'{self._sqlfiles}/{queryfile}.sql', 'r') as sqlfile:
sqlscript = sqlfile.read()
with self.__db_con() as con:
cur = con.cursor()
if isinstance(payload, list):
cur.execute(sqlscript, payload)
id = cur.lastrowid
else:
cur.executescript(sqlscript)
data = cur.fetchall()
if getId:
return id
elif len(data) > 0:
return data
def __setup_db(self) -> None:
"""init a new db if need check version of db and start update until `dbVersion` match
:return: none
"""
self.__db_query('tables')
if self.__get_db_version() is None:
self.__db_query('insertDbVersion')
while not self.dbVersion == self.__get_db_version():
self.__update_db(self.__get_db_version())
def __update_db(self, version) -> None:
"""update db file; create a backup for every version
:param version: needed version; version structure conroled by sql files
:type version: str
:return: none
"""
shutil.copyfile(f'{self._dbfile}', f'{self._dbfile}.v-{version}.bak')
self.__db_query(f'update/updateDb-{version}')
##> select (get) querys
def __get_db_version(self) -> str | None:
"""read dbVersion from db
:return: version from db
:rtype: str
"""
data = self.__db_query('selectDbVersion', [])
if data is None:
return data
else:
return data[0][0]
def get_log(self) -> list[tuple] | None:
"""get all logs from db
:return: all logs from db
:rtype: list[tuple]
"""
return self.__db_query('selectLogs', [])
def get_log_by_pid(self, pid: int = os.getpid()) -> list[tuple] | None:
"""get all logs from db match the pid
:param pid: process id (default is own from running instance)
:type pid: int
:return: all logs match the pid or none
:rtype: list[tuple] | None
"""
if isinstance(pid, int):
return self.__db_query('selectLogsByPID', [pid])
else: return None
##> logging methods
def emit(self, record: logging.LogRecord) -> None:
"""save log to db
:param record: logging record called from logging lib
:type record: logging.LogRecord
"""
data = [
record.created,
record.process,
record.name,
record.levelname,
record.levelno,
record.filename,
record.lineno,
record.msg,
]
self.__db_query('insertLogs', data)
if __name__ == "__main__":
log = logging.getLogger(__name__)
log.setLevel(10)
logDB = LogHandlerSQLite()
log.addHandler(logDB)
log.error('Here is a info')
print(logDB.get_log_by_pid())
t = LogHandlerSQLite()

1
__init__.py Normal file
View File

@@ -0,0 +1 @@
from .LogHandlerSQLite import LogHandlerSQLite

4
sql/insertDbVersion.sql Normal file
View File

@@ -0,0 +1,4 @@
INSERT INTO dbVersion
(dbVersion)
VALUES
('1.0');

5
sql/insertLogs.sql Normal file
View File

@@ -0,0 +1,5 @@
INSERT INTO logs
(logTime, process, logHandler, levelName, levelNo, fileName, fileLineNo, message)
VALUES
(:1, :2, :3, :4, :5, :6, :7, :8)
;

1
sql/selectDbVersion.sql Normal file
View File

@@ -0,0 +1 @@
SELECT * FROM dbVersion;

2
sql/selectLogs.sql Normal file
View File

@@ -0,0 +1,2 @@
SELECT *
FROM logs

4
sql/selectLogsByPID.sql Normal file
View File

@@ -0,0 +1,4 @@
SELECT *
FROM logs
WHERE process == ?
;

15
sql/tables.sql Normal file
View File

@@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS dbVersion (
dbVersion TEXT PRIMARY KEY NOT NULL
);
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
logTime datetime,
process INTEGER,
logHandler TEXT,
levelName TEXT,
levelNo INTEGER,
fileName TEXT,
fileLineNo INTEGER,
message TEXT NOT NULL
);