begin adding static types to backend code

This commit is contained in:
AAGaming
2023-08-26 22:06:01 -04:00
parent ae399b8c0e
commit ecc5f5c2fa
9 changed files with 189 additions and 153 deletions

View File

@@ -4,53 +4,70 @@ import json
# from pprint import pformat
# Partial imports
from aiohttp import ClientSession, web
from asyncio import get_event_loop, sleep
from concurrent.futures import ProcessPoolExecutor
from aiohttp import ClientSession
from asyncio import sleep
from hashlib import sha256
from io import BytesIO
from logging import getLogger
from os import R_OK, W_OK, path, rename, listdir, access, mkdir
from os import R_OK, W_OK, path, listdir, access, mkdir
from shutil import rmtree
from time import time
from zipfile import ZipFile
from localplatform import chown, chmod
from enum import IntEnum
from typing import Dict, List, TypedDict
# Local modules
from helpers import get_ssl_context, download_remote_binary_to_path
from injector import get_gamepadui_tab
from .loader import Loader, Plugins
from .helpers import get_ssl_context, download_remote_binary_to_path
from .settings import SettingsManager
from .injector import get_gamepadui_tab
logger = getLogger("Browser")
class PluginInstallType(IntEnum):
INSTALL = 0
REINSTALL = 1
UPDATE = 2
class PluginInstallRequest(TypedDict):
name: str
artifact: str
version: str
hash: str
install_type: PluginInstallType
class PluginInstallContext:
def __init__(self, artifact, name, version, hash) -> None:
def __init__(self, artifact: str, name: str, version: str, hash: str) -> None:
self.artifact = artifact
self.name = name
self.version = version
self.hash = hash
class PluginBrowser:
def __init__(self, plugin_path, plugins, loader, settings) -> None:
def __init__(self, plugin_path: str, plugins: Plugins, loader: Loader, settings: SettingsManager) -> None:
self.plugin_path = plugin_path
self.plugins = plugins
self.loader = loader
self.settings = settings
self.install_requests = {}
self.install_requests: Dict[str, PluginInstallContext | List[PluginInstallContext]] = {}
def _unzip_to_plugin_dir(self, zip, name, hash):
def _unzip_to_plugin_dir(self, zip: BytesIO, name: str, hash: str):
zip_hash = sha256(zip.getbuffer()).hexdigest()
if hash and (zip_hash != hash):
return False
zip_file = ZipFile(zip)
zip_file.extractall(self.plugin_path)
plugin_dir = path.join(self.plugin_path, self.find_plugin_folder(name))
plugin_folder = self.find_plugin_folder(name)
assert plugin_folder is not None
plugin_dir = path.join(self.plugin_path, plugin_folder)
if not chown(plugin_dir) or not chmod(plugin_dir, 555):
logger.error(f"chown/chmod exited with a non-zero exit code")
return False
return True
async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath):
async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath: str):
rv = False
try:
packageJsonPath = path.join(pluginBasePath, 'package.json')
@@ -91,7 +108,7 @@ class PluginBrowser:
return rv
"""Return the filename (only) for the specified plugin"""
def find_plugin_folder(self, name):
def find_plugin_folder(self, name: str) -> str | None:
for folder in listdir(self.plugin_path):
try:
with open(path.join(self.plugin_path, folder, 'plugin.json'), "r", encoding="utf-8") as f:
@@ -102,11 +119,13 @@ class PluginBrowser:
except:
logger.debug(f"skipping {folder}")
async def uninstall_plugin(self, name):
async def uninstall_plugin(self, name: str):
if self.loader.watcher:
self.loader.watcher.disabled = True
tab = await get_gamepadui_tab()
plugin_dir = path.join(self.plugin_path, self.find_plugin_folder(name))
plugin_folder = self.find_plugin_folder(name)
assert plugin_folder is not None
plugin_dir = path.join(self.plugin_path, )
try:
logger.info("uninstalling " + name)
logger.info(" at dir " + plugin_dir)
@@ -133,7 +152,7 @@ class PluginBrowser:
if self.loader.watcher:
self.loader.watcher.disabled = False
async def _install(self, artifact, name, version, hash):
async def _install(self, artifact: str, name: str, version: str, hash: str):
# Will be set later in code
res_zip = None
@@ -185,6 +204,7 @@ class PluginBrowser:
ret = self._unzip_to_plugin_dir(res_zip, name, hash)
if ret:
plugin_folder = self.find_plugin_folder(name)
assert plugin_folder is not None
plugin_dir = path.join(self.plugin_path, plugin_folder)
ret = await self._download_remote_binaries_for_plugin_with_name(plugin_dir)
if ret:
@@ -206,14 +226,14 @@ class PluginBrowser:
if self.loader.watcher:
self.loader.watcher.disabled = False
async def request_plugin_install(self, artifact, name, version, hash, install_type):
async def request_plugin_install(self, artifact: str, name: str, version: str, hash: str, install_type: PluginInstallType):
request_id = str(time())
self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash)
tab = await get_gamepadui_tab()
await tab.open_websocket()
await tab.evaluate_js(f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}', '{request_id}', '{hash}', {install_type})")
async def request_multiple_plugin_installs(self, requests):
async def request_multiple_plugin_installs(self, requests: List[PluginInstallRequest]):
request_id = str(time())
self.install_requests[request_id] = [PluginInstallContext(req['artifact'], req['name'], req['version'], req['hash']) for req in requests]
js_requests_parameter = ','.join([
@@ -224,17 +244,17 @@ class PluginBrowser:
await tab.open_websocket()
await tab.evaluate_js(f"DeckyPluginLoader.addMultiplePluginsInstallPrompt('{request_id}', [{js_requests_parameter}])")
async def confirm_plugin_install(self, request_id):
async def confirm_plugin_install(self, request_id: str):
requestOrRequests = self.install_requests.pop(request_id)
if isinstance(requestOrRequests, list):
[await self._install(req.artifact, req.name, req.version, req.hash) for req in requestOrRequests]
else:
await self._install(requestOrRequests.artifact, requestOrRequests.name, requestOrRequests.version, requestOrRequests.hash)
def cancel_plugin_install(self, request_id):
def cancel_plugin_install(self, request_id: str):
self.install_requests.pop(request_id)
def cleanup_plugin_settings(self, name):
def cleanup_plugin_settings(self, name: str):
"""Removes any settings related to a plugin. Propably called when a plugin is uninstalled.
Args:

View File

@@ -2,13 +2,13 @@ import re
import ssl
import uuid
import os
import sys
import subprocess
from hashlib import sha256
from io import BytesIO
import certifi
from aiohttp.web import Response, middleware
from aiohttp.web import Request, Response, middleware
from aiohttp.typedefs import Handler
from aiohttp import ClientSession
import localplatform
from customtypes import UserType
@@ -31,17 +31,17 @@ def get_csrf_token():
return csrf_token
@middleware
async def csrf_middleware(request, handler):
async def csrf_middleware(request: Request, handler: Handler):
if str(request.method) == "OPTIONS" or request.headers.get('Authentication') == csrf_token or str(request.rel_url) == "/auth/token" or str(request.rel_url).startswith("/plugins/load_main/") or str(request.rel_url).startswith("/static/") or str(request.rel_url).startswith("/legacy/") or str(request.rel_url).startswith("/steam_resource/") or str(request.rel_url).startswith("/frontend/") or assets_regex.match(str(request.rel_url)) or frontend_regex.match(str(request.rel_url)):
return await handler(request)
return Response(text='Forbidden', status='403')
return Response(text='Forbidden', status=403)
# Get the default homebrew path unless a home_path is specified. home_path argument is deprecated
def get_homebrew_path(home_path = None) -> str:
def get_homebrew_path() -> str:
return localplatform.get_unprivileged_path()
# Recursively create path and chown as user
def mkdir_as_user(path):
def mkdir_as_user(path: str):
path = os.path.realpath(path)
os.makedirs(path, exist_ok=True)
localplatform.chown(path)
@@ -57,23 +57,18 @@ def get_loader_version() -> str:
# returns the appropriate system python paths
def get_system_pythonpaths() -> list[str]:
extra_args = {}
if localplatform.ON_LINUX:
# run as normal normal user to also include user python paths
extra_args["user"] = localplatform.localplatform._get_user_id()
extra_args["env"] = {}
try:
# run as normal normal user if on linux to also include user python paths
proc = subprocess.run(["python3" if localplatform.ON_LINUX else "python", "-c", "import sys; print('\\n'.join(x for x in sys.path if x))"],
capture_output=True, **extra_args)
# TODO make this less insane
capture_output=True, user=localplatform.localplatform._get_user_id() if localplatform.ON_LINUX else None, env={} if localplatform.ON_LINUX else None) # type: ignore
return [x.strip() for x in proc.stdout.decode().strip().split("\n")]
except Exception as e:
logger.warn(f"Failed to execute get_system_pythonpaths(): {str(e)}")
return []
# Download Remote Binaries to local Plugin
async def download_remote_binary_to_path(url, binHash, path) -> bool:
async def download_remote_binary_to_path(url: str, binHash: str, path: str) -> bool:
rv = False
try:
if os.access(os.path.dirname(path), os.W_OK):
@@ -110,46 +105,42 @@ def set_user_group() -> str:
# Get the user id hosting the plugin loader
def get_user_id() -> int:
return localplatform.localplatform._get_user_id()
return localplatform.localplatform._get_user_id() # pyright: ignore [reportPrivateUsage]
# Get the user hosting the plugin loader
def get_user() -> str:
return localplatform.localplatform._get_user()
return localplatform.localplatform._get_user() # pyright: ignore [reportPrivateUsage]
# Get the effective user id of the running process
def get_effective_user_id() -> int:
return localplatform.localplatform._get_effective_user_id()
return localplatform.localplatform._get_effective_user_id() # pyright: ignore [reportPrivateUsage]
# Get the effective user of the running process
def get_effective_user() -> str:
return localplatform.localplatform._get_effective_user()
return localplatform.localplatform._get_effective_user() # pyright: ignore [reportPrivateUsage]
# Get the effective user group id of the running process
def get_effective_user_group_id() -> int:
return localplatform.localplatform._get_effective_user_group_id()
return localplatform.localplatform._get_effective_user_group_id() # pyright: ignore [reportPrivateUsage]
# Get the effective user group of the running process
def get_effective_user_group() -> str:
return localplatform.localplatform._get_effective_user_group()
return localplatform.localplatform._get_effective_user_group() # pyright: ignore [reportPrivateUsage]
# Get the user owner of the given file path.
def get_user_owner(file_path) -> str:
return localplatform.localplatform._get_user_owner(file_path)
def get_user_owner(file_path: str) -> str:
return localplatform.localplatform._get_user_owner(file_path) # pyright: ignore [reportPrivateUsage]
# Get the user group of the given file path.
def get_user_group(file_path) -> str:
return localplatform.localplatform._get_user_group(file_path)
# Get the user group of the given file path, or the user group hosting the plugin loader
def get_user_group(file_path: str | None = None) -> str:
return localplatform.localplatform._get_user_group(file_path) # pyright: ignore [reportPrivateUsage]
# Get the group id of the user hosting the plugin loader
def get_user_group_id() -> int:
return localplatform.localplatform._get_user_group_id()
# Get the group of the user hosting the plugin loader
def get_user_group() -> str:
return localplatform.localplatform._get_user_group()
return localplatform.localplatform._get_user_group_id() # pyright: ignore [reportPrivateUsage]
# Get the default home path unless a user is specified
def get_home_path(username = None) -> str:
def get_home_path(username: str | None = None) -> str:
return localplatform.get_home_path(UserType.ROOT if username == "root" else UserType.HOST_USER)
async def is_systemd_unit_active(unit_name: str) -> bool:

View File

@@ -2,10 +2,9 @@
from asyncio import sleep
from logging import getLogger
from traceback import format_exc
from typing import List
from typing import Any, Callable, List, TypedDict, Dict
from aiohttp import ClientSession, WSMsgType
from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientConnectorError, ClientOSError
from asyncio.exceptions import TimeoutError
import uuid
@@ -14,35 +13,43 @@ BASE_ADDRESS = "http://localhost:8080"
logger = getLogger("Injector")
class _TabResponse(TypedDict):
title: str
id: str
url: str
webSocketDebuggerUrl: str
class Tab:
cmd_id = 0
def __init__(self, res) -> None:
self.title = res["title"]
self.id = res["id"]
self.url = res["url"]
self.ws_url = res["webSocketDebuggerUrl"]
def __init__(self, res: _TabResponse) -> None:
self.title: str = res["title"]
self.id: str = res["id"]
self.url: str = res["url"]
self.ws_url: str = res["webSocketDebuggerUrl"]
self.websocket = None
self.client = None
async def open_websocket(self):
self.client = ClientSession()
self.websocket = await self.client.ws_connect(self.ws_url)
self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore
async def close_websocket(self):
await self.websocket.close()
await self.client.close()
if self.websocket:
await self.websocket.close()
if self.client:
await self.client.close()
async def listen_for_message(self):
async for message in self.websocket:
data = message.json()
yield data
logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.")
await self.close_websocket()
if self.websocket:
async for message in self.websocket:
data = message.json()
yield data
logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.")
await self.close_websocket()
async def _send_devtools_cmd(self, dc, receive=True):
async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True):
if self.websocket:
self.cmd_id += 1
dc["id"] = self.cmd_id
@@ -54,7 +61,7 @@ class Tab:
return None
raise RuntimeError("Websocket not opened")
async def evaluate_js(self, js, run_async=False, manage_socket=True, get_result=True):
async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True):
try:
if manage_socket:
await self.open_websocket()
@@ -73,15 +80,16 @@ class Tab:
await self.close_websocket()
return res
async def has_global_var(self, var_name, manage_socket=True):
async def has_global_var(self, var_name: str, manage_socket: bool = True):
res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket)
assert res is not None
if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
return False
return res["result"]["result"]["value"]
async def close(self, manage_socket=True):
async def close(self, manage_socket: bool = True):
try:
if manage_socket:
await self.open_websocket()
@@ -111,7 +119,7 @@ class Tab:
"method": "Page.disable",
}, False)
async def refresh(self, manage_socket=True):
async def refresh(self, manage_socket: bool = True):
try:
if manage_socket:
await self.open_websocket()
@@ -125,7 +133,7 @@ class Tab:
await self.close_websocket()
return
async def reload_and_evaluate(self, js, manage_socket=True):
async def reload_and_evaluate(self, js: str, manage_socket: bool = True):
"""
Reloads the current tab, with JS to run on load via debugger
"""
@@ -153,11 +161,13 @@ class Tab:
}
}, True)
assert breakpoint_res is not None
logger.info(breakpoint_res)
# Page finishes loading when breakpoint hits
for x in range(20):
for _ in range(20):
# this works around 1/5 of the time, so just send it 8 times.
# the js accounts for being injected multiple times allowing only one instance to run at a time anyway
await self._send_devtools_cmd({
@@ -176,7 +186,7 @@ class Tab:
}
}, False)
for x in range(4):
for _ in range(4):
await self._send_devtools_cmd({
"method": "Debugger.resume"
}, False)
@@ -190,7 +200,7 @@ class Tab:
await self.close_websocket()
return
async def add_script_to_evaluate_on_new_document(self, js, add_dom_wrapper=True, manage_socket=True, get_result=True):
async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True):
"""
How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description:
@@ -253,7 +263,7 @@ class Tab:
await self.close_websocket()
return res
async def remove_script_to_evaluate_on_new_document(self, script_id, manage_socket=True):
async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True):
"""
Removes a script from a page that was added with `add_script_to_evaluate_on_new_document`
@@ -267,7 +277,7 @@ class Tab:
if manage_socket:
await self.open_websocket()
res = await self._send_devtools_cmd({
await self._send_devtools_cmd({
"method": "Page.removeScriptToEvaluateOnNewDocument",
"params": {
"identifier": script_id
@@ -278,15 +288,16 @@ class Tab:
if manage_socket:
await self.close_websocket()
async def has_element(self, element_name, manage_socket=True):
async def has_element(self, element_name: str, manage_socket: bool = True):
res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket)
assert res is not None
if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
return False
return res["result"]["result"]["value"]
async def inject_css(self, style, manage_socket=True):
async def inject_css(self, style: str, manage_socket: bool = True):
try:
css_id = str(uuid.uuid4())
@@ -300,6 +311,8 @@ class Tab:
}})()
""", False, manage_socket)
assert result is not None
if "exceptionDetails" in result["result"]:
return {
"success": False,
@@ -316,7 +329,7 @@ class Tab:
"result": e
}
async def remove_css(self, css_id, manage_socket=True):
async def remove_css(self, css_id: str, manage_socket: bool = True):
try:
result = await self.evaluate_js(
f"""
@@ -328,6 +341,8 @@ class Tab:
}})()
""", False, manage_socket)
assert result is not None
if "exceptionDetails" in result["result"]:
return {
"success": False,
@@ -343,8 +358,9 @@ class Tab:
"result": e
}
async def get_steam_resource(self, url):
async def get_steam_resource(self, url: str):
res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True)
assert res is not None
return res["result"]["result"]["value"]
def __repr__(self):
@@ -380,14 +396,14 @@ async def get_tabs() -> List[Tab]:
raise Exception(f"/json did not return 200. {await res.text()}")
async def get_tab(tab_name) -> Tab:
async def get_tab(tab_name: str) -> Tab:
tabs = await get_tabs()
tab = next((i for i in tabs if i.title == tab_name), None)
if not tab:
raise ValueError(f"Tab {tab_name} not found")
return tab
async def get_tab_lambda(test) -> Tab:
async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab:
tabs = await get_tabs()
tab = next((i for i in tabs if test(i)), None)
if not tab:
@@ -408,7 +424,7 @@ async def get_gamepadui_tab() -> Tab:
raise ValueError(f"GamepadUI Tab not found")
return tab
async def inject_to_tab(tab_name, js, run_async=False):
async def inject_to_tab(tab_name: str, js: str, run_async: bool = False):
tab = await get_tab(tab_name)
return await tab.evaluate_js(js, run_async)

