#!/bin/python3 ######################### # # timeTrack.py # by 4nima # v.0.3.0 # ######################### # simple time tracking with database ######################### import datetime from sqlite3.dbapi2 import Cursor import time import sqlite3 import json import os import logging # Main class class TimeTrack: def __init__(self, DATABASE='timetrack.db', CONFIG='timetrack.conf'): self.DATABASE = DATABASE self.CONFIG = CONFIG self.USERID = 0 self.USERNAME = '' self.LOGFILE = 'timetrack.log' logging.basicConfig( filename=self.LOGFILE, level=logging.DEBUG, format='%(asctime)s - %(process)d-%(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) self.db_setup() self.load_config() ## Prepartation ### Creates a database if none is found def db_setup(self): if os.path.isfile(self.DATABASE): logging.info('Database file was found') else: logging.info('Database file was not found, will create ') sql = [] sql.append(""" CREATE TABLE IF NOT EXISTS time_entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, starttime TIMESTAMP NOT NULL, endtime TIMESTAMP NOT NULL, user_id INTEGER NOT NULL, activity TEXT, catrgory_id INTEGER, client_id INTEGER, lock BOOLEAN ) """) sql.append(""" CREATE TABLE IF NOT EXISTS active_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, starttime TIMESTAMP NOT NULL, user_id INT NOT NULL ) """) sql.append(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, worktime INT, worktime_span INT ) """) sql.append(""" CREATE TABLE IF NOT EXISTS clients ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL ) """) sql.append(""" CREATE TABLE IF NOT EXISTS categories ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL ) """) connect = sqlite3.connect(self.DATABASE) cursor = connect.cursor() logging.debug('Create initial database tables') for SQL in sql: try: cursor.execute(SQL) except: logging.error('Table could not be created') logging.debug(SQL) print('TimeTrack wird beendet: Fehler bei Datanbank Setup') quit() else: logging.debug('Table was created successfully or already exists') connect.commit() connect.close() ### Loads or creates a config file def load_config(self): if os.path.isfile(self.CONFIG): logging.info('Config file was found') try: with open(self.CONFIG) as config_data: data = json.load(config_data) except ValueError: logging.error('Config file has no valid JSON syntax') print('TimeTrack wird beendet: Fehler in der JSON-Konfig ({})'.format(self.CONFIG)) quit() else: logging.info('Config file was loaded successfully') self.USERID = data['user'] logging.debug('UserID {} was used'.format(data['user'])) self.set_user() else: logging.warning('Config file not found') config = { 'user' : 1, 'default' : 'interactive' } with open(self.CONFIG, "w") as outfile: json.dump(config, outfile, indent=4, sort_keys=True) logging.info('Config file created successfully') #> wenn man keine datei erstellen darf/kann, fällt das skript in einen loop ? self.load_config() ## user handling ### Creates a user who does not yet exist def create_user(self, USER=''): if USER == '': username = input('Benutzername eingaben: ') logging.debug('Selected username: {}'.format(username)) else: username = USER while self.get_users(NAME=username): print('Der gewünsche Benutzername ({}) ist schon vergeben'.format(username)) username = input('Wähle einen anderen Benutzernamen: ') logging.debug('Try again: Selected username: {}'.format(username)) logging.debug('Accepted username: {}'.format(username)) connect = sqlite3.connect(self.DATABASE) cursor = connect.cursor() sql = "INSERT INTO users ( name ) values ( ? )" try: cursor.execute(sql, [username]) except: logging.error('User could not be saved in database') logging.debug(sql) else: logging.info('User was saved successfully') connect.commit() connect.close() ### Outputs existing users from the DB def get_users(self, UID=0, NAME=''): if not UID == 0: logging.debug('Get user by ID: {}'.format(UID)) data = UID sql = "SELECT * FROM users WHERE id = ?" elif not NAME == '': logging.debug('Get user by username: {}'.format(NAME)) data = NAME sql = "SELECT * FROM users WHERE name LIKE ?" else: logging.debug('Get all users') data = '' sql = "SELECT * FROM users" connect = sqlite3.connect(self.DATABASE) cursor = connect.cursor() try: if data: cursor.execute(sql, [data]) else: cursor.execute(sql) except: logging.error('Could not get user') logging.debug(sql) print('Fehler beim Zugriff auf die Benutzer Datenbank') return 1 else: logging.debug('User database read out successfully') connect.commit() data = cursor.fetchall() output = [] for row in data: output.append(row) connect.close() return output ### Defines a user for the session def set_user(self): data = self.get_users() if not data: logging.info("No user was found") print("Es wurde kein Benutzer gefunden, bitte legen sie einen neuen an.") self.create_user() data = self.get_users(UID=self.USERID) if data == []: logging.error('User ID was not found') else: self.USERNAME = data[0][1] ## Time handling ### Creates an active event if none exists for the user def set_event(self, TIME=datetime.datetime.now()): if not self.get_event(USERID=self.USERID): logging.debug('No active events found for the user: {}'.format(self.USERID)) connect = sqlite3.connect(self.DATABASE) cursor = connect.cursor() sql = "INSERT INTO active_events ( starttime, user_id ) VALUES ( ?, ? )" try: cursor.execute(sql, [TIME, self.USERID]) except: logging.error('Event could not be created') logging.debug(sql) print('Event konnte nicht gespeichert werden.') else: logging.info('Event was created successfully') connect.commit() connect.close() else: logging.warning('Active events found for the user, new event could not be created') ### Deletes an active event based on a user or event ID def delete_event(self, ENTRYID='', USERID=''): if not ENTRYID == '': logging.info('Deletes event based on eventid: {}'.format(USERID)) sql = "DELETE FROM active_events WHERE id = ?" data = ENTRYID elif not USERID == '': logging.info('Deletes events based on userid: {}'.format(USERID)) sql = "DELETE FROM active_events WHERE user_id = ?" data = USERID else: logging.warning('No indication of what should be deleted') print('Keine angabe was gelöscht werden soll') return 0 connect = sqlite3.connect(self.DATABASE, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) cursor = connect.cursor() try: cursor.execute(sql, [data]) except: logging.error('Event could not be deleted') logging.debug(sql) print('Fehler beim löschen des Events.') else: logging.debug('Event was successfully deleted') connect.commit() connect.close() ### Get an active event based on a user or event ID def get_event(self, ENTRYID='', USERID=''): if not ENTRYID == '': logging.debug('Search event based on eventid: {}'.format(ENTRYID)) sql = "SELECT * FROM active_events WHERE id = ?" data = ENTRYID elif not USERID == '': logging.debug('Search events based on userid: {}'.format(USERID)) sql = "SELECT * FROM active_events WHERE user_id = ?" data = USERID else: sql = "SELECT * FROM active_events" data = '' connect = sqlite3.connect(self.DATABASE, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) cursor = connect.cursor() try: if data: logging.debug('Search event') cursor.execute(sql, [data]) else: logging.debug('Get all Events') cursor.execute(sql) except: logging.error('Events could not be read') logging.debug(sql) print('Fehler beim auslesen der aktiven Events') else: logging.debug('Events could be read out successfully') data = cursor.fetchall() connect.close() if data == []: logging.debug('No active events found') return 0 else: logging.debug('{} events found'.format(len(data))) return data[0] #################################################################################################### #################################################################################################### #################################################################################################### #################################################################################################### def time_start(self): pass def time_end(self): pass def get_time(self): pass test = TimeTrack() #test.delete_event(USERID=1) #test.set_event() test.get_event()