#!/usr/bin/env python3 """pihole v6 API wrapper""" """ requirements: - pip3 install requests - """ __version__ = '0.1.0' __author__ = 'anima' # imports import logging import requests import json from datetime import datetime, timedelta from os import path # log settings logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO) class PiholeAPI: def __init__(self, config: str = 'pihole.json', host: str = None, port: int = 80, ssl: bool = False, password: str = None): self.host = host self.port = port self.ssl = ssl self.password = password self.valid_auth = datetime.now() self.sid = None self.csrf = None if path.exists(config): with open(config, 'r') as file: for key, value in json.load(file).items(): self.__dict__[key] = value def __query(self, query: str, method: str = 'GET', payload: dict = None, auth_need: bool = True): headers = dict() # check if http method valid valid_methods = ['GET', 'POST', 'PUT', 'PATCH', 'DELETE'] if method not in valid_methods: logging.error(f'http {method=} is none of {valid_methods=}') return False # refresh auth if required and set auth info if auth_need: if not self._check_auth(): self.do_auth() headers['X-FTL-SID'] = self.sid headers['X-FTL-CSRF'] = self.csrf # create url url = 'http' if self.ssl: url += 's' url += f'://{self.host}:{self.port}/api/{query}' if payload: response = requests.request(method=method, url=url, headers=headers, json=payload, verify=False) else: response = requests.request(method=method, url=url, headers=headers, verify=False) match response.status_code: case (200): # comment: successfull request return json.loads(response.content) case (204): # comment: successfull (e.g. auth delete) return True case (_): # comment: default fallback print() print(response.__dict__) print(response.raw) return False # ## Auth based methods # def need_auth(self) -> bool: """generate a valid session if no auth required # TODO: better handling if not successful Returns: bool: True if a valid session created """ data = self.__query('auth', auth_need=False) if not data: logging.error(f'can not get a valid session without password') return False if data['session']['valid']: self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity']) self.sid = data['session']['sid'] self.csrf = data['session']['csrf'] logging.info(f'successfull new auth created until {self.valid_auth}') return True return False def _check_auth(self) -> bool: """check if an auth valid (in time) Returns: bool: True if auth valid """ now = datetime.now() if datetime.now() < self.valid_auth: return True return False def do_auth(self) -> bool: """auth via password to get sid / csrf # TODO: TOTP support Returns: bool: True if auth successfull """ payload = {'password': self.password} data = self.__query('auth', 'POST', payload, auth_need=False) if not data: raise PermissionError(f'Authentication not possible') if data['session']['valid']: self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity']) self.sid = data['session']['sid'] self.csrf = data['session']['csrf'] logging.info(f'successfull new auth created until {self.valid_auth}') return True return False def get_sessions(self) -> list | None: """List of all current sessions Returns: list | None: list of sessions if successfull query """ data = self.__query('auth/sessions') if data: return data['sessions'] return None def delete_session(self, session_id: int = None) -> bool: """Delete session by ID with no arguments it will clear it own session Args: session_id (int, optional): Session ID get from get_sessions. Defaults to None. Returns: bool: True if session delete successfull """ if session_id is None: data = self.get_sessions() if not data: return False for session in data: if session['current_session']: return self.delete_session(session['id']) if self.__query(f'auth/session/{session_id}', 'DELETE'): logging.info(f'successfull delete session with {session_id=}') return True def clear_sessions(self): data = self.get_sessions() for session in data: if not session['current_session']: if not self.delete_session(session['id']): return False return True def get_app_password(self) -> dict | None: """Create new application password This password can be used for API instet of regular password an TOTP. All sessions will be delte if this password set! # TODO: set password (hash) to PATCH /api/config/webserver/api/app_pwhash Returns: dict | None: if successful: app password and hash """ data = self.__query('auth/app') return data['app'] # ## metrics based methods # def get_history(self) -> list | None: """Get activity graph data Returns: list | None: if successfull: list of history query data """ data = self.__query('history') if not data: return None return data['history'] def get_history_timerange(self, starttime: int | datetime, endtime: int | datetime) -> list | None: """Get activity graph data (long-term data) # TODO: BUG - always response with You need to specify both \\"from\\" and \\"until\\" in the request. Args: starttime (int | datetime): start time of data collection (int as timestamp) endtime (int | datetime): end time of data collection (int as timestamp) Returns: list | None: if successfull: list of history query data in given time range """ payload = dict() if isinstance(starttime, int) and isinstance(endtime, int): payload['from'] = starttime payload['until'] = endtime if isinstance(starttime, datetime) and isinstance(endtime, datetime): payload['from'] = int(starttime.timestamp()) payload['until'] = int(endtime.timestamp()) if len(payload.keys()) > 0: data = self.__query('history/database', payload=payload) if not data: return None return data def get_client_history(self) -> list | None: """Get per-client activity graph data Returns: list | None: if successfull: list of history query data by client """ data = self.__query('history/clients') if not data: return None return data['history'] def get_client_history_timerange(self, starttime: int | datetime, endtime: int | datetime) -> list | None: """Get activity graph data (long-term data) # TODO: BUG - always response with You need to specify both \\"from\\" and \\"until\\" in the request. Args: starttime (int | datetime): start time of data collection (int as timestamp) endtime (int | datetime): end time of data collection (int as timestamp) Returns: list | None: if successfull: list of history query data by client in given time range """ payload = dict() if isinstance(starttime, int) and isinstance(endtime, int): payload['from'] = starttime payload['until'] = endtime if isinstance(starttime, datetime) and isinstance(endtime, datetime): payload['from'] = int(starttime.timestamp()) payload['until'] = int(endtime.timestamp()) if len(payload.keys()) > 0: data = self.__query('history/database', payload=payload) if not data: return None return data def get_queries(self) -> list | None: """Request query details. Query parameters may be used to limit the number of results. # TODO: add arguments Returns: list | None: By default, this API callback returns the most recent 100 queries. """ data = self.__query('queries') if not data: return None print(data['queries'][0]) return data['queries'] def get_upstrams(self) -> list | None: data = self.__query('stats/upstreams') print(data) # ## confg methods # def get_config(self, element: str = None) -> dict | None: """Get current configuration of your Pi-hole. Can be use to modify and dump back via self.patch_config(config) Args: element (str, optional): define sub elements of config. Defaults to None. Returns: dict | None: if successfull : dict with config (sub elements) """ if element: data = self.__query(f'config/{element}') else: data = self.__query('config') return data def patch_config(self, config: dict) -> bool: """Change configuration of your Pi-hole Args: config (dict): new config to push Returns: bool: true if query send successfull """ if not 'config' in config.keys(): return False payload = config data = self.__query('config', 'PATCH', payload=payload) if data: return True return False def get_dns_host(self, host: list[str, str], match_ip: bool = True, match_fqdn: bool = True) -> list[list[str, str]] | None: """get specific dns host from custom dns records Args: host (list[str, str]): list with [ip, fqdn] match_ip (bool, optional): controls if ip will be check. Defaults to True. match_fqdn (bool, optional): controls if fqdn will be check. Defaults to True. Returns: list[list[str, str]] | None: list with results of found hosts [[ip, fqdn], [ip, fqdn]] """ if isinstance(host, list) and len(host) != 2: return None data = self.get_config('dns/hosts') if not data: return None results = list() for entry in data['config']['dns']['hosts']: entry_host = entry.split(' ') if (not match_ip or entry_host[0] == host[0]) and (not match_fqdn or entry_host[1] == host[1]): results.append(entry_host) return results def add_dns_host(self, host: list[str, str]) -> bool: """add a new custom dns record Args: host (list[str, str]): list with [ip, fqdn] Returns: bool: True if new entry exists or new created """ if not isinstance(host, list) and len(host) != 2: return False data = self.get_config('dns/hosts') if not data: return False create_new = True results = self.get_dns_host(host) if results: for result in results: if result == host: create_new = False if create_new: data['config']['dns']['hosts'].append(' '.join(host)) return self.patch_config(data) else: return True def remove_dns_host(self, host: list[str, str], match_ip: bool = True, match_fqdn: bool = True) -> bool: """remove given host Args: host (list[str, str]): list with [ip, fqdn] match_ip (bool, optional): controls if ip will be check. Defaults to True. match_fqdn (bool, optional): controls if fqdn will be check. Defaults to True. Returns: bool: True if host(s) successfull removed """ if not isinstance(host, list) and len(host) != 2: return False data = self.get_config('dns/hosts') if not data: return False hosts = data['config']['dns']['hosts'][::] for entry in data['config']['dns']['hosts']: entry_host = entry.split(' ') if (not match_ip or entry_host[0] == host[0]) and (not match_fqdn or entry_host[1] == host[1]): hosts.remove(entry) data['config']['dns']['hosts'] = hosts return self.patch_config(data) def main(): pi = PiholeAPI() pi.clear_sessions() config = pi.get_config() print(config) hosts = pi.get_dns_host(['', 'myhost.example.org'], match_fqdn=False) print(hosts) if __name__ == '__main__': main()