198 lines
6.7 KiB
Python
198 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import logging
|
|
|
|
logging.basicConfig(format='[%(asctime)s] %(process)s %(levelname)s {%(filename)s:%(lineno)d} - %(message)s', level=logging.DEBUG)
|
|
|
|
try:
|
|
import cv2
|
|
import mss
|
|
import numpy
|
|
from PIL import Image
|
|
except ImportError as e:
|
|
logging.error(f"Missing dependency: {e}")
|
|
logging.error("Install with: pip install mss Pillow opencv-python numpy")
|
|
sys.exit(1)
|
|
|
|
from .command import Command
|
|
|
|
|
|
class Window:
|
|
__AUTHOR__ = 'anima'
|
|
__VERSION__ = '1.0.0'
|
|
|
|
key_press_duration = 0.1
|
|
supported_keys = ['return', 'Up', 'Down', 'Right', 'Left']
|
|
|
|
@staticmethod
|
|
def check_requrements() -> None:
|
|
## xdotool
|
|
try:
|
|
subprocess.run(["xdotool"], capture_output=True)
|
|
except FileNotFoundError:
|
|
logging.error("xdotool not found. Install it first.")
|
|
sys.exit(1)
|
|
|
|
def __init__(self, window_name: str, monitor_id: int = 1, poll_intervall: float = 1):
|
|
self.check_requrements()
|
|
self.window_name = window_name
|
|
self.monitor_id = monitor_id
|
|
self.window_id = None
|
|
self.poll_intervall = poll_intervall
|
|
self.commands = list()
|
|
self.dry_run = False
|
|
|
|
|
|
def run(self):
|
|
if not len(self.commands) > 0:
|
|
logging.error(f'no commands to run found')
|
|
sys.exit(1)
|
|
with mss.mss() as self.sct:
|
|
while True:
|
|
command = self.match_screen()
|
|
# print(self.match_screen())
|
|
if command:
|
|
self.activate_window()
|
|
for key in command.commands:
|
|
if isinstance(key, str):
|
|
self.send_key(key)
|
|
elif isinstance(key, int) or isinstance(key, float):
|
|
time.sleep(key)
|
|
time.sleep(self.poll_intervall)
|
|
|
|
## Screen Managament
|
|
def grab_screen(self) -> numpy.ndarray:
|
|
"""Capture screen and return as normalized BGR numpy array.
|
|
|
|
Returns:
|
|
numpy.ndarray: screenshot converted as matchable
|
|
"""
|
|
screenshot = self.sct.grab(self.sct.monitors[self.monitor_id])
|
|
screen = numpy.array(screenshot)
|
|
screen = cv2.cvtColor(screen, cv2.COLOR_BGRA2BGR)
|
|
return screen
|
|
|
|
def match_screen(self) -> Command | None:
|
|
"""match screenshot with all known commands
|
|
|
|
Returns:
|
|
Command | None: command with hightest screen/image match
|
|
"""
|
|
best_match = None
|
|
best_pos = None
|
|
best_value = 0.0
|
|
screen = self.grab_screen()
|
|
|
|
screen_gray = cv2.cvtColor(screen, cv2.COLOR_BGR2GRAY)
|
|
for command in self.commands:
|
|
img_h, img_w = command.image.shape[:2]
|
|
screen_h, screen_w = screen.shape[:2]
|
|
|
|
if img_h > screen_h or img_w > screen_w:
|
|
continue
|
|
|
|
img_gray = cv2.cvtColor(command.image, cv2.COLOR_BGR2GRAY)
|
|
result = cv2.matchTemplate(screen_gray, img_gray, cv2.TM_CCOEFF_NORMED)
|
|
_, match_value, _, match_pos = cv2.minMaxLoc(result)
|
|
|
|
if match_value >= command.threshold and match_value > best_value:
|
|
logging.debug(f'new best match: {command.name} ({match_value=}%, {match_pos=})')
|
|
best_match = command
|
|
best_value = match_value
|
|
best_pos = match_pos
|
|
|
|
if best_match:
|
|
return best_match
|
|
return None
|
|
|
|
## Window Management
|
|
def _get_window_id(self) -> bool:
|
|
"""Find window ID by name using xdotool.
|
|
|
|
Returns:
|
|
bool: True if window found and id saved
|
|
"""
|
|
result = subprocess.run(
|
|
["xdotool", "search", "--name", self.window_name],
|
|
capture_output=True, text=True
|
|
)
|
|
ids = result.stdout.strip().splitlines()
|
|
logging.debug(f'found ids with string {self.window_name=}: {ids=}')
|
|
if ids:
|
|
# last match is usually the main window
|
|
self.window_id = ids[-1]
|
|
logging.debug(f'used id: {self.window_id=}')
|
|
return True
|
|
logging.debug(f'no matching ids found for {self.window_name=}')
|
|
return False
|
|
|
|
def activate_window(self) -> bool:
|
|
"""activate given window
|
|
|
|
Returns:
|
|
bool: True if window found and activate
|
|
"""
|
|
# TODO: replace search with id ?
|
|
if self.dry_run:
|
|
logging.info(f'dry-run: activate window {self.window_name=}')
|
|
return True
|
|
|
|
# TODO: window_id insteat?
|
|
result = subprocess.run(
|
|
['xdotool', 'search', '--name', self.window_name, 'windowactivate'],
|
|
capture_output=True, text=True
|
|
)
|
|
|
|
if result.returncode: return True
|
|
return False
|
|
|
|
## Key Management
|
|
def send_key(self, key_name: str = None) -> bool:
|
|
"""Send a key via xdotool, optionally targeting a specific window.
|
|
|
|
Args:
|
|
key_name (str, optional): xdotool name of key to press. Defaults to None.
|
|
|
|
Returns:
|
|
bool: True if key send successfully
|
|
"""
|
|
if not isinstance(key_name, str):
|
|
logging.error(f'given key is not a string: {key_name=} ({type(key_name)})')
|
|
return False
|
|
#=> disabled because no need atm
|
|
# if not key_name in self.supported_keys:
|
|
# logging.error(f'given key is not a xdotool {key_name=}, supoprtet keys a: {self.supportet_keys}')
|
|
# return False
|
|
if self.dry_run:
|
|
logging.info(f'dry-run: send {key_name=}')
|
|
return True
|
|
|
|
if self.window_id:
|
|
logging.debug(f'press {key_name=} for window {self.window_id=} ({self.window_name=})')
|
|
key_down = subprocess.run(["xdotool", "keydown", "--window", self.window_id, key_name])
|
|
time.sleep(self.key_press_duration)
|
|
key_up = subprocess.run(["xdotool", "keyup", "--window", self.window_id, key_name])
|
|
if key_down.returncode and key_up.returncode:
|
|
return True
|
|
else: # TODO: only if window_id known ?
|
|
logging.debug(f'press {key_name=}')
|
|
key_down = subprocess.run(["xdotool", "keydown", key_name])
|
|
time.sleep(self.key_press_duration)
|
|
key_up = subprocess.run(["xdotool", "keyup", key_name])
|
|
if key_down.returncode and key_up.returncode:
|
|
return True
|
|
return False
|
|
|
|
def main():
|
|
check_requrements()
|
|
if 1 <= args.verbose <= 3:
|
|
logging.getLogger().setLevel(40 - (args.verbose * 10))
|
|
game = Saki('Final Fantasy X')
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|