View File

@@ -1,34 +1,40 @@
from asyncio import Queue, sleep
from asyncio import AbstractEventLoop, Queue, sleep
from json.decoder import JSONDecodeError
from logging import getLogger
from os import listdir, path
from pathlib import Path
from traceback import print_exc
from typing import Any, Tuple
from aiohttp import web
from os.path import exists
from watchdog.events import RegexMatchingEventHandler
from watchdog.observers import Observer
from watchdog.events import RegexMatchingEventHandler, DirCreatedEvent, DirModifiedEvent, FileCreatedEvent, FileModifiedEvent # type: ignore
from watchdog.observers import Observer # type: ignore
from injector import get_tab, get_gamepadui_tab
from plugin import PluginWrapper
from backend.main import PluginManager # type: ignore
from .injector import get_tab, get_gamepadui_tab
from .plugin import PluginWrapper
Plugins = dict[str, PluginWrapper]
ReloadQueue = Queue[Tuple[str, str, bool | None] | Tuple[str, str]]
class FileChangeHandler(RegexMatchingEventHandler):
def __init__(self, queue, plugin_path) -> None:
super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$'])
def __init__(self, queue: ReloadQueue, plugin_path: str) -> None:
super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$']) # type: ignore
self.logger = getLogger("file-watcher")
self.plugin_path = plugin_path
self.queue = queue
self.disabled = True
def maybe_reload(self, src_path):
def maybe_reload(self, src_path: str):
if self.disabled:
return
plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0]
if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")):
self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True))
def on_created(self, event):
def on_created(self, event: DirCreatedEvent | FileCreatedEvent):
src_path = event.src_path
if "__pycache__" in src_path:
return
@@ -42,7 +48,7 @@ class FileChangeHandler(RegexMatchingEventHandler):
self.logger.debug(f"file created: {src_path}")
self.maybe_reload(src_path)
def on_modified(self, event):
def on_modified(self, event: DirModifiedEvent | FileModifiedEvent):
src_path = event.src_path
if "__pycache__" in src_path:
return
@@ -57,25 +63,25 @@ class FileChangeHandler(RegexMatchingEventHandler):
self.maybe_reload(src_path)
class Loader:
def __init__(self, server_instance, plugin_path, loop, live_reload=False) -> None:
def __init__(self, server_instance: PluginManager, plugin_path: str, loop: AbstractEventLoop, live_reload: bool =False) -> None:
self.loop = loop
self.logger = getLogger("Loader")
self.plugin_path = plugin_path
self.logger.info(f"plugin_path: {self.plugin_path}")
self.plugins : dict[str, PluginWrapper] = {}
self.plugins: Plugins = {}
self.watcher = None
self.live_reload = live_reload
self.reload_queue = Queue()
self.reload_queue: ReloadQueue = Queue()
self.loop.create_task(self.handle_reloads())
if live_reload:
self.observer = Observer()
self.watcher = FileChangeHandler(self.reload_queue, plugin_path)
self.observer.schedule(self.watcher, self.plugin_path, recursive=True)
self.observer.schedule(self.watcher, self.plugin_path, recursive=True) # type: ignore
self.observer.start()
self.loop.create_task(self.enable_reload_wait())
server_instance.add_routes([
server_instance.web_app.add_routes([
web.get("/frontend/{path:.*}", self.handle_frontend_assets),
web.get("/locales/{path:.*}", self.handle_frontend_locales),
web.get("/plugins", self.get_plugins),
@@ -93,15 +99,16 @@ class Loader:
async def enable_reload_wait(self):
if self.live_reload:
await sleep(10)
self.logger.info("Hot reload enabled")
self.watcher.disabled = False
if self.watcher:
self.logger.info("Hot reload enabled")
self.watcher.disabled = False
async def handle_frontend_assets(self, request):
async def handle_frontend_assets(self, request: web.Request):
file = path.join(path.dirname(__file__), "static", request.match_info["path"])
return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
async def handle_frontend_locales(self, request):
async def handle_frontend_locales(self, request: web.Request):
req_lang = request.match_info["path"]
file = path.join(path.dirname(__file__), "locales", req_lang)
if exists(file):
@@ -110,23 +117,23 @@ class Loader:
self.logger.info(f"Language {req_lang} not available, returning an empty dictionary")
return web.json_response(data={}, headers={"Cache-Control": "no-cache"})
async def get_plugins(self, request):
async def get_plugins(self, request: web.Request):
plugins = list(self.plugins.values())
return web.json_response([{"name": str(i) if not i.legacy else "$LEGACY_"+str(i), "version": i.version} for i in plugins])
def handle_plugin_frontend_assets(self, request):
async def handle_plugin_frontend_assets(self, request: web.Request):
plugin = self.plugins[request.match_info["plugin_name"]]
file = path.join(self.plugin_path, plugin.plugin_directory, "dist/assets", request.match_info["path"])
return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
def handle_frontend_bundle(self, request):
async def handle_frontend_bundle(self, request: web.Request):
plugin = self.plugins[request.match_info["plugin_name"]]
with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), "r", encoding="utf-8") as bundle:
return web.Response(text=bundle.read(), content_type="application/javascript")
def import_plugin(self, file, plugin_directory, refresh=False, batch=False):
def import_plugin(self, file: str, plugin_directory: str, refresh: bool | None = False, batch: bool | None = False):
try:
plugin = PluginWrapper(file, plugin_directory, self.plugin_path)
if plugin.name in self.plugins:
@@ -146,7 +153,7 @@ class Loader:
self.logger.error(f"Could not load {file}. {e}")
print_exc()
async def dispatch_plugin(self, name, version):
async def dispatch_plugin(self, name: str, version: str | None):
gpui_tab = await get_gamepadui_tab()
await gpui_tab.evaluate_js(f"window.importDeckyPlugin('{name}', '{version}')")
@@ -161,15 +168,15 @@ class Loader:
async def handle_reloads(self):
while True:
args = await self.reload_queue.get()
self.import_plugin(*args)
self.import_plugin(*args) # type: ignore
async def handle_plugin_method_call(self, request):
async def handle_plugin_method_call(self, request: web.Request):
res = {}
plugin = self.plugins[request.match_info["plugin_name"]]
method_name = request.match_info["method_name"]
try:
method_info = await request.json()
args = method_info["args"]
args: Any = method_info["args"]
except JSONDecodeError:
args = {}
try:
@@ -189,7 +196,7 @@ class Loader:
can introduce it more smoothly and give people the chance to sample the new features even
without plugin support. They will be removed once legacy plugins are no longer relevant.
"""
async def load_plugin_main_view(self, request):
async def load_plugin_main_view(self, request: web.Request):
plugin = self.plugins[request.match_info["name"]]
with open(path.join(self.plugin_path, plugin.plugin_directory, plugin.main_view_html), "r", encoding="utf-8") as template:
template_data = template.read()
@@ -201,7 +208,7 @@ class Loader:
"""
return web.Response(text=ret, content_type="text/html")
async def handle_sub_route(self, request):
async def handle_sub_route(self, request: web.Request):
plugin = self.plugins[request.match_info["name"]]
route_path = request.match_info["path"]
self.logger.info(path)
@@ -212,14 +219,14 @@ class Loader:
return web.Response(text=ret)
async def get_steam_resource(self, request):
async def get_steam_resource(self, request: web.Request):
tab = await get_tab("SP")
try:
return web.Response(text=await tab.get_steam_resource(f"https://steamloopback.host/{request.match_info['path']}"), content_type="text/html")
except Exception as e:
return web.Response(text=str(e), status=400)
async def handle_backend_reload_request(self, request):
async def handle_backend_reload_request(self, request: web.Request):
plugin_name : str = request.match_info["plugin_name"]
plugin = self.plugins[plugin_name]

View File

@@ -29,21 +29,17 @@ def _get_effective_user_group() -> str:
return grp.getgrgid(_get_effective_user_group_id()).gr_name
# Get the user owner of the given file path.
def _get_user_owner(file_path) -> str:
def _get_user_owner(file_path: str) -> str:
return pwd.getpwuid(os.stat(file_path).st_uid).pw_name
# Get the user group of the given file path.
def _get_user_group(file_path) -> str:
return grp.getgrgid(os.stat(file_path).st_gid).gr_name
# Get the user group of the given file path, or the user group hosting the plugin loader
def _get_user_group(file_path: str | None = None) -> str:
return grp.getgrgid(os.stat(file_path).st_gid if file_path is not None else _get_user_group_id()).gr_name
# Get the group id of the user hosting the plugin loader
def _get_user_group_id() -> int:
return pwd.getpwuid(_get_user_id()).pw_gid
# Get the group of the user hosting the plugin loader
def _get_user_group() -> str:
return grp.getgrgid(_get_user_group_id()).gr_name
def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool:
user_str = ""
@@ -146,7 +142,7 @@ def get_privileged_path() -> str:
return path
def _parent_dir(path : str) -> str:
def _parent_dir(path : str | None) -> str | None:
if path == None:
return None
@@ -166,7 +162,7 @@ def get_unprivileged_path() -> str:
# Expected path of loader binary is /home/deck/homebrew/service/PluginLoader
path = _parent_dir(_parent_dir(os.path.realpath(sys.argv[0])))
if not os.path.exists(path):
if path != None and not os.path.exists(path):
path = None
if path == None:

View File

@@ -7,16 +7,16 @@ from localplatform import (chmod, chown, service_stop, service_start,
if hasattr(sys, '_MEIPASS'):
chmod(sys._MEIPASS, 755) # type: ignore
# Full imports
from asyncio import new_event_loop, set_event_loop, sleep
from asyncio import AbstractEventLoop, new_event_loop, set_event_loop, sleep
from logging import basicConfig, getLogger
from os import path
from traceback import format_exc
import multiprocessing
import aiohttp_cors
import aiohttp_cors # type: ignore
# Partial imports
from aiohttp import client_exceptions
from aiohttp.web import Application, Response, get, run_app, static
from aiohttp.web import Application, Response, get, run_app, static # type: ignore
from aiohttp_jinja2 import setup as jinja_setup
# local modules
@@ -51,7 +51,7 @@ if get_chown_plugin_path() == True:
chown_plugin_dir()
class PluginManager:
def __init__(self, loop) -> None:
def __init__(self, loop: AbstractEventLoop) -> None:
self.loop = loop
self.web_app = Application()
self.web_app.middlewares.append(csrf_middleware)
@@ -62,7 +62,7 @@ class PluginManager:
allow_credentials=True
)
})
self.plugin_loader = Loader(self.web_app, plugin_path, self.loop, get_live_reload())
self.plugin_loader = Loader(self, plugin_path, self.loop, get_live_reload())
self.settings = SettingsManager("loader", path.join(get_privileged_path(), "settings"))
self.plugin_browser = PluginBrowser(plugin_path, self.plugin_loader.plugins, self.plugin_loader, self.settings)
self.utilities = Utilities(self)

