371 lines
13 KiB
Python
371 lines
13 KiB
Python
import requests
|
|
import json
|
|
import yaml
|
|
from os.path import exists
|
|
import logging
|
|
|
|
class Pihole:
|
|
"""API wrapper for Pi-hole"""
|
|
__AUTHOR__ = 'anima'
|
|
__VERSION__ = '1.0.0'
|
|
|
|
_log = logging.getLogger(__name__)
|
|
__logFile = logging.FileHandler(__name__)
|
|
__logPrinter = logging.StreamHandler()
|
|
__logFormat = logging.Formatter('%(asctime)s %(levelname)s \t %(name)s : %(message)s')
|
|
__logFile.setFormatter(__logFormat)
|
|
__logPrinter.setFormatter(__logFormat)
|
|
_log.addHandler(__logFile)
|
|
_log.addHandler(__logPrinter)
|
|
|
|
def __init__(self, host: str = None, token: str = None, port: int = None, ssl: bool = None, configfile: str = 'pihole.conf') -> None:
|
|
"""init a Pihole API wrapper class
|
|
|
|
:param host: ip or hostname of pihole instanz
|
|
:type host: str
|
|
:param token: api token get from pyhole gui (settings => api => show api token)
|
|
:type token: str
|
|
:param port: (optional) port if not 80 or 443
|
|
:type port: int
|
|
:param ssl: (optional) set true if connection to pyhole allow https (default is False)
|
|
:type ssl: bool
|
|
:param configfile: path to config file in yaml format (all values with None will read from config file)
|
|
:type configfile: str
|
|
"""
|
|
if host is None and token is None and not exists(configfile):
|
|
self._log.error(f'missing inforamtions {host=} or token not set and no config file exists')
|
|
raise NotImplementedError
|
|
|
|
# set defaults
|
|
self.host = host
|
|
self.token = token
|
|
self.port = port
|
|
if ssl is None:
|
|
self.ssl = False
|
|
|
|
# replace defaults if cconfig file exists
|
|
if exists(configfile):
|
|
with open(configfile, 'r') as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
if host is None:
|
|
self._log.debug(f'load host from {configfile=}')
|
|
self.host = config['pihole']['host']
|
|
|
|
if token is None:
|
|
self._log.debug(f'load token from {configfile=}')
|
|
self.token = config['pihole']['token']
|
|
|
|
if port is None and 'port' in config['pihole'].keys():
|
|
self._log.debug(f'load port from {configfile=}')
|
|
self.port = config['pihole']['port']
|
|
|
|
if ssl is None and ssl in config['pihole'].keys():
|
|
self._log.debug(f'load ssl from {configfile=}')
|
|
self.ssl = config['pihole']['ssl']
|
|
|
|
def _query(self, query: str = None, data: dict = None) -> dict | None:
|
|
"""default query for a pihole api call
|
|
|
|
:param query: set query type
|
|
:type query: str
|
|
:param data: needed data for query as dict
|
|
:type data: dict
|
|
:return: dict with response if successfull or None
|
|
:rtype: dict | None
|
|
"""
|
|
if self.port is None and self.ssl:
|
|
self.port = 443
|
|
elif self.port is None:
|
|
self.port = 80
|
|
|
|
protocol = 'http'
|
|
if self.ssl:
|
|
protocol += 's'
|
|
|
|
if data is not None or query is not None:
|
|
if query is None:
|
|
query = ''
|
|
if data is not None:
|
|
for key, value in data.items():
|
|
query += f'&{key}={value}'
|
|
|
|
response = requests.post(f'{protocol}://{self.host}:{self.port}/admin/api.php?auth={self.__token}&{query}')
|
|
|
|
if response.status_code == 200:
|
|
url = response.url
|
|
response = json.loads(response.text)
|
|
return response
|
|
else:
|
|
self._log.error(f'return non 200: {response.status_code} - {response.url.replace(self.__token, "SECRETTOKEN")} - {response.text}')
|
|
return None
|
|
|
|
def get_custom_dns(self) -> dict | None:
|
|
"""get all custrom dns entrys (no cname's! see get_custom_cname)
|
|
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
data = { 'action': 'get' }
|
|
return self._query('customdns', data)
|
|
|
|
def add_custom_dns(self, domain: str, ip: str, reload: bool = False) -> dict | None:
|
|
"""add custom domain entry for a ip
|
|
|
|
:param domain: domain name for ip
|
|
:type: str
|
|
:param ip: ip address of domain
|
|
:type ip: str
|
|
:pram reload: reload service FTL (default False)
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
data = { 'action': 'add' }
|
|
data['domain'] = domain
|
|
data['ip'] = ip
|
|
data['reload'] = str(reload).lower()
|
|
|
|
self._log.info(f'try to add new {domain=} for {ip=}')
|
|
return self._query('customdns', data)
|
|
|
|
def del_custom_dns(self, domain: str, ip: str, reload: bool = False) -> dict | None:
|
|
"""delete custom domain entry for a ip
|
|
|
|
:param domain: domain name for ip
|
|
:type: str
|
|
:param ip: ip address of domain
|
|
:type ip: str
|
|
:pram reload: reload service FTL (default False)
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
data = { 'action': 'delete' }
|
|
data['domain'] = domain
|
|
data['ip'] = ip
|
|
data['reload'] = str(reload).lower()
|
|
|
|
self._log.info(f'try to delete {domain=} for {ip=}')
|
|
return self._query('customdns', data)
|
|
|
|
def get_custom_cname(self) -> dict | None:
|
|
"""get all custrom cname entrys (no dns's! see get_custom_dns)
|
|
|
|
:return: json response from pihole or Noneö
|
|
:rtype: dict | None
|
|
"""
|
|
"""{{host}}/admin/api.php?auth={{token}}&customcname=1&action=get"""
|
|
data = { 'action': 'get' }
|
|
return self._query('customcname', data)
|
|
|
|
def add_custom_cname(self, domain: str, target: str, reload: bool = False) -> dict | None:
|
|
"""add custom cname entry for a ip
|
|
|
|
:param domain: domain name alias for a existing domain name
|
|
:type: str
|
|
:param target: existing domain
|
|
:type target: str
|
|
:pram reload: reload service FTL (default False)
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
data = { 'action': 'add' }
|
|
data['domain'] = domain
|
|
data['target'] = target
|
|
data['reload'] = str(reload).lower()
|
|
|
|
|
|
self._log.info(f'try to add {domain=} cname for {target=}')
|
|
return self._query('customcname', data)
|
|
|
|
def del_custom_cname(self, domain: str, target: str, reload: bool = False) -> dict | None:
|
|
"""delete custom domain entry for a ip
|
|
|
|
:param domain: domain name alias for a existing domain name
|
|
:type: str
|
|
:param target: existing domain
|
|
:type target: str
|
|
:pram reload: reload service FTL (default False)
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
data = { 'action': 'delete' }
|
|
data['domain'] = domain
|
|
data['target'] = target
|
|
data['reload'] = str(reload).lower()
|
|
|
|
self._log.info(f'try to delete {domain=} cname for {target=}')
|
|
return self._query('customcname', data)
|
|
|
|
|
|
def get_list(self, list: str) -> dict | None:
|
|
"""get all entrys from a filter list
|
|
|
|
:param list: valid list names: white, black, regex_white, regex_black. (white and black a exact match lists!)
|
|
:type list: str
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
supportet_lists = ['white', 'black', 'regex_white', 'regex_black']
|
|
if list in supportet_lists:
|
|
data = { 'list': list }
|
|
return self._query(data=data)
|
|
else:
|
|
self._log.error(f'get_from_list got false {list=}')
|
|
return None
|
|
|
|
def get_whitelist(self):
|
|
"""alias for self.get_list('white')"""
|
|
return self.get_list('white')
|
|
|
|
def get_regex_whitelist(self):
|
|
"""alias for self.get_list('regex_white')"""
|
|
return self.get_list('regex_white')
|
|
|
|
def get_blacklist(self):
|
|
"""alias for self.get_list('black')"""
|
|
return self.get_list('black')
|
|
|
|
def get_regex_blacklist(self):
|
|
"""alias for self.get_list('regex_black')"""
|
|
return self.get_list('regex_black')
|
|
|
|
def add_to_list(self, list: str, entry: str):
|
|
"""add a entry to a filter list
|
|
|
|
:param list: valid list names: white, black, regex_white, regex_black. (white and black a exact match lists!)
|
|
:type list: str
|
|
:param entry: hostname or ip
|
|
:type entry: str
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
supportet_lists = ['white', 'black', 'regex_white', 'regex_black']
|
|
if list in supportet_lists and entry is not None:
|
|
data = { 'list': list }
|
|
data['add'] = entry
|
|
return self._query(data=data)
|
|
else:
|
|
self._log.error(f'add_from_list got false {list=} or no {entry=}')
|
|
return None
|
|
|
|
def add_to_whitelist(self, entry: str):
|
|
"""alias for self.add_to_list('white')"""
|
|
return self.add_to_list('white', entry=entry)
|
|
|
|
def add_to_regex_whitelist(self, entry: str):
|
|
"""alias for self.add_to_list('regex_white')"""
|
|
return self.add_to_list('regex_white', entry=entry)
|
|
|
|
def add_to_blacklist(self, entry: str):
|
|
"""alias for self.add_to_list('black')"""
|
|
return self.add_to_list('black', entry=entry)
|
|
|
|
def add_to_regex_blacklist(self, entry: str):
|
|
"""alias for self.add_to_list('regex_black')"""
|
|
return self.add_to_list('regex_black', entry=entry)
|
|
|
|
def del_from_list(self, list: str, entry: str):
|
|
"""delete a entry to a filter list
|
|
|
|
:param list: valid list names: white, black, regex_white, regex_black. (white and black a exact match lists!)
|
|
:type list: str
|
|
:param entry: hostname or ip
|
|
:type entry: str
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
supportet_lists = ['white', 'black', 'regex_white', 'regex_black']
|
|
if list in supportet_lists and entry is not None:
|
|
data = { 'list': list }
|
|
data['sub'] = entry
|
|
return self._query(data=data)
|
|
else:
|
|
self._log.error(f'del_from_list got false {list=} or no {entry=}')
|
|
return None
|
|
|
|
def del_from_whitelist(self, entry: str):
|
|
"""alias for self.del_from_list('white')"""
|
|
return self.del_from_list('white', entry=entry)
|
|
|
|
def del_from_regex_whitelist(self, entry: str):
|
|
"""alias for self.del_from_list('regex_white')"""
|
|
return self.del_from_list('regex_white', entry=entry)
|
|
|
|
def del_from_blacklist(self, entry: str):
|
|
"""alias for self.del_from_list('black')"""
|
|
return self.del_from_list('black', entry=entry)
|
|
|
|
def del_from_regex_blacklist(self, entry: str):
|
|
"""alias for self.del_from_list('regex_black')"""
|
|
return self.del_from_list('regex_black', entry=entry)
|
|
|
|
def enable(self) -> dict | None:
|
|
"""enable service for dns blocking (not dns it self!)
|
|
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
self._log.info('enable blocking service')
|
|
return self._query('enable')
|
|
|
|
def disable(self) -> dict | None:
|
|
"""disable service for dns blocking (not dns it self!)
|
|
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
self._log.info('disable blocking service')
|
|
return self._query('disable')
|
|
|
|
def get_version(self) -> dict | None:
|
|
"""get all versions from pihole stack (and available updates) see self.check_updates() too
|
|
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
return self._query('versions')
|
|
|
|
def check_updates(self) -> bool:
|
|
"""check if update needed
|
|
|
|
:return: true if a update available
|
|
:rtype: bool
|
|
"""
|
|
versions = self.get_version()
|
|
if versions is not None:
|
|
for version in versions.keys():
|
|
if version.endswith('_update'):
|
|
if not version:
|
|
self._log.warning(f'update available for {version}: {versions=}')
|
|
return True
|
|
else:
|
|
break
|
|
return False
|
|
|
|
def set_tmp_unit(self, unit: str = 'c') -> dict | None:
|
|
"""set cpu temperature unit (supported: c, f or k)
|
|
|
|
:param unit: temp unit (c, f or k)
|
|
:type: unit: str
|
|
:return: json response from pihole or None
|
|
:rtype: dict | None
|
|
"""
|
|
if unit in ['c', 'f', 'k']:
|
|
data = { 'setTempUnit': unit }
|
|
return self._query('setTempUnit')
|
|
else: return None
|
|
|
|
@property
|
|
def token(self) -> None:
|
|
"""return of token is None! only readable in class it self."""
|
|
return None
|
|
|
|
@token.setter
|
|
def token(self, value: str) -> None:
|
|
"""setter for token"""
|
|
self.__token = value
|
|
|
|
|
|
if __name__ == '__main__':
|
|
dns = Pihole()
|
|
print(dns.get_custom_dns()) |