inital version for v6 api
This commit is contained in:
175
PiholeAPI.py
Normal file
175
PiholeAPI.py
Normal file
@@ -0,0 +1,175 @@
|
||||
#!/usr/bin/env python3
|
||||
"""pihole v6 API wrapper"""
|
||||
"""
|
||||
requirements:
|
||||
- pip3 install requests
|
||||
-
|
||||
"""
|
||||
__version__ = '0.1.0'
|
||||
__author__ = 'anima'
|
||||
|
||||
# imports
|
||||
import logging
|
||||
import requests
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
# log settings
|
||||
logging.basicConfig(format='[%(asctime)s] %(levelname)s %(message)s', level=logging.INFO)
|
||||
|
||||
|
||||
class PiholeAPI:
|
||||
def __init__(self, url: str, port: int = 80, ssl: bool = False, passwd: str = None):
|
||||
self.url = url
|
||||
self.port = port
|
||||
self.ssl = ssl
|
||||
self.passwd = passwd
|
||||
self.valid_auth = datetime.now()
|
||||
self.sid = None
|
||||
self.csrf = None
|
||||
|
||||
def __query(self, query: str, method: str = 'GET', payload: dict = None, auth_need: bool = True):
|
||||
headers = dict()
|
||||
|
||||
# check if http method valid
|
||||
valid_methods = ['GET', 'POST', 'PUT', 'DELETE']
|
||||
if method not in valid_methods:
|
||||
logging.error(f'http {method=} is none of {valid_methods=}')
|
||||
return False
|
||||
|
||||
# refresh auth if required and set auth info
|
||||
if auth_need:
|
||||
if not self._check_auth():
|
||||
self.do_auth()
|
||||
headers['X-FTL-SID'] = self.sid
|
||||
headers['X-FTL-CSRF'] = self.csrf
|
||||
|
||||
# create url
|
||||
url = 'http'
|
||||
if self.ssl:
|
||||
url += 's'
|
||||
url += f'://{self.url}:{self.port}/api/{query}'
|
||||
|
||||
if payload:
|
||||
response = requests.request(method=method, url=url, headers=headers, json=payload, verify=False)
|
||||
else:
|
||||
response = requests.request(method=method, url=url, headers=headers, verify=False)
|
||||
|
||||
match response.status_code:
|
||||
case (200):
|
||||
# comment: successfull request
|
||||
return json.loads(response.content)
|
||||
case (204):
|
||||
# comment: successfull (e.g. auth delete)
|
||||
return True
|
||||
case (_):
|
||||
# comment: default fallback
|
||||
print()
|
||||
print(response.__dict__)
|
||||
return False
|
||||
#
|
||||
## Auth based methods
|
||||
#
|
||||
def need_auth(self) -> bool:
|
||||
"""generate a valid session if no auth required
|
||||
# TODO: better handling if not successful
|
||||
|
||||
Returns:
|
||||
bool: True if a valid session created
|
||||
"""
|
||||
data = self.__query('auth', auth_need=False)
|
||||
if not data:
|
||||
logging.error(f'can not get a valid session without password')
|
||||
return False
|
||||
if data['session']['valid']:
|
||||
self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity'])
|
||||
self.sid = data['session']['sid']
|
||||
self.csrf = data['session']['csrf']
|
||||
logging.info(f'successfull new auth created until {self.valid_auth}')
|
||||
return True
|
||||
return False
|
||||
|
||||
def _check_auth(self) -> bool:
|
||||
"""check if an auth valid (in time)
|
||||
|
||||
Returns:
|
||||
bool: True if auth valid
|
||||
"""
|
||||
now = datetime.now()
|
||||
if datetime.now() < self.valid_auth:
|
||||
return True
|
||||
return False
|
||||
|
||||
def do_auth(self) -> bool:
|
||||
"""auth via password to get sid / csrf
|
||||
|
||||
# TODO: TOTP support
|
||||
|
||||
Returns:
|
||||
bool: True if auth successfull
|
||||
"""
|
||||
payload = {'password': self.passwd}
|
||||
data = self.__query('auth', 'POST', payload, auth_need=False)
|
||||
if not data:
|
||||
raise PermissionError(f'Authentication not possible')
|
||||
if data['session']['valid']:
|
||||
self.valid_auth = datetime.now() + timedelta(seconds=data['session']['validity'])
|
||||
self.sid = data['session']['sid']
|
||||
self.csrf = data['session']['csrf']
|
||||
logging.info(f'successfull new auth created until {self.valid_auth}')
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_sessions(self) -> list | None:
|
||||
"""List of all current sessions
|
||||
|
||||
Returns:
|
||||
list | None: list of sessions if successfull query
|
||||
"""
|
||||
data = self.__query('auth/sessions')
|
||||
if data:
|
||||
return data['sessions']
|
||||
return False
|
||||
|
||||
def delete_session(self, session_id: int = None) -> bool:
|
||||
"""Delete session by ID
|
||||
|
||||
Args:
|
||||
session_id (int, optional): Session ID get from get_sessions. Defaults to None.
|
||||
|
||||
Returns:
|
||||
bool: True if session delete successfull
|
||||
"""
|
||||
if session_id is None:
|
||||
data = self.get_sessions()
|
||||
if not data:
|
||||
return False
|
||||
for session in data:
|
||||
if session['current_session']:
|
||||
return self.delete_session(session['id'])
|
||||
if self.__query(f'auth/session/{session_id}', 'DELETE'):
|
||||
logging.info(f'successfull delete session with {session_id=}')
|
||||
return True
|
||||
|
||||
def get_app_password(self) -> dict | None:
|
||||
"""Create new application password
|
||||
|
||||
This password can be used for API instet of regular password an TOTP.
|
||||
All sessions will be delte if this password set!
|
||||
# TODO: set password (hash) to PATCH /api/config/webserver/api/app_pwhash
|
||||
|
||||
Returns:
|
||||
dict | None: if successful: app password and hash
|
||||
"""
|
||||
data = self.__query('auth/app')
|
||||
return data['app']
|
||||
|
||||
|
||||
def main():
|
||||
pi = PiholeAPI(url='localhost', passwd='my-password!')
|
||||
pi.get_sessions()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user