type hints on main,plugin,updater,utilites.localsocket

This commit is contained in:
marios8543
2023-09-18 00:31:54 +03:00
parent e2d708a6af
commit a7c358844c
5 changed files with 92 additions and 63 deletions
+43 -31
View File
@@ -1,3 +1,4 @@
from os import stat_result
import uuid
from json.decoder import JSONDecodeError
from os.path import splitext
@@ -5,12 +6,12 @@ import re
from traceback import format_exc
from stat import FILE_ATTRIBUTE_HIDDEN # type: ignore
from asyncio import start_server, gather, open_connection
from asyncio import StreamReader, StreamWriter, start_server, gather, open_connection
from aiohttp import ClientSession, web
from typing import Dict
from typing import Callable, Coroutine, Dict, Any, List, TypedDict
from logging import getLogger
from backend.browser import PluginInstallType
from backend.browser import PluginInstallRequest, PluginInstallType
from backend.main import PluginManager
from injector import inject_to_tab, get_gamepadui_tab, close_old_tabs, get_tab
from pathlib import Path
@@ -18,10 +19,15 @@ from localplatform import ON_WINDOWS
import helpers
from localplatform import service_stop, service_start, get_home_path, get_username
class FilePickerObj(TypedDict):
file: Path
filest: stat_result
is_dir: bool
class Utilities:
def __init__(self, context: PluginManager) -> None:
self.context = context
self.util_methods: Dict[] = {
self.util_methods: Dict[str, Callable[..., Coroutine[Any, Any, Any]]] = {
"ping": self.ping,
"http_request": self.http_request,
"install_plugin": self.install_plugin,
@@ -54,7 +60,7 @@ class Utilities:
web.post("/methods/{method_name}", self._handle_server_method_call)
])
async def _handle_server_method_call(self, request):
async def _handle_server_method_call(self, request: web.Request):
method_name = request.match_info["method_name"]
try:
args = await request.json()
@@ -70,7 +76,7 @@ class Utilities:
res["success"] = False
return web.json_response(res)
async def install_plugin(self, artifact="", name="No name", version="dev", hash=False, install_type=PluginInstallType.INSTALL):
async def install_plugin(self, artifact: str="", name: str="No name", version: str="dev", hash: str="", install_type: PluginInstallType=PluginInstallType.INSTALL):
return await self.context.plugin_browser.request_plugin_install(
artifact=artifact,
name=name,
@@ -79,21 +85,21 @@ class Utilities:
install_type=install_type
)
async def install_plugins(self, requests):
async def install_plugins(self, requests: List[PluginInstallRequest]):
return await self.context.plugin_browser.request_multiple_plugin_installs(
requests=requests
)
async def confirm_plugin_install(self, request_id):
async def confirm_plugin_install(self, request_id: str):
return await self.context.plugin_browser.confirm_plugin_install(request_id)
def cancel_plugin_install(self, request_id):
async def cancel_plugin_install(self, request_id: str):
return self.context.plugin_browser.cancel_plugin_install(request_id)
async def uninstall_plugin(self, name):
async def uninstall_plugin(self, name: str):
return await self.context.plugin_browser.uninstall_plugin(name)
async def http_request(self, method="", url="", **kwargs):
async def http_request(self, method: str="", url: str="", **kwargs: Any):
async with ClientSession() as web:
res = await web.request(method, url, ssl=helpers.get_ssl_context(), **kwargs)
text = await res.text()
@@ -103,12 +109,13 @@ class Utilities:
"body": text
}
async def ping(self, **kwargs):
async def ping(self, **kwargs: Any):
return "pong"
async def execute_in_tab(self, tab, run_async, code):
async def execute_in_tab(self, tab: str, run_async: bool, code: str):
try:
result = await inject_to_tab(tab, code, run_async)
assert result
if "exceptionDetails" in result["result"]:
return {
"success": False,
@@ -125,7 +132,7 @@ class Utilities:
"result": e
}
async def inject_css_into_tab(self, tab, style):
async def inject_css_into_tab(self, tab: str, style: str):
try:
css_id = str(uuid.uuid4())
@@ -139,7 +146,7 @@ class Utilities:
}})()
""", False)
if "exceptionDetails" in result["result"]:
if result and "exceptionDetails" in result["result"]:
return {
"success": False,
"result": result["result"]
@@ -155,7 +162,7 @@ class Utilities:
"result": e
}
async def remove_css_from_tab(self, tab, css_id):
async def remove_css_from_tab(self, tab: str, css_id: str):
try:
result = await inject_to_tab(tab,
f"""
@@ -167,7 +174,7 @@ class Utilities:
}})()
""", False)
if "exceptionDetails" in result["result"]:
if result and "exceptionDetails" in result["result"]:
return {
"success": False,
"result": result
@@ -182,10 +189,10 @@ class Utilities:
"result": e
}
async def get_setting(self, key, default):
async def get_setting(self, key: str, default: Any):
return self.context.settings.getSetting(key, default)
async def set_setting(self, key, value):
async def set_setting(self, key: str, value: Any):
return self.context.settings.setSetting(key, value)
async def allow_remote_debugging(self):
@@ -210,17 +217,18 @@ class Utilities:
if path == None:
path = get_home_path()
path = Path(path).resolve()
path_obj = Path(path).resolve()
files, folders = [], []
files: List[FilePickerObj] = []
folders: List[FilePickerObj] = []
#Resolving all files/folders in the requested directory
for file in path.iterdir():
for file in path_obj.iterdir():
if file.exists():
filest = file.stat()
is_hidden = file.name.startswith('.')
if ON_WINDOWS and not is_hidden:
is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN)
is_hidden = bool(filest.st_file_attributes & FILE_ATTRIBUTE_HIDDEN) # type: ignore
if include_folders and file.is_dir():
if (is_hidden and include_hidden) or not is_hidden:
folders.append({"file": file, "filest": filest, "is_dir": True})
@@ -234,9 +242,9 @@ class Utilities:
if filter_for is not None:
try:
if re.compile(filter_for):
files = filter(lambda file: re.search(filter_for, file.name) != None, files)
files = list(filter(lambda file: re.search(filter_for, file["file"].name) != None, files))
except re.error:
files = filter(lambda file: file.name.find(filter_for) != -1, files)
files = list(filter(lambda file: file["file"].name.find(filter_for) != -1, files))
# Ordering logic
ord_arg = order_by.split("_")
@@ -256,6 +264,9 @@ class Utilities:
files.sort(key=lambda x: x['filest'].st_size, reverse = not rev)
# Folders has no file size, order by name instead
folders.sort(key=lambda x: x['file'].name.casefold())
case _:
files.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
folders.sort(key=lambda x: x['file'].name.casefold(), reverse = rev)
#Constructing the final file list, folders first
all = [{
@@ -275,14 +286,14 @@ class Utilities:
# Based on https://stackoverflow.com/a/46422554/13174603
def start_rdt_proxy(self, ip, port):
async def pipe(reader, writer):
def start_rdt_proxy(self, ip: str, port: int):
async def pipe(reader: StreamReader, writer: StreamWriter):
try:
while not reader.at_eof():
writer.write(await reader.read(2048))
finally:
writer.close()
async def handle_client(local_reader, local_writer):
async def handle_client(local_reader: StreamReader, local_writer: StreamWriter):
try:
remote_reader, remote_writer = await open_connection(
ip, port)
@@ -298,7 +309,8 @@ class Utilities:
def stop_rdt_proxy(self):
if self.rdt_proxy_server:
self.rdt_proxy_server.close()
self.rdt_proxy_task.cancel()
if self.rdt_proxy_task:
self.rdt_proxy_task.cancel()
async def _enable_rdt(self):
# TODO un-hardcode port
@@ -348,11 +360,11 @@ class Utilities:
await tab.evaluate_js("location.reload();", False, True, False)
self.logger.info("React DevTools disabled")
async def get_user_info(self) -> dict:
async def get_user_info(self) -> Dict[str, str]:
return {
"username": get_username(),
"path": get_home_path()
}
async def get_tab_id(self, name):
async def get_tab_id(self, name: str):
return (await get_tab(name)).id