From 33c53e5bca860b7692953b2a7cabe9b4da7c0429 Mon Sep 17 00:00:00 2001
From: anima
Date: Sat, 21 Sep 2024 19:34:23 +0200
Subject: [PATCH] inital version of satisfactory api wrapper
---
.gitignore | 4 +
SatiAPI.py | 227 +++++++++++++++++++++++++++++++++++++++++++++++
conf.yml.sample | 6 ++
requirements.txt | 2 +
4 files changed, 239 insertions(+)
create mode 100644 SatiAPI.py
create mode 100644 conf.yml.sample
create mode 100644 requirements.txt
diff --git a/.gitignore b/.gitignore
index 5d381cc..c6317ae 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,7 @@
+*.conf
+*.ymal
+*.yml
+
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
diff --git a/SatiAPI.py b/SatiAPI.py
new file mode 100644
index 0000000..906dcdd
--- /dev/null
+++ b/SatiAPI.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python3
+"""
+https://github.com/wolveix/satisfactory-server/wiki/Official-API-Docs
+https://satisfactory.wiki.gg/wiki/Dedicated_servers/HTTPS_API
+"""
+
+import requests
+import logging
+import json
+import yaml
+from os.path import exists
+
+import urllib3
+urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
+
+class SatiAPI:
+ """A API wrapper for Satisfactory dedicated server"""
+ __AUTHOR__ = 'anima'
+ __VERSION__ = '0.1.0'
+
+ def __init__(self, host: str = None, port: int = 7777, token: str = None, conffile: str = 'conf.yml', logfile: str = 'SatiAPI.log', loglevel: int = 20) -> None:
+ """create a wrapper for satisfactory dedicated server
+
+ :param host: host ip or dns
+ :type host: string
+ :param port: port of host
+ :type port: int | str
+ :pram token: token for auth on server (can be get with `get_token`)
+ :type token: string
+ :param conffile: path to config file in yaml format
+ :type conffile: string
+ :param logfile: path to logfile
+ :type logfile: string
+ :param loglevel: level of logging (10 => debug, 20 => info (default), 30 => warning, 40 => error)
+
+ all server specific settings can be set via config file
+ """
+ self._log = logging.getLogger(__name__)
+ logHandler = logging.FileHandler(logfile)
+ logPrinter = logging.StreamHandler()
+ logFormat = logging.Formatter('%(asctime)s %(levelname)s \t %(name)s : %(message)s')
+ logHandler.setFormatter(logFormat) # Format for File
+ logPrinter.setFormatter(logFormat) # Format for Console
+ self._log.addHandler(logHandler)
+ self._log.addHandler(logPrinter)
+ self._log.setLevel(loglevel)
+
+ self.host = host
+ self.port = port
+ self.token = token
+ self.__map_config(conffile)
+
+ ## propertys
+ @property
+ def host(self):
+ return self.__host
+
+ @host.setter
+ def host(self, value) -> bool:
+ """setter method for host. check if value valid.
+
+ :param value: host ip or dns to dedicated server
+ :type value: string
+ :return: true if set successfull
+ :rtype: bool
+ """
+ if isinstance(value, str) or value is None:
+ self.__host = value
+ return True
+ else:
+ self._log.debug(f'wrong {value=} ({type(value)}) for host')
+ return False
+
+ @property
+ def port(self):
+ return self.__port
+
+ @port.setter
+ def port(self, value) -> bool:
+ """setter method for port. check if value a valid network port
+
+ :param value: port of dedicated server instance
+ :type value: int | str
+ :return: true if set successfull
+ :rtype: bool
+ """
+ if not isinstance(value, int):
+ if isinstance(value, str):
+ try:
+ value = int(value)
+ except ValueError:
+ self._log.debug(f'wrong {value=} ({type(value)}) for port')
+ return False
+ else:
+ self._log.debug(f'wrong {value=} ({type(value)}) for port')
+ return False
+
+ if value > 0 and value < 65535:
+ self.__port = value
+ return True
+ else:
+ self._log.debug(f'out of port range {value=}')
+ return False
+
+ @property
+ def token(self):
+ return self.__token
+
+ @token.setter
+ def token(self, value) -> bool:
+ """setter method for token. check if value a valid token
+
+ :param value: token for auth on server
+ :type value: string
+ :return: true if set successfull
+ :rtype: bool
+ """
+ if isinstance(value, str) or value is None:
+ self.__token = value
+ return True
+ else:
+ self._log.debug(f'wrong {value=} ({type(value)}) for token')
+
+ ## methods
+ def __map_config(self, conffile) -> None:
+ """map config settings with multible usage in this class override only default values not changed.
+
+ :param conffile: path to config file in yaml format
+ :type conffile: string
+ """
+ if exists(conffile):
+ with open(conffile, 'r') as f:
+ self._config = yaml.safe_load(f.read())
+
+ if 'host' in self._config and self.host is None:
+ self.host = self._config['host']
+
+ if 'port' in self._config and self.port == 7777:
+ self.port = self._config['port']
+
+ if 'auth' in self._config:
+ if 'token' in self._config['auth'] and self.token is None:
+ self.token = self._config['auth']['token']
+
+ def __query(self, query: str, data: dict = None) -> dict | bool | None:
+ """run a query to satisfacoty dedicated server
+
+ :param query: name of querytype
+ :type query: string
+ :param data: (maybe optional) data to be send as dict
+ :type data: dict
+ :return: response from server striped from "data" wrapper or False
+ :rtype: dict | None | bool
+ """
+ payload = dict()
+ payload['function'] = query
+
+ if data is not None:
+ payload['data'] = data
+
+ headers = dict()
+ headers['Content-Type'] = 'application/json'
+ if self.token is not None:
+ headers['Authorization'] = f'Bearer {self.token}'
+
+ response = requests.post(f'https://{self.host}:7777/api/v1', headers=headers, json=payload, verify=False)
+ if response.status_code == 200:
+ return json.loads(response.text)['data']
+ else:
+ self._log.error(f'non successfull response {response.content} [{response.status_code}]')
+ self._log.debug(f'{response.request.url=}')
+ self._log.debug(f'{response.request.headers=}')
+ self._log.debug(f'{response.request.body=}')
+ return False
+
+
+ def get_token(self, password: str = None, privilegeLevel: str = 'Administrator') -> bool:
+ """get token from satisfacory dedicated server, password is needed!
+
+ :param password: password of admin level
+ :type password: string
+ :param privilegeLevel: level of authentication
+ :type privilegeLevel: string
+ :return: true if successfull get a token
+ :rtype: bool
+ """
+ if self.token is not None:
+ self._log.warning('you have already a token')
+ return True
+ if isinstance(password, str):
+ self._log.debug('password used from parameter')
+ else:
+ if self._config is not None and 'auth' in self._config:
+ if 'password' in self._config['auth']:
+ password = self._config['auth']['password']
+ self._log.debug('password used from conf file')
+ else:
+ self._log('missing information in config file under auth')
+
+ if 'privilegeLevel' in self._config['auth']:
+ privilegeLevel = self._config['auth']['privilegeLevel']
+ self._log.debug('privilege level used from conff ile')
+
+ else:
+ self._log.error(f'no password for auth given!')
+ return False
+
+ data = dict()
+ data['password'] = password
+ data['MinimumPrivilegeLevel'] = privilegeLevel
+
+ response = self.__query('PasswordLogin', data)
+ self.token = response['authenticationToken']
+ return True
+
+ def get_status(self) -> dict:
+ """get basis status of satisfacory server
+
+ :return: full return from server
+ :rtpye: dict
+ """
+ return self.__query('QueryServerState')
+
+
+if __name__ == "__main__":
+ sati = SatiAPI(loglevel=10)
+ print('Verbundene Spieler: ', sati.get_status()['serverGameState']['numConnectedPlayers'])
\ No newline at end of file
diff --git a/conf.yml.sample b/conf.yml.sample
new file mode 100644
index 0000000..d6534e9
--- /dev/null
+++ b/conf.yml.sample
@@ -0,0 +1,6 @@
+host: ServerIP
+port: 7777
+auth:
+ token: myToken
+ password: myAdminPassword # not need if token existend
+ privilegeLevel: Administrator
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..b3663cb
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,2 @@
+requests
+PyYAML
\ No newline at end of file