Files
decky-loader/backend/loader.py

169 lines
6.6 KiB
Python

from asyncio import Queue
from logging import getLogger
from os import listdir, path
from pathlib import Path
from time import time
from traceback import print_exc
from aiohttp import web
from aiohttp_jinja2 import template
from genericpath import exists
from watchdog.events import RegexMatchingEventHandler
from watchdog.observers.inotify import InotifyObserver as Observer
from injector import inject_to_tab
from plugin import PluginWrapper
class FileChangeHandler(RegexMatchingEventHandler):
def __init__(self, queue, plugin_path) -> None:
super().__init__(regexes=[r'^.*?dist\/index\.js$', r'^.*?main\.py$'])
self.logger = getLogger("file-watcher")
self.plugin_path = plugin_path
self.queue = queue
def maybe_reload(self, src_path):
plugin_dir = Path(path.relpath(src_path, self.plugin_path)).parts[0]
if exists(path.join(self.plugin_path, plugin_dir, "plugin.json")):
self.queue.put_nowait((path.join(self.plugin_path, plugin_dir, "main.py"), plugin_dir, True))
def on_created(self, event):
src_path = event.src_path
if "__pycache__" in src_path:
return
# check to make sure this isn't a directory
if path.isdir(src_path):
return
# get the directory name of the plugin so that we can find its "main.py" and reload it; the
# file that changed is not necessarily the one that needs to be reloaded
self.logger.debug(f"file created: {src_path}")
self.maybe_reload(src_path)
def on_modified(self, event):
src_path = event.src_path
if "__pycache__" in src_path:
return
# check to make sure this isn't a directory
if path.isdir(src_path):
return
# get the directory name of the plugin so that we can find its "main.py" and reload it; the
# file that changed is not necessarily the one that needs to be reloaded
self.logger.debug(f"file modified: {src_path}")
self.maybe_reload(src_path)
class Loader:
def __init__(self, server_instance, plugin_path, loop, live_reload=False) -> None:
self.loop = loop
self.logger = getLogger("Loader")
self.plugin_path = plugin_path
self.logger.info(f"plugin_path: {self.plugin_path}")
self.plugins = {}
self.import_plugins()
if live_reload:
self.reload_queue = Queue()
self.observer = Observer()
self.observer.schedule(FileChangeHandler(self.reload_queue, plugin_path), self.plugin_path, recursive=True)
self.observer.start()
self.loop.create_task(self.handle_reloads())
server_instance.add_routes([
web.get("/plugins", self.handle_plugins),
web.get("/plugins/{plugin_name}/frontend_bundle", self.handle_frontend_bundle),
web.post("/plugins/{plugin_name}/methods/{method_name}", self.handle_plugin_method_call),
web.get("/plugins/{plugin_name}/assets/{path:.*}", self.handle_frontend_assets),
web.post("/methods/{method_name}", self.handle_server_method_call)
])
def handle_plugins(self, request):
plugins = list(map(lambda kv: dict([("name", kv[0])]), self.plugins.items()))
return web.json_response(plugins)
def handle_frontend_assets(self, request):
plugin = self.plugins[request.match_info["plugin_name"]]
file = path.join(self.plugin_path, plugin.plugin_directory, "dist/assets", request.match_info["path"])
return web.FileResponse(file)
def handle_frontend_bundle(self, request):
plugin = self.plugins[request.match_info["plugin_name"]]
with open(path.join(self.plugin_path, plugin.plugin_directory, "dist/index.js"), 'r') as bundle:
return web.Response(text=bundle.read(), content_type="application/javascript")
def import_plugin(self, file, plugin_directory, refresh=False):
try:
plugin = PluginWrapper(file, plugin_directory, self.plugin_path)
if plugin.name in self.plugins:
if not "debug" in plugin.flags and refresh:
self.logger.info(f"Plugin {plugin.name} is already loaded and has requested to not be re-loaded")
return
else:
self.plugins[plugin.name].stop()
self.plugins.pop(plugin.name, None)
if plugin.passive:
self.logger.info(f"Plugin {plugin.name} is passive")
self.plugins[plugin.name] = plugin.start()
self.logger.info(f"Loaded {plugin.name}")
if refresh:
self.loop.create_task(self.reload_frontend_plugin(plugin.name))
except Exception as e:
self.logger.error(f"Could not load {file}. {e}")
print_exc()
async def reload_frontend_plugin(self, name):
await inject_to_tab("SP", f"window.DeckyPluginLoader?.loadPlugin('{name}')")
def import_plugins(self):
self.logger.info(f"import plugins from {self.plugin_path}")
directories = [i for i in listdir(self.plugin_path) if path.isdir(path.join(self.plugin_path, i)) and path.isfile(path.join(self.plugin_path, i, "plugin.json"))]
for directory in directories:
self.logger.info(f"found plugin: {directory}")
self.import_plugin(path.join(self.plugin_path, directory, "main.py"), directory)
async def handle_reloads(self):
while True:
args = await self.reload_queue.get()
self.import_plugin(*args)
async def handle_server_method_call(self, request):
method_name = request.match_info["method_name"]
method_info = await request.json()
args = method_info["args"]
res = {}
try:
r = await self.utilities.util_methods[method_name](**args)
res["result"] = r
res["success"] = True
except Exception as e:
res["result"] = str(e)
res["success"] = False
return web.json_response(res)
async def handle_plugin_method_call(self, request):
res = {}
plugin = self.plugins[request.match_info["plugin_name"]]
method_name = request.match_info["method_name"]
method_info = await request.json()
args = method_info["args"]
try:
if method_name.startswith("_"):
raise RuntimeError("Tried to call private method")
res["result"] = await plugin.execute_method(method_name, args)
res["success"] = True
except Exception as e:
res["result"] = str(e)
res["success"] = False
return web.json_response(res)