View File

@@ -1,7 +1,6 @@
import multiprocessing
from asyncio import (Lock, get_event_loop, new_event_loop,
set_event_loop, sleep)
from concurrent.futures import ProcessPoolExecutor
from importlib.util import module_from_spec, spec_from_file_location
from json import dumps, load, loads
from logging import getLogger
@@ -9,14 +8,14 @@ from traceback import format_exc
from os import path, environ
from signal import SIGINT, signal
from sys import exit, path as syspath
from time import time
from typing import Any, Dict
from localsocket import LocalSocket
from localplatform import setgid, setuid, get_username, get_home_path
from customtypes import UserType
import helpers
class PluginWrapper:
def __init__(self, file, plugin_directory, plugin_path) -> None:
def __init__(self, file: str, plugin_directory: str, plugin_path: str) -> None:
self.file = file
self.plugin_path = plugin_path
self.plugin_directory = plugin_directory
@@ -73,14 +72,17 @@ class PluginWrapper:
helpers.mkdir_as_user(environ["DECKY_PLUGIN_LOG_DIR"])
environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory)
environ["DECKY_PLUGIN_NAME"] = self.name
environ["DECKY_PLUGIN_VERSION"] = self.version
if self.version:
environ["DECKY_PLUGIN_VERSION"] = self.version
environ["DECKY_PLUGIN_AUTHOR"] = self.author
# append the plugin's `py_modules` to the recognized python paths
syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules"))
spec = spec_from_file_location("_", self.file)
assert spec is not None
module = module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
self.Plugin = module.Plugin
@@ -118,7 +120,8 @@ class PluginWrapper:
get_event_loop().close()
raise Exception("Closing message listener")
d = {"res": None, "success": True}
# TODO there is definitely a better way to type this
d: Dict[str, Any] = {"res": None, "success": True}
try:
d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
except Exception as e:
@@ -137,17 +140,18 @@ class PluginWrapper:
if self.passive:
return
async def _(self):
async def _(self: PluginWrapper):
await self.socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False))
await self.socket.close_socket_connection()
get_event_loop().create_task(_(self))
async def execute_method(self, method_name, kwargs):
async def execute_method(self, method_name: str, kwargs: Dict[Any, Any]):
if self.passive:
raise RuntimeError("This plugin is passive (aka does not implement main.py)")
async with self.method_call_lock:
reader, writer = await self.socket.get_socket_connection()
# reader, writer =
await self.socket.get_socket_connection()
await self.socket.write_single_line(dumps({ "method": method_name, "args": kwargs }, ensure_ascii=False))

