mirror of
https://github.com/SteamDeckHomebrew/decky-loader.git
synced 2026-06-13 09:03:20 +03:00
197 lines
6.4 KiB
Python
197 lines
6.4 KiB
Python
import uuid
|
|
import os
|
|
from json.decoder import JSONDecodeError
|
|
|
|
from aiohttp import ClientSession, web
|
|
|
|
from injector import inject_to_tab
|
|
import helpers
|
|
import subprocess
|
|
|
|
|
|
class Utilities:
|
|
def __init__(self, context) -> None:
|
|
self.context = context
|
|
self.util_methods = {
|
|
"ping": self.ping,
|
|
"http_request": self.http_request,
|
|
"install_plugin": self.install_plugin,
|
|
"cancel_plugin_install": self.cancel_plugin_install,
|
|
"confirm_plugin_install": self.confirm_plugin_install,
|
|
"uninstall_plugin": self.uninstall_plugin,
|
|
"execute_in_tab": self.execute_in_tab,
|
|
"inject_css_into_tab": self.inject_css_into_tab,
|
|
"remove_css_from_tab": self.remove_css_from_tab,
|
|
"allow_remote_debugging": self.allow_remote_debugging,
|
|
"disallow_remote_debugging": self.disallow_remote_debugging,
|
|
"set_setting": self.set_setting,
|
|
"get_setting": self.get_setting,
|
|
"filepicker_ls": self.filepicker_ls
|
|
}
|
|
|
|
if context:
|
|
context.web_app.add_routes([
|
|
web.post("/methods/{method_name}", self._handle_server_method_call)
|
|
])
|
|
|
|
async def _handle_server_method_call(self, request):
|
|
method_name = request.match_info["method_name"]
|
|
try:
|
|
args = await request.json()
|
|
except JSONDecodeError:
|
|
args = {}
|
|
res = {}
|
|
try:
|
|
r = await self.util_methods[method_name](**args)
|
|
res["result"] = r
|
|
res["success"] = True
|
|
except Exception as e:
|
|
res["result"] = str(e)
|
|
res["success"] = False
|
|
return web.json_response(res)
|
|
|
|
async def install_plugin(self, artifact="", name="No name", version="dev", hash=False):
|
|
return await self.context.plugin_browser.request_plugin_install(
|
|
artifact=artifact,
|
|
name=name,
|
|
version=version,
|
|
hash=hash
|
|
)
|
|
|
|
async def confirm_plugin_install(self, request_id):
|
|
return await self.context.plugin_browser.confirm_plugin_install(request_id)
|
|
|
|
def cancel_plugin_install(self, request_id):
|
|
return self.context.plugin_browser.cancel_plugin_install(request_id)
|
|
|
|
async def uninstall_plugin(self, name):
|
|
return await self.context.plugin_browser.uninstall_plugin(name)
|
|
|
|
async def http_request(self, method="", url="", **kwargs):
|
|
async with ClientSession() as web:
|
|
async with web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs) as res:
|
|
return {
|
|
"status": res.status,
|
|
"headers": dict(res.headers),
|
|
"body": await res.text()
|
|
}
|
|
|
|
async def ping(self, **kwargs):
|
|
return "pong"
|
|
|
|
async def execute_in_tab(self, tab, run_async, code):
|
|
try:
|
|
result = await inject_to_tab(tab, code, run_async)
|
|
if "exceptionDetails" in result["result"]:
|
|
return {
|
|
"success": False,
|
|
"result": result["result"]
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"result": result["result"]["result"].get("value")
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"result": e
|
|
}
|
|
|
|
async def inject_css_into_tab(self, tab, style):
|
|
try:
|
|
css_id = str(uuid.uuid4())
|
|
|
|
result = await inject_to_tab(tab,
|
|
f"""
|
|
(function() {{
|
|
const style = document.createElement('style');
|
|
style.id = "{css_id}";
|
|
document.head.append(style);
|
|
style.textContent = `{style}`;
|
|
}})()
|
|
""", False)
|
|
|
|
if "exceptionDetails" in result["result"]:
|
|
return {
|
|
"success": False,
|
|
"result": result["result"]
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"result": css_id
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"result": e
|
|
}
|
|
|
|
async def remove_css_from_tab(self, tab, css_id):
|
|
try:
|
|
result = await inject_to_tab(tab,
|
|
f"""
|
|
(function() {{
|
|
let style = document.getElementById("{css_id}");
|
|
|
|
if (style.nodeName.toLowerCase() == 'style')
|
|
style.parentNode.removeChild(style);
|
|
}})()
|
|
""", False)
|
|
|
|
if "exceptionDetails" in result["result"]:
|
|
return {
|
|
"success": False,
|
|
"result": result
|
|
}
|
|
|
|
return {
|
|
"success": True
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"result": e
|
|
}
|
|
|
|
async def get_setting(self, key, default):
|
|
return self.context.settings.getSetting(key, default)
|
|
|
|
async def set_setting(self, key, value):
|
|
return self.context.settings.setSetting(key, value)
|
|
|
|
async def allow_remote_debugging(self):
|
|
await helpers.start_systemd_unit(helpers.REMOTE_DEBUGGER_UNIT)
|
|
return True
|
|
|
|
async def disallow_remote_debugging(self):
|
|
await helpers.stop_systemd_unit(helpers.REMOTE_DEBUGGER_UNIT)
|
|
return True
|
|
|
|
async def filepicker_ls(self, path, include_files=True):
|
|
# def sorter(file): # Modification time
|
|
# if os.path.isdir(os.path.join(path, file)) or os.path.isfile(os.path.join(path, file)):
|
|
# return os.path.getmtime(os.path.join(path, file))
|
|
# return 0
|
|
# file_names = sorted(os.listdir(path), key=sorter, reverse=True) # TODO provide more sort options
|
|
file_names = sorted(os.listdir(path)) # Alphabetical
|
|
|
|
files = []
|
|
|
|
for file in file_names:
|
|
full_path = os.path.join(path, file)
|
|
is_dir = os.path.isdir(full_path)
|
|
|
|
if is_dir or include_files:
|
|
files.append({
|
|
"isdir": is_dir,
|
|
"name": file,
|
|
"realpath": os.path.realpath(full_path)
|
|
})
|
|
|
|
return {
|
|
"realpath": os.path.realpath(path),
|
|
"files": files
|
|
}
|