Merge aa/type-cleanup-py (work by marios, aa, wolv)

This commit is contained in:
TrainDoctor
2023-10-25 19:47:33 -07:00
parent dacd2c19eb
commit a7669799bc
27 changed files with 571 additions and 439 deletions
+3 -2
View File
@@ -27,6 +27,7 @@ jobs:
python-version: "3.11.4" python-version: "3.11.4"
- name: Install Python dependencies ⬇️ - name: Install Python dependencies ⬇️
working-directory: ./backend
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install pyinstaller==5.13.0 pip install pyinstaller==5.13.0
@@ -43,10 +44,10 @@ jobs:
run: pnpm run build run: pnpm run build
- name: Build Python Backend 🛠️ - name: Build Python Backend 🛠️
run: pyinstaller --noconfirm --onefile --name "PluginLoader" --add-data "./backend/static;/static" --add-data "./backend/locales;/locales" --add-data "./backend/legacy;/legacy" --add-data "./plugin;/plugin" --hidden-import=sqlite3 ./backend/main.py run: pyinstaller --noconfirm --onefile --name "PluginLoader" --add-data "./backend/static;/static" --add-data "./backend/locales;/locales" --add-data "./backend/src/legacy;/src/legacy" --add-data "./plugin/*;/" --hidden-import=sqlite3 ./backend/main.py
- name: Build Python Backend (noconsole) 🛠️ - name: Build Python Backend (noconsole) 🛠️
run: pyinstaller --noconfirm --noconsole --onefile --name "PluginLoader_noconsole" --add-data "./backend/static;/static" --add-data "./backend/locales;/locales" --add-data "./backend/legacy;/legacy" --add-data "./plugin;/plugin" --hidden-import=sqlite3 ./backend/main.py run: pyinstaller --noconfirm --noconsole --onefile --name "PluginLoader_noconsole" --add-data "./backend/static;/static" --add-data "./backend/locales;/locales" --add-data "./backend/src/legacy;/src/legacy" --add-data "./plugin/*;/" --hidden-import=sqlite3 ./backend/main.py
- name: Upload package artifact ⬆️ - name: Upload package artifact ⬆️
uses: actions/upload-artifact@v3 uses: actions/upload-artifact@v3
+6 -5
View File
@@ -62,7 +62,7 @@ jobs:
-DSQLITE_ENABLE_UNLOCK_NOTIFY -DSQLITE_ENABLE_DBSTAT_VTAB=1 -DSQLITE_ENABLE_FTS3_TOKENIZER=1 \ -DSQLITE_ENABLE_UNLOCK_NOTIFY -DSQLITE_ENABLE_DBSTAT_VTAB=1 -DSQLITE_ENABLE_FTS3_TOKENIZER=1 \
-DSQLITE_ENABLE_FTS3_PARENTHESIS -DSQLITE_SECURE_DELETE -DSQLITE_ENABLE_STMTVTAB -DSQLITE_MAX_VARIABLE_NUMBER=250000 \ -DSQLITE_ENABLE_FTS3_PARENTHESIS -DSQLITE_SECURE_DELETE -DSQLITE_ENABLE_STMTVTAB -DSQLITE_MAX_VARIABLE_NUMBER=250000 \
-DSQLITE_MAX_EXPR_DEPTH=10000 -DSQLITE_ENABLE_MATH_FUNCTIONS" && -DSQLITE_MAX_EXPR_DEPTH=10000 -DSQLITE_ENABLE_MATH_FUNCTIONS" &&
make && make -j$(nproc) &&
sudo make install && sudo make install &&
sudo cp /usr/lib/libsqlite3.so /usr/lib/x86_64-linux-gnu/ && sudo cp /usr/lib/libsqlite3.so /usr/lib/x86_64-linux-gnu/ &&
sudo cp /usr/lib/libsqlite3.so.0 /usr/lib/x86_64-linux-gnu/ && sudo cp /usr/lib/libsqlite3.so.0 /usr/lib/x86_64-linux-gnu/ &&
@@ -70,10 +70,11 @@ jobs:
rm -r /tmp/sqlite-autoconf-3420000 rm -r /tmp/sqlite-autoconf-3420000
- name: Install Python dependencies ⬇️ - name: Install Python dependencies ⬇️
working-directory: ./backend
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install pyinstaller==5.13.0 pip install pyinstaller==5.13.0
[ -f requirements.txt ] && pip install -r requirements.txt pip install -r requirements.txt
- name: Install JS dependencies ⬇️ - name: Install JS dependencies ⬇️
working-directory: ./frontend working-directory: ./frontend
@@ -86,7 +87,7 @@ jobs:
run: pnpm run build run: pnpm run build
- name: Build Python Backend 🛠️ - name: Build Python Backend 🛠️
run: pyinstaller --noconfirm --onefile --name "PluginLoader" --add-data ./backend/static:/static --add-data ./backend/locales:/locales --add-data ./backend/legacy:/legacy --add-data ./plugin:/plugin --hidden-import=sqlite3 ./backend/*.py run: pyinstaller --noconfirm --onefile --name "PluginLoader" --add-data ./backend/static:/static --add-data ./backend/locales:/locales --add-data ./backend/src/legacy:/src/legacy --add-data ./plugin/*:/ --hidden-import=sqlite3 ./backend/main.py
- name: Upload package artifact ⬆️ - name: Upload package artifact ⬆️
if: ${{ !env.ACT }} if: ${{ !env.ACT }}
@@ -127,7 +128,7 @@ jobs:
- name: Get latest release - name: Get latest release
uses: rez0n/actions-github-release@main uses: rez0n/actions-github-release@main
id: latest_release id: latest_release
env: with:
token: ${{ secrets.GITHUB_TOKEN }} token: ${{ secrets.GITHUB_TOKEN }}
repository: "SteamDeckHomebrew/decky-loader" repository: "SteamDeckHomebrew/decky-loader"
type: "nodraft" type: "nodraft"
@@ -206,7 +207,7 @@ jobs:
- name: Get latest release - name: Get latest release
uses: rez0n/actions-github-release@main uses: rez0n/actions-github-release@main
id: latest_release id: latest_release
env: with:
token: ${{ secrets.GITHUB_TOKEN }} token: ${{ secrets.GITHUB_TOKEN }}
repository: "SteamDeckHomebrew/decky-loader" repository: "SteamDeckHomebrew/decky-loader"
type: "nodraft" type: "nodraft"
+10 -4
View File
@@ -2,6 +2,7 @@ name: Lint
on: on:
push: push:
pull_request:
jobs: jobs:
lint: lint:
@@ -10,8 +11,13 @@ jobs:
steps: steps:
- uses: actions/checkout@v3 # Check out the repository first. - uses: actions/checkout@v3 # Check out the repository first.
- name: Run prettier (JavaScript & TypeScript)
- name: Install TypeScript dependencies
working-directory: frontend
run: | run: |
pushd frontend npm i -g pnpm
npm install pnpm i --frozen-lockfile
npm run lint
- name: Run prettier (TypeScript)
working-directory: frontend
run: pnpm run lint
+36
View File
@@ -0,0 +1,36 @@
name: Type Check
on:
push:
pull_request:
jobs:
typecheck:
name: Run type checkers
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2 # Check out the repository first.
- name: Install Python dependencies
working-directory: backend
run: |
python -m pip install --upgrade pip
[ -f requirements.txt ] && pip install -r requirements.txt
- name: Install TypeScript dependencies
working-directory: frontend
run: |
npm i -g pnpm
pnpm i --frozen-lockfile
- name: Run pyright (Python)
uses: jakebailey/pyright-action@v1
with:
python-version: "3.10.6"
no-comments: true
working-directory: backend
- name: Run tsc (TypeScript)
working-directory: frontend
run: $(pnpm bin)/tsc --noEmit
+3 -192
View File
@@ -1,193 +1,4 @@
# Change PyInstaller files permissions # This file is needed to make the relative imports in src/ work properly.
import sys
from localplatform import (chmod, chown, service_stop, service_start,
ON_WINDOWS, get_log_level, get_live_reload,
get_server_port, get_server_host, get_chown_plugin_path,
get_unprivileged_user, get_unprivileged_path,
get_privileged_path)
if hasattr(sys, '_MEIPASS'):
chmod(sys._MEIPASS, 755)
# Full imports
from asyncio import new_event_loop, set_event_loop, sleep
from json import dumps, loads
from logging import DEBUG, INFO, basicConfig, getLogger
from os import getenv, path
from traceback import format_exc
import multiprocessing
import aiohttp_cors
# Partial imports
from aiohttp import client_exceptions, WSMsgType
from aiohttp.web import Application, Response, get, run_app, static
from aiohttp_jinja2 import setup as jinja_setup
# local modules
from browser import PluginBrowser
from helpers import (REMOTE_DEBUGGER_UNIT, csrf_middleware, get_csrf_token,
mkdir_as_user, get_system_pythonpaths, get_effective_user_id)
from injector import get_gamepadui_tab, Tab, get_tabs, close_old_tabs
from loader import Loader
from settings import SettingsManager
from updater import Updater
from utilities import Utilities
from customtypes import UserType
basicConfig(
level=get_log_level(),
format="[%(module)s][%(levelname)s]: %(message)s"
)
logger = getLogger("Main")
plugin_path = path.join(get_privileged_path(), "plugins")
def chown_plugin_dir():
if not path.exists(plugin_path): # For safety, create the folder before attempting to do anything with it
mkdir_as_user(plugin_path)
if not chown(plugin_path, UserType.HOST_USER) or not chmod(plugin_path, 555):
logger.error(f"chown/chmod exited with a non-zero exit code")
if get_chown_plugin_path() == True:
chown_plugin_dir()
class PluginManager:
def __init__(self, loop) -> None:
self.loop = loop
self.web_app = Application()
self.web_app.middlewares.append(csrf_middleware)
self.cors = aiohttp_cors.setup(self.web_app, defaults={
"https://steamloopback.host": aiohttp_cors.ResourceOptions(
expose_headers="*",
allow_headers="*",
allow_credentials=True
)
})
self.plugin_loader = Loader(self.web_app, plugin_path, self.loop, get_live_reload())
self.settings = SettingsManager("loader", path.join(get_privileged_path(), "settings"))
self.plugin_browser = PluginBrowser(plugin_path, self.plugin_loader.plugins, self.plugin_loader, self.settings)
self.utilities = Utilities(self)
self.updater = Updater(self)
jinja_setup(self.web_app)
async def startup(_):
if self.settings.getSetting("cef_forward", False):
self.loop.create_task(service_start(REMOTE_DEBUGGER_UNIT))
else:
self.loop.create_task(service_stop(REMOTE_DEBUGGER_UNIT))
self.loop.create_task(self.loader_reinjector())
self.loop.create_task(self.load_plugins())
self.web_app.on_startup.append(startup)
self.loop.set_exception_handler(self.exception_handler)
self.web_app.add_routes([get("/auth/token", self.get_auth_token)])
for route in list(self.web_app.router.routes()):
self.cors.add(route)
self.web_app.add_routes([static("/static", path.join(path.dirname(__file__), 'static'))])
self.web_app.add_routes([static("/legacy", path.join(path.dirname(__file__), 'legacy'))])
def exception_handler(self, loop, context):
if context["message"] == "Unclosed connection":
return
loop.default_exception_handler(context)
async def get_auth_token(self, request):
return Response(text=get_csrf_token())
async def load_plugins(self):
# await self.wait_for_server()
logger.debug("Loading plugins")
self.plugin_loader.import_plugins()
# await inject_to_tab("SP", "window.syncDeckyPlugins();")
if self.settings.getSetting("pluginOrder", None) == None:
self.settings.setSetting("pluginOrder", list(self.plugin_loader.plugins.keys()))
logger.debug("Did not find pluginOrder setting, set it to default")
async def loader_reinjector(self):
while True:
tab = None
nf = False
dc = False
while not tab:
try:
tab = await get_gamepadui_tab()
except (client_exceptions.ClientConnectorError, client_exceptions.ServerDisconnectedError):
if not dc:
logger.debug("Couldn't connect to debugger, waiting...")
dc = True
pass
except ValueError:
if not nf:
logger.debug("Couldn't find GamepadUI tab, waiting...")
nf = True
pass
if not tab:
await sleep(5)
await tab.open_websocket()
await tab.enable()
await self.inject_javascript(tab, True)
try:
async for msg in tab.listen_for_message():
# this gets spammed a lot
if msg.get("method", None) != "Page.navigatedWithinDocument":
logger.debug("Page event: " + str(msg.get("method", None)))
if msg.get("method", None) == "Page.domContentEventFired":
if not await tab.has_global_var("deckyHasLoaded", False):
await self.inject_javascript(tab)
if msg.get("method", None) == "Inspector.detached":
logger.info("CEF has requested that we detach.")
await tab.close_websocket()
break
# If this is a forceful disconnect the loop will just stop without any failure message. In this case, injector.py will handle this for us so we don't need to close the socket.
# This is because of https://github.com/aio-libs/aiohttp/blob/3ee7091b40a1bc58a8d7846e7878a77640e96996/aiohttp/client_ws.py#L321
logger.info("CEF has disconnected...")
# At this point the loop starts again and we connect to the freshly started Steam client once it is ready.
except Exception as e:
logger.error("Exception while reading page events " + format_exc())
await tab.close_websocket()
pass
# while True:
# await sleep(5)
# if not await tab.has_global_var("deckyHasLoaded", False):
# logger.info("Plugin loader isn't present in Steam anymore, reinjecting...")
# await self.inject_javascript(tab)
async def inject_javascript(self, tab: Tab, first=False, request=None):
logger.info("Loading Decky frontend!")
try:
if first:
if await tab.has_global_var("deckyHasLoaded", False):
await close_old_tabs()
await tab.evaluate_js("try{if (window.deckyHasLoaded){setTimeout(() => location.reload(), 100)}else{window.deckyHasLoaded = true;(async()=>{try{while(!window.SP_REACT){await new Promise(r => setTimeout(r, 10))};await import('http://localhost:1337/frontend/index.js')}catch(e){console.error(e)};})();}}catch(e){console.error(e)}", False, False, False)
except:
logger.info("Failed to inject JavaScript into tab\n" + format_exc())
pass
def run(self):
return run_app(self.web_app, host=get_server_host(), port=get_server_port(), loop=self.loop, access_log=None)
if __name__ == "__main__": if __name__ == "__main__":
if ON_WINDOWS: from src.main import main
# Fix windows/flask not recognising that .js means 'application/javascript' main()
import mimetypes
mimetypes.add_type('application/javascript', '.js')
# Required for multiprocessing support in frozen files
multiprocessing.freeze_support()
else:
if get_effective_user_id() != 0:
logger.warning(f"decky is running as an unprivileged user, this is not officially supported and may cause issues")
# Append the loader's plugin path to the recognized python paths
sys.path.append(path.join(path.dirname(__file__), "plugin"))
# Append the system and user python paths
sys.path.extend(get_system_pythonpaths())
loop = new_event_loop()
set_event_loop(loop)
PluginManager(loop).run()
+3
View File
@@ -0,0 +1,3 @@
{
"strict": ["*"]
}
+42 -22
View File
@@ -4,53 +4,70 @@ import json
# from pprint import pformat # from pprint import pformat
# Partial imports # Partial imports
from aiohttp import ClientSession, web from aiohttp import ClientSession
from asyncio import get_event_loop, sleep from asyncio import sleep
from concurrent.futures import ProcessPoolExecutor
from hashlib import sha256 from hashlib import sha256
from io import BytesIO from io import BytesIO
from logging import getLogger from logging import getLogger
from os import R_OK, W_OK, path, rename, listdir, access, mkdir from os import R_OK, W_OK, path, listdir, access, mkdir
from shutil import rmtree from shutil import rmtree
from time import time from time import time
from zipfile import ZipFile from zipfile import ZipFile
from localplatform import chown, chmod from enum import IntEnum
from typing import Dict, List, TypedDict
# Local modules # Local modules
from helpers import get_ssl_context, download_remote_binary_to_path from .localplatform import chown, chmod
from injector import get_gamepadui_tab from .loader import Loader, Plugins
from .helpers import get_ssl_context, download_remote_binary_to_path
from .settings import SettingsManager
from .injector import get_gamepadui_tab
logger = getLogger("Browser") logger = getLogger("Browser")
class PluginInstallType(IntEnum):
INSTALL = 0
REINSTALL = 1
UPDATE = 2
class PluginInstallRequest(TypedDict):
name: str
artifact: str
version: str
hash: str
install_type: PluginInstallType
class PluginInstallContext: class PluginInstallContext:
def __init__(self, artifact, name, version, hash) -> None: def __init__(self, artifact: str, name: str, version: str, hash: str) -> None:
self.artifact = artifact self.artifact = artifact
self.name = name self.name = name
self.version = version self.version = version
self.hash = hash self.hash = hash
class PluginBrowser: class PluginBrowser:
def __init__(self, plugin_path, plugins, loader, settings) -> None: def __init__(self, plugin_path: str, plugins: Plugins, loader: Loader, settings: SettingsManager) -> None:
self.plugin_path = plugin_path self.plugin_path = plugin_path
self.plugins = plugins self.plugins = plugins
self.loader = loader self.loader = loader
self.settings = settings self.settings = settings
self.install_requests = {} self.install_requests: Dict[str, PluginInstallContext | List[PluginInstallContext]] = {}
def _unzip_to_plugin_dir(self, zip, name, hash): def _unzip_to_plugin_dir(self, zip: BytesIO, name: str, hash: str):
zip_hash = sha256(zip.getbuffer()).hexdigest() zip_hash = sha256(zip.getbuffer()).hexdigest()
if hash and (zip_hash != hash): if hash and (zip_hash != hash):
return False return False
zip_file = ZipFile(zip) zip_file = ZipFile(zip)
zip_file.extractall(self.plugin_path) zip_file.extractall(self.plugin_path)
plugin_dir = path.join(self.plugin_path, self.find_plugin_folder(name)) plugin_folder = self.find_plugin_folder(name)
assert plugin_folder is not None
plugin_dir = path.join(self.plugin_path, plugin_folder)
if not chown(plugin_dir) or not chmod(plugin_dir, 555): if not chown(plugin_dir) or not chmod(plugin_dir, 555):
logger.error(f"chown/chmod exited with a non-zero exit code") logger.error(f"chown/chmod exited with a non-zero exit code")
return False return False
return True return True
async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath): async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath: str):
rv = False rv = False
try: try:
packageJsonPath = path.join(pluginBasePath, 'package.json') packageJsonPath = path.join(pluginBasePath, 'package.json')
@@ -91,7 +108,7 @@ class PluginBrowser:
return rv return rv
"""Return the filename (only) for the specified plugin""" """Return the filename (only) for the specified plugin"""
def find_plugin_folder(self, name): def find_plugin_folder(self, name: str) -> str | None:
for folder in listdir(self.plugin_path): for folder in listdir(self.plugin_path):
try: try:
with open(path.join(self.plugin_path, folder, 'plugin.json'), "r", encoding="utf-8") as f: with open(path.join(self.plugin_path, folder, 'plugin.json'), "r", encoding="utf-8") as f:
@@ -102,11 +119,13 @@ class PluginBrowser:
except: except:
logger.debug(f"skipping {folder}") logger.debug(f"skipping {folder}")
async def uninstall_plugin(self, name): async def uninstall_plugin(self, name: str):
if self.loader.watcher: if self.loader.watcher:
self.loader.watcher.disabled = True self.loader.watcher.disabled = True
tab = await get_gamepadui_tab() tab = await get_gamepadui_tab()
plugin_dir = path.join(self.plugin_path, self.find_plugin_folder(name)) plugin_folder = self.find_plugin_folder(name)
assert plugin_folder is not None
plugin_dir = path.join(self.plugin_path, plugin_folder)
try: try:
logger.info("uninstalling " + name) logger.info("uninstalling " + name)
logger.info(" at dir " + plugin_dir) logger.info(" at dir " + plugin_dir)
@@ -133,7 +152,7 @@ class PluginBrowser:
if self.loader.watcher: if self.loader.watcher:
self.loader.watcher.disabled = False self.loader.watcher.disabled = False
async def _install(self, artifact, name, version, hash): async def _install(self, artifact: str, name: str, version: str, hash: str):
# Will be set later in code # Will be set later in code
res_zip = None res_zip = None
@@ -185,6 +204,7 @@ class PluginBrowser:
ret = self._unzip_to_plugin_dir(res_zip, name, hash) ret = self._unzip_to_plugin_dir(res_zip, name, hash)
if ret: if ret:
plugin_folder = self.find_plugin_folder(name) plugin_folder = self.find_plugin_folder(name)
assert plugin_folder is not None
plugin_dir = path.join(self.plugin_path, plugin_folder) plugin_dir = path.join(self.plugin_path, plugin_folder)
ret = await self._download_remote_binaries_for_plugin_with_name(plugin_dir) ret = await self._download_remote_binaries_for_plugin_with_name(plugin_dir)
if ret: if ret:
@@ -206,14 +226,14 @@ class PluginBrowser:
if self.loader.watcher: if self.loader.watcher:
self.loader.watcher.disabled = False self.loader.watcher.disabled = False
async def request_plugin_install(self, artifact, name, version, hash, install_type): async def request_plugin_install(self, artifact: str, name: str, version: str, hash: str, install_type: PluginInstallType):
request_id = str(time()) request_id = str(time())
self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash) self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash)
tab = await get_gamepadui_tab() tab = await get_gamepadui_tab()
await tab.open_websocket() await tab.open_websocket()
await tab.evaluate_js(f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}', '{request_id}', '{hash}', {install_type})") await tab.evaluate_js(f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}', '{request_id}', '{hash}', {install_type})")
async def request_multiple_plugin_installs(self, requests): async def request_multiple_plugin_installs(self, requests: List[PluginInstallRequest]):
request_id = str(time()) request_id = str(time())
self.install_requests[request_id] = [PluginInstallContext(req['artifact'], req['name'], req['version'], req['hash']) for req in requests] self.install_requests[request_id] = [PluginInstallContext(req['artifact'], req['name'], req['version'], req['hash']) for req in requests]
js_requests_parameter = ','.join([ js_requests_parameter = ','.join([
@@ -224,17 +244,17 @@ class PluginBrowser:
await tab.open_websocket() await tab.open_websocket()
await tab.evaluate_js(f"DeckyPluginLoader.addMultiplePluginsInstallPrompt('{request_id}', [{js_requests_parameter}])") await tab.evaluate_js(f"DeckyPluginLoader.addMultiplePluginsInstallPrompt('{request_id}', [{js_requests_parameter}])")
async def confirm_plugin_install(self, request_id): async def confirm_plugin_install(self, request_id: str):
requestOrRequests = self.install_requests.pop(request_id) requestOrRequests = self.install_requests.pop(request_id)
if isinstance(requestOrRequests, list): if isinstance(requestOrRequests, list):
[await self._install(req.artifact, req.name, req.version, req.hash) for req in requestOrRequests] [await self._install(req.artifact, req.name, req.version, req.hash) for req in requestOrRequests]
else: else:
await self._install(requestOrRequests.artifact, requestOrRequests.name, requestOrRequests.version, requestOrRequests.hash) await self._install(requestOrRequests.artifact, requestOrRequests.name, requestOrRequests.version, requestOrRequests.hash)
def cancel_plugin_install(self, request_id): def cancel_plugin_install(self, request_id: str):
self.install_requests.pop(request_id) self.install_requests.pop(request_id)
def cleanup_plugin_settings(self, name): def cleanup_plugin_settings(self, name: str):
"""Removes any settings related to a plugin. Propably called when a plugin is uninstalled. """Removes any settings related to a plugin. Propably called when a plugin is uninstalled.
Args: Args:
+25 -34
View File
@@ -2,16 +2,16 @@ import re
import ssl import ssl
import uuid import uuid
import os import os
import sys
import subprocess import subprocess
from hashlib import sha256 from hashlib import sha256
from io import BytesIO from io import BytesIO
import certifi import certifi
from aiohttp.web import Response, middleware from aiohttp.web import Request, Response, middleware
from aiohttp.typedefs import Handler
from aiohttp import ClientSession from aiohttp import ClientSession
import localplatform from . import localplatform
from customtypes import UserType from .customtypes import UserType
from logging import getLogger from logging import getLogger
REMOTE_DEBUGGER_UNIT = "steam-web-debug-portforward.service" REMOTE_DEBUGGER_UNIT = "steam-web-debug-portforward.service"
@@ -31,17 +31,17 @@ def get_csrf_token():
return csrf_token return csrf_token
@middleware @middleware
async def csrf_middleware(request, handler): async def csrf_middleware(request: Request, handler: Handler):
if str(request.method) == "OPTIONS" or request.headers.get('Authentication') == csrf_token or str(request.rel_url) == "/auth/token" or str(request.rel_url).startswith("/plugins/load_main/") or str(request.rel_url).startswith("/static/") or str(request.rel_url).startswith("/legacy/") or str(request.rel_url).startswith("/steam_resource/") or str(request.rel_url).startswith("/frontend/") or assets_regex.match(str(request.rel_url)) or frontend_regex.match(str(request.rel_url)): if str(request.method) == "OPTIONS" or request.headers.get('Authentication') == csrf_token or str(request.rel_url) == "/auth/token" or str(request.rel_url).startswith("/plugins/load_main/") or str(request.rel_url).startswith("/static/") or str(request.rel_url).startswith("/legacy/") or str(request.rel_url).startswith("/steam_resource/") or str(request.rel_url).startswith("/frontend/") or assets_regex.match(str(request.rel_url)) or frontend_regex.match(str(request.rel_url)):
return await handler(request) return await handler(request)
return Response(text='Forbidden', status='403') return Response(text='Forbidden', status=403)
# Get the default homebrew path unless a home_path is specified. home_path argument is deprecated # Get the default homebrew path unless a home_path is specified. home_path argument is deprecated
def get_homebrew_path(home_path = None) -> str: def get_homebrew_path() -> str:
return localplatform.get_unprivileged_path() return localplatform.get_unprivileged_path()
# Recursively create path and chown as user # Recursively create path and chown as user
def mkdir_as_user(path): def mkdir_as_user(path: str):
path = os.path.realpath(path) path = os.path.realpath(path)
os.makedirs(path, exist_ok=True) os.makedirs(path, exist_ok=True)
localplatform.chown(path) localplatform.chown(path)
@@ -57,23 +57,18 @@ def get_loader_version() -> str:
# returns the appropriate system python paths # returns the appropriate system python paths
def get_system_pythonpaths() -> list[str]: def get_system_pythonpaths() -> list[str]:
extra_args = {}
if localplatform.ON_LINUX:
# run as normal normal user to also include user python paths
extra_args["user"] = localplatform.localplatform._get_user_id()
extra_args["env"] = {}
try: try:
# run as normal normal user if on linux to also include user python paths
proc = subprocess.run(["python3" if localplatform.ON_LINUX else "python", "-c", "import sys; print('\\n'.join(x for x in sys.path if x))"], proc = subprocess.run(["python3" if localplatform.ON_LINUX else "python", "-c", "import sys; print('\\n'.join(x for x in sys.path if x))"],
capture_output=True, **extra_args) # TODO make this less insane
capture_output=True, user=localplatform.localplatform._get_user_id() if localplatform.ON_LINUX else None, env={} if localplatform.ON_LINUX else None) # type: ignore
return [x.strip() for x in proc.stdout.decode().strip().split("\n")] return [x.strip() for x in proc.stdout.decode().strip().split("\n")]
except Exception as e: except Exception as e:
logger.warn(f"Failed to execute get_system_pythonpaths(): {str(e)}") logger.warn(f"Failed to execute get_system_pythonpaths(): {str(e)}")
return [] return []
# Download Remote Binaries to local Plugin # Download Remote Binaries to local Plugin
async def download_remote_binary_to_path(url, binHash, path) -> bool: async def download_remote_binary_to_path(url: str, binHash: str, path: str) -> bool:
rv = False rv = False
try: try:
if os.access(os.path.dirname(path), os.W_OK): if os.access(os.path.dirname(path), os.W_OK):
@@ -110,46 +105,42 @@ def set_user_group() -> str:
# Get the user id hosting the plugin loader # Get the user id hosting the plugin loader
def get_user_id() -> int: def get_user_id() -> int:
return localplatform.localplatform._get_user_id() return localplatform.localplatform._get_user_id() # pyright: ignore [reportPrivateUsage]
# Get the user hosting the plugin loader # Get the user hosting the plugin loader
def get_user() -> str: def get_user() -> str:
return localplatform.localplatform._get_user() return localplatform.localplatform._get_user() # pyright: ignore [reportPrivateUsage]
# Get the effective user id of the running process # Get the effective user id of the running process
def get_effective_user_id() -> int: def get_effective_user_id() -> int:
return localplatform.localplatform._get_effective_user_id() return localplatform.localplatform._get_effective_user_id() # pyright: ignore [reportPrivateUsage]
# Get the effective user of the running process # Get the effective user of the running process
def get_effective_user() -> str: def get_effective_user() -> str:
return localplatform.localplatform._get_effective_user() return localplatform.localplatform._get_effective_user() # pyright: ignore [reportPrivateUsage]
# Get the effective user group id of the running process # Get the effective user group id of the running process
def get_effective_user_group_id() -> int: def get_effective_user_group_id() -> int:
return localplatform.localplatform._get_effective_user_group_id() return localplatform.localplatform._get_effective_user_group_id() # pyright: ignore [reportPrivateUsage]
# Get the effective user group of the running process # Get the effective user group of the running process
def get_effective_user_group() -> str: def get_effective_user_group() -> str:
return localplatform.localplatform._get_effective_user_group() return localplatform.localplatform._get_effective_user_group() # pyright: ignore [reportPrivateUsage]
# Get the user owner of the given file path. # Get the user owner of the given file path.
def get_user_owner(file_path) -> str: def get_user_owner(file_path: str) -> str:
return localplatform.localplatform._get_user_owner(file_path) return localplatform.localplatform._get_user_owner(file_path) # pyright: ignore [reportPrivateUsage]
# Get the user group of the given file path. # Get the user group of the given file path, or the user group hosting the plugin loader
def get_user_group(file_path) -> str: def get_user_group(file_path: str | None = None) -> str:
return localplatform.localplatform._get_user_group(file_path) return localplatform.localplatform._get_user_group(file_path) # pyright: ignore [reportPrivateUsage]
# Get the group id of the user hosting the plugin loader # Get the group id of the user hosting the plugin loader
def get_user_group_id() -> int: def get_user_group_id() -> int:
return localplatform.localplatform._get_user_group_id() return localplatform.localplatform._get_user_group_id() # pyright: ignore [reportPrivateUsage]
# Get the group of the user hosting the plugin loader
def get_user_group() -> str:
return localplatform.localplatform._get_user_group()
# Get the default home path unless a user is specified # Get the default home path unless a user is specified
def get_home_path(username = None) -> str: def get_home_path(username: str | None = None) -> str:
return localplatform.get_home_path(UserType.ROOT if username == "root" else UserType.HOST_USER) return localplatform.get_home_path(UserType.ROOT if username == "root" else UserType.HOST_USER)
async def is_systemd_unit_active(unit_name: str) -> bool: async def is_systemd_unit_active(unit_name: str) -> bool:
+50 -34
View File
@@ -2,10 +2,9 @@
from asyncio import sleep from asyncio import sleep
from logging import getLogger from logging import getLogger
from traceback import format_exc from typing import Any, Callable, List, TypedDict, Dict
from typing import List
from aiohttp import ClientSession, WSMsgType from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientConnectorError, ClientOSError from aiohttp.client_exceptions import ClientConnectorError, ClientOSError
from asyncio.exceptions import TimeoutError from asyncio.exceptions import TimeoutError
import uuid import uuid
@@ -14,35 +13,43 @@ BASE_ADDRESS = "http://localhost:8080"
logger = getLogger("Injector") logger = getLogger("Injector")
class _TabResponse(TypedDict):
title: str
id: str
url: str
webSocketDebuggerUrl: str
class Tab: class Tab:
cmd_id = 0 cmd_id = 0
def __init__(self, res) -> None: def __init__(self, res: _TabResponse) -> None:
self.title = res["title"] self.title: str = res["title"]
self.id = res["id"] self.id: str = res["id"]
self.url = res["url"] self.url: str = res["url"]
self.ws_url = res["webSocketDebuggerUrl"] self.ws_url: str = res["webSocketDebuggerUrl"]
self.websocket = None self.websocket = None
self.client = None self.client = None
async def open_websocket(self): async def open_websocket(self):
self.client = ClientSession() self.client = ClientSession()
self.websocket = await self.client.ws_connect(self.ws_url) self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore
async def close_websocket(self): async def close_websocket(self):
await self.websocket.close() if self.websocket:
await self.client.close() await self.websocket.close()
if self.client:
await self.client.close()
async def listen_for_message(self): async def listen_for_message(self):
async for message in self.websocket: if self.websocket:
data = message.json() async for message in self.websocket:
yield data data = message.json()
logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.") yield data
await self.close_websocket() logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.")
await self.close_websocket()
async def _send_devtools_cmd(self, dc, receive=True): async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True):
if self.websocket: if self.websocket:
self.cmd_id += 1 self.cmd_id += 1
dc["id"] = self.cmd_id dc["id"] = self.cmd_id
@@ -54,7 +61,7 @@ class Tab:
return None return None
raise RuntimeError("Websocket not opened") raise RuntimeError("Websocket not opened")
async def evaluate_js(self, js, run_async=False, manage_socket=True, get_result=True): async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True):
try: try:
if manage_socket: if manage_socket:
await self.open_websocket() await self.open_websocket()
@@ -73,15 +80,16 @@ class Tab:
await self.close_websocket() await self.close_websocket()
return res return res
async def has_global_var(self, var_name, manage_socket=True): async def has_global_var(self, var_name: str, manage_socket: bool = True):
res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket) res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket)
assert res is not None
if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
return False return False
return res["result"]["result"]["value"] return res["result"]["result"]["value"]
async def close(self, manage_socket=True): async def close(self, manage_socket: bool = True):
try: try:
if manage_socket: if manage_socket:
await self.open_websocket() await self.open_websocket()
@@ -111,7 +119,7 @@ class Tab:
"method": "Page.disable", "method": "Page.disable",
}, False) }, False)
async def refresh(self, manage_socket=True): async def refresh(self, manage_socket: bool = True):
try: try:
if manage_socket: if manage_socket:
await self.open_websocket() await self.open_websocket()
@@ -125,7 +133,7 @@ class Tab:
await self.close_websocket() await self.close_websocket()
return return
async def reload_and_evaluate(self, js, manage_socket=True): async def reload_and_evaluate(self, js: str, manage_socket: bool = True):
""" """
Reloads the current tab, with JS to run on load via debugger Reloads the current tab, with JS to run on load via debugger
""" """
@@ -153,11 +161,13 @@ class Tab:
} }
}, True) }, True)
assert breakpoint_res is not None
logger.info(breakpoint_res) logger.info(breakpoint_res)
# Page finishes loading when breakpoint hits # Page finishes loading when breakpoint hits
for x in range(20): for _ in range(20):
# this works around 1/5 of the time, so just send it 8 times. # this works around 1/5 of the time, so just send it 8 times.
# the js accounts for being injected multiple times allowing only one instance to run at a time anyway # the js accounts for being injected multiple times allowing only one instance to run at a time anyway
await self._send_devtools_cmd({ await self._send_devtools_cmd({
@@ -176,7 +186,7 @@ class Tab:
} }
}, False) }, False)
for x in range(4): for _ in range(4):
await self._send_devtools_cmd({ await self._send_devtools_cmd({
"method": "Debugger.resume" "method": "Debugger.resume"
}, False) }, False)
@@ -190,7 +200,7 @@ class Tab:
await self.close_websocket() await self.close_websocket()
return return
async def add_script_to_evaluate_on_new_document(self, js, add_dom_wrapper=True, manage_socket=True, get_result=True): async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True):
""" """
How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description: How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description:
@@ -253,7 +263,7 @@ class Tab:
await self.close_websocket() await self.close_websocket()
return res return res
async def remove_script_to_evaluate_on_new_document(self, script_id, manage_socket=True): async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True):
""" """
Removes a script from a page that was added with `add_script_to_evaluate_on_new_document` Removes a script from a page that was added with `add_script_to_evaluate_on_new_document`
@@ -267,7 +277,7 @@ class Tab:
if manage_socket: if manage_socket:
await self.open_websocket() await self.open_websocket()
res = await self._send_devtools_cmd({ await self._send_devtools_cmd({
"method": "Page.removeScriptToEvaluateOnNewDocument", "method": "Page.removeScriptToEvaluateOnNewDocument",
"params": { "params": {
"identifier": script_id "identifier": script_id
@@ -278,15 +288,16 @@ class Tab:
if manage_socket: if manage_socket:
await self.close_websocket() await self.close_websocket()
async def has_element(self, element_name, manage_socket=True): async def has_element(self, element_name: str, manage_socket: bool = True):
res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket) res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket)
assert res is not None
if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]: if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
return False return False
return res["result"]["result"]["value"] return res["result"]["result"]["value"]
async def inject_css(self, style, manage_socket=True): async def inject_css(self, style: str, manage_socket: bool = True):
try: try:
css_id = str(uuid.uuid4()) css_id = str(uuid.uuid4())
@@ -300,6 +311,8 @@ class Tab:
}})() }})()
""", False, manage_socket) """, False, manage_socket)
assert result is not None
if "exceptionDetails" in result["result"]: if "exceptionDetails" in result["result"]:
return { return {
"success": False, "success": False,
@@ -316,7 +329,7 @@ class Tab:
"result": e "result": e
} }
async def remove_css(self, css_id, manage_socket=True): async def remove_css(self, css_id: str, manage_socket: bool = True):
try: try:
result = await self.evaluate_js( result = await self.evaluate_js(
f""" f"""
@@ -328,6 +341,8 @@ class Tab:
}})() }})()
""", False, manage_socket) """, False, manage_socket)
assert result is not None
if "exceptionDetails" in result["result"]: if "exceptionDetails" in result["result"]:
return { return {
"success": False, "success": False,
@@ -343,8 +358,9 @@ class Tab:
"result": e "result": e
} }
async def get_steam_resource(self, url): async def get_steam_resource(self, url: str):
res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True) res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True)
assert res is not None
return res["result"]["result"]["value"] return res["result"]["result"]["value"]
def __repr__(self): def __repr__(self):
@@ -380,14 +396,14 @@ async def get_tabs() -> List[Tab]:
raise Exception(f"/json did not return 200. {await res.text()}") raise Exception(f"/json did not return 200. {await res.text()}")
async def get_tab(tab_name) -> Tab: async def get_tab(tab_name: str) -> Tab:
tabs = await get_tabs() tabs = await get_tabs()
tab = next((i for i in tabs if i.title == tab_name), None) tab = next((i for i in tabs if i.title == tab_name), None)
if not tab: if not tab:
raise ValueError(f"Tab {tab_name} not found") raise ValueError(f"Tab {tab_name} not found")
return tab return tab
async def get_tab_lambda(test) -> Tab: async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab:
tabs = await get_tabs() tabs = await get_tabs()
tab = next((i for i in tabs if test(i)), None) tab = next((i for i in tabs if test(i)), None)
if not tab: if not tab:
@@ -408,7 +424,7 @@ async def get_gamepadui_tab() -> Tab:
raise ValueError(f"GamepadUI Tab not found") raise ValueError(f"GamepadUI Tab not found")
return tab return tab
async def inject_to_tab(tab_name, js, run_async=False): async def inject_to_tab(tab_name: str, js: str, run_async: bool = False):
tab = await get_tab(tab_name) tab = await get_tab(tab_name)
return await tab.evaluate_js(js, run_async) return await tab.evaluate_js(js, run_async)
+43 -33
View File
@@ -1,34 +1,43 @@
from asyncio import Queue, sleep from __future__ import annotations
from asyncio import AbstractEventLoop, Queue, sleep
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from logging import getLogger from logging import getLogger
from os import listdir, path from os import listdir, path
from pathlib import Path from pathlib import Path
from traceback import print_exc from traceback import print_exc
from typing import Any, Tuple
from aiohttp import web from aiohttp import web
from os.path import exists from os.path import exists
from watchdog.events import RegexMatchingEventHandler from watchdog.events import RegexMatchingEventHandler, DirCreatedEvent, DirModifiedEvent, FileCreatedEvent, FileModifiedEvent # type: ignore
from watchdog.observers import Observer from watchdog.observers import Observer # type: ignore
from injector import get_tab, get_gamepadui_tab from typing import TYPE_CHECKING
from plugin import PluginWrapper if TYPE_CHECKING:
from .main import PluginManager
from .injector import get_tab, get_gamepadui_tab
from .plugin import PluginWrapper
Plugins = dict[str, PluginWrapper]
ReloadQueue = Queue[Tuple[str, str, bool | None] | Tuple[str, str]]
class FileChangeHandler(RegexMatchingEventHandler): class FileChangeHandler(RegexMatchingEventHandler):
def __init__(self, queue, plugin_path) -> None: def __init__(self, queue: ReloadQueue, plugin_path: str) -> None:
super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$']) super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$']) # type: ignore
self.logger = getLogger("file-watcher") self.logger = getLogger("file-watcher")
self.plugin_path = plugin_path self.plugin_path = plugin_path
self.queue = queue self.queue = queue
self.disabled = True self.disabled = True
def maybe_reload(self, src_path): def maybe_reload(self, src_path: str):
if self.disabled: if self.disabled:
return return
plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0] plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0]
if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")): if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")):
self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True)) self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True))
def on_created(self, event): def on_created(self, event: DirCreatedEvent | FileCreatedEvent):
src_path = event.src_path src_path = event.src_path
if "__pycache__" in src_path: if "__pycache__" in src_path:
return return
@@ -42,7 +51,7 @@ class FileChangeHandler(RegexMatchingEventHandler):
self.logger.debug(f"file created: {src_path}") self.logger.debug(f"file created: {src_path}")
self.maybe_reload(src_path) self.maybe_reload(src_path)
def on_modified(self, event): def on_modified(self, event: DirModifiedEvent | FileModifiedEvent):
src_path = event.src_path src_path = event.src_path
if "__pycache__" in src_path: if "__pycache__" in src_path:
return return
@@ -57,25 +66,25 @@ class FileChangeHandler(RegexMatchingEventHandler):
self.maybe_reload(src_path) self.maybe_reload(src_path)
class Loader: class Loader:
def __init__(self, server_instance, plugin_path, loop, live_reload=False) -> None: def __init__(self, server_instance: PluginManager, plugin_path: str, loop: AbstractEventLoop, live_reload: bool = False) -> None:
self.loop = loop self.loop = loop
self.logger = getLogger("Loader") self.logger = getLogger("Loader")
self.plugin_path = plugin_path self.plugin_path = plugin_path
self.logger.info(f"plugin_path: {self.plugin_path}") self.logger.info(f"plugin_path: {self.plugin_path}")
self.plugins : dict[str, PluginWrapper] = {} self.plugins: Plugins = {}
self.watcher = None self.watcher = None
self.live_reload = live_reload self.live_reload = live_reload
self.reload_queue = Queue() self.reload_queue: ReloadQueue = Queue()
self.loop.create_task(self.handle_reloads()) self.loop.create_task(self.handle_reloads())
if live_reload: if live_reload:
self.observer = Observer() self.observer = Observer()
self.watcher = FileChangeHandler(self.reload_queue, plugin_path) self.watcher = FileChangeHandler(self.reload_queue, plugin_path)
self.observer.schedule(self.watcher, self.plugin_path, recursive=True) self.observer.schedule(self.watcher, self.plugin_path, recursive=True) # type: ignore
self.observer.start() self.observer.start()
self.loop.create_task(self.enable_reload_wait()) self.loop.create_task(self.enable_reload_wait())
server_instance.add_routes([ server_instance.web_app.add_routes([
web.get("/frontend/{path:.*}", self.handle_frontend_assets), web.get("/frontend/{path:.*}", self.handle_frontend_assets),
web.get("/locales/{path:.*}", self.handle_frontend_locales), web.get("/locales/{path:.*}", self.handle_frontend_locales),
web.get("/plugins", self.get_plugins), web.get("/plugins", self.get_plugins),
@@ -93,40 +102,41 @@ class Loader:
async def enable_reload_wait(self): async def enable_reload_wait(self):
if self.live_reload: if self.live_reload:
await sleep(10) await sleep(10)
self.logger.info("Hot reload enabled") if self.watcher:
self.watcher.disabled = False self.logger.info("Hot reload enabled")
self.watcher.disabled = False
async def handle_frontend_assets(self, request): async def handle_frontend_assets(self, request: web.Request):
file = path.join(path.dirname(__file__), "static", request.match_info["path"]) file = path.join(path.dirname(__file__), "..", "static", request.match_info["path"])
return web.FileResponse(file, headers={"Cache-Control": "no-cache"}) return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
async def handle_frontend_locales(self, request): async def handle_frontend_locales(self, request: web.Request):
req_lang = request.match_info["path"] req_lang = request.match_info["path"]
file = path.join(path.dirname(__file__), "locales", req_lang) file = path.join(path.dirname(__file__), "..", "locales", req_lang)
if exists(file): if exists(file):
return web.FileResponse(file, headers={"Cache-Control": "no-cache", "Content-Type": "application/json"}) return web.FileResponse(file, headers={"Cache-Control": "no-cache", "Content-Type": "application/json"})
else: else:
self.logger.info(f"Language {req_lang} not available, returning an empty dictionary") self.logger.info(f"Language {req_lang} not available, returning an empty dictionary")
return web.json_response(data={}, headers={"Cache-Control": "no-cache"}) return web.json_response(data={}, headers={"Cache-Control": "no-cache"})
async def get_plugins(self, request): async def get_plugins(self, request: web.Request):
plugins = list(self.plugins.values()) plugins = list(self.plugins.values())
return web.json_response([{"name": str(i) if not i.legacy else "$LEGACY_"+str(i), "version": i.version} for i in plugins]) return web.json_response([{"name": str(i) if not i.legacy else "$LEGACY_"+str(i), "version": i.version} for i in plugins])
def handle_plugin_frontend_assets(self, request): async def handle_plugin_frontend_assets(self, request: web.Request):
plugin = self.plugins[request.match_info["plugin_name"]] plugin = self.plugins[request.match_info["plugin_name"]]
file = path.join(self.plugin_path, plugin.plugin_directory, "dist/assets", request.match_info["path"]) file = path.join(self.plugin_path, plugin.plugin_directory, "dist/assets", request.match_info["path"])
return web.FileResponse(file, headers={"Cache-Control": "no-cache"}) return web.FileResponse(file, headers={"Cache-Control": "no-cache"})
def handle_frontend_bundle(self, request): async def handle_frontend_bundle(self, request: web.Request):
plugin = self.plugins[request.match_info["plugin_name"]] plugin = self.plugins[request.match_info["plugin_name"]]
with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), "r", encoding="utf-8") as bundle: with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), "r", encoding="utf-8") as bundle:
return web.Response(text=bundle.read(), content_type="application/javascript") return web.Response(text=bundle.read(), content_type="application/javascript")
def import_plugin(self, file, plugin_directory, refresh=False, batch=False): def import_plugin(self, file: str, plugin_directory: str, refresh: bool | None = False, batch: bool | None = False):
try: try:
plugin = PluginWrapper(file, plugin_directory, self.plugin_path) plugin = PluginWrapper(file, plugin_directory, self.plugin_path)
if plugin.name in self.plugins: if plugin.name in self.plugins:
@@ -146,7 +156,7 @@ class Loader:
self.logger.error(f"Could not load {file}. {e}") self.logger.error(f"Could not load {file}. {e}")
print_exc() print_exc()
async def dispatch_plugin(self, name, version): async def dispatch_plugin(self, name: str, version: str | None):
gpui_tab = await get_gamepadui_tab() gpui_tab = await get_gamepadui_tab()
await gpui_tab.evaluate_js(f"window.importDeckyPlugin('{name}', '{version}')") await gpui_tab.evaluate_js(f"window.importDeckyPlugin('{name}', '{version}')")
@@ -161,15 +171,15 @@ class Loader:
async def handle_reloads(self): async def handle_reloads(self):
while True: while True:
args = await self.reload_queue.get() args = await self.reload_queue.get()
self.import_plugin(*args) self.import_plugin(*args) # type: ignore
async def handle_plugin_method_call(self, request): async def handle_plugin_method_call(self, request: web.Request):
res = {} res = {}
plugin = self.plugins[request.match_info["plugin_name"]] plugin = self.plugins[request.match_info["plugin_name"]]
method_name = request.match_info["method_name"] method_name = request.match_info["method_name"]
try: try:
method_info = await request.json() method_info = await request.json()
args = method_info["args"] args: Any = method_info["args"]
except JSONDecodeError: except JSONDecodeError:
args = {} args = {}
try: try:
@@ -189,7 +199,7 @@ class Loader:
can introduce it more smoothly and give people the chance to sample the new features even can introduce it more smoothly and give people the chance to sample the new features even
without plugin support. They will be removed once legacy plugins are no longer relevant. without plugin support. They will be removed once legacy plugins are no longer relevant.
""" """
async def load_plugin_main_view(self, request): async def load_plugin_main_view(self, request: web.Request):
plugin = self.plugins[request.match_info["name"]] plugin = self.plugins[request.match_info["name"]]
with open(path.join(self.plugin_path, plugin.plugin_directory, plugin.main_view_html), "r", encoding="utf-8") as template: with open(path.join(self.plugin_path, plugin.plugin_directory, plugin.main_view_html), "r", encoding="utf-8") as template:
template_data = template.read() template_data = template.read()
@@ -201,7 +211,7 @@ class Loader:
""" """
return web.Response(text=ret, content_type="text/html") return web.Response(text=ret, content_type="text/html")
async def handle_sub_route(self, request): async def handle_sub_route(self, request: web.Request):
plugin = self.plugins[request.match_info["name"]] plugin = self.plugins[request.match_info["name"]]
route_path = request.match_info["path"] route_path = request.match_info["path"]
self.logger.info(path) self.logger.info(path)
@@ -212,14 +222,14 @@ class Loader:
return web.Response(text=ret) return web.Response(text=ret)
async def get_steam_resource(self, request): async def get_steam_resource(self, request: web.Request):
tab = await get_tab("SP") tab = await get_tab("SP")
try: try:
return web.Response(text=await tab.get_steam_resource(f"https://steamloopback.host/{request.match_info['path']}"), content_type="text/html") return web.Response(text=await tab.get_steam_resource(f"https://steamloopback.host/{request.match_info['path']}"), content_type="text/html")
except Exception as e: except Exception as e:
return web.Response(text=str(e), status=400) return web.Response(text=str(e), status=400)
async def handle_backend_reload_request(self, request): async def handle_backend_reload_request(self, request: web.Request):
plugin_name : str = request.match_info["plugin_name"] plugin_name : str = request.match_info["plugin_name"]
plugin = self.plugins[plugin_name] plugin = self.plugins[plugin_name]
@@ -4,11 +4,11 @@ ON_WINDOWS = platform.system() == "Windows"
ON_LINUX = not ON_WINDOWS ON_LINUX = not ON_WINDOWS
if ON_WINDOWS: if ON_WINDOWS:
from localplatformwin import * from .localplatformwin import *
import localplatformwin as localplatform from . import localplatformwin as localplatform
else: else:
from localplatformlinux import * from .localplatformlinux import *
import localplatformlinux as localplatform from . import localplatformlinux as localplatform
def get_privileged_path() -> str: def get_privileged_path() -> str:
'''Get path accessible by elevated user. Holds plugins, decky loader and decky loader configs''' '''Get path accessible by elevated user. Holds plugins, decky loader and decky loader configs'''
@@ -1,6 +1,6 @@
import os, pwd, grp, sys, logging import os, pwd, grp, sys, logging
from subprocess import call, run, DEVNULL, PIPE, STDOUT from subprocess import call, run, DEVNULL, PIPE, STDOUT
from customtypes import UserType from .customtypes import UserType
logger = logging.getLogger("localplatform") logger = logging.getLogger("localplatform")
@@ -29,21 +29,17 @@ def _get_effective_user_group() -> str:
return grp.getgrgid(_get_effective_user_group_id()).gr_name return grp.getgrgid(_get_effective_user_group_id()).gr_name
# Get the user owner of the given file path. # Get the user owner of the given file path.
def _get_user_owner(file_path) -> str: def _get_user_owner(file_path: str) -> str:
return pwd.getpwuid(os.stat(file_path).st_uid).pw_name return pwd.getpwuid(os.stat(file_path).st_uid).pw_name
# Get the user group of the given file path. # Get the user group of the given file path, or the user group hosting the plugin loader
def _get_user_group(file_path) -> str: def _get_user_group(file_path: str | None = None) -> str:
return grp.getgrgid(os.stat(file_path).st_gid).gr_name return grp.getgrgid(os.stat(file_path).st_gid if file_path is not None else _get_user_group_id()).gr_name
# Get the group id of the user hosting the plugin loader # Get the group id of the user hosting the plugin loader
def _get_user_group_id() -> int: def _get_user_group_id() -> int:
return pwd.getpwuid(_get_user_id()).pw_gid return pwd.getpwuid(_get_user_id()).pw_gid
# Get the group of the user hosting the plugin loader
def _get_user_group() -> str:
return grp.getgrgid(_get_user_group_id()).gr_name
def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool: def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool:
user_str = "" user_str = ""
@@ -146,7 +142,7 @@ def get_privileged_path() -> str:
return path return path
def _parent_dir(path : str) -> str: def _parent_dir(path : str | None) -> str | None:
if path == None: if path == None:
return None return None
@@ -166,7 +162,7 @@ def get_unprivileged_path() -> str:
# Expected path of loader binary is /home/deck/homebrew/service/PluginLoader # Expected path of loader binary is /home/deck/homebrew/service/PluginLoader
path = _parent_dir(_parent_dir(os.path.realpath(sys.argv[0]))) path = _parent_dir(_parent_dir(os.path.realpath(sys.argv[0])))
if not os.path.exists(path): if path != None and not os.path.exists(path):
path = None path = None
if path == None: if path == None:
@@ -1,4 +1,4 @@
from customtypes import UserType from .customtypes import UserType
import os, sys import os, sys
def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool: def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool = True) -> bool:
@@ -1,10 +1,13 @@
import asyncio, time, random import asyncio, time
from localplatform import ON_WINDOWS from typing import Awaitable, Callable
import random
from .localplatform import ON_WINDOWS
BUFFER_LIMIT = 2 ** 20 # 1 MiB BUFFER_LIMIT = 2 ** 20 # 1 MiB
class UnixSocket: class UnixSocket:
def __init__(self, on_new_message): def __init__(self, on_new_message: Callable[[str], Awaitable[str|None]]):
''' '''
on_new_message takes 1 string argument. on_new_message takes 1 string argument.
It's return value gets used, if not None, to write data to the socket. It's return value gets used, if not None, to write data to the socket.
@@ -46,28 +49,32 @@ class UnixSocket:
self.reader = None self.reader = None
async def read_single_line(self) -> str|None: async def read_single_line(self) -> str|None:
reader, writer = await self.get_socket_connection() reader, _ = await self.get_socket_connection()
if self.reader == None: try:
return None assert reader
except AssertionError:
return
return await self._read_single_line(reader) return await self._read_single_line(reader)
async def write_single_line(self, message : str): async def write_single_line(self, message : str):
reader, writer = await self.get_socket_connection() _, writer = await self.get_socket_connection()
if self.writer == None: try:
return; assert writer
except AssertionError:
return
await self._write_single_line(writer, message) await self._write_single_line(writer, message)
async def _read_single_line(self, reader) -> str: async def _read_single_line(self, reader: asyncio.StreamReader) -> str:
line = bytearray() line = bytearray()
while True: while True:
try: try:
line.extend(await reader.readuntil()) line.extend(await reader.readuntil())
except asyncio.LimitOverrunError: except asyncio.LimitOverrunError:
line.extend(await reader.read(reader._limit)) line.extend(await reader.read(reader._limit)) # type: ignore
continue continue
except asyncio.IncompleteReadError as err: except asyncio.IncompleteReadError as err:
line.extend(err.partial) line.extend(err.partial)
@@ -77,27 +84,27 @@ class UnixSocket:
return line.decode("utf-8") return line.decode("utf-8")
async def _write_single_line(self, writer, message : str): async def _write_single_line(self, writer: asyncio.StreamWriter, message : str):
if not message.endswith("\n"): if not message.endswith("\n"):
message += "\n" message += "\n"
writer.write(message.encode("utf-8")) writer.write(message.encode("utf-8"))
await writer.drain() await writer.drain()
async def _listen_for_method_call(self, reader, writer): async def _listen_for_method_call(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
while True: while True:
line = await self._read_single_line(reader) line = await self._read_single_line(reader)
try: try:
res = await self.on_new_message(line) res = await self.on_new_message(line)
except Exception as e: except Exception:
return return
if res != None: if res != None:
await self._write_single_line(writer, res) await self._write_single_line(writer, res)
class PortSocket (UnixSocket): class PortSocket (UnixSocket):
def __init__(self, on_new_message): def __init__(self, on_new_message: Callable[[str], Awaitable[str|None]]):
''' '''
on_new_message takes 1 string argument. on_new_message takes 1 string argument.
It's return value gets used, if not None, to write data to the socket. It's return value gets used, if not None, to write data to the socket.
@@ -125,7 +132,7 @@ class PortSocket (UnixSocket):
return True return True
if ON_WINDOWS: if ON_WINDOWS:
class LocalSocket (PortSocket): class LocalSocket (PortSocket): # type: ignore
pass pass
else: else:
class LocalSocket (UnixSocket): class LocalSocket (UnixSocket):
+192
View File
@@ -0,0 +1,192 @@
# Change PyInstaller files permissions
import sys
from typing import Dict
from .localplatform import (chmod, chown, service_stop, service_start,
ON_WINDOWS, get_log_level, get_live_reload,
get_server_port, get_server_host, get_chown_plugin_path,
get_privileged_path)
if hasattr(sys, '_MEIPASS'):
chmod(sys._MEIPASS, 755) # type: ignore
# Full imports
from asyncio import AbstractEventLoop, new_event_loop, set_event_loop, sleep
from logging import basicConfig, getLogger
from os import path
from traceback import format_exc
import multiprocessing
import aiohttp_cors # type: ignore
# Partial imports
from aiohttp import client_exceptions
from aiohttp.web import Application, Response, Request, get, run_app, static # type: ignore
from aiohttp_jinja2 import setup as jinja_setup
# local modules
from .browser import PluginBrowser
from .helpers import (REMOTE_DEBUGGER_UNIT, csrf_middleware, get_csrf_token,
mkdir_as_user, get_system_pythonpaths, get_effective_user_id)
from .injector import get_gamepadui_tab, Tab, close_old_tabs
from .loader import Loader
from .settings import SettingsManager
from .updater import Updater
from .utilities import Utilities
from .customtypes import UserType
basicConfig(
level=get_log_level(),
format="[%(module)s][%(levelname)s]: %(message)s"
)
logger = getLogger("Main")
plugin_path = path.join(get_privileged_path(), "plugins")
def chown_plugin_dir():
if not path.exists(plugin_path): # For safety, create the folder before attempting to do anything with it
mkdir_as_user(plugin_path)
if not chown(plugin_path, UserType.HOST_USER) or not chmod(plugin_path, 555):
logger.error(f"chown/chmod exited with a non-zero exit code")
if get_chown_plugin_path() == True:
chown_plugin_dir()
class PluginManager:
def __init__(self, loop: AbstractEventLoop) -> None:
self.loop = loop
self.web_app = Application()
self.web_app.middlewares.append(csrf_middleware)
self.cors = aiohttp_cors.setup(self.web_app, defaults={
"https://steamloopback.host": aiohttp_cors.ResourceOptions(
expose_headers="*",
allow_headers="*",
allow_credentials=True
)
})
self.plugin_loader = Loader(self, plugin_path, self.loop, get_live_reload())
self.settings = SettingsManager("loader", path.join(get_privileged_path(), "settings"))
self.plugin_browser = PluginBrowser(plugin_path, self.plugin_loader.plugins, self.plugin_loader, self.settings)
self.utilities = Utilities(self)
self.updater = Updater(self)
jinja_setup(self.web_app)
async def startup(_: Application):
if self.settings.getSetting("cef_forward", False):
self.loop.create_task(service_start(REMOTE_DEBUGGER_UNIT))
else:
self.loop.create_task(service_stop(REMOTE_DEBUGGER_UNIT))
self.loop.create_task(self.loader_reinjector())
self.loop.create_task(self.load_plugins())
self.web_app.on_startup.append(startup)
self.loop.set_exception_handler(self.exception_handler)
self.web_app.add_routes([get("/auth/token", self.get_auth_token)])
for route in list(self.web_app.router.routes()):
self.cors.add(route) # type: ignore
self.web_app.add_routes([static("/static", path.join(path.dirname(__file__), '..', 'static'))])
self.web_app.add_routes([static("/legacy", path.join(path.dirname(__file__), 'legacy'))])
def exception_handler(self, loop: AbstractEventLoop, context: Dict[str, str]):
if context["message"] == "Unclosed connection":
return
loop.default_exception_handler(context)
async def get_auth_token(self, request: Request):
return Response(text=get_csrf_token())
async def load_plugins(self):
# await self.wait_for_server()
logger.debug("Loading plugins")
self.plugin_loader.import_plugins()
# await inject_to_tab("SP", "window.syncDeckyPlugins();")
if self.settings.getSetting("pluginOrder", None) == None:
self.settings.setSetting("pluginOrder", list(self.plugin_loader.plugins.keys()))
logger.debug("Did not find pluginOrder setting, set it to default")
async def loader_reinjector(self):
while True:
tab = None
nf = False
dc = False
while not tab:
try:
tab = await get_gamepadui_tab()
except (client_exceptions.ClientConnectorError, client_exceptions.ServerDisconnectedError):
if not dc:
logger.debug("Couldn't connect to debugger, waiting...")
dc = True
pass
except ValueError:
if not nf:
logger.debug("Couldn't find GamepadUI tab, waiting...")
nf = True
pass
if not tab:
await sleep(5)
await tab.open_websocket()
await tab.enable()
await self.inject_javascript(tab, True)
try:
async for msg in tab.listen_for_message():
# this gets spammed a lot
if msg.get("method", None) != "Page.navigatedWithinDocument":
logger.debug("Page event: " + str(msg.get("method", None)))
if msg.get("method", None) == "Page.domContentEventFired":
if not await tab.has_global_var("deckyHasLoaded", False):
await self.inject_javascript(tab)
if msg.get("method", None) == "Inspector.detached":
logger.info("CEF has requested that we detach.")
await tab.close_websocket()
break
# If this is a forceful disconnect the loop will just stop without any failure message. In this case, injector.py will handle this for us so we don't need to close the socket.
# This is because of https://github.com/aio-libs/aiohttp/blob/3ee7091b40a1bc58a8d7846e7878a77640e96996/aiohttp/client_ws.py#L321
logger.info("CEF has disconnected...")
# At this point the loop starts again and we connect to the freshly started Steam client once it is ready.
except Exception:
logger.error("Exception while reading page events " + format_exc())
await tab.close_websocket()
pass
# while True:
# await sleep(5)
# if not await tab.has_global_var("deckyHasLoaded", False):
# logger.info("Plugin loader isn't present in Steam anymore, reinjecting...")
# await self.inject_javascript(tab)
async def inject_javascript(self, tab: Tab, first: bool=False, request: Request|None=None):
logger.info("Loading Decky frontend!")
try:
if first:
if await tab.has_global_var("deckyHasLoaded", False):
await close_old_tabs()
await tab.evaluate_js("try{if (window.deckyHasLoaded){setTimeout(() => location.reload(), 100)}else{window.deckyHasLoaded = true;(async()=>{try{while(!window.SP_REACT){await new Promise(r => setTimeout(r, 10))};await import('http://localhost:1337/frontend/index.js')}catch(e){console.error(e)};})();}}catch(e){console.error(e)}", False, False, False)
except:
logger.info("Failed to inject JavaScript into tab\n" + format_exc())
pass
def run(self):
return run_app(self.web_app, host=get_server_host(), port=get_server_port(), loop=self.loop, access_log=None)
def main():
if ON_WINDOWS:
# Fix windows/flask not recognising that .js means 'application/javascript'
import mimetypes
mimetypes.add_type('application/javascript', '.js')
# Required for multiprocessing support in frozen files
multiprocessing.freeze_support()
else:
if get_effective_user_id() != 0:
logger.warning(f"decky is running as an unprivileged user, this is not officially supported and may cause issues")
# Append the loader's plugin path to the recognized python paths
sys.path.append(path.join(path.dirname(__file__), "plugin"))
# Append the system and user python paths
sys.path.extend(get_system_pythonpaths())
loop = new_event_loop()
set_event_loop(loop)
PluginManager(loop).run()
+17 -13
View File
@@ -1,7 +1,6 @@
import multiprocessing import multiprocessing
from asyncio import (Lock, get_event_loop, new_event_loop, from asyncio import (Lock, get_event_loop, new_event_loop,
set_event_loop, sleep) set_event_loop, sleep)
from concurrent.futures import ProcessPoolExecutor
from importlib.util import module_from_spec, spec_from_file_location from importlib.util import module_from_spec, spec_from_file_location
from json import dumps, load, loads from json import dumps, load, loads
from logging import getLogger from logging import getLogger
@@ -9,19 +8,19 @@ from traceback import format_exc
from os import path, environ from os import path, environ
from signal import SIGINT, signal from signal import SIGINT, signal
from sys import exit, path as syspath from sys import exit, path as syspath
from time import time from typing import Any, Dict
from localsocket import LocalSocket from .localsocket import LocalSocket
from localplatform import setgid, setuid, get_username, get_home_path from .localplatform import setgid, setuid, get_username, get_home_path
from customtypes import UserType from .customtypes import UserType
import helpers from . import helpers
class PluginWrapper: class PluginWrapper:
def __init__(self, file, plugin_directory, plugin_path) -> None: def __init__(self, file: str, plugin_directory: str, plugin_path: str) -> None:
self.file = file self.file = file
self.plugin_path = plugin_path self.plugin_path = plugin_path
self.plugin_directory = plugin_directory self.plugin_directory = plugin_directory
self.method_call_lock = Lock() self.method_call_lock = Lock()
self.socket = LocalSocket(self._on_new_message) self.socket: LocalSocket = LocalSocket(self._on_new_message)
self.version = None self.version = None
@@ -73,14 +72,17 @@ class PluginWrapper:
helpers.mkdir_as_user(environ["DECKY_PLUGIN_LOG_DIR"]) helpers.mkdir_as_user(environ["DECKY_PLUGIN_LOG_DIR"])
environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory) environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory)
environ["DECKY_PLUGIN_NAME"] = self.name environ["DECKY_PLUGIN_NAME"] = self.name
environ["DECKY_PLUGIN_VERSION"] = self.version if self.version:
environ["DECKY_PLUGIN_VERSION"] = self.version
environ["DECKY_PLUGIN_AUTHOR"] = self.author environ["DECKY_PLUGIN_AUTHOR"] = self.author
# append the plugin's `py_modules` to the recognized python paths # append the plugin's `py_modules` to the recognized python paths
syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules")) syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules"))
spec = spec_from_file_location("_", self.file) spec = spec_from_file_location("_", self.file)
assert spec is not None
module = module_from_spec(spec) module = module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module) spec.loader.exec_module(module)
self.Plugin = module.Plugin self.Plugin = module.Plugin
@@ -118,7 +120,8 @@ class PluginWrapper:
get_event_loop().close() get_event_loop().close()
raise Exception("Closing message listener") raise Exception("Closing message listener")
d = {"res": None, "success": True} # TODO there is definitely a better way to type this
d: Dict[str, Any] = {"res": None, "success": True}
try: try:
d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"]) d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
except Exception as e: except Exception as e:
@@ -137,17 +140,18 @@ class PluginWrapper:
if self.passive: if self.passive:
return return
async def _(self): async def _(self: PluginWrapper):
await self.socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False)) await self.socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False))
await self.socket.close_socket_connection() await self.socket.close_socket_connection()
get_event_loop().create_task(_(self)) get_event_loop().create_task(_(self))
async def execute_method(self, method_name, kwargs): async def execute_method(self, method_name: str, kwargs: Dict[Any, Any]):
if self.passive: if self.passive:
raise RuntimeError("This plugin is passive (aka does not implement main.py)") raise RuntimeError("This plugin is passive (aka does not implement main.py)")
async with self.method_call_lock: async with self.method_call_lock:
reader, writer = await self.socket.get_socket_connection() # reader, writer =
await self.socket.get_socket_connection()
await self.socket.write_single_line(dumps({ "method": method_name, "args": kwargs }, ensure_ascii=False)) await self.socket.write_single_line(dumps({ "method": method_name, "args": kwargs }, ensure_ascii=False))
@@ -1,13 +1,14 @@
from json import dump, load from json import dump, load
from os import mkdir, path, listdir, rename from os import mkdir, path, listdir, rename
from localplatform import chown, folder_owner, get_chown_plugin_path from typing import Any, Dict
from customtypes import UserType from .localplatform import chown, folder_owner, get_chown_plugin_path
from .customtypes import UserType
from helpers import get_homebrew_path from .helpers import get_homebrew_path
class SettingsManager: class SettingsManager:
def __init__(self, name, settings_directory = None) -> None: def __init__(self, name: str, settings_directory: str | None = None) -> None:
wrong_dir = get_homebrew_path() wrong_dir = get_homebrew_path()
if settings_directory == None: if settings_directory == None:
settings_directory = path.join(wrong_dir, "settings") settings_directory = path.join(wrong_dir, "settings")
@@ -31,11 +32,11 @@ class SettingsManager:
if folder_owner(settings_directory) != expected_user: if folder_owner(settings_directory) != expected_user:
chown(settings_directory, expected_user, False) chown(settings_directory, expected_user, False)
self.settings = {} self.settings: Dict[str, Any] = {}
try: try:
open(self.path, "x", encoding="utf-8") open(self.path, "x", encoding="utf-8")
except FileExistsError as e: except FileExistsError as _:
self.read() self.read()
pass pass
@@ -51,9 +52,9 @@ class SettingsManager:
with open(self.path, "w+", encoding="utf-8") as file: with open(self.path, "w+", encoding="utf-8") as file:
dump(self.settings, file, indent=4, ensure_ascii=False) dump(self.settings, file, indent=4, ensure_ascii=False)
def getSetting(self, key, default=None): def getSetting(self, key: str, default: Any = None) -> Any:
return self.settings.get(key, default) return self.settings.get(key, default)
def setSetting(self, key, value): def setSetting(self, key: str, value: Any) -> Any:
self.settings[key] = value self.settings[key] = value
self.commit() self.commit()
+28 -12
View File
@@ -1,23 +1,33 @@
from __future__ import annotations
import os import os
import shutil import shutil
import uuid
from asyncio import sleep from asyncio import sleep
from ensurepip import version
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from logging import getLogger from logging import getLogger
from os import getcwd, path, remove from os import getcwd, path, remove
from localplatform import chmod, service_restart, ON_LINUX, get_keep_systemd_service, get_selinux from typing import TYPE_CHECKING, List, TypedDict
if TYPE_CHECKING:
from .main import PluginManager
from .localplatform import chmod, service_restart, ON_LINUX, get_keep_systemd_service, get_selinux
from aiohttp import ClientSession, web from aiohttp import ClientSession, web
import helpers from . import helpers
from injector import get_gamepadui_tab, inject_to_tab from .injector import get_gamepadui_tab
from settings import SettingsManager from .settings import SettingsManager
logger = getLogger("Updater") logger = getLogger("Updater")
class RemoteVerAsset(TypedDict):
name: str
browser_download_url: str
class RemoteVer(TypedDict):
tag_name: str
prerelease: bool
assets: List[RemoteVerAsset]
class Updater: class Updater:
def __init__(self, context) -> None: def __init__(self, context: PluginManager) -> None:
self.context = context self.context = context
self.settings = self.context.settings self.settings = self.context.settings
# Exposes updater methods to frontend # Exposes updater methods to frontend
@@ -28,8 +38,8 @@ class Updater:
"do_restart": self.do_restart, "do_restart": self.do_restart,
"check_for_updates": self.check_for_updates "check_for_updates": self.check_for_updates
} }
self.remoteVer = None self.remoteVer: RemoteVer | None = None
self.allRemoteVers = None self.allRemoteVers: List[RemoteVer] = []
self.localVer = helpers.get_loader_version() self.localVer = helpers.get_loader_version()
try: try:
@@ -44,7 +54,7 @@ class Updater:
]) ])
context.loop.create_task(self.version_reloader()) context.loop.create_task(self.version_reloader())
async def _handle_server_method_call(self, request): async def _handle_server_method_call(self, request: web.Request):
method_name = request.match_info["method_name"] method_name = request.match_info["method_name"]
try: try:
args = await request.json() args = await request.json()
@@ -52,7 +62,7 @@ class Updater:
args = {} args = {}
res = {} res = {}
try: try:
r = await self.updater_methods[method_name](**args) r = await self.updater_methods[method_name](**args) # type: ignore
res["result"] = r res["result"] = r
res["success"] = True res["success"] = True
except Exception as e: except Exception as e:
@@ -105,7 +115,7 @@ class Updater:
selectedBranch = self.get_branch(self.context.settings) selectedBranch = self.get_branch(self.context.settings)
async with ClientSession() as web: async with ClientSession() as web:
async with web.request("GET", "https://api.github.com/repos/SteamDeckHomebrew/decky-loader/releases", ssl=helpers.get_ssl_context()) as res: async with web.request("GET", "https://api.github.com/repos/SteamDeckHomebrew/decky-loader/releases", ssl=helpers.get_ssl_context()) as res:
remoteVersions = await res.json() remoteVersions: List[RemoteVer] = await res.json()
if selectedBranch == 0: if selectedBranch == 0:
logger.debug("release type: release") logger.debug("release type: release")
remoteVersions = list(filter(lambda ver: ver["tag_name"].startswith("v") and not ver["prerelease"] and not ver["tag_name"].find("-pre") > 0 and ver["tag_name"], remoteVersions)) remoteVersions = list(filter(lambda ver: ver["tag_name"].startswith("v") and not ver["prerelease"] and not ver["tag_name"].find("-pre") > 0 and ver["tag_name"], remoteVersions))
@@ -142,6 +152,12 @@ class Updater:
async def do_update(self): async def do_update(self):
logger.debug("Starting update.") logger.debug("Starting update.")
try:
assert self.remoteVer
except AssertionError:
logger.error("Unable to update as remoteVer is missing")
return
version = self.remoteVer["tag_name"] version = self.remoteVer["tag_name"]
download_url = None download_url = None
download_filename = "PluginLoader" if ON_LINUX else "PluginLoader.exe" download_filename = "PluginLoader" if ON_LINUX else "PluginLoader.exe"
@@ -1,26 +1,36 @@
from __future__ import annotations
from os import stat_result
import uuid import uuid
import os
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from os.path import splitext from os.path import splitext
import re import re
from traceback import format_exc from traceback import format_exc
from stat import FILE_ATTRIBUTE_HIDDEN from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore
from asyncio import sleep, start_server, gather, open_connection from asyncio import StreamReader, StreamWriter, start_server, gather, open_connection
from aiohttp import ClientSession, web from aiohttp import ClientSession, web
from typing import TYPE_CHECKING, Callable, Coroutine, Dict, Any, List, TypedDict
from logging import getLogger from logging import getLogger
from injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab
from pathlib import Path from pathlib import Path
from localplatform import ON_WINDOWS
import helpers from .browser import PluginInstallRequest, PluginInstallType
import subprocess if TYPE_CHECKING:
from localplatform import service_stop, service_start, get_home_path, get_username from .main import PluginManager
from .injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab
from .localplatform import ON_WINDOWS
from . import helpers
from .localplatform import service_stop, service_start, get_home_path, get_username
class FilePickerObj(TypedDict):
file: Path
filest: stat_result
is_dir: bool
class Utilities: class Utilities:
def __init__(self, context) -> None: def __init__(self, context: PluginManager) -> None:
self.context = context self.context = context
self.util_methods = { self.util_methods: Dict[str, Callable[..., Coroutine[Any, Any, Any]]] = {
"ping": self.ping, "ping": self.ping,
"http_request": self.http_request, "http_request": self.http_request,
"install_plugin": self.install_plugin, "install_plugin": self.install_plugin,
@@ -53,7 +63,7 @@ class Utilities:
web.post("/methods/{method_name}", self._handle_server_method_call) web.post("/methods/{method_name}", self._handle_server_method_call)
]) ])
async def _handle_server_method_call(self, request): async def _handle_server_method_call(self, request: web.Request):
method_name = request.match_info["method_name"] method_name = request.match_info["method_name"]
try: try:
args = await request.json() args = await request.json()
@@ -69,7 +79,7 @@ class Utilities:
res["success"] = False res["success"] = False
return web.json_response(res) return web.json_response(res)
async def install_plugin(self, artifact="", name="No name", version="dev", hash=False, install_type=0): async def install_plugin(self, artifact: str="", name: str="No name", version: str="dev", hash: str="", install_type: PluginInstallType=PluginInstallType.INSTALL):
return await self.context.plugin_browser.request_plugin_install( return await self.context.plugin_browser.request_plugin_install(
artifact=artifact, artifact=artifact,
name=name, name=name,
@@ -78,21 +88,21 @@ class Utilities:
install_type=install_type install_type=install_type
) )
async def install_plugins(self, requests): async def install_plugins(self, requests: List[PluginInstallRequest]):
return await self.context.plugin_browser.request_multiple_plugin_installs( return await self.context.plugin_browser.request_multiple_plugin_installs(
requests=requests requests=requests
) )
async def confirm_plugin_install(self, request_id): async def confirm_plugin_install(self, request_id: str):
return await self.context.plugin_browser.confirm_plugin_install(request_id) return await self.context.plugin_browser.confirm_plugin_install(request_id)
def cancel_plugin_install(self, request_id): async def cancel_plugin_install(self, request_id: str):
return self.context.plugin_browser.cancel_plugin_install(request_id) return self.context.plugin_browser.cancel_plugin_install(request_id)
async def uninstall_plugin(self, name): async def uninstall_plugin(self, name: str):
return await self.context.plugin_browser.uninstall_plugin(name) return await self.context.plugin_browser.uninstall_plugin(name)
async def http_request(self, method="", url="", **kwargs): async def http_request(self, method: str="", url: str="", **kwargs: Any):
async with ClientSession() as web: async with ClientSession() as web:
res = await web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs) res = await web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs)
text = await res.text() text = await res.text()
@@ -102,12 +112,13 @@ class Utilities:
"body": text "body": text
} }
async def ping(self, **kwargs): async def ping(self, **kwargs: Any):
return "pong" return "pong"
async def execute_in_tab(self, tab, run_async, code): async def execute_in_tab(self, tab: str, run_async: bool, code: str):
try: try:
result = await inject_to_tab(tab, code, run_async) result = await inject_to_tab(tab, code, run_async)
assert result
if "exceptionDetails" in result["result"]: if "exceptionDetails" in result["result"]:
return { return {
"success": False, "success": False,
@@ -124,7 +135,7 @@ class Utilities:
"result": e "result": e
} }
async def inject_css_into_tab(self, tab, style): async def inject_css_into_tab(self, tab: str, style: str):
try: try:
css_id = str(uuid.uuid4()) css_id = str(uuid.uuid4())
@@ -138,7 +149,7 @@ class Utilities:
}})() }})()
""", False) """, False)
if "exceptionDetails" in result["result"]: if result and "exceptionDetails" in result["result"]:
return { return {
"success": False, "success": False,
"result": result["result"] "result": result["result"]
@@ -154,7 +165,7 @@ class Utilities:
"result": e "result": e
} }
async def remove_css_from_tab(self, tab, css_id): async def remove_css_from_tab(self, tab: str, css_id: str):
try: try:
result = await inject_to_tab(tab, result = await inject_to_tab(tab,
f""" f"""
@@ -166,7 +177,7 @@ class Utilities:
}})() }})()
""", False) """, False)
if "exceptionDetails" in result["result"]: if result and "exceptionDetails" in result["result"]:
return { return {
"success": False, "success": False,
"result": result "result": result
@@ -181,10 +192,10 @@ class Utilities:
"result": e "result": e
} }
async def get_setting(self, key, default): async def get_setting(self, key: str, default: Any):
return self.context.settings.getSetting(key, default) return self.context.settings.getSetting(key, default)
async def set_setting(self, key, value): async def set_setting(self, key: str, value: Any):
return self.context.settings.setSetting(key, value) return self.context.settings.setSetting(key, value)
async def allow_remote_debugging(self): async def allow_remote_debugging(self):
@@ -209,17 +220,18 @@ class Utilities:
if path == None: if path == None:
path = get_home_path() path = get_home_path()
path = Path(path).resolve() path_obj = Path(path).resolve()
files, folders = [], [] files: List[FilePickerObj] = []
folders: List[FilePickerObj] = []
#Resolving all files/folders in the requested directory #Resolving all files/folders in the requested directory
for file in path.iterdir(): for file in path_obj.iterdir():
if file.exists(): if file.exists():
filest = file.stat() filest = file.stat()
is_hidden = file.name.startswith('.') is_hidden = file.name.startswith('.')
if ON_WINDOWS and not is_hidden: if ON_WINDOWS and not is_hidden:
is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) # type: ignore
if include_folders and file.is_dir(): if include_folders and file.is_dir():
if (is_hidden and include_hidden) or not is_hidden: if (is_hidden and include_hidden) or not is_hidden:
folders.append({"file": file, "filest": filest, "is_dir": True}) folders.append({"file": file, "filest": filest, "is_dir": True})
@@ -233,9 +245,9 @@ class Utilities:
if filter_for is not None: if filter_for is not None:
try: try:
if re.compile(filter_for): if re.compile(filter_for):
files = filter(lambda file: re.search(filter_for, file.name) != None, files) files = list(filter(lambda file: re.search(filter_for, file["file"].name) != None, files))
except re.error: except re.error:
files = filter(lambda file: file.name.find(filter_for) != -1, files) files = list(filter(lambda file: file["file"].name.find(filter_for) != -1, files))
# Ordering logic # Ordering logic
ord_arg = order_by.split("_") ord_arg = order_by.split("_")
@@ -255,6 +267,9 @@ class Utilities:
files.sort(key=lambda x: x['filest'].st_size, reverse = not rev) files.sort(key=lambda x: x['filest'].st_size, reverse = not rev)
# Folders has no file size, order by name instead # Folders has no file size, order by name instead
folders.sort(key=lambda x: x['file'].name.casefold()) folders.sort(key=lambda x: x['file'].name.casefold())
case _:
files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
#Constructing the final file list, folders first #Constructing the final file list, folders first
all = [{ all = [{
@@ -274,14 +289,14 @@ class Utilities:
# Based on https://stackoverflow.com/a/46422554/13174603 # Based on https://stackoverflow.com/a/46422554/13174603
def start_rdt_proxy(self, ip, port): def start_rdt_proxy(self, ip: str, port: int):
async def pipe(reader, writer): async def pipe(reader: StreamReader, writer: StreamWriter):
try: try:
while not reader.at_eof(): while not reader.at_eof():
writer.write(await reader.read(2048)) writer.write(await reader.read(2048))
finally: finally:
writer.close() writer.close()
async def handle_client(local_reader, local_writer): async def handle_client(local_reader: StreamReader, local_writer: StreamWriter):
try: try:
remote_reader, remote_writer = await open_connection( remote_reader, remote_writer = await open_connection(
ip, port) ip, port)
@@ -295,9 +310,10 @@ class Utilities:
self.rdt_proxy_task = self.context.loop.create_task(self.rdt_proxy_server) self.rdt_proxy_task = self.context.loop.create_task(self.rdt_proxy_server)
def stop_rdt_proxy(self): def stop_rdt_proxy(self):
if self.rdt_proxy_server: if self.rdt_proxy_server != None:
self.rdt_proxy_server.close() self.rdt_proxy_server.close()
self.rdt_proxy_task.cancel() if self.rdt_proxy_task:
self.rdt_proxy_task.cancel()
async def _enable_rdt(self): async def _enable_rdt(self):
# TODO un-hardcode port # TODO un-hardcode port
@@ -347,11 +363,11 @@ class Utilities:
await tab.evaluate_js("location.reload();", False, True, False) await tab.evaluate_js("location.reload();", False, True, False)
self.logger.info("React DevTools disabled") self.logger.info("React DevTools disabled")
async def get_user_info(self) -> dict: async def get_user_info(self) -> Dict[str, str]:
return { return {
"username": get_username(), "username": get_username(),
"path": get_home_path() "path": get_home_path()
} }
async def get_tab_id(self, name): async def get_tab_id(self, name: str):
return (await get_tab(name)).id return (await get_tab(name)).id
@@ -13,7 +13,7 @@ import {
} from 'decky-frontend-lib'; } from 'decky-frontend-lib';
import { filesize } from 'filesize'; import { filesize } from 'filesize';
import { FunctionComponent, useCallback, useEffect, useMemo, useState } from 'react'; import { FunctionComponent, useCallback, useEffect, useMemo, useState } from 'react';
import { FileIcon, defaultStyles } from 'react-file-icon'; import { DefaultExtensionType, FileIcon, defaultStyles } from 'react-file-icon';
import { useTranslation } from 'react-i18next'; import { useTranslation } from 'react-i18next';
import { FaArrowUp, FaFolder } from 'react-icons/fa'; import { FaArrowUp, FaFolder } from 'react-icons/fa';
@@ -316,7 +316,12 @@ const FilePicker: FunctionComponent<FilePickerProps> = ({
) : ( ) : (
<div style={iconStyles}> <div style={iconStyles}>
{file.realpath.includes('.') ? ( {file.realpath.includes('.') ? (
<FileIcon {...defaultStyles[extension]} {...styleDefObj[extension]} extension={''} /> <FileIcon
{...defaultStyles[extension as DefaultExtensionType]}
// @ts-expect-error
{...styleDefObj[extension]}
extension={''}
/>
) : ( ) : (
<FileIcon /> <FileIcon />
)} )}
@@ -29,10 +29,10 @@ const BranchSelect: FunctionComponent<{}> = () => {
<Field label={t('BranchSelect.update_channel.label')} childrenContainerWidth={'fixed'}> <Field label={t('BranchSelect.update_channel.label')} childrenContainerWidth={'fixed'}>
<Dropdown <Dropdown
rgOptions={Object.values(UpdateBranch) rgOptions={Object.values(UpdateBranch)
.filter((branch) => typeof branch == 'string') .filter((branch) => typeof branch == 'number')
.map((branch) => ({ .map((branch) => ({
label: tBranches[UpdateBranch[branch]], label: tBranches[branch as number],
data: UpdateBranch[branch], data: branch,
}))} }))}
selectedOption={selectedBranch} selectedOption={selectedBranch}
onChange={async (newVal) => { onChange={async (newVal) => {
@@ -26,10 +26,10 @@ const StoreSelect: FunctionComponent<{}> = () => {
<Field label={t('StoreSelect.store_channel.label')} childrenContainerWidth={'fixed'}> <Field label={t('StoreSelect.store_channel.label')} childrenContainerWidth={'fixed'}>
<Dropdown <Dropdown
rgOptions={Object.values(Store) rgOptions={Object.values(Store)
.filter((store) => typeof store == 'string') .filter((store) => typeof store == 'number')
.map((store) => ({ .map((store) => ({
label: tStores[Store[store]], label: tStores[store as number],
data: Store[store], data: store,
}))} }))}
selectedOption={selectedStore} selectedOption={selectedStore}
onChange={async (newVal) => { onChange={async (newVal) => {
+2 -1
View File
@@ -38,7 +38,8 @@ export async function getStore(): Promise<Store> {
export async function getPluginList(): Promise<StorePlugin[]> { export async function getPluginList(): Promise<StorePlugin[]> {
let version = await window.DeckyPluginLoader.updateVersion(); let version = await window.DeckyPluginLoader.updateVersion();
let store = await getSetting<Store>('store', null); let store = await getSetting<Store | null>('store', null);
let customURL = await getSetting<string>('store-url', 'https://plugins.deckbrew.xyz/plugins'); let customURL = await getSetting<string>('store-url', 'https://plugins.deckbrew.xyz/plugins');
let storeURL; let storeURL;
if (store === null) { if (store === null) {
-1
View File
@@ -14,7 +14,6 @@
"noImplicitThis": true, "noImplicitThis": true,
"noImplicitAny": true, "noImplicitAny": true,
"strict": true, "strict": true,
"suppressImplicitAnyIndexErrors": true,
"allowSyntheticDefaultImports": true, "allowSyntheticDefaultImports": true,
"skipLibCheck": true, "skipLibCheck": true,
"resolveJsonModule": true "resolveJsonModule": true