View File

@@ -1,5 +1,6 @@
from json import dump, load
from os import mkdir, path, listdir, rename
from typing import Any, Dict
from localplatform import chown, folder_owner, get_chown_plugin_path
from customtypes import UserType
@@ -7,7 +8,7 @@ from helpers import get_homebrew_path
class SettingsManager:
def __init__(self, name, settings_directory = None) -> None:
def __init__(self, name: str, settings_directory: str | None = None) -> None:
wrong_dir = get_homebrew_path()
if settings_directory == None:
settings_directory = path.join(wrong_dir, "settings")
@@ -31,11 +32,11 @@ class SettingsManager:
if folder_owner(settings_directory) != expected_user:
chown(settings_directory, expected_user, False)
self.settings = {}
self.settings: Dict[str, Any] = {}
try:
open(self.path, "x", encoding="utf-8")
except FileExistsError as e:
except FileExistsError as _:
self.read()
pass
@@ -51,9 +52,9 @@ class SettingsManager:
with open(self.path, "w+", encoding="utf-8") as file:
dump(self.settings, file, indent=4, ensure_ascii=False)
def getSetting(self, key, default=None):
def getSetting(self, key: str, default: Any = None) -> Any:
return self.settings.get(key, default)
def setSetting(self, key, value):
def setSetting(self, key: str, value: Any) -> Any:
self.settings[key] = value
self.commit()

