Files
api-pihole/Pihole.py
2024-10-19 17:58:55 +02:00

382 lines
13 KiB
Python

import requests
import json
import yaml
from os.path import exists
import logging
class Pihole:
"""API wrapper for Pi-hole"""
__AUTHOR__ = 'anima'
__VERSION__ = '1.0.0'
_log = logging.getLogger(__name__)
__logFile = logging.FileHandler(__name__)
__logPrinter = logging.StreamHandler()
__logFormat = logging.Formatter('%(asctime)s %(levelname)s \t %(name)s : %(message)s')
__logFile.setFormatter(__logFormat)
__logPrinter.setFormatter(__logFormat)
_log.addHandler(__logFile)
_log.addHandler(__logPrinter)
def __init__(self, host: str = None, token: str = None, port: int = None, ssl: bool = None, configfile: str = 'pihole.conf') -> None:
"""init a Pihole API wrapper class
:param host: ip or hostname of pihole instanz
:type host: str
:param token: api token get from pyhole gui (settings => api => show api token)
:type token: str
:param port: (optional) port if not 80 or 443
:type port: int
:param ssl: (optional) set true if connection to pyhole allow https (default is False)
:type ssl: bool
:param configfile: path to config file in yaml format (all values with None will read from config file)
:type configfile: str
"""
if host is None and token is None and not exists(configfile):
self._log.error(f'missing inforamtions {host=} or token not set and no config file exists')
raise NotImplementedError
# set defaults
self.host = host
self.token = token
self.port = port
if ssl is None:
self.ssl = False
# replace defaults if cconfig file exists
if exists(configfile):
with open(configfile, 'r') as f:
config = yaml.safe_load(f)
if host is None:
self._log.debug(f'load host from {configfile=}')
self.host = config['pihole']['host']
if token is None:
self._log.debug(f'load token from {configfile=}')
self.token = config['pihole']['token']
if port is None and 'port' in config['pihole'].keys():
self._log.debug(f'load port from {configfile=}')
self.port = config['pihole']['port']
if ssl is None and ssl in config['pihole'].keys():
self._log.debug(f'load ssl from {configfile=}')
self.ssl = config['pihole']['ssl']
def _query(self, query: str = None, data: dict = None) -> dict | None:
"""default query for a pihole api call
:param query: set query type
:type query: str
:param data: needed data for query as dict
:type data: dict
:return: dict with response if successfull or None
:rtype: dict | None
"""
if self.port is None and self.ssl:
self.port = 443
elif self.port is None:
self.port = 80
protocol = 'http'
if self.ssl:
protocol += 's'
if data is not None or query is not None:
if query is None:
query = ''
if data is not None:
for key, value in data.items():
query += f'&{key}={value}'
response = requests.post(f'{protocol}://{self.host}:{self.port}/admin/api.php?auth={self.__token}&{query}')
if response.status_code == 200:
url = response.url
response = json.loads(response.text)
return response
else:
self._log.error(f'return non 200: {response.status_code} - {response.url.replace(self.__token, "SECRETTOKEN")} - {response.text}')
return None
def get_custom_dns(self) -> dict | None:
"""get all custrom dns entrys (no cname's! see get_custom_cname)
:return: json response from pihole or None
:rtype: dict | None
"""
data = { 'action': 'get' }
return self._query('customdns', data)
def add_custom_dns(self, domain: str, ip: str, reload: bool = False) -> dict | None:
"""add custom domain entry for a ip
:param domain: domain name for ip
:type: str
:param ip: ip address of domain
:type ip: str
:pram reload: reload service FTL (default False)
:return: json response from pihole or None
:rtype: dict | None
"""
data = { 'action': 'add' }
data['domain'] = domain
data['ip'] = ip
if reload:
data['reload'] = 'true'
else:
data['reload'] = 'false'
self._log.info(f'try to add new {domain=} for {ip=}')
return self._query('customdns', data)
def del_custom_dns(self, domain: str, ip: str, reload: bool = False) -> dict | None:
"""delete custom domain entry for a ip
:param domain: domain name for ip
:type: str
:param ip: ip address of domain
:type ip: str
:pram reload: reload service FTL (default False)
:return: json response from pihole or None
:rtype: dict | None
"""
data = { 'action': 'delete' }
data['domain'] = domain
data['ip'] = ip
if reload:
data['reload'] = 'true'
else:
data['reload'] = 'false'
self._log.info(f'try to delete {domain=} for {ip=}')
return self._query('customdns', data)
def get_custom_cname(self) -> dict | None:
"""get all custrom cname entrys (no dns's! see get_custom_dns)
:return: json response from pihole or Noneö
:rtype: dict | None
"""
"""{{host}}/admin/api.php?auth={{token}}&customcname=1&action=get"""
data = { 'action': 'get' }
return self._query('customcname', data)
def add_custom_cname(self, domain: str, target: str, reload: bool = False) -> dict | None:
"""add custom cname entry for a ip
:param domain: domain name alias for a existing domain name
:type: str
:param target: existing domain
:type target: str
:pram reload: reload service FTL (default False)
:return: json response from pihole or None
:rtype: dict | None
"""
data = { 'action': 'add' }
data['domain'] = domain
data['target'] = target
if reload:
data['reload'] = 'true'
else:
data['reload'] = 'false'
self._log.info(f'try to add {domain=} cname for {target=}')
return self._query('customcname', data)
def del_custom_cname(self, domain: str, target: str, reload: bool = False) -> dict | None:
"""delete custom domain entry for a ip
:param domain: domain name alias for a existing domain name
:type: str
:param target: existing domain
:type target: str
:pram reload: reload service FTL (default False)
:return: json response from pihole or None
:rtype: dict | None
"""
data = { 'action': 'delete' }
data['domain'] = domain
data['target'] = target
if reload:
data['reload'] = 'true'
else:
data['reload'] = 'false'
self._log.info(f'try to delete {domain=} cname for {target=}')
return self._query('customcname', data)
def get_list(self, list: str) -> dict | None:
"""get all entrys from a filter list
:param list: valid list names: white, black, regex_white, regex_black. (white and black a exact match lists!)
:type list: str
:return: json response from pihole or None
:rtype: dict | None
"""
supportet_lists = ['white', 'black', 'regex_white', 'regex_black']
if list in supportet_lists:
data = { 'list': list }
return self._query(data=data)
else:
self._log.error(f'get_from_list got false {list=}')
return None
def get_whitelist(self):
"""alias for self.get_list('white')"""
return self.get_list('white')
def get_regex_whitelist(self):
"""alias for self.get_list('regex_white')"""
return self.get_list('regex_white')
def get_blacklist(self):
"""alias for self.get_list('black')"""
return self.get_list('black')
def get_regex_blacklist(self):
"""alias for self.get_list('regex_black')"""
return self.get_list('regex_black')
def add_to_list(self, list: str, entry: str):
"""add a entry to a filter list
:param list: valid list names: white, black, regex_white, regex_black. (white and black a exact match lists!)
:type list: str
:param entry: hostname or ip
:type entry: str
:return: json response from pihole or None
:rtype: dict | None
"""
supportet_lists = ['white', 'black', 'regex_white', 'regex_black']
if list in supportet_lists and entry is not None:
data = { 'list': list }
data['add'] = entry
return self._query(data=data)
else:
self._log.error(f'add_from_list got false {list=} or no {entry=}')
return None
def add_to_whitelist(self, entry: str):
"""alias for self.add_to_list('white')"""
return self.add_to_list('white', entry=entry)
def add_to_regex_whitelist(self, entry: str):
"""alias for self.add_to_list('regex_white')"""
return self.add_to_list('regex_white', entry=entry)
def add_to_blacklist(self, entry: str):
"""alias for self.add_to_list('black')"""
return self.add_to_list('black', entry=entry)
def add_to_regex_blacklist(self, entry: str):
"""alias for self.add_to_list('regex_black')"""
return self.add_to_list('regex_black', entry=entry)
def del_from_list(self, list: str, entry: str):
"""delete a entry to a filter list
:param list: valid list names: white, black, regex_white, regex_black. (white and black a exact match lists!)
:type list: str
:param entry: hostname or ip
:type entry: str
:return: json response from pihole or None
:rtype: dict | None
"""
supportet_lists = ['white', 'black', 'regex_white', 'regex_black']
if list in supportet_lists and entry is not None:
data = { 'list': list }
data['sub'] = entry
return self._query(data=data)
else:
self._log.error(f'del_from_list got false {list=} or no {entry=}')
return None
def del_from_whitelist(self, entry: str):
"""alias for self.del_from_list('white')"""
return self.del_from_list('white', entry=entry)
def del_from_regex_whitelist(self, entry: str):
"""alias for self.del_from_list('regex_white')"""
return self.del_from_list('regex_white', entry=entry)
def del_from_blacklist(self, entry: str):
"""alias for self.del_from_list('black')"""
return self.del_from_list('black', entry=entry)
def del_from_regex_blacklist(self, entry: str):
"""alias for self.del_from_list('regex_black')"""
return self.del_from_list('regex_black', entry=entry)
def enable(self) -> dict | None:
"""enable service for dns blocking (not dns it self!)
:return: json response from pihole or None
:rtype: dict | None
"""
self._log.info('enable blocking service')
return self._query('enable')
def disable(self) -> dict | None:
"""disable service for dns blocking (not dns it self!)
:return: json response from pihole or None
:rtype: dict | None
"""
self._log.info('disable blocking service')
return self._query('disable')
def get_version(self) -> dict | None:
"""get all versions from pihole stack (and available updates) see self.check_updates() too
:return: json response from pihole or None
:rtype: dict | None
"""
return self._query('versions')
def check_updates(self) -> bool:
"""check if update needed
:return: true if a update available
:rtype: bool
"""
versions = self.get_version()
if versions is not None:
for version in versions.keys():
if version.endswith('_update'):
if not version:
self._log.warning(f'update available for {version}: {versions=}')
return True
else:
break
return False
def set_tmp_unit(self, unit: str = 'c') -> dict | None:
"""set cpu temperature unit (supported: c, f or k)
:param unit: temp unit (c, f or k)
:type: unit: str
:return: json response from pihole or None
:rtype: dict | None
"""
if unit in ['c', 'f', 'k']:
data = { 'setTempUnit': unit }
return self._query('setTempUnit')
else: return None
@property
def token(self) -> None:
"""return of token is None! only readable in class it self."""
return None
@token.setter
def token(self, value: str) -> None:
"""setter for token"""
self.__token = value
if __name__ == '__main__':
dns = Pihole()
print(dns.get_custom_dns())