Compare commits
10 Commits
7933e9cee3
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 333b708c65 | |||
| e55ae7bbe1 | |||
| a502e6387f | |||
| 2736f70f80 | |||
| 3ad75b6eb8 | |||
| 34960aa608 | |||
| 2ebb6ff77e | |||
| 89ac85bacd | |||
| 5681316673 | |||
| 590faef1bb |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,4 +1,5 @@
|
|||||||
*.conf
|
*.conf
|
||||||
|
*.json
|
||||||
|
|
||||||
# ---> Python
|
# ---> Python
|
||||||
# Byte-compiled / optimized / DLL files
|
# Byte-compiled / optimized / DLL files
|
||||||
|
|||||||
25
Pihole.py
25
Pihole.py
@@ -10,7 +10,7 @@ class Pihole:
|
|||||||
__VERSION__ = '1.0.0'
|
__VERSION__ = '1.0.0'
|
||||||
|
|
||||||
_log = logging.getLogger(__name__)
|
_log = logging.getLogger(__name__)
|
||||||
__logFile = logging.FileHandler(__name__)
|
__logFile = logging.FileHandler('Pihole.log')
|
||||||
__logPrinter = logging.StreamHandler()
|
__logPrinter = logging.StreamHandler()
|
||||||
__logFormat = logging.Formatter('%(asctime)s %(levelname)s \t %(name)s : %(message)s')
|
__logFormat = logging.Formatter('%(asctime)s %(levelname)s \t %(name)s : %(message)s')
|
||||||
__logFile.setFormatter(__logFormat)
|
__logFile.setFormatter(__logFormat)
|
||||||
@@ -123,10 +123,7 @@ class Pihole:
|
|||||||
data = { 'action': 'add' }
|
data = { 'action': 'add' }
|
||||||
data['domain'] = domain
|
data['domain'] = domain
|
||||||
data['ip'] = ip
|
data['ip'] = ip
|
||||||
if reload:
|
data['reload'] = str(reload).lower()
|
||||||
data['reload'] = 'true'
|
|
||||||
else:
|
|
||||||
data['reload'] = 'false'
|
|
||||||
|
|
||||||
self._log.info(f'try to add new {domain=} for {ip=}')
|
self._log.info(f'try to add new {domain=} for {ip=}')
|
||||||
return self._query('customdns', data)
|
return self._query('customdns', data)
|
||||||
@@ -145,10 +142,7 @@ class Pihole:
|
|||||||
data = { 'action': 'delete' }
|
data = { 'action': 'delete' }
|
||||||
data['domain'] = domain
|
data['domain'] = domain
|
||||||
data['ip'] = ip
|
data['ip'] = ip
|
||||||
if reload:
|
data['reload'] = str(reload).lower()
|
||||||
data['reload'] = 'true'
|
|
||||||
else:
|
|
||||||
data['reload'] = 'false'
|
|
||||||
|
|
||||||
self._log.info(f'try to delete {domain=} for {ip=}')
|
self._log.info(f'try to delete {domain=} for {ip=}')
|
||||||
return self._query('customdns', data)
|
return self._query('customdns', data)
|
||||||
@@ -177,10 +171,8 @@ class Pihole:
|
|||||||
data = { 'action': 'add' }
|
data = { 'action': 'add' }
|
||||||
data['domain'] = domain
|
data['domain'] = domain
|
||||||
data['target'] = target
|
data['target'] = target
|
||||||
if reload:
|
data['reload'] = str(reload).lower()
|
||||||
data['reload'] = 'true'
|
|
||||||
else:
|
|
||||||
data['reload'] = 'false'
|
|
||||||
|
|
||||||
self._log.info(f'try to add {domain=} cname for {target=}')
|
self._log.info(f'try to add {domain=} cname for {target=}')
|
||||||
return self._query('customcname', data)
|
return self._query('customcname', data)
|
||||||
@@ -199,10 +191,7 @@ class Pihole:
|
|||||||
data = { 'action': 'delete' }
|
data = { 'action': 'delete' }
|
||||||
data['domain'] = domain
|
data['domain'] = domain
|
||||||
data['target'] = target
|
data['target'] = target
|
||||||
if reload:
|
data['reload'] = str(reload).lower()
|
||||||
data['reload'] = 'true'
|
|
||||||
else:
|
|
||||||
data['reload'] = 'false'
|
|
||||||
|
|
||||||
self._log.info(f'try to delete {domain=} cname for {target=}')
|
self._log.info(f'try to delete {domain=} cname for {target=}')
|
||||||
return self._query('customcname', data)
|
return self._query('customcname', data)
|
||||||
@@ -354,7 +343,7 @@ class Pihole:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def set_tmp_unit(self, unit: str = 'c') -> dict | None:
|
def set_tmp_unit(self, unit: str = 'c') -> dict | None:
|
||||||
"""set temperature unit (supported: c, f or k)
|
"""set cpu temperature unit (supported: c, f or k)
|
||||||
|
|
||||||
:param unit: temp unit (c, f or k)
|
:param unit: temp unit (c, f or k)
|
||||||
:type: unit: str
|
:type: unit: str
|
||||||
|
|||||||
407
PiholeAPI.py
Normal file
407
PiholeAPI.py
Normal file
@@ -0,0 +1,407 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""pihole v6 API wrapper"""
|
||||||
|
"""
|
||||||
|
requirements:
|
||||||
|
- pip3 install requests
|
||||||
|
-
|
||||||
|
"""
|
||||||
|
__version__ = '0.3.0'
|
||||||
|
__author__ = 'anima'
|
||||||
|
|
||||||
|
# imports
|
||||||
|
import logging
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from os import path
|
||||||
|
|
||||||
|
|
||||||
|
# log settings
|
||||||
|
logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
|
||||||
|
|
||||||
|
|
||||||
|
class PiholeAPI:
|
||||||
|
def __init__(self, config: str = 'pihole.json', host: str = None, port: int = 80, ssl: bool = False, password: str = None):
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.ssl = ssl
|
||||||
|
self.password = password
|
||||||
|
self.valid_auth = datetime.now()
|
||||||
|
self.sid = None
|
||||||
|
self.csrf = None
|
||||||
|
if path.exists(config):
|
||||||
|
with open(config, 'r') as file:
|
||||||
|
for key, value in json.load(file).items():
|
||||||
|
self.__dict__[key] = value
|
||||||
|
|
||||||
|
def __query(self, query: str, method: str = 'GET', payload: dict = None, auth_need: bool = True):
|
||||||
|
headers = dict()
|
||||||
|
|
||||||
|
# check if http method valid
|
||||||
|
valid_methods = ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']
|
||||||
|
if method not in valid_methods:
|
||||||
|
logging.error(f'http {method=} is none of {valid_methods=}')
|
||||||
|
return False
|
||||||
|
|
||||||
|
# refresh auth if required and set auth info
|
||||||
|
if auth_need:
|
||||||
|
if not self._check_auth():
|
||||||
|
self.do_auth()
|
||||||
|
headers['X-FTL-SID'] = self.sid
|
||||||
|
headers['X-FTL-CSRF'] = self.csrf
|
||||||
|
|
||||||
|
# create url
|
||||||
|
url = 'http'
|
||||||
|
if self.ssl:
|
||||||
|
url += 's'
|
||||||
|
url += f'://{self.host}:{self.port}/api/{query}'
|
||||||
|
|
||||||
|
if payload:
|
||||||
|
response = requests.request(method=method, url=url, headers=headers, json=payload, verify=False)
|
||||||
|
else:
|
||||||
|
response = requests.request(method=method, url=url, headers=headers, verify=False)
|
||||||
|
|
||||||
|
match response.status_code:
|
||||||
|
case (200):
|
||||||
|
# comment: successfull request
|
||||||
|
return json.loads(response.content)
|
||||||
|
case (204):
|
||||||
|
# comment: successfull (e.g. auth delete)
|
||||||
|
return True
|
||||||
|
case (_):
|
||||||
|
# comment: default fallback
|
||||||
|
print()
|
||||||
|
print(response.__dict__)
|
||||||
|
print(response.raw)
|
||||||
|
return False
|
||||||
|
#
|
||||||
|
## Auth based methods
|
||||||
|
#
|
||||||
|
def need_auth(self) -> bool:
|
||||||
|
"""generate a valid session if no auth required
|
||||||
|
# TODO: better handling if not successful
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if a valid session created
|
||||||
|
"""
|
||||||
|
data = self.__query('auth', auth_need=False)
|
||||||
|
if not data:
|
||||||
|
logging.error(f'can not get a valid session without password')
|
||||||
|
return False
|
||||||
|
if data['session']['valid']:
|
||||||
|
self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity'])
|
||||||
|
self.sid = data['session']['sid']
|
||||||
|
self.csrf = data['session']['csrf']
|
||||||
|
logging.info(f'successfull new auth created until {self.valid_auth}')
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _check_auth(self) -> bool:
|
||||||
|
"""check if an auth valid (in time)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if auth valid
|
||||||
|
"""
|
||||||
|
now = datetime.now()
|
||||||
|
if datetime.now() < self.valid_auth:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def do_auth(self) -> bool:
|
||||||
|
"""auth via password to get sid / csrf
|
||||||
|
|
||||||
|
# TODO: TOTP support
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if auth successfull
|
||||||
|
"""
|
||||||
|
payload = {'password': self.password}
|
||||||
|
data = self.__query('auth', 'POST', payload, auth_need=False)
|
||||||
|
if not data:
|
||||||
|
raise PermissionError(f'Authentication not possible')
|
||||||
|
if data['session']['valid']:
|
||||||
|
self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity'])
|
||||||
|
self.sid = data['session']['sid']
|
||||||
|
self.csrf = data['session']['csrf']
|
||||||
|
logging.info(f'successfull new auth created until {self.valid_auth}')
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_sessions(self) -> list | None:
|
||||||
|
"""List of all current sessions
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list | None: list of sessions if successfull query
|
||||||
|
"""
|
||||||
|
data = self.__query('auth/sessions')
|
||||||
|
if data:
|
||||||
|
return data['sessions']
|
||||||
|
return None
|
||||||
|
|
||||||
|
def delete_session(self, session_id: int = None) -> bool:
|
||||||
|
"""Delete session by ID
|
||||||
|
|
||||||
|
with no arguments it will clear it own session
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session_id (int, optional): Session ID get from get_sessions. Defaults to None.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if session delete successfull
|
||||||
|
"""
|
||||||
|
if session_id is None:
|
||||||
|
data = self.get_sessions()
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
for session in data:
|
||||||
|
if session['current_session']:
|
||||||
|
return self.delete_session(session['id'])
|
||||||
|
if self.__query(f'auth/session/{session_id}', 'DELETE'):
|
||||||
|
logging.info(f'successfull delete session with {session_id=}')
|
||||||
|
return True
|
||||||
|
|
||||||
|
def clear_sessions(self):
|
||||||
|
data = self.get_sessions()
|
||||||
|
for session in data:
|
||||||
|
if not session['current_session']:
|
||||||
|
if not self.delete_session(session['id']):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def get_app_password(self) -> dict | None:
|
||||||
|
"""Create new application password
|
||||||
|
|
||||||
|
This password can be used for API instet of regular password an TOTP.
|
||||||
|
All sessions will be delte if this password set!
|
||||||
|
# TODO: set password (hash) to PATCH /api/config/webserver/api/app_pwhash
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict | None: if successful: app password and hash
|
||||||
|
"""
|
||||||
|
data = self.__query('auth/app')
|
||||||
|
return data['app']
|
||||||
|
|
||||||
|
#
|
||||||
|
## metrics based methods
|
||||||
|
#
|
||||||
|
def get_history(self) -> list | None:
|
||||||
|
"""Get activity graph data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list | None: if successfull: list of history query data
|
||||||
|
"""
|
||||||
|
data = self.__query('history')
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
return data['history']
|
||||||
|
|
||||||
|
def get_history_timerange(self, starttime: int | datetime, endtime: int | datetime) -> list | None:
|
||||||
|
"""Get activity graph data (long-term data)
|
||||||
|
|
||||||
|
# TODO: BUG - always response with You need to specify both \\"from\\" and \\"until\\" in the request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
starttime (int | datetime): start time of data collection (int as timestamp)
|
||||||
|
endtime (int | datetime): end time of data collection (int as timestamp)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list | None: if successfull: list of history query data in given time range
|
||||||
|
"""
|
||||||
|
payload = dict()
|
||||||
|
if isinstance(starttime, int) and isinstance(endtime, int):
|
||||||
|
payload['from'] = starttime
|
||||||
|
payload['until'] = endtime
|
||||||
|
if isinstance(starttime, datetime) and isinstance(endtime, datetime):
|
||||||
|
payload['from'] = int(starttime.timestamp())
|
||||||
|
payload['until'] = int(endtime.timestamp())
|
||||||
|
|
||||||
|
if len(payload.keys()) > 0:
|
||||||
|
data = self.__query('history/database', payload=payload)
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def get_client_history(self) -> list | None:
|
||||||
|
"""Get per-client activity graph data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list | None: if successfull: list of history query data by client
|
||||||
|
"""
|
||||||
|
data = self.__query('history/clients')
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
return data['history']
|
||||||
|
|
||||||
|
def get_client_history_timerange(self, starttime: int | datetime, endtime: int | datetime) -> list | None:
|
||||||
|
"""Get activity graph data (long-term data)
|
||||||
|
|
||||||
|
# TODO: BUG - always response with You need to specify both \\"from\\" and \\"until\\" in the request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
starttime (int | datetime): start time of data collection (int as timestamp)
|
||||||
|
endtime (int | datetime): end time of data collection (int as timestamp)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list | None: if successfull: list of history query data by client in given time range
|
||||||
|
"""
|
||||||
|
payload = dict()
|
||||||
|
if isinstance(starttime, int) and isinstance(endtime, int):
|
||||||
|
payload['from'] = starttime
|
||||||
|
payload['until'] = endtime
|
||||||
|
if isinstance(starttime, datetime) and isinstance(endtime, datetime):
|
||||||
|
payload['from'] = int(starttime.timestamp())
|
||||||
|
payload['until'] = int(endtime.timestamp())
|
||||||
|
|
||||||
|
if len(payload.keys()) > 0:
|
||||||
|
data = self.__query('history/database', payload=payload)
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
return data
|
||||||
|
|
||||||
|
def get_queries(self) -> list | None:
|
||||||
|
"""Request query details. Query parameters may be used to limit the number of results.
|
||||||
|
|
||||||
|
# TODO: add arguments
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list | None: By default, this API callback returns the most recent 100 queries.
|
||||||
|
"""
|
||||||
|
data = self.__query('queries')
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
print(data['queries'][0])
|
||||||
|
return data['queries']
|
||||||
|
|
||||||
|
def get_upstrams(self) -> list | None:
|
||||||
|
data = self.__query('stats/upstreams')
|
||||||
|
print(data)
|
||||||
|
|
||||||
|
#
|
||||||
|
## confg methods
|
||||||
|
#
|
||||||
|
def get_config(self, element: str = None) -> dict | None:
|
||||||
|
"""Get current configuration of your Pi-hole.
|
||||||
|
|
||||||
|
Can be use to modify and dump back via self.patch_config(config)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
element (str, optional): define sub elements of config. Defaults to None.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict | None: if successfull : dict with config (sub elements)
|
||||||
|
"""
|
||||||
|
if element:
|
||||||
|
data = self.__query(f'config/{element}')
|
||||||
|
else:
|
||||||
|
data = self.__query('config')
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
def patch_config(self, config: dict) -> bool:
|
||||||
|
"""Change configuration of your Pi-hole
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config (dict): new config to push
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: true if query send successfull
|
||||||
|
"""
|
||||||
|
if not 'config' in config.keys():
|
||||||
|
return False
|
||||||
|
payload = config
|
||||||
|
data = self.__query('config', 'PATCH', payload=payload)
|
||||||
|
if data:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_dns_host(self, host: list[str, str], match_ip: bool = True, match_fqdn: bool = True) -> list[list[str, str]] | None:
|
||||||
|
"""get specific dns host from custom dns records
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host (list[str, str]): list with [ip, fqdn]
|
||||||
|
match_ip (bool, optional): controls if ip will be check. Defaults to True.
|
||||||
|
match_fqdn (bool, optional): controls if fqdn will be check. Defaults to True.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[list[str, str]] | None: list with results of found hosts [[ip, fqdn], [ip, fqdn]]
|
||||||
|
"""
|
||||||
|
if isinstance(host, list) and len(host) != 2:
|
||||||
|
return None
|
||||||
|
data = self.get_config('dns/hosts')
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
results = list()
|
||||||
|
for entry in data['config']['dns']['hosts']:
|
||||||
|
entry_host = entry.split(' ')
|
||||||
|
if (not match_ip or entry_host[0] == host[0]) and (not match_fqdn or entry_host[1] == host[1]):
|
||||||
|
results.append(entry_host)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def add_dns_host(self, host: list[str, str]) -> bool:
|
||||||
|
"""add a new custom dns record
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host (list[str, str]): list with [ip, fqdn]
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if new entry exists or new created
|
||||||
|
"""
|
||||||
|
if not isinstance(host, list) and len(host) != 2:
|
||||||
|
return False
|
||||||
|
data = self.get_config('dns/hosts')
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
create_new = True
|
||||||
|
results = self.get_dns_host(host)
|
||||||
|
if results:
|
||||||
|
for result in results:
|
||||||
|
if result == host:
|
||||||
|
create_new = False
|
||||||
|
|
||||||
|
if create_new:
|
||||||
|
data['config']['dns']['hosts'].append(' '.join(host))
|
||||||
|
return self.patch_config(data)
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
|
def remove_dns_host(self, host: list[str, str], match_ip: bool = True, match_fqdn: bool = True) -> bool:
|
||||||
|
"""remove given host
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host (list[str, str]): list with [ip, fqdn]
|
||||||
|
match_ip (bool, optional): controls if ip will be check. Defaults to True.
|
||||||
|
match_fqdn (bool, optional): controls if fqdn will be check. Defaults to True.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if host(s) successfull removed
|
||||||
|
"""
|
||||||
|
if not isinstance(host, list) and len(host) != 2:
|
||||||
|
return False
|
||||||
|
data = self.get_config('dns/hosts')
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
hosts = data['config']['dns']['hosts'][::]
|
||||||
|
for entry in data['config']['dns']['hosts']:
|
||||||
|
entry_host = entry.split(' ')
|
||||||
|
if (not match_ip or entry_host[0] == host[0]) and (not match_fqdn or entry_host[1] == host[1]):
|
||||||
|
hosts.remove(entry)
|
||||||
|
|
||||||
|
data['config']['dns']['hosts'] = hosts
|
||||||
|
return self.patch_config(data)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
pi = PiholeAPI()
|
||||||
|
pi.clear_sessions()
|
||||||
|
config = pi.get_config()
|
||||||
|
print(config)
|
||||||
|
hosts = pi.get_dns_host(['', 'myhost.example.org'], match_fqdn=False)
|
||||||
|
print(hosts)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
114
README.md
114
README.md
@@ -1,3 +1,113 @@
|
|||||||
# api-pihole
|
# API Wrapper: Pi-hole
|
||||||
|
>A basic API wrapper for Pi-hole
|
||||||
|
|
||||||
API wrapper for pihole
|
## Usage Pihole v6 (PiholeAPI.py)
|
||||||
|
> still in progress...
|
||||||
|
### base usage
|
||||||
|
```python
|
||||||
|
pih = PiholeAPI() # if pihole.json exists
|
||||||
|
pih = PiholeAPI(host='10.10.10.10', password='mypassword') # without conf file
|
||||||
|
```
|
||||||
|
|
||||||
|
### auth handling
|
||||||
|
```python
|
||||||
|
pih.need_auth() # get a valid session if no auth required
|
||||||
|
pih._check_auth() # check if wrapper has a valid session stored
|
||||||
|
pih.do_auth() # (runs automaticly if need) do a auth with password
|
||||||
|
pih.get_sessions() # get all known sessions (webgui sessions too...)
|
||||||
|
pih.delete_session() # delet a session by id (collect from get_sessions) - withour parameters it will clear it own sessions
|
||||||
|
pih.clear_sessions() # delete all sessions without it own
|
||||||
|
pih.get_app_password() # generate a app password. good for use without TOTP. clears all sessions! (Hint: must be set in config, not implementet at now)
|
||||||
|
```
|
||||||
|
|
||||||
|
### metric handling
|
||||||
|
```python
|
||||||
|
pih.get_history() # Get activity graph data of dns querys
|
||||||
|
pih.get_history_timerange() # bugged!
|
||||||
|
pih.get_client_history() # Get per-client activity graph data of dns querys
|
||||||
|
pih.get_client_history_timerange() # bugged!
|
||||||
|
pih.get_queries() # Request query details.
|
||||||
|
```
|
||||||
|
|
||||||
|
### config handling
|
||||||
|
```python
|
||||||
|
pih.get_config() # get full running config
|
||||||
|
pih.get_config('dns/hosts') # get full tree but only ['dns']['hosts'] be filled
|
||||||
|
pih.patch_config(config) # send config update, all given elemnts will be overriden. config from get_config can be use.
|
||||||
|
```
|
||||||
|
|
||||||
|
#### custom dns handling
|
||||||
|
```python
|
||||||
|
pih.get_dns_host(['ip', 'fqdn']) # get a custom dns hosts als list
|
||||||
|
# return: [[ip, fqdn], [ip, fqdn]]
|
||||||
|
pih.remove_dns_host(['', 'fqdn'], match_ip=False) # get all custom dns hosts als list which match fqdn only
|
||||||
|
pih.remove_dns_host(['ip', ''], match_fqdn=False) # get all custom dns hosts als list which match ip only
|
||||||
|
pih.add_dns_host(['ip', 'fqdn']) # add new custom dns host. checks fist if identical host exists
|
||||||
|
pih.remove_dns_host(['ip', 'fqdn']) # remove a host which match ip and fqdn
|
||||||
|
pih.remove_dns_host(['', 'fqdn'], match_ip=False) # remove all hosts which match fqdn only
|
||||||
|
pih.remove_dns_host(['ip', ''], match_fqdn=False) # remove all hosts which match ip only
|
||||||
|
```
|
||||||
|
|
||||||
|
## Usage Pihole v5 and below (Pihole.py)
|
||||||
|
### Custom DNS
|
||||||
|
Read, add or delete a custom dns entry (no cname's!)
|
||||||
|
```python
|
||||||
|
get_custom_dns()
|
||||||
|
add_custom_dns(domain, ip)
|
||||||
|
del_custom_dns(domain, ip)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Custom CNAMES
|
||||||
|
Same as Custom DNS but for CNAMES only.
|
||||||
|
```python
|
||||||
|
get_custom_cname()
|
||||||
|
add_custom_cname(domain, target)
|
||||||
|
del_custom_cname(domain, target)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Filter Lists
|
||||||
|
Read, add or delete a filter list.
|
||||||
|
Availible lists:
|
||||||
|
- white
|
||||||
|
- black
|
||||||
|
- regex_white
|
||||||
|
- regex_black
|
||||||
|
|
||||||
|
*white and black only match exact machtes!*
|
||||||
|
|
||||||
|
```python
|
||||||
|
get_list(list, entry)
|
||||||
|
""" aliases """
|
||||||
|
get_whitelist(entry)
|
||||||
|
get_regex_whitelist(entry)
|
||||||
|
get_blacklist(entry)
|
||||||
|
get_regex_blacklist(entry)
|
||||||
|
|
||||||
|
add_to_list(list, entry)
|
||||||
|
""" aliases """
|
||||||
|
add_to_whitelist(entry)
|
||||||
|
add_to_regex_whitelist(entry)
|
||||||
|
add_to_blacklist(entry)
|
||||||
|
add_to_regex_blacklist(entry)
|
||||||
|
|
||||||
|
del_from_list(list, entry)
|
||||||
|
""" aliases """
|
||||||
|
del_from_whitelist(entry)
|
||||||
|
del_from_regex_whitelist(entry)
|
||||||
|
del_from_blacklist(entry)
|
||||||
|
del_from_regex_blacklist(entry)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Misc
|
||||||
|
Misc functions for:
|
||||||
|
- enable / disable filter
|
||||||
|
- get / check update status
|
||||||
|
- set temp unit from cpu
|
||||||
|
|
||||||
|
```python
|
||||||
|
enable()
|
||||||
|
disable()
|
||||||
|
get_version()
|
||||||
|
check_updates()
|
||||||
|
set_tmp_unit(unit)
|
||||||
|
```
|
||||||
1
__init__.py
Normal file
1
__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
from .Pihole import Pihole
|
||||||
6
pihole.json.sample
Normal file
6
pihole.json.sample
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
{
|
||||||
|
"host": "localhost",
|
||||||
|
"port": 80,
|
||||||
|
"ssl": false,
|
||||||
|
"password": "mypassword"
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user