View File

@@ -1,26 +1,27 @@
import uuid
import os
from json.decoder import JSONDecodeError
from os.path import splitext
import re
from traceback import format_exc
from stat import FILE_ATTRIBUTE_HIDDEN
from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore
from asyncio import sleep, start_server, gather, open_connection
from asyncio import start_server, gather, open_connection
from aiohttp import ClientSession, web
from typing import Dict
from logging import getLogger
from backend.browser import PluginInstallType
from backend.main import PluginManager
from injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab
from pathlib import Path
from localplatform import ON_WINDOWS
import helpers
import subprocess
from localplatform import service_stop, service_start, get_home_path, get_username
class Utilities:
def __init__(self, context) -> None:
def __init__(self, context: PluginManager) -> None:
self.context = context
self.util_methods = {
self.util_methods: Dict[] = {
"ping": self.ping,
"http_request": self.http_request,
"install_plugin": self.install_plugin,
@@ -69,7 +70,7 @@ class Utilities:
res["success"] = False
return web.json_response(res)
async def install_plugin(self, artifact="", name="No name", version="dev", hash=False, install_type=0):
async def install_plugin(self, artifact="", name="No name", version="dev", hash=False, install_type=PluginInstallType.INSTALL):
return await self.context.plugin_browser.request_plugin_install(
artifact=artifact,
name=name,