add basic config handling and custom dns mgmt
This commit is contained in:
126
PiholeAPI.py
126
PiholeAPI.py
@@ -38,7 +38,7 @@ class PiholeAPI:
|
|||||||
headers = dict()
|
headers = dict()
|
||||||
|
|
||||||
# check if http method valid
|
# check if http method valid
|
||||||
valid_methods = ['GET', 'POST', 'PUT', 'DELETE']
|
valid_methods = ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']
|
||||||
if method not in valid_methods:
|
if method not in valid_methods:
|
||||||
logging.error(f'http {method=} is none of {valid_methods=}')
|
logging.error(f'http {method=} is none of {valid_methods=}')
|
||||||
return False
|
return False
|
||||||
@@ -275,11 +275,131 @@ class PiholeAPI:
|
|||||||
data = self.__query('stats/upstreams')
|
data = self.__query('stats/upstreams')
|
||||||
print(data)
|
print(data)
|
||||||
|
|
||||||
|
#
|
||||||
|
## confg methods
|
||||||
|
#
|
||||||
|
def get_config(self, element: str = None) -> dict | None:
|
||||||
|
"""Get current configuration of your Pi-hole.
|
||||||
|
|
||||||
|
Can be use to modify and dump back via self.patch_config(config)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
element (str, optional): define sub elements of config. Defaults to None.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict | None: if successfull : dict with config (sub elements)
|
||||||
|
"""
|
||||||
|
if element:
|
||||||
|
data = self.__query(f'config/{element}')
|
||||||
|
else:
|
||||||
|
data = self.__query('config')
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
def patch_config(self, config: dict) -> bool:
|
||||||
|
"""Change configuration of your Pi-hole
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config (dict): new config to push
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: true if query send successfull
|
||||||
|
"""
|
||||||
|
if not 'config' in config.keys():
|
||||||
|
return False
|
||||||
|
payload = config
|
||||||
|
data = self.__query('config', 'PATCH', payload=payload)
|
||||||
|
if data:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_dns_host(self, host: list[str, str], match_ip: bool = True, match_fqdn: bool = True) -> list[list[str, str]] | None:
|
||||||
|
"""get specific dns host from custom dns records
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host (list[str, str]): list with [ip, fqdn]
|
||||||
|
match_ip (bool, optional): controls if ip will be check. Defaults to True.
|
||||||
|
match_fqdn (bool, optional): controls if fqdn will be check. Defaults to True.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[list[str, str]] | None: list with results of found hosts [[ip, fqdn], [ip, fqdn]]
|
||||||
|
"""
|
||||||
|
if isinstance(host, list) and len(host) != 2:
|
||||||
|
return None
|
||||||
|
data = self.get_config('dns/hosts')
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
results = list()
|
||||||
|
for entry in data['config']['dns']['hosts']:
|
||||||
|
entry_host = entry.split(' ')
|
||||||
|
if (not match_ip or entry_host[0] == host[0]) and (not match_fqdn or entry_host[1] == host[1]):
|
||||||
|
results.append(entry_host)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def add_dns_host(self, host: list[str, str]) -> bool:
|
||||||
|
"""add a new custom dns record
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host (list[str, str]): list with [ip, fqdn]
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if new entry exists or new created
|
||||||
|
"""
|
||||||
|
if not isinstance(host, list) and len(host) != 2:
|
||||||
|
return False
|
||||||
|
data = self.get_config('dns/hosts')
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
create_new = True
|
||||||
|
results = self.get_dns_host(host)
|
||||||
|
if results:
|
||||||
|
for result in results:
|
||||||
|
if result == host:
|
||||||
|
create_new = False
|
||||||
|
|
||||||
|
if create_new:
|
||||||
|
data['config']['dns']['hosts'].append(' '.join(host))
|
||||||
|
return self.patch_config(data)
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
|
def remove_dns_host(self, host: list[str, str], match_ip: bool = True, match_fqdn: bool = True) -> bool:
|
||||||
|
"""remove given host
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host (list[str, str]): list with [ip, fqdn]
|
||||||
|
match_ip (bool, optional): controls if ip will be check. Defaults to True.
|
||||||
|
match_fqdn (bool, optional): controls if fqdn will be check. Defaults to True.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if host(s) successfull removed
|
||||||
|
"""
|
||||||
|
if not isinstance(host, list) and len(host) != 2:
|
||||||
|
return False
|
||||||
|
data = self.get_config('dns/hosts')
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
hosts = data['config']['dns']['hosts'][::]
|
||||||
|
for entry in data['config']['dns']['hosts']:
|
||||||
|
entry_host = entry.split(' ')
|
||||||
|
if (not match_ip or entry_host[0] == host[0]) and (not match_fqdn or entry_host[1] == host[1]):
|
||||||
|
hosts.remove(entry)
|
||||||
|
|
||||||
|
data['config']['dns']['hosts'] = hosts
|
||||||
|
return self.patch_config(data)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
pi = PiholeAPI()
|
pi = PiholeAPI()
|
||||||
pi.clear_sessions()
|
pi.clear_sessions()
|
||||||
pi.get_upstrams()
|
config = pi.get_config()
|
||||||
|
print(config)
|
||||||
|
hosts = pi.get_dns_host(['', 'myhost.example.org'], match_fqdn=False)
|
||||||
|
print(hosts)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
Reference in New Issue
Block a user