From 34960aa6083b79fb12725a8de66ae3b43fb1d7c3 Mon Sep 17 00:00:00 2001 From: anima Date: Fri, 7 Mar 2025 15:43:30 +0100 Subject: [PATCH] inital version for v6 api --- PiholeAPI.py | 175 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 PiholeAPI.py diff --git a/PiholeAPI.py b/PiholeAPI.py new file mode 100644 index 0000000..2304836 --- /dev/null +++ b/PiholeAPI.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +"""pihole v6 API wrapper""" +""" +requirements: +- pip3 install requests +- +""" +__version__ = '0.1.0' +__author__ = 'anima' + +# imports +import logging +import requests +import json +from datetime import datetime, timedelta + + +# log settings +logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO) + + +class PiholeAPI: + def __init__(self, url: str, port: int = 80, ssl: bool = False, passwd: str = None): + self.url = url + self.port = port + self.ssl = ssl + self.passwd = passwd + self.valid_auth = datetime.now() + self.sid = None + self.csrf = None + + def __query(self, query: str, method: str = 'GET', payload: dict = None, auth_need: bool = True): + headers = dict() + + # check if http method valid + valid_methods = ['GET', 'POST', 'PUT', 'DELETE'] + if method not in valid_methods: + logging.error(f'http {method=} is none of {valid_methods=}') + return False + + # refresh auth if required and set auth info + if auth_need: + if not self._check_auth(): + self.do_auth() + headers['X-FTL-SID'] = self.sid + headers['X-FTL-CSRF'] = self.csrf + + # create url + url = 'http' + if self.ssl: + url += 's' + url += f'://{self.url}:{self.port}/api/{query}' + + if payload: + response = requests.request(method=method, url=url, headers=headers, json=payload, verify=False) + else: + response = requests.request(method=method, url=url, headers=headers, verify=False) + + match response.status_code: + case (200): + # comment: successfull request + return json.loads(response.content) + case (204): + # comment: successfull (e.g. auth delete) + return True + case (_): + # comment: default fallback + print() + print(response.__dict__) + return False + # + ## Auth based methods + # + def need_auth(self) -> bool: + """generate a valid session if no auth required + # TODO: better handling if not successful + + Returns: + bool: True if a valid session created + """ + data = self.__query('auth', auth_need=False) + if not data: + logging.error(f'can not get a valid session without password') + return False + if data['session']['valid']: + self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity']) + self.sid = data['session']['sid'] + self.csrf = data['session']['csrf'] + logging.info(f'successfull new auth created until {self.valid_auth}') + return True + return False + + def _check_auth(self) -> bool: + """check if an auth valid (in time) + + Returns: + bool: True if auth valid + """ + now = datetime.now() + if datetime.now() < self.valid_auth: + return True + return False + + def do_auth(self) -> bool: + """auth via password to get sid / csrf + + # TODO: TOTP support + + Returns: + bool: True if auth successfull + """ + payload = {'password': self.passwd} + data = self.__query('auth', 'POST', payload, auth_need=False) + if not data: + raise PermissionError(f'Authentication not possible') + if data['session']['valid']: + self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity']) + self.sid = data['session']['sid'] + self.csrf = data['session']['csrf'] + logging.info(f'successfull new auth created until {self.valid_auth}') + return True + return False + + def get_sessions(self) -> list | None: + """List of all current sessions + + Returns: + list | None: list of sessions if successfull query + """ + data = self.__query('auth/sessions') + if data: + return data['sessions'] + return False + + def delete_session(self, session_id: int = None) -> bool: + """Delete session by ID + + Args: + session_id (int, optional): Session ID get from get_sessions. Defaults to None. + + Returns: + bool: True if session delete successfull + """ + if session_id is None: + data = self.get_sessions() + if not data: + return False + for session in data: + if session['current_session']: + return self.delete_session(session['id']) + if self.__query(f'auth/session/{session_id}', 'DELETE'): + logging.info(f'successfull delete session with {session_id=}') + return True + + def get_app_password(self) -> dict | None: + """Create new application password + + This password can be used for API instet of regular password an TOTP. + All sessions will be delte if this password set! + # TODO: set password (hash) to PATCH /api/config/webserver/api/app_pwhash + + Returns: + dict | None: if successful: app password and hash + """ + data = self.__query('auth/app') + return data['app'] + + +def main(): + pi = PiholeAPI(url='localhost', passwd='my-password!') + pi.get_sessions() + + +if __name__ == '__main__': + main() \ No newline at end of file