From 7933e9cee3ed55da42e1d6dba4b7998a4652ab1c Mon Sep 17 00:00:00 2001
From: anima
Date: Sat, 19 Oct 2024 17:53:30 +0200
Subject: [PATCH] inital version
---
.gitignore | 2 +
Pihole.py | 382 +++++++++++++++++++++++++++++++++++++++++++++
pihole.conf.sample | 5 +
requirements.txt | 2 +
4 files changed, 391 insertions(+)
create mode 100644 Pihole.py
create mode 100644 pihole.conf.sample
create mode 100644 requirements.txt
diff --git a/.gitignore b/.gitignore
index 5d381cc..658bfae 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,5 @@
+*.conf
+
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
diff --git a/Pihole.py b/Pihole.py
new file mode 100644
index 0000000..6e30ec2
--- /dev/null
+++ b/Pihole.py
@@ -0,0 +1,382 @@
+import requests
+import json
+import yaml
+from os.path import exists
+import logging
+
+class Pihole:
+ """API wrapper for Pi-hole"""
+ __AUTHOR__ = 'anima'
+ __VERSION__ = '1.0.0'
+
+ _log = logging.getLogger(__name__)
+ __logFile = logging.FileHandler(__name__)
+ __logPrinter = logging.StreamHandler()
+ __logFormat = logging.Formatter('%(asctime)s %(levelname)s \t %(name)s : %(message)s')
+ __logFile.setFormatter(__logFormat)
+ __logPrinter.setFormatter(__logFormat)
+ _log.addHandler(__logFile)
+ _log.addHandler(__logPrinter)
+
+ def __init__(self, host: str = None, token: str = None, port: int = None, ssl: bool = None, configfile: str = 'pihole.conf') -> None:
+ """init a Pihole API wrapper class
+
+ :param host: ip or hostname of pihole instanz
+ :type host: str
+ :param token: api token get from pyhole gui (settings => api => show api token)
+ :type token: str
+ :param port: (optional) port if not 80 or 443
+ :type port: int
+ :param ssl: (optional) set true if connection to pyhole allow https (default is False)
+ :type ssl: bool
+ :param configfile: path to config file in yaml format (all values with None will read from config file)
+ :type configfile: str
+ """
+ if host is None and token is None and not exists(configfile):
+ self._log.error(f'missing inforamtions {host=} or token not set and no config file exists')
+ raise NotImplementedError
+
+ # set defaults
+ self.host = host
+ self.token = token
+ self.port = port
+ if ssl is None:
+ self.ssl = False
+
+ # replace defaults if cconfig file exists
+ if exists(configfile):
+ with open(configfile, 'r') as f:
+ config = yaml.safe_load(f)
+
+ if host is None:
+ self._log.debug(f'load host from {configfile=}')
+ self.host = config['pihole']['host']
+
+ if token is None:
+ self._log.debug(f'load token from {configfile=}')
+ self.token = config['pihole']['token']
+
+ if port is None and 'port' in config['pihole'].keys():
+ self._log.debug(f'load port from {configfile=}')
+ self.port = config['pihole']['port']
+
+ if ssl is None and ssl in config['pihole'].keys():
+ self._log.debug(f'load ssl from {configfile=}')
+ self.ssl = config['pihole']['ssl']
+
+ def _query(self, query: str = None, data: dict = None) -> dict | None:
+ """default query for a pihole api call
+
+ :param query: set query type
+ :type query: str
+ :param data: needed data for query as dict
+ :type data: dict
+ :return: dict with response if successfull or None
+ :rtype: dict | None
+ """
+ if self.port is None and self.ssl:
+ self.port = 443
+ elif self.port is None:
+ self.port = 80
+
+ protocol = 'http'
+ if self.ssl:
+ protocol += 's'
+
+ if data is not None or query is not None:
+ if query is None:
+ query = ''
+ if data is not None:
+ for key, value in data.items():
+ query += f'&{key}={value}'
+
+ response = requests.post(f'{protocol}://{self.host}:{self.port}/admin/api.php?auth={self.__token}&{query}')
+
+ if response.status_code == 200:
+ url = response.url
+ response = json.loads(response.text)
+ return response
+ else:
+ self._log.error(f'return non 200: {response.status_code} - {response.url.replace(self.__token, "SECRETTOKEN")} - {response.text}')
+ return None
+
+ def get_custom_dns(self) -> dict | None:
+ """get all custrom dns entrys (no cname's! see get_custom_cname)
+
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ data = { 'action': 'get' }
+ return self._query('customdns', data)
+
+ def add_custom_dns(self, domain: str, ip: str, reload: bool = False) -> dict | None:
+ """add custom domain entry for a ip
+
+ :param domain: domain name for ip
+ :type: str
+ :param ip: ip address of domain
+ :type ip: str
+ :pram reload: reload service FTL (default False)
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ data = { 'action': 'add' }
+ data['domain'] = domain
+ data['ip'] = ip
+ if reload:
+ data['reload'] = 'true'
+ else:
+ data['reload'] = 'false'
+
+ self._log.info(f'try to add new {domain=} for {ip=}')
+ return self._query('customdns', data)
+
+ def del_custom_dns(self, domain: str, ip: str, reload: bool = False) -> dict | None:
+ """delete custom domain entry for a ip
+
+ :param domain: domain name for ip
+ :type: str
+ :param ip: ip address of domain
+ :type ip: str
+ :pram reload: reload service FTL (default False)
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ data = { 'action': 'delete' }
+ data['domain'] = domain
+ data['ip'] = ip
+ if reload:
+ data['reload'] = 'true'
+ else:
+ data['reload'] = 'false'
+
+ self._log.info(f'try to delete {domain=} for {ip=}')
+ return self._query('customdns', data)
+
+ def get_custom_cname(self) -> dict | None:
+ """get all custrom cname entrys (no dns's! see get_custom_dns)
+
+ :return: json response from pihole or Noneƶ
+ :rtype: dict | None
+ """
+ """{{host}}/admin/api.php?auth={{token}}&customcname=1&action=get"""
+ data = { 'action': 'get' }
+ return self._query('customcname', data)
+
+ def add_custom_cname(self, domain: str, target: str, reload: bool = False) -> dict | None:
+ """add custom cname entry for a ip
+
+ :param domain: domain name alias for a existing domain name
+ :type: str
+ :param target: existing domain
+ :type target: str
+ :pram reload: reload service FTL (default False)
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ data = { 'action': 'add' }
+ data['domain'] = domain
+ data['target'] = target
+ if reload:
+ data['reload'] = 'true'
+ else:
+ data['reload'] = 'false'
+
+ self._log.info(f'try to add {domain=} cname for {target=}')
+ return self._query('customcname', data)
+
+ def del_custom_cname(self, domain: str, target: str, reload: bool = False) -> dict | None:
+ """delete custom domain entry for a ip
+
+ :param domain: domain name alias for a existing domain name
+ :type: str
+ :param target: existing domain
+ :type target: str
+ :pram reload: reload service FTL (default False)
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ data = { 'action': 'delete' }
+ data['domain'] = domain
+ data['target'] = target
+ if reload:
+ data['reload'] = 'true'
+ else:
+ data['reload'] = 'false'
+
+ self._log.info(f'try to delete {domain=} cname for {target=}')
+ return self._query('customcname', data)
+
+
+ def get_list(self, list: str) -> dict | None:
+ """get all entrys from a filter list
+
+ :param list: valid list names: white, black, regex_white, regex_black. (white and black a exact match lists!)
+ :type list: str
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ supportet_lists = ['white', 'black', 'regex_white', 'regex_black']
+ if list in supportet_lists:
+ data = { 'list': list }
+ return self._query(data=data)
+ else:
+ self._log.error(f'get_from_list got false {list=}')
+ return None
+
+ def get_whitelist(self):
+ """alias for self.get_list('white')"""
+ return self.get_list('white')
+
+ def get_regex_whitelist(self):
+ """alias for self.get_list('regex_white')"""
+ return self.get_list('regex_white')
+
+ def get_blacklist(self):
+ """alias for self.get_list('black')"""
+ return self.get_list('black')
+
+ def get_regex_blacklist(self):
+ """alias for self.get_list('regex_black')"""
+ return self.get_list('regex_black')
+
+ def add_to_list(self, list: str, entry: str):
+ """add a entry to a filter list
+
+ :param list: valid list names: white, black, regex_white, regex_black. (white and black a exact match lists!)
+ :type list: str
+ :param entry: hostname or ip
+ :type entry: str
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ supportet_lists = ['white', 'black', 'regex_white', 'regex_black']
+ if list in supportet_lists and entry is not None:
+ data = { 'list': list }
+ data['add'] = entry
+ return self._query(data=data)
+ else:
+ self._log.error(f'add_from_list got false {list=} or no {entry=}')
+ return None
+
+ def add_to_whitelist(self, entry: str):
+ """alias for self.add_to_list('white')"""
+ return self.add_to_list('white', entry=entry)
+
+ def add_to_regex_whitelist(self, entry: str):
+ """alias for self.add_to_list('regex_white')"""
+ return self.add_to_list('regex_white', entry=entry)
+
+ def add_to_blacklist(self, entry: str):
+ """alias for self.add_to_list('black')"""
+ return self.add_to_list('black', entry=entry)
+
+ def add_to_regex_blacklist(self, entry: str):
+ """alias for self.add_to_list('regex_black')"""
+ return self.add_to_list('regex_black', entry=entry)
+
+ def del_from_list(self, list: str, entry: str):
+ """delete a entry to a filter list
+
+ :param list: valid list names: white, black, regex_white, regex_black. (white and black a exact match lists!)
+ :type list: str
+ :param entry: hostname or ip
+ :type entry: str
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ supportet_lists = ['white', 'black', 'regex_white', 'regex_black']
+ if list in supportet_lists and entry is not None:
+ data = { 'list': list }
+ data['sub'] = entry
+ return self._query(data=data)
+ else:
+ self._log.error(f'del_from_list got false {list=} or no {entry=}')
+ return None
+
+ def del_from_whitelist(self, entry: str):
+ """alias for self.del_from_list('white')"""
+ return self.del_from_list('white', entry=entry)
+
+ def del_from_regex_whitelist(self, entry: str):
+ """alias for self.del_from_list('regex_white')"""
+ return self.del_from_list('regex_white', entry=entry)
+
+ def del_from_blacklist(self, entry: str):
+ """alias for self.del_from_list('black')"""
+ return self.del_from_list('black', entry=entry)
+
+ def del_from_regex_blacklist(self, entry: str):
+ """alias for self.del_from_list('regex_black')"""
+ return self.del_from_list('regex_black', entry=entry)
+
+ def enable(self) -> dict | None:
+ """enable service for dns blocking (not dns it self!)
+
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ self._log.info('enable blocking service')
+ return self._query('enable')
+
+ def disable(self) -> dict | None:
+ """disable service for dns blocking (not dns it self!)
+
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ self._log.info('disable blocking service')
+ return self._query('disable')
+
+ def get_version(self) -> dict | None:
+ """get all versions from pihole stack (and available updates) see self.check_updates() too
+
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ return self._query('versions')
+
+ def check_updates(self) -> bool:
+ """check if update needed
+
+ :return: true if a update available
+ :rtype: bool
+ """
+ versions = self.get_version()
+ if versions is not None:
+ for version in versions.keys():
+ if version.endswith('_update'):
+ if not version:
+ self._log.warning(f'update available for {version}: {versions=}')
+ return True
+ else:
+ break
+ return False
+
+ def set_tmp_unit(self, unit: str = 'c') -> dict | None:
+ """set temperature unit (supported: c, f or k)
+
+ :param unit: temp unit (c, f or k)
+ :type: unit: str
+ :return: json response from pihole or None
+ :rtype: dict | None
+ """
+ if unit in ['c', 'f', 'k']:
+ data = { 'setTempUnit': unit }
+ return self._query('setTempUnit')
+ else: return None
+
+ @property
+ def token(self) -> None:
+ """return of token is None! only readable in class it self."""
+ return None
+
+ @token.setter
+ def token(self, value: str) -> None:
+ """setter for token"""
+ self.__token = value
+
+
+if __name__ == '__main__':
+ dns = Pihole()
+ print(dns.get_custom_dns())
\ No newline at end of file
diff --git a/pihole.conf.sample b/pihole.conf.sample
new file mode 100644
index 0000000..e173433
--- /dev/null
+++ b/pihole.conf.sample
@@ -0,0 +1,5 @@
+pihole:
+ host: 10.1.2.3
+ token: "see settings => api => show api"
+ # port: 80
+ # ssl: False
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..5bb1125
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,2 @@
+requests
+pyaml
\ No newline at end of file