mirror of
https://github.com/SteamDeckHomebrew/decky-loader.git
synced 2026-06-17 08:47:49 +00:00
begin adding static types to backend code
This commit is contained in:
+50
-34
@@ -2,10 +2,9 @@
|
||||
|
||||
from asyncio import sleep
|
||||
from logging import getLogger
|
||||
from traceback import format_exc
|
||||
from typing import List
|
||||
from typing import Any, Callable, List, TypedDict, Dict
|
||||
|
||||
from aiohttp import ClientSession, WSMsgType
|
||||
from aiohttp import ClientSession
|
||||
from aiohttp.client_exceptions import ClientConnectorError, ClientOSError
|
||||
from asyncio.exceptions import TimeoutError
|
||||
import uuid
|
||||
@@ -14,35 +13,43 @@ BASE_ADDRESS = "http://localhost:8080"
|
||||
|
||||
logger = getLogger("Injector")
|
||||
|
||||
class _TabResponse(TypedDict):
|
||||
title: str
|
||||
id: str
|
||||
url: str
|
||||
webSocketDebuggerUrl: str
|
||||
|
||||
class Tab:
|
||||
cmd_id = 0
|
||||
|
||||
def __init__(self, res) -> None:
|
||||
self.title = res["title"]
|
||||
self.id = res["id"]
|
||||
self.url = res["url"]
|
||||
self.ws_url = res["webSocketDebuggerUrl"]
|
||||
def __init__(self, res: _TabResponse) -> None:
|
||||
self.title: str = res["title"]
|
||||
self.id: str = res["id"]
|
||||
self.url: str = res["url"]
|
||||
self.ws_url: str = res["webSocketDebuggerUrl"]
|
||||
|
||||
self.websocket = None
|
||||
self.client = None
|
||||
|
||||
async def open_websocket(self):
|
||||
self.client = ClientSession()
|
||||
self.websocket = await self.client.ws_connect(self.ws_url)
|
||||
self.websocket = await self.client.ws_connect(self.ws_url) # type: ignore
|
||||
|
||||
async def close_websocket(self):
|
||||
await self.websocket.close()
|
||||
await self.client.close()
|
||||
if self.websocket:
|
||||
await self.websocket.close()
|
||||
if self.client:
|
||||
await self.client.close()
|
||||
|
||||
async def listen_for_message(self):
|
||||
async for message in self.websocket:
|
||||
data = message.json()
|
||||
yield data
|
||||
logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.")
|
||||
await self.close_websocket()
|
||||
if self.websocket:
|
||||
async for message in self.websocket:
|
||||
data = message.json()
|
||||
yield data
|
||||
logger.warn(f"The Tab {self.title} socket has been disconnected while listening for messages.")
|
||||
await self.close_websocket()
|
||||
|
||||
async def _send_devtools_cmd(self, dc, receive=True):
|
||||
async def _send_devtools_cmd(self, dc: Dict[str, Any], receive: bool = True):
|
||||
if self.websocket:
|
||||
self.cmd_id += 1
|
||||
dc["id"] = self.cmd_id
|
||||
@@ -54,7 +61,7 @@ class Tab:
|
||||
return None
|
||||
raise RuntimeError("Websocket not opened")
|
||||
|
||||
async def evaluate_js(self, js, run_async=False, manage_socket=True, get_result=True):
|
||||
async def evaluate_js(self, js: str, run_async: bool | None = False, manage_socket: bool | None = True, get_result: bool = True):
|
||||
try:
|
||||
if manage_socket:
|
||||
await self.open_websocket()
|
||||
@@ -73,15 +80,16 @@ class Tab:
|
||||
await self.close_websocket()
|
||||
return res
|
||||
|
||||
async def has_global_var(self, var_name, manage_socket=True):
|
||||
async def has_global_var(self, var_name: str, manage_socket: bool = True):
|
||||
res = await self.evaluate_js(f"window['{var_name}'] !== null && window['{var_name}'] !== undefined", False, manage_socket)
|
||||
assert res is not None
|
||||
|
||||
if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
|
||||
return False
|
||||
|
||||
return res["result"]["result"]["value"]
|
||||
|
||||
async def close(self, manage_socket=True):
|
||||
async def close(self, manage_socket: bool = True):
|
||||
try:
|
||||
if manage_socket:
|
||||
await self.open_websocket()
|
||||
@@ -111,7 +119,7 @@ class Tab:
|
||||
"method": "Page.disable",
|
||||
}, False)
|
||||
|
||||
async def refresh(self, manage_socket=True):
|
||||
async def refresh(self, manage_socket: bool = True):
|
||||
try:
|
||||
if manage_socket:
|
||||
await self.open_websocket()
|
||||
@@ -125,7 +133,7 @@ class Tab:
|
||||
await self.close_websocket()
|
||||
|
||||
return
|
||||
async def reload_and_evaluate(self, js, manage_socket=True):
|
||||
async def reload_and_evaluate(self, js: str, manage_socket: bool = True):
|
||||
"""
|
||||
Reloads the current tab, with JS to run on load via debugger
|
||||
"""
|
||||
@@ -153,11 +161,13 @@ class Tab:
|
||||
}
|
||||
}, True)
|
||||
|
||||
assert breakpoint_res is not None
|
||||
|
||||
logger.info(breakpoint_res)
|
||||
|
||||
# Page finishes loading when breakpoint hits
|
||||
|
||||
for x in range(20):
|
||||
for _ in range(20):
|
||||
# this works around 1/5 of the time, so just send it 8 times.
|
||||
# the js accounts for being injected multiple times allowing only one instance to run at a time anyway
|
||||
await self._send_devtools_cmd({
|
||||
@@ -176,7 +186,7 @@ class Tab:
|
||||
}
|
||||
}, False)
|
||||
|
||||
for x in range(4):
|
||||
for _ in range(4):
|
||||
await self._send_devtools_cmd({
|
||||
"method": "Debugger.resume"
|
||||
}, False)
|
||||
@@ -190,7 +200,7 @@ class Tab:
|
||||
await self.close_websocket()
|
||||
return
|
||||
|
||||
async def add_script_to_evaluate_on_new_document(self, js, add_dom_wrapper=True, manage_socket=True, get_result=True):
|
||||
async def add_script_to_evaluate_on_new_document(self, js: str, add_dom_wrapper: bool = True, manage_socket: bool = True, get_result: bool = True):
|
||||
"""
|
||||
How the underlying call functions is not particularly clear from the devtools docs, so stealing puppeteer's description:
|
||||
|
||||
@@ -253,7 +263,7 @@ class Tab:
|
||||
await self.close_websocket()
|
||||
return res
|
||||
|
||||
async def remove_script_to_evaluate_on_new_document(self, script_id, manage_socket=True):
|
||||
async def remove_script_to_evaluate_on_new_document(self, script_id: str, manage_socket: bool = True):
|
||||
"""
|
||||
Removes a script from a page that was added with `add_script_to_evaluate_on_new_document`
|
||||
|
||||
@@ -267,7 +277,7 @@ class Tab:
|
||||
if manage_socket:
|
||||
await self.open_websocket()
|
||||
|
||||
res = await self._send_devtools_cmd({
|
||||
await self._send_devtools_cmd({
|
||||
"method": "Page.removeScriptToEvaluateOnNewDocument",
|
||||
"params": {
|
||||
"identifier": script_id
|
||||
@@ -278,15 +288,16 @@ class Tab:
|
||||
if manage_socket:
|
||||
await self.close_websocket()
|
||||
|
||||
async def has_element(self, element_name, manage_socket=True):
|
||||
async def has_element(self, element_name: str, manage_socket: bool = True):
|
||||
res = await self.evaluate_js(f"document.getElementById('{element_name}') != null", False, manage_socket)
|
||||
assert res is not None
|
||||
|
||||
if not "result" in res or not "result" in res["result"] or not "value" in res["result"]["result"]:
|
||||
return False
|
||||
|
||||
return res["result"]["result"]["value"]
|
||||
|
||||
async def inject_css(self, style, manage_socket=True):
|
||||
async def inject_css(self, style: str, manage_socket: bool = True):
|
||||
try:
|
||||
css_id = str(uuid.uuid4())
|
||||
|
||||
@@ -300,6 +311,8 @@ class Tab:
|
||||
}})()
|
||||
""", False, manage_socket)
|
||||
|
||||
assert result is not None
|
||||
|
||||
if "exceptionDetails" in result["result"]:
|
||||
return {
|
||||
"success": False,
|
||||
@@ -316,7 +329,7 @@ class Tab:
|
||||
"result": e
|
||||
}
|
||||
|
||||
async def remove_css(self, css_id, manage_socket=True):
|
||||
async def remove_css(self, css_id: str, manage_socket: bool = True):
|
||||
try:
|
||||
result = await self.evaluate_js(
|
||||
f"""
|
||||
@@ -328,6 +341,8 @@ class Tab:
|
||||
}})()
|
||||
""", False, manage_socket)
|
||||
|
||||
assert result is not None
|
||||
|
||||
if "exceptionDetails" in result["result"]:
|
||||
return {
|
||||
"success": False,
|
||||
@@ -343,8 +358,9 @@ class Tab:
|
||||
"result": e
|
||||
}
|
||||
|
||||
async def get_steam_resource(self, url):
|
||||
async def get_steam_resource(self, url: str):
|
||||
res = await self.evaluate_js(f'(async function test() {{ return await (await fetch("{url}")).text() }})()', True)
|
||||
assert res is not None
|
||||
return res["result"]["result"]["value"]
|
||||
|
||||
def __repr__(self):
|
||||
@@ -380,14 +396,14 @@ async def get_tabs() -> List[Tab]:
|
||||
raise Exception(f"/json did not return 200. {await res.text()}")
|
||||
|
||||
|
||||
async def get_tab(tab_name) -> Tab:
|
||||
async def get_tab(tab_name: str) -> Tab:
|
||||
tabs = await get_tabs()
|
||||
tab = next((i for i in tabs if i.title == tab_name), None)
|
||||
if not tab:
|
||||
raise ValueError(f"Tab {tab_name} not found")
|
||||
return tab
|
||||
|
||||
async def get_tab_lambda(test) -> Tab:
|
||||
async def get_tab_lambda(test: Callable[[Tab], bool]) -> Tab:
|
||||
tabs = await get_tabs()
|
||||
tab = next((i for i in tabs if test(i)), None)
|
||||
if not tab:
|
||||
@@ -408,7 +424,7 @@ async def get_gamepadui_tab() -> Tab:
|
||||
raise ValueError(f"GamepadUI Tab not found")
|
||||
return tab
|
||||
|
||||
async def inject_to_tab(tab_name, js, run_async=False):
|
||||
async def inject_to_tab(tab_name: str, js: str, run_async: bool = False):
|
||||
tab = await get_tab(tab_name)
|
||||
|
||||
return await tab.evaluate_js(js, run_async)
|
||||
|
||||
Reference in New Issue
Block a user