#!/usr/bin/env python3 """pihole v6 API wrapper""" """ requirements: - pip3 install requests - """ __version__ = '0.1.0' __author__ = 'anima' # imports import logging import requests import json from datetime import datetime, timedelta from os import path # log settings logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO) class PiholeAPI: def __init__(self, config: str = 'pihole.json', host: str = None, port: int = 80, ssl: bool = False, password: str = None): self.host = host self.port = port self.ssl = ssl self.password = password self.valid_auth = datetime.now() self.sid = None self.csrf = None if path.exists(config): with open(config, 'r') as file: for key, value in json.load(file).items(): self.__dict__[key] = value def __query(self, query: str, method: str = 'GET', payload: dict = None, auth_need: bool = True): headers = dict() # check if http method valid valid_methods = ['GET', 'POST', 'PUT', 'DELETE'] if method not in valid_methods: logging.error(f'http {method=} is none of {valid_methods=}') return False # refresh auth if required and set auth info if auth_need: if not self._check_auth(): self.do_auth() headers['X-FTL-SID'] = self.sid headers['X-FTL-CSRF'] = self.csrf # create url url = 'http' if self.ssl: url += 's' url += f'://{self.host}:{self.port}/api/{query}' if payload: response = requests.request(method=method, url=url, headers=headers, json=payload, verify=False) else: response = requests.request(method=method, url=url, headers=headers, verify=False) match response.status_code: case (200): # comment: successfull request return json.loads(response.content) case (204): # comment: successfull (e.g. auth delete) return True case (_): # comment: default fallback print() print(response.__dict__) print(response.raw) return False # ## Auth based methods # def need_auth(self) -> bool: """generate a valid session if no auth required # TODO: better handling if not successful Returns: bool: True if a valid session created """ data = self.__query('auth', auth_need=False) if not data: logging.error(f'can not get a valid session without password') return False if data['session']['valid']: self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity']) self.sid = data['session']['sid'] self.csrf = data['session']['csrf'] logging.info(f'successfull new auth created until {self.valid_auth}') return True return False def _check_auth(self) -> bool: """check if an auth valid (in time) Returns: bool: True if auth valid """ now = datetime.now() if datetime.now() < self.valid_auth: return True return False def do_auth(self) -> bool: """auth via password to get sid / csrf # TODO: TOTP support Returns: bool: True if auth successfull """ payload = {'password': self.password} data = self.__query('auth', 'POST', payload, auth_need=False) if not data: raise PermissionError(f'Authentication not possible') if data['session']['valid']: self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity']) self.sid = data['session']['sid'] self.csrf = data['session']['csrf'] logging.info(f'successfull new auth created until {self.valid_auth}') return True return False def get_sessions(self) -> list | None: """List of all current sessions Returns: list | None: list of sessions if successfull query """ data = self.__query('auth/sessions') if data: return data['sessions'] return None def delete_session(self, session_id: int = None) -> bool: """Delete session by ID Args: session_id (int, optional): Session ID get from get_sessions. Defaults to None. Returns: bool: True if session delete successfull """ if session_id is None: data = self.get_sessions() if not data: return False for session in data: if session['current_session']: return self.delete_session(session['id']) if self.__query(f'auth/session/{session_id}', 'DELETE'): logging.info(f'successfull delete session with {session_id=}') return True def clear_sessions(self): data = self.get_sessions() for session in data: if not session['current_session']: if not self.delete_session(session['id']): return False return True def get_app_password(self) -> dict | None: """Create new application password This password can be used for API instet of regular password an TOTP. All sessions will be delte if this password set! # TODO: set password (hash) to PATCH /api/config/webserver/api/app_pwhash Returns: dict | None: if successful: app password and hash """ data = self.__query('auth/app') return data['app'] # ## metrics based methods # def get_history(self) -> list | None: """Get activity graph data Returns: list | None: if successfull: list of history query data """ data = self.__query('history') if not data: return None return data['history'] def get_history_timerange(self, starttime: int | datetime, endtime: int | datetime) -> list | None: """Get activity graph data (long-term data) # TODO: BUG - always response with You need to specify both \\"from\\" and \\"until\\" in the request. Args: starttime (int | datetime): start time of data collection (int as timestamp) endtime (int | datetime): end time of data collection (int as timestamp) Returns: list | None: if successfull: list of history query data in given time range """ payload = dict() if isinstance(starttime, int) and isinstance(endtime, int): payload['from'] = starttime payload['until'] = endtime if isinstance(starttime, datetime) and isinstance(endtime, datetime): payload['from'] = int(starttime.timestamp()) payload['until'] = int(endtime.timestamp()) if len(payload.keys()) > 0: data = self.__query('history/database', payload=payload) if not data: return None return data def get_client_history(self) -> list | None: """Get per-client activity graph data Returns: list | None: if successfull: list of history query data by client """ data = self.__query('history/clients') if not data: return None return data['history'] def get_client_history_timerange(self, starttime: int | datetime, endtime: int | datetime) -> list | None: """Get activity graph data (long-term data) # TODO: BUG - always response with You need to specify both \\"from\\" and \\"until\\" in the request. Args: starttime (int | datetime): start time of data collection (int as timestamp) endtime (int | datetime): end time of data collection (int as timestamp) Returns: list | None: if successfull: list of history query data by client in given time range """ payload = dict() if isinstance(starttime, int) and isinstance(endtime, int): payload['from'] = starttime payload['until'] = endtime if isinstance(starttime, datetime) and isinstance(endtime, datetime): payload['from'] = int(starttime.timestamp()) payload['until'] = int(endtime.timestamp()) if len(payload.keys()) > 0: data = self.__query('history/database', payload=payload) if not data: return None return data def get_queries(self) -> list | None: """Request query details. Query parameters may be used to limit the number of results. # TODO: add arguments Returns: list | None: By default, this API callback returns the most recent 100 queries. """ data = self.__query('queries') if not data: return None print(data['queries'][0]) return data['queries'] def get_upstrams(self) -> list | None: data = self.__query('stats/upstreams') print(data) def main(): pi = PiholeAPI() pi.clear_sessions() pi.get_upstrams() if __name__ == '__main__': main()