Files
api-pihole/PiholeAPI.py
2025-03-07 18:41:24 +01:00

407 lines
13 KiB
Python

#!/usr/bin/env python3
"""pihole v6 API wrapper"""
"""
requirements:
- pip3 install requests
-
"""
__version__ = '0.3.0'
__author__ = 'anima'
# imports
import logging
import requests
import json
from datetime import datetime, timedelta
from os import path
# log settings
logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
class PiholeAPI:
def __init__(self, config: str = 'pihole.json', host: str = None, port: int = 80, ssl: bool = False, password: str = None):
self.host = host
self.port = port
self.ssl = ssl
self.password = password
self.valid_auth = datetime.now()
self.sid = None
self.csrf = None
if path.exists(config):
with open(config, 'r') as file:
for key, value in json.load(file).items():
self.__dict__[key] = value
def __query(self, query: str, method: str = 'GET', payload: dict = None, auth_need: bool = True):
headers = dict()
# check if http method valid
valid_methods = ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']
if method not in valid_methods:
logging.error(f'http {method=} is none of {valid_methods=}')
return False
# refresh auth if required and set auth info
if auth_need:
if not self._check_auth():
self.do_auth()
headers['X-FTL-SID'] = self.sid
headers['X-FTL-CSRF'] = self.csrf
# create url
url = 'http'
if self.ssl:
url += 's'
url += f'://{self.host}:{self.port}/api/{query}'
if payload:
response = requests.request(method=method, url=url, headers=headers, json=payload, verify=False)
else:
response = requests.request(method=method, url=url, headers=headers, verify=False)
match response.status_code:
case (200):
# comment: successfull request
return json.loads(response.content)
case (204):
# comment: successfull (e.g. auth delete)
return True
case (_):
# comment: default fallback
print()
print(response.__dict__)
print(response.raw)
return False
#
## Auth based methods
#
def need_auth(self) -> bool:
"""generate a valid session if no auth required
# TODO: better handling if not successful
Returns:
bool: True if a valid session created
"""
data = self.__query('auth', auth_need=False)
if not data:
logging.error(f'can not get a valid session without password')
return False
if data['session']['valid']:
self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity'])
self.sid = data['session']['sid']
self.csrf = data['session']['csrf']
logging.info(f'successfull new auth created until {self.valid_auth}')
return True
return False
def _check_auth(self) -> bool:
"""check if an auth valid (in time)
Returns:
bool: True if auth valid
"""
now = datetime.now()
if datetime.now() < self.valid_auth:
return True
return False
def do_auth(self) -> bool:
"""auth via password to get sid / csrf
# TODO: TOTP support
Returns:
bool: True if auth successfull
"""
payload = {'password': self.password}
data = self.__query('auth', 'POST', payload, auth_need=False)
if not data:
raise PermissionError(f'Authentication not possible')
if data['session']['valid']:
self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity'])
self.sid = data['session']['sid']
self.csrf = data['session']['csrf']
logging.info(f'successfull new auth created until {self.valid_auth}')
return True
return False
def get_sessions(self) -> list | None:
"""List of all current sessions
Returns:
list | None: list of sessions if successfull query
"""
data = self.__query('auth/sessions')
if data:
return data['sessions']
return None
def delete_session(self, session_id: int = None) -> bool:
"""Delete session by ID
with no arguments it will clear it own session
Args:
session_id (int, optional): Session ID get from get_sessions. Defaults to None.
Returns:
bool: True if session delete successfull
"""
if session_id is None:
data = self.get_sessions()
if not data:
return False
for session in data:
if session['current_session']:
return self.delete_session(session['id'])
if self.__query(f'auth/session/{session_id}', 'DELETE'):
logging.info(f'successfull delete session with {session_id=}')
return True
def clear_sessions(self):
data = self.get_sessions()
for session in data:
if not session['current_session']:
if not self.delete_session(session['id']):
return False
return True
def get_app_password(self) -> dict | None:
"""Create new application password
This password can be used for API instet of regular password an TOTP.
All sessions will be delte if this password set!
# TODO: set password (hash) to PATCH /api/config/webserver/api/app_pwhash
Returns:
dict | None: if successful: app password and hash
"""
data = self.__query('auth/app')
return data['app']
#
## metrics based methods
#
def get_history(self) -> list | None:
"""Get activity graph data
Returns:
list | None: if successfull: list of history query data
"""
data = self.__query('history')
if not data:
return None
return data['history']
def get_history_timerange(self, starttime: int | datetime, endtime: int | datetime) -> list | None:
"""Get activity graph data (long-term data)
# TODO: BUG - always response with You need to specify both \\"from\\" and \\"until\\" in the request.
Args:
starttime (int | datetime): start time of data collection (int as timestamp)
endtime (int | datetime): end time of data collection (int as timestamp)
Returns:
list | None: if successfull: list of history query data in given time range
"""
payload = dict()
if isinstance(starttime, int) and isinstance(endtime, int):
payload['from'] = starttime
payload['until'] = endtime
if isinstance(starttime, datetime) and isinstance(endtime, datetime):
payload['from'] = int(starttime.timestamp())
payload['until'] = int(endtime.timestamp())
if len(payload.keys()) > 0:
data = self.__query('history/database', payload=payload)
if not data:
return None
return data
def get_client_history(self) -> list | None:
"""Get per-client activity graph data
Returns:
list | None: if successfull: list of history query data by client
"""
data = self.__query('history/clients')
if not data:
return None
return data['history']
def get_client_history_timerange(self, starttime: int | datetime, endtime: int | datetime) -> list | None:
"""Get activity graph data (long-term data)
# TODO: BUG - always response with You need to specify both \\"from\\" and \\"until\\" in the request.
Args:
starttime (int | datetime): start time of data collection (int as timestamp)
endtime (int | datetime): end time of data collection (int as timestamp)
Returns:
list | None: if successfull: list of history query data by client in given time range
"""
payload = dict()
if isinstance(starttime, int) and isinstance(endtime, int):
payload['from'] = starttime
payload['until'] = endtime
if isinstance(starttime, datetime) and isinstance(endtime, datetime):
payload['from'] = int(starttime.timestamp())
payload['until'] = int(endtime.timestamp())
if len(payload.keys()) > 0:
data = self.__query('history/database', payload=payload)
if not data:
return None
return data
def get_queries(self) -> list | None:
"""Request query details. Query parameters may be used to limit the number of results.
# TODO: add arguments
Returns:
list | None: By default, this API callback returns the most recent 100 queries.
"""
data = self.__query('queries')
if not data:
return None
print(data['queries'][0])
return data['queries']
def get_upstrams(self) -> list | None:
data = self.__query('stats/upstreams')
print(data)
#
## confg methods
#
def get_config(self, element: str = None) -> dict | None:
"""Get current configuration of your Pi-hole.
Can be use to modify and dump back via self.patch_config(config)
Args:
element (str, optional): define sub elements of config. Defaults to None.
Returns:
dict | None: if successfull : dict with config (sub elements)
"""
if element:
data = self.__query(f'config/{element}')
else:
data = self.__query('config')
return data
def patch_config(self, config: dict) -> bool:
"""Change configuration of your Pi-hole
Args:
config (dict): new config to push
Returns:
bool: true if query send successfull
"""
if not 'config' in config.keys():
return False
payload = config
data = self.__query('config', 'PATCH', payload=payload)
if data:
return True
return False
def get_dns_host(self, host: list[str, str], match_ip: bool = True, match_fqdn: bool = True) -> list[list[str, str]] | None:
"""get specific dns host from custom dns records
Args:
host (list[str, str]): list with [ip, fqdn]
match_ip (bool, optional): controls if ip will be check. Defaults to True.
match_fqdn (bool, optional): controls if fqdn will be check. Defaults to True.
Returns:
list[list[str, str]] | None: list with results of found hosts [[ip, fqdn], [ip, fqdn]]
"""
if isinstance(host, list) and len(host) != 2:
return None
data = self.get_config('dns/hosts')
if not data:
return None
results = list()
for entry in data['config']['dns']['hosts']:
entry_host = entry.split(' ')
if (not match_ip or entry_host[0] == host[0]) and (not match_fqdn or entry_host[1] == host[1]):
results.append(entry_host)
return results
def add_dns_host(self, host: list[str, str]) -> bool:
"""add a new custom dns record
Args:
host (list[str, str]): list with [ip, fqdn]
Returns:
bool: True if new entry exists or new created
"""
if not isinstance(host, list) and len(host) != 2:
return False
data = self.get_config('dns/hosts')
if not data:
return False
create_new = True
results = self.get_dns_host(host)
if results:
for result in results:
if result == host:
create_new = False
if create_new:
data['config']['dns']['hosts'].append(' '.join(host))
return self.patch_config(data)
else:
return True
def remove_dns_host(self, host: list[str, str], match_ip: bool = True, match_fqdn: bool = True) -> bool:
"""remove given host
Args:
host (list[str, str]): list with [ip, fqdn]
match_ip (bool, optional): controls if ip will be check. Defaults to True.
match_fqdn (bool, optional): controls if fqdn will be check. Defaults to True.
Returns:
bool: True if host(s) successfull removed
"""
if not isinstance(host, list) and len(host) != 2:
return False
data = self.get_config('dns/hosts')
if not data:
return False
hosts = data['config']['dns']['hosts'][::]
for entry in data['config']['dns']['hosts']:
entry_host = entry.split(' ')
if (not match_ip or entry_host[0] == host[0]) and (not match_fqdn or entry_host[1] == host[1]):
hosts.remove(entry)
data['config']['dns']['hosts'] = hosts
return self.patch_config(data)
def main():
pi = PiholeAPI()
pi.clear_sessions()
config = pi.get_config()
print(config)
hosts = pi.get_dns_host(['', 'myhost.example.org'], match_fqdn=False)
print(hosts)
if __name__ == '__main__':
main()