mirror of
https://github.com/SteamDeckHomebrew/decky-loader.git
synced 2026-06-13 17:13:45 +03:00
54 lines
2.0 KiB
Python
54 lines
2.0 KiB
Python
from asyncio import StreamReader, create_task, sleep, create_subprocess_exec
|
|
from asyncio.subprocess import Process
|
|
from subprocess import PIPE
|
|
|
|
from .sandboxed_plugin import SandboxedPlugin
|
|
from ..localplatform.localsocket import LocalSocket
|
|
from ..customtypes import UserType
|
|
|
|
from typing import Dict, List
|
|
|
|
class BinaryPlugin(SandboxedPlugin):
|
|
def __init__(self,
|
|
socket: LocalSocket,
|
|
name: str,
|
|
flags: List[str],
|
|
file: str,
|
|
plugin_directory: str,
|
|
plugin_path: str,
|
|
version: str | None,
|
|
author: str,
|
|
env: Dict[str, str]) -> None:
|
|
super().__init__(socket, name, flags, file, plugin_directory, plugin_path, version, author, env)
|
|
self.process: Process
|
|
|
|
def start(self):
|
|
create_task(self._start())
|
|
|
|
async def stop(self):
|
|
self.process.terminate()
|
|
while not self.process.returncode:
|
|
await sleep(0)
|
|
|
|
async def _start(self):
|
|
self.env["DECKY_SOCKET"] = self.socket.socket_addr
|
|
user_type = UserType.ROOT.value if "root" in self.flags else UserType.HOST_USER.value
|
|
self.process = await create_subprocess_exec(self.file,
|
|
env=self.env,
|
|
user=user_type,
|
|
group=user_type,
|
|
stdout=PIPE,
|
|
stderr=PIPE)
|
|
assert self.process.stderr and self.process.stdout
|
|
create_task(self._stream_watcher(self.process.stdout, False))
|
|
create_task(self._stream_watcher(self.process.stderr, True))
|
|
|
|
async def _stream_watcher(self, stream: StreamReader, is_err: bool):
|
|
async for line in stream:
|
|
line = line.decode("utf-8")
|
|
if not line.strip():
|
|
continue
|
|
if is_err:
|
|
self.log.error(line)
|
|
else:
|
|
self.log.info(line) |