Files
decky-loader/backend/plugin.py
Philipp Richter f1e679c3fb Expose a 'decky_plugin' module to decky plugins (#353)
* Expose a 'decky_plugin' module to decky plugins

* expose decky user home path
* support 'py_modules' python modules in plugins
* allow for a '_migration' method in plugins to have an explicit file
  moving step

* Expose the plugin python module as .pyi stub interface

* Expose system and user python paths to plugins
2023-02-19 14:42:55 -08:00

204 lines
8.7 KiB
Python

import multiprocessing
from asyncio import (Lock, get_event_loop, new_event_loop,
open_unix_connection, set_event_loop, sleep,
start_unix_server, IncompleteReadError, LimitOverrunError)
from concurrent.futures import ProcessPoolExecutor
from importlib.util import module_from_spec, spec_from_file_location
from json import dumps, load, loads
from logging import getLogger
from traceback import format_exc
from os import path, setgid, setuid, environ
from signal import SIGINT, signal
from sys import exit, path as syspath
from time import time
import helpers
from updater import Updater
multiprocessing.set_start_method("fork")
BUFFER_LIMIT = 2 ** 20 # 1 MiB
class PluginWrapper:
def __init__(self, file, plugin_directory, plugin_path) -> None:
self.file = file
self.plugin_path = plugin_path
self.plugin_directory = plugin_directory
self.reader = None
self.writer = None
self.socket_addr = f"/tmp/plugin_socket_{time()}"
self.method_call_lock = Lock()
self.version = None
json = load(open(path.join(plugin_path, plugin_directory, "plugin.json"), "r", encoding="utf-8"))
if path.isfile(path.join(plugin_path, plugin_directory, "package.json")):
package_json = load(open(path.join(plugin_path, plugin_directory, "package.json"), "r", encoding="utf-8"))
self.version = package_json["version"]
self.legacy = False
self.main_view_html = json["main_view_html"] if "main_view_html" in json else ""
self.tile_view_html = json["tile_view_html"] if "tile_view_html" in json else ""
self.legacy = self.main_view_html or self.tile_view_html
self.name = json["name"]
self.author = json["author"]
self.flags = json["flags"]
self.log = getLogger("plugin")
self.passive = not path.isfile(self.file)
def __str__(self) -> str:
return self.name
def _init(self):
try:
signal(SIGINT, lambda s, f: exit(0))
set_event_loop(new_event_loop())
if self.passive:
return
setgid(0 if "root" in self.flags else helpers.get_user_group_id())
setuid(0 if "root" in self.flags else helpers.get_user_id())
# export a bunch of environment variables to help plugin developers
environ["HOME"] = helpers.get_home_path("root" if "root" in self.flags else helpers.get_user())
environ["USER"] = "root" if "root" in self.flags else helpers.get_user()
environ["DECKY_VERSION"] = helpers.get_loader_version()
environ["DECKY_USER"] = helpers.get_user()
environ["DECKY_USER_HOME"] = helpers.get_home_path()
environ["DECKY_HOME"] = helpers.get_homebrew_path()
environ["DECKY_PLUGIN_SETTINGS_DIR"] = path.join(environ["DECKY_HOME"], "settings", self.plugin_directory)
helpers.mkdir_as_user(environ["DECKY_PLUGIN_SETTINGS_DIR"])
environ["DECKY_PLUGIN_RUNTIME_DIR"] = path.join(environ["DECKY_HOME"], "data", self.plugin_directory)
helpers.mkdir_as_user(environ["DECKY_PLUGIN_RUNTIME_DIR"])
environ["DECKY_PLUGIN_LOG_DIR"] = path.join(environ["DECKY_HOME"], "logs", self.plugin_directory)
helpers.mkdir_as_user(environ["DECKY_PLUGIN_LOG_DIR"])
environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory)
environ["DECKY_PLUGIN_NAME"] = self.name
environ["DECKY_PLUGIN_VERSION"] = self.version
environ["DECKY_PLUGIN_AUTHOR"] = self.author
# append the loader's plugin path to the recognized python paths
syspath.append(path.realpath(path.join(path.dirname(__file__), "plugin")))
# append the plugin's `py_modules` to the recognized python paths
syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules"))
# append the system and user python paths
syspath.extend(helpers.get_system_pythonpaths())
spec = spec_from_file_location("_", self.file)
module = module_from_spec(spec)
spec.loader.exec_module(module)
self.Plugin = module.Plugin
if hasattr(self.Plugin, "_migration"):
get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin))
if hasattr(self.Plugin, "_main"):
get_event_loop().create_task(self.Plugin._main(self.Plugin))
get_event_loop().create_task(self._setup_socket())
get_event_loop().run_forever()
except:
self.log.error("Failed to start " + self.name + "!\n" + format_exc())
exit(0)
async def _unload(self):
try:
self.log.info("Attempting to unload with plugin " + self.name + "'s \"_unload\" function.\n")
if hasattr(self.Plugin, "_unload"):
await self.Plugin._unload(self.Plugin)
self.log.info("Unloaded " + self.name + "\n")
else:
self.log.info("Could not find \"_unload\" in " + self.name + "'s main.py" + "\n")
except:
self.log.error("Failed to unload " + self.name + "!\n" + format_exc())
exit(0)
async def _setup_socket(self):
self.socket = await start_unix_server(self._listen_for_method_call, path=self.socket_addr, limit=BUFFER_LIMIT)
async def _listen_for_method_call(self, reader, writer):
while True:
line = bytearray()
while True:
try:
line.extend(await reader.readuntil())
except LimitOverrunError:
line.extend(await reader.read(reader._limit))
continue
except IncompleteReadError as err:
line.extend(err.partial)
break
else:
break
data = loads(line.decode("utf-8"))
if "stop" in data:
self.log.info("Calling Loader unload function.")
await self._unload()
get_event_loop().stop()
while get_event_loop().is_running():
await sleep(0)
get_event_loop().close()
return
d = {"res": None, "success": True}
try:
d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"])
except Exception as e:
d["res"] = str(e)
d["success"] = False
finally:
writer.write((dumps(d, ensure_ascii=False)+"\n").encode("utf-8"))
await writer.drain()
async def _open_socket_if_not_exists(self):
if not self.reader:
retries = 0
while retries < 10:
try:
self.reader, self.writer = await open_unix_connection(self.socket_addr, limit=BUFFER_LIMIT)
return True
except:
await sleep(2)
retries += 1
return False
else:
return True
def start(self):
if self.passive:
return self
multiprocessing.Process(target=self._init).start()
return self
def stop(self):
if self.passive:
return
async def _(self):
if await self._open_socket_if_not_exists():
self.writer.write((dumps({ "stop": True }, ensure_ascii=False)+"\n").encode("utf-8"))
await self.writer.drain()
self.writer.close()
get_event_loop().create_task(_(self))
async def execute_method(self, method_name, kwargs):
if self.passive:
raise RuntimeError("This plugin is passive (aka does not implement main.py)")
async with self.method_call_lock:
if await self._open_socket_if_not_exists():
self.writer.write(
(dumps({ "method": method_name, "args": kwargs }, ensure_ascii=False) + "\n").encode("utf-8"))
await self.writer.drain()
line = bytearray()
while True:
try:
line.extend(await self.reader.readuntil())
except LimitOverrunError:
line.extend(await self.reader.read(self.reader._limit))
continue
except IncompleteReadError as err:
line.extend(err.partial)
break
else:
break
res = loads(line.decode("utf-8"))
if not res["success"]:
raise Exception(res["res"])
return res["res"]