Add message emit mechanism

This commit is contained in:
marios8543
2023-10-18 14:45:36 +03:00
parent dcee5ca4e4
commit d9ba637cd9
5 changed files with 54 additions and 31 deletions
+8
View File
@@ -185,6 +185,14 @@
"runpydeck" "runpydeck"
], ],
"problemMatcher": [] "problemMatcher": []
},
{
"label": "act",
"type": "shell",
"group": "none",
"detail": "Run the act thing",
"command": "./act/run-act.sh release",
"problemMatcher": []
} }
] ]
} }
+27 -14
View File
@@ -2,12 +2,13 @@ from asyncio import Task, create_task
from json import dumps, load, loads from json import dumps, load, loads
from logging import getLogger from logging import getLogger
from os import path from os import path
from multiprocessing import Process
from .sandboxed_plugin import SandboxedPlugin from .sandboxed_plugin import SandboxedPlugin
from .method_call_request import MethodCallRequest from .method_call_request import MethodCallRequest
from ..localplatform.localsocket import LocalSocket from ..localplatform.localsocket import LocalSocket
from typing import Any, Dict from typing import Any, Callable, Coroutine, Dict
class PluginWrapper: class PluginWrapper:
def __init__(self, file: str, plugin_directory: str, plugin_path: str) -> None: def __init__(self, file: str, plugin_directory: str, plugin_path: str) -> None:
@@ -28,40 +29,52 @@ class PluginWrapper:
self.passive = not path.isfile(self.file) self.passive = not path.isfile(self.file)
self.log = getLogger("plugin") self.log = getLogger("plugin")
self.method_call_requests: Dict[str, MethodCallRequest] = {}
self.sandboxed_plugin = SandboxedPlugin(self.name, self.passive, self.flags, self.file, self.plugin_directory, self.plugin_path, self.version, self.author) self.sandboxed_plugin = SandboxedPlugin(self.name, self.passive, self.flags, self.file, self.plugin_directory, self.plugin_path, self.version, self.author)
#TODO: Maybe somehow make LocalSocket not require on_new_message to make this more clear #TODO: Maybe make LocalSocket not require on_new_message to make this cleaner
self.socket = LocalSocket(self.sandboxed_plugin.on_new_message) self._socket = LocalSocket(self.sandboxed_plugin.on_new_message)
self.listener_task: Task[Any] self._listener_task: Task[Any]
self._method_call_requests: Dict[str, MethodCallRequest] = {}
self.emitted_message_callback: Callable[[Dict[Any, Any]], Coroutine[Any, Any, Any]]
def __str__(self) -> str: def __str__(self) -> str:
return self.name return self.name
async def _response_listener(self): async def _response_listener(self):
while True: while True:
line = await self.socket.read_single_line() line = await self._socket.read_single_line()
if line != None: if line != None:
res = loads(line) res = loads(line)
self.method_call_requests.pop(res["id"]).set_result(res) if res["id"] == 0:
create_task(self.emitted_message_callback(res["payload"]))
return
self._method_call_requests.pop(res["id"]).set_result(res)
async def set_emitted_message_callback(self, callback: Callable[[Dict[Any, Any]], Coroutine[Any, Any, Any]]):
self.emitted_message_callback = callback
async def execute_method(self, method_name: str, kwargs: Dict[Any, Any]): async def execute_method(self, method_name: str, kwargs: Dict[Any, Any]):
if self.passive: if self.passive:
raise RuntimeError("This plugin is passive (aka does not implement main.py)") raise RuntimeError("This plugin is passive (aka does not implement main.py)")
request = MethodCallRequest() request = MethodCallRequest()
await self.socket.get_socket_connection() await self._socket.get_socket_connection()
await self.socket.write_single_line(dumps({ "method": method_name, "args": kwargs, "id": request.id }, ensure_ascii=False)) await self._socket.write_single_line(dumps({ "method": method_name, "args": kwargs, "id": request.id }, ensure_ascii=False))
self.method_call_requests[request.id] = request self._method_call_requests[request.id] = request
return await request.wait_for_result() return await request.wait_for_result()
async def start(self): async def start(self):
self.sandboxed_plugin.start(self.socket) if self.passive:
return self
Process(target=self.sandboxed_plugin.initialize, args=[self._socket]).start()
self.listener_task = create_task(self._response_listener()) self.listener_task = create_task(self._response_listener())
return self
async def stop(self): async def stop(self):
self.listener_task.cancel() self._listener_task.cancel()
async def _(self: PluginWrapper): async def _(self: PluginWrapper):
await self.socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False)) await self._socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False))
await self.socket.close_socket_connection() await self._socket.close_socket_connection()
create_task(_(self)) create_task(_(self))
+12 -10
View File
@@ -3,7 +3,6 @@ from signal import SIGINT, signal
from importlib.util import module_from_spec, spec_from_file_location from importlib.util import module_from_spec, spec_from_file_location
from json import dumps, loads from json import dumps, loads
from logging import getLogger from logging import getLogger
import multiprocessing
from sys import exit, path as syspath from sys import exit, path as syspath
from traceback import format_exc from traceback import format_exc
from asyncio import (get_event_loop, new_event_loop, from asyncio import (get_event_loop, new_event_loop,
@@ -15,7 +14,7 @@ from ..localplatform.localplatform import setgid, setuid, get_username, get_home
from ..customtypes import UserType from ..customtypes import UserType
from .. import helpers from .. import helpers
from typing import List from typing import Any, Dict, List
class SandboxedPlugin: class SandboxedPlugin:
def __init__(self, def __init__(self,
@@ -38,7 +37,9 @@ class SandboxedPlugin:
self.log = getLogger("plugin") self.log = getLogger("plugin")
def _init(self, socket: LocalSocket): def initialize(self, socket: LocalSocket):
self._socket = socket
try: try:
signal(SIGINT, lambda s, f: exit(0)) signal(SIGINT, lambda s, f: exit(0))
@@ -79,6 +80,9 @@ class SandboxedPlugin:
spec.loader.exec_module(module) spec.loader.exec_module(module)
self.Plugin = module.Plugin self.Plugin = module.Plugin
setattr(self.Plugin, "emit_message", self.emit_message)
#TODO: Find how to put emit_message on global namespace so it doesn't pollute Plugin
if hasattr(self.Plugin, "_migration"): if hasattr(self.Plugin, "_migration"):
get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin)) get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin))
if hasattr(self.Plugin, "_main"): if hasattr(self.Plugin, "_main"):
@@ -113,7 +117,6 @@ class SandboxedPlugin:
get_event_loop().close() get_event_loop().close()
raise Exception("Closing message listener") raise Exception("Closing message listener")
# TODO there is definitely a better way to type this
d: SocketResponseDict = {"res": None, "success": True, "id": data["id"]} d: SocketResponseDict = {"res": None, "success": True, "id": data["id"]}
try: try:
d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"]) d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
@@ -123,9 +126,8 @@ class SandboxedPlugin:
finally: finally:
return dumps(d, ensure_ascii=False) return dumps(d, ensure_ascii=False)
async def emit_message(self, message: Dict[Any, Any]):
def start(self, socket: LocalSocket): await self._socket.write_single_line(dumps({
if self.passive: "id": 0,
return self "payload": message
multiprocessing.Process(target=self._init, args=[socket]).start() }))
return self
BIN
View File
Binary file not shown.
+7 -7
View File
@@ -44,7 +44,7 @@ export async function getPluginList(): Promise<StorePlugin[]> {
if (store === null) { if (store === null) {
console.log('Could not get store, using Default.'); console.log('Could not get store, using Default.');
await setSetting('store', Store.Default); await setSetting('store', Store.Default);
store = Store.Default store = Store.Default;
} }
switch (+store) { switch (+store) {
case Store.Default: case Store.Default:
@@ -60,12 +60,12 @@ export async function getPluginList(): Promise<StorePlugin[]> {
console.error('Somehow you ended up without a standard URL, using the default URL.'); console.error('Somehow you ended up without a standard URL, using the default URL.');
storeURL = 'https://plugins.deckbrew.xyz/plugins'; storeURL = 'https://plugins.deckbrew.xyz/plugins';
break; break;
return fetch(storeURL, { return fetch(storeURL, {
method: 'GET', method: 'GET',
headers: { headers: {
'X-Decky-Version': version.current, 'X-Decky-Version': version.current,
}, },
}).then((r) => r.json()); }).then((r) => r.json());
} }
switch (+store) { switch (+store) {
case Store.Default: case Store.Default: