Files
SatiAPI/SatiAPI.py
2024-09-21 21:00:50 +02:00

321 lines
11 KiB
Python

#!/usr/bin/env python3
"""
https://github.com/wolveix/satisfactory-server/wiki/Official-API-Docs
https://satisfactory.wiki.gg/wiki/Dedicated_servers/HTTPS_API
"""
import requests
import logging
import json
import yaml
from os.path import exists
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class SatiAPI:
"""A API wrapper for Satisfactory dedicated server"""
__AUTHOR__ = 'anima'
__VERSION__ = '0.5.2'
def __init__(self, host: str = None, port: int = 7777, token: str = None, conffile: str = 'conf.yml', logfile: str = 'SatiAPI.log', loglevel: int = 20) -> None:
"""create a wrapper for satisfactory dedicated server
:param host: host ip or dns
:type host: string
:param port: port of host
:type port: int | str
:pram token: token for auth on server (can be get with `get_token`)
:type token: string
:param conffile: path to config file in yaml format
:type conffile: string
:param logfile: path to logfile
:type logfile: string
:param loglevel: level of logging (10 => debug, 20 => info (default), 30 => warning, 40 => error)
all server specific settings can be set via config file
"""
self._log = logging.getLogger(__name__)
logHandler = logging.FileHandler(logfile)
logPrinter = logging.StreamHandler()
logFormat = logging.Formatter('%(asctime)s %(levelname)s \t %(name)s : %(message)s')
logHandler.setFormatter(logFormat) # Format for File
logPrinter.setFormatter(logFormat) # Format for Console
self._log.addHandler(logHandler)
self._log.addHandler(logPrinter)
self._log.setLevel(loglevel)
self.host = host
self.port = port
self.token = token
self.__map_config(conffile)
## propertys
@property
def host(self):
return self.__host
@host.setter
def host(self, value) -> bool:
"""setter method for host. check if value valid.
:param value: host ip or dns to dedicated server
:type value: string
:return: true if set successfull
:rtype: bool
"""
if isinstance(value, str) or value is None:
self.__host = value
return True
else:
self._log.debug(f'wrong {value=} ({type(value)}) for host')
return False
@property
def port(self):
return self.__port
@port.setter
def port(self, value) -> bool:
"""setter method for port. check if value a valid network port
:param value: port of dedicated server instance
:type value: int | str
:return: true if set successfull
:rtype: bool
"""
if not isinstance(value, int):
if isinstance(value, str):
try:
value = int(value)
except ValueError:
self._log.debug(f'wrong {value=} ({type(value)}) for port')
return False
else:
self._log.debug(f'wrong {value=} ({type(value)}) for port')
return False
if value > 0 and value < 65535:
self.__port = value
return True
else:
self._log.debug(f'out of port range {value=}')
return False
@property
def token(self):
return self.__token
@token.setter
def token(self, value) -> bool:
"""setter method for token. check if value a valid token
:param value: token for auth on server
:type value: string
:return: true if set successfull
:rtype: bool
"""
if isinstance(value, str) or value is None:
self.__token = value
return True
else:
self._log.debug(f'wrong {value=} ({type(value)}) for token')
## methods
def __map_config(self, conffile) -> None:
"""map config settings with multible usage in this class override only default values not changed.
:param conffile: path to config file in yaml format
:type conffile: string
"""
if exists(conffile):
with open(conffile, 'r') as f:
self._config = yaml.safe_load(f.read())
if 'host' in self._config and self.host is None:
self.host = self._config['host']
if 'port' in self._config and self.port == 7777:
self.port = self._config['port']
if 'auth' in self._config:
if 'token' in self._config['auth'] and self.token is None:
self.token = self._config['auth']['token']
def __query(self, query: str, data: dict = None, auth: bool = False) -> dict | bool | None:
"""run a query to satisfacoty dedicated server
:param query: name of querytype
:type query: string
:param data: (maybe optional) data to be send as dict
:type data: dict
:param auth: set true if authentication requiered
:type auth: bool
:return: response from server
:rtype: dict | None | bool
"""
payload = dict()
payload['function'] = query
if data is not None:
payload['data'] = data
headers = dict()
headers['Content-Type'] = 'application/json'
if auth:
if self.token is None:
self.get_token()
if self.token is None:
headers['Authorization'] = f'Bearer {self.token}'
else:
self._log.error('no data to authencitate')
return False
response = requests.post(f'https://{self.host}:{self.port}/api/v1', headers=headers, json=payload, verify=False)
if response.status_code == 200:
json_response = json.loads(response.text)
if 'data' in json_response:
return json_response
else:
# if json_response['errorMessage'] == '':
# self._log.debug(f'token expired')
# self.get_token()
self._log.error(f'wrong response data {json_response=}')
else:
self._log.error(f'non successfull response {response.content} [{response.status_code}]')
self._log.debug(f'{response.request.url=}')
self._log.debug(f'{response.request.headers=}')
self._log.debug(f'{response.request.body=}')
return False
def _passwordless_login(self, privilegeLevel: str = 'InitialAdmin') -> bool:
"""Attempts to perform a passwordless login to the Dedicated Server as a player. Passwordless login is possible if the Dedicated Server is not claimed, or if Client Protection Password is not set for the Dedicated Server. This function requires no Authentication.
:param privilegeLevel: Minimum privilege level to attempt to acquire by logging in. See Privilege Level enum for possible values
:type privilegeLevel: string
:return: true if successfull
:rtype: bool
"""
data = dict()
data['MinimumPrivilegeLevel'] = privilegeLevel
response = self.__query('PasswordlessLogin', data)
if response:
self.token = response['data']['authenticationToken']
return True
else:
return response
def get_token(self, password: str = None, privilegeLevel: str = 'Administrator') -> bool:
"""get token from satisfacory dedicated server, password is needed!
Attempts to log in to the Dedicated Server as a player using either Admin Password or Client Protection Password. This function requires no Authentication.
:param password: password of admin level
:type password: string
:param privilegeLevel: level of authentication
:type privilegeLevel: string
:return: true if successfull get a token
:rtype: bool
"""
if self.token is not None:
self._log.warning('you have already a token')
if isinstance(password, str):
self._log.debug('password used from parameter')
else:
if self._config is not None and 'auth' in self._config:
if 'password' in self._config['auth']:
password = self._config['auth']['password']
self._log.debug('password used from conf file')
else:
self._log('missing information in config file under auth')
if 'privilegeLevel' in self._config['auth']:
privilegeLevel = self._config['auth']['privilegeLevel']
self._log.debug('privilege level used from conff ile')
else:
self._log.error(f'no password for auth given!')
return False
data = dict()
data['password'] = password
data['MinimumPrivilegeLevel'] = privilegeLevel
response = self.__query('PasswordLogin', data)
if response:
self.token = response['data']['authenticationToken']
return True
else:
return response
def get_status(self) -> dict:
"""Retrieves the current state of the Dedicated Server. Does not require any input parameters.
:return: full return from server
:rtpye: dict
"""
return self.__query('QueryServerState')
def get_health(self, ClientCustomData: str = '') -> dict:
"""get health check
:param ClientCustomData: Custom Data passed from the Game Client or Third Party service. Not used by vanilla Dedicated Servers
:type ClientCustomData: string
:return: full return from server
:rtpye: dict
"""
data = dict()
data['ClientCustomData'] = ClientCustomData
return self.__query('HealthCheck', data)
def get_server_options(self) -> dict:
"""Retrieves currently applied server options and server options that are still pending application (because of needing session or server restart) Does not require input parameters.
:return: full return from server
:rtpye: dict
"""
return self.__query('GetServerOptions')
def get_advanced_game_settings(self):
"""Retrieves currently applied advanced game settings. Does not require input parameters.
:return: full return from server
:rtpye: dict
"""
return self.__query('GetAdvancedGameSettings')
## todo
#PasswordlessLogin
#ClaimServer
#RenameServer
#SetClientPassword
#SetAdminPassword
#SetAutoLoadSessionName
#ApplyServerOptions
#ApplyAdvancedGameSettings
#RunCommand
#Shutdown
#CreateNewGame
#SaveGame
#DeleteSaveFile
#DeleteSaveSession
#EnumerateSessions
#LoadGame
#UploadSaveGame
#DownloadSaveGame
if __name__ == "__main__":
sati = SatiAPI(loglevel=10)
# print('Verbundene Spieler: ', sati.get_status()['serverGameState']['numConnectedPlayers'])
# print(sati.get_token())
# print(sati.token)
# print(sati.get_status())
print(sati._passwordless_login())
# print(sati.claim_server('SDGame01', 'Admin123!'))
# print(sati.get_status())