308 lines
10 KiB
Python
308 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
https://github.com/wolveix/satisfactory-server/wiki/Official-API-Docs
|
|
https://satisfactory.wiki.gg/wiki/Dedicated_servers/HTTPS_API
|
|
"""
|
|
|
|
import requests
|
|
import logging
|
|
import json
|
|
import yaml
|
|
from os.path import exists
|
|
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
class SatiAPI:
|
|
"""A API wrapper for Satisfactory dedicated server"""
|
|
__AUTHOR__ = 'anima'
|
|
__VERSION__ = '0.4.3'
|
|
|
|
def __init__(self, host: str = None, port: int = 7777, token: str = None, conffile: str = 'conf.yml', logfile: str = 'SatiAPI.log', loglevel: int = 20) -> None:
|
|
"""create a wrapper for satisfactory dedicated server
|
|
|
|
:param host: host ip or dns
|
|
:type host: string
|
|
:param port: port of host
|
|
:type port: int | str
|
|
:pram token: token for auth on server (can be get with `get_token`)
|
|
:type token: string
|
|
:param conffile: path to config file in yaml format
|
|
:type conffile: string
|
|
:param logfile: path to logfile
|
|
:type logfile: string
|
|
:param loglevel: level of logging (10 => debug, 20 => info (default), 30 => warning, 40 => error)
|
|
|
|
all server specific settings can be set via config file
|
|
"""
|
|
self._log = logging.getLogger(__name__)
|
|
logHandler = logging.FileHandler(logfile)
|
|
logPrinter = logging.StreamHandler()
|
|
logFormat = logging.Formatter('%(asctime)s %(levelname)s \t %(name)s : %(message)s')
|
|
logHandler.setFormatter(logFormat) # Format for File
|
|
logPrinter.setFormatter(logFormat) # Format for Console
|
|
self._log.addHandler(logHandler)
|
|
self._log.addHandler(logPrinter)
|
|
self._log.setLevel(loglevel)
|
|
|
|
self.host = host
|
|
self.port = port
|
|
self.token = token
|
|
self.__map_config(conffile)
|
|
|
|
## propertys
|
|
@property
|
|
def host(self):
|
|
return self.__host
|
|
|
|
@host.setter
|
|
def host(self, value) -> bool:
|
|
"""setter method for host. check if value valid.
|
|
|
|
:param value: host ip or dns to dedicated server
|
|
:type value: string
|
|
:return: true if set successfull
|
|
:rtype: bool
|
|
"""
|
|
if isinstance(value, str) or value is None:
|
|
self.__host = value
|
|
return True
|
|
else:
|
|
self._log.debug(f'wrong {value=} ({type(value)}) for host')
|
|
return False
|
|
|
|
@property
|
|
def port(self):
|
|
return self.__port
|
|
|
|
@port.setter
|
|
def port(self, value) -> bool:
|
|
"""setter method for port. check if value a valid network port
|
|
|
|
:param value: port of dedicated server instance
|
|
:type value: int | str
|
|
:return: true if set successfull
|
|
:rtype: bool
|
|
"""
|
|
if not isinstance(value, int):
|
|
if isinstance(value, str):
|
|
try:
|
|
value = int(value)
|
|
except ValueError:
|
|
self._log.debug(f'wrong {value=} ({type(value)}) for port')
|
|
return False
|
|
else:
|
|
self._log.debug(f'wrong {value=} ({type(value)}) for port')
|
|
return False
|
|
|
|
if value > 0 and value < 65535:
|
|
self.__port = value
|
|
return True
|
|
else:
|
|
self._log.debug(f'out of port range {value=}')
|
|
return False
|
|
|
|
@property
|
|
def token(self):
|
|
return self.__token
|
|
|
|
@token.setter
|
|
def token(self, value) -> bool:
|
|
"""setter method for token. check if value a valid token
|
|
|
|
:param value: token for auth on server
|
|
:type value: string
|
|
:return: true if set successfull
|
|
:rtype: bool
|
|
"""
|
|
if isinstance(value, str) or value is None:
|
|
self.__token = value
|
|
return True
|
|
else:
|
|
self._log.debug(f'wrong {value=} ({type(value)}) for token')
|
|
|
|
## methods
|
|
def __map_config(self, conffile) -> None:
|
|
"""map config settings with multible usage in this class override only default values not changed.
|
|
|
|
:param conffile: path to config file in yaml format
|
|
:type conffile: string
|
|
"""
|
|
if exists(conffile):
|
|
with open(conffile, 'r') as f:
|
|
self._config = yaml.safe_load(f.read())
|
|
|
|
if 'host' in self._config and self.host is None:
|
|
self.host = self._config['host']
|
|
|
|
if 'port' in self._config and self.port == 7777:
|
|
self.port = self._config['port']
|
|
|
|
if 'auth' in self._config:
|
|
if 'token' in self._config['auth'] and self.token is None:
|
|
self.token = self._config['auth']['token']
|
|
|
|
def __query(self, query: str, data: dict = None, auth: bool = False) -> dict | bool | None:
|
|
"""run a query to satisfacoty dedicated server
|
|
|
|
:param query: name of querytype
|
|
:type query: string
|
|
:param data: (maybe optional) data to be send as dict
|
|
:type data: dict
|
|
:param auth: set true if authentication requiered
|
|
:type auth: bool
|
|
:return: response from server
|
|
:rtype: dict | None | bool
|
|
"""
|
|
payload = dict()
|
|
payload['function'] = query
|
|
|
|
if data is not None:
|
|
payload['data'] = data
|
|
|
|
headers = dict()
|
|
headers['Content-Type'] = 'application/json'
|
|
if auth:
|
|
if self.token is None:
|
|
print(self.token)
|
|
self.get_token()
|
|
|
|
if self.token is None:
|
|
print(self.token)
|
|
headers['Authorization'] = f'Bearer {self.token}'
|
|
else:
|
|
self._log.error('no data to authencitate')
|
|
return False
|
|
|
|
response = requests.post(f'https://{self.host}:{self.port}/api/v1', headers=headers, json=payload, verify=False)
|
|
if response.status_code == 200:
|
|
json_response = json.loads(response.text)
|
|
if 'data' in json_response:
|
|
return json_response
|
|
else:
|
|
#or if 403
|
|
# if json_response['errorMessage'] == '':
|
|
# self._log.debug(f'token expired')
|
|
# self.get_token()
|
|
self._log.error(f'wrong response data {json_response=}')
|
|
|
|
else:
|
|
self._log.error(f'non successfull response {response.content} [{response.status_code}]')
|
|
self._log.debug(f'{response.request.url=}')
|
|
self._log.debug(f'{response.request.headers=}')
|
|
self._log.debug(f'{response.request.body=}')
|
|
return False
|
|
|
|
|
|
def get_token(self, password: str = None, privilegeLevel: str = 'Administrator') -> bool:
|
|
"""get token from satisfacory dedicated server, password is needed!
|
|
Attempts to log in to the Dedicated Server as a player using either Admin Password or Client Protection Password. This function requires no Authentication.
|
|
|
|
:param password: password of admin level
|
|
:type password: string
|
|
:param privilegeLevel: level of authentication
|
|
:type privilegeLevel: string
|
|
:return: true if successfull get a token
|
|
:rtype: bool
|
|
"""
|
|
if self.token is not None:
|
|
self._log.warning('you have already a token')
|
|
if isinstance(password, str):
|
|
self._log.debug('password used from parameter')
|
|
else:
|
|
if self._config is not None and 'auth' in self._config:
|
|
if 'password' in self._config['auth']:
|
|
password = self._config['auth']['password']
|
|
self._log.debug('password used from conf file')
|
|
else:
|
|
self._log('missing information in config file under auth')
|
|
|
|
if 'privilegeLevel' in self._config['auth']:
|
|
privilegeLevel = self._config['auth']['privilegeLevel']
|
|
self._log.debug('privilege level used from conff ile')
|
|
|
|
else:
|
|
self._log.error(f'no password for auth given!')
|
|
return False
|
|
|
|
data = dict()
|
|
data['password'] = password
|
|
data['MinimumPrivilegeLevel'] = privilegeLevel
|
|
|
|
response = self.__query('PasswordLogin', data)
|
|
if response:
|
|
self.token = response['authenticationToken']
|
|
return True
|
|
else:
|
|
return response
|
|
|
|
def get_status(self) -> dict:
|
|
"""Retrieves the current state of the Dedicated Server. Does not require any input parameters.
|
|
|
|
:return: full return from server
|
|
:rtpye: dict
|
|
"""
|
|
return self.__query('QueryServerState')
|
|
|
|
def get_health(self, ClientCustomData: str = '') -> dict:
|
|
"""get health check
|
|
|
|
:param ClientCustomData: Custom Data passed from the Game Client or Third Party service. Not used by vanilla Dedicated Servers
|
|
:type ClientCustomData: string
|
|
:return: full return from server
|
|
:rtpye: dict
|
|
"""
|
|
data = dict()
|
|
data['ClientCustomData'] = ClientCustomData
|
|
|
|
return self.__query('HealthCheck', data)
|
|
|
|
def get_server_options(self) -> dict:
|
|
"""Retrieves currently applied server options and server options that are still pending application (because of needing session or server restart) Does not require input parameters.
|
|
|
|
:return: full return from server
|
|
:rtpye: dict
|
|
"""
|
|
return self.__query('GetServerOptions')
|
|
|
|
def get_advanced_game_settings(self):
|
|
"""Retrieves currently applied advanced game settings. Does not require input parameters.
|
|
|
|
:return: full return from server
|
|
:rtpye: dict
|
|
"""
|
|
return self.__query('GetAdvancedGameSettings')
|
|
|
|
|
|
|
|
|
|
|
|
## todo
|
|
#PasswordlessLogin
|
|
#ClaimServer
|
|
#RenameServer
|
|
#SetClientPassword
|
|
#SetAdminPassword
|
|
#SetAutoLoadSessionName
|
|
#ApplyServerOptions
|
|
#ApplyAdvancedGameSettings
|
|
#RunCommand
|
|
#Shutdown
|
|
#CreateNewGame
|
|
#SaveGame
|
|
#DeleteSaveFile
|
|
#DeleteSaveSession
|
|
#EnumerateSessions
|
|
#LoadGame
|
|
#UploadSaveGame
|
|
#DownloadSaveGame
|
|
|
|
if __name__ == "__main__":
|
|
sati = SatiAPI(loglevel=10)
|
|
# print('Verbundene Spieler: ', sati.get_status()['serverGameState']['numConnectedPlayers'])
|
|
# print(sati.get_token())
|
|
# print(sati.token)
|
|
# print(sati.get_status())
|
|
print(sati.claim_server('SDGame01', 'Admin123!'))
|
|
|
|
# print(sati.get_status()) |