mirror of
https://github.com/SteamDeckHomebrew/decky-loader.git
synced 2026-06-12 08:33:34 +03:00
* Fix bad debug print statement in uninstall plugin * Safely remove element from hidden plugin/plugin order list
254 lines
11 KiB
Python
254 lines
11 KiB
Python
# Full imports
|
|
import json
|
|
# import pprint
|
|
# from pprint import pformat
|
|
|
|
# Partial imports
|
|
from aiohttp import ClientSession, web
|
|
from asyncio import get_event_loop, sleep
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
from hashlib import sha256
|
|
from io import BytesIO
|
|
from logging import getLogger
|
|
from os import R_OK, W_OK, path, rename, listdir, access, mkdir
|
|
from shutil import rmtree
|
|
from time import time
|
|
from zipfile import ZipFile
|
|
from localplatform import chown, chmod
|
|
|
|
# Local modules
|
|
from helpers import get_ssl_context, download_remote_binary_to_path
|
|
from injector import get_gamepadui_tab
|
|
|
|
logger = getLogger("Browser")
|
|
|
|
class PluginInstallContext:
|
|
def __init__(self, artifact, name, version, hash) -> None:
|
|
self.artifact = artifact
|
|
self.name = name
|
|
self.version = version
|
|
self.hash = hash
|
|
|
|
class PluginBrowser:
|
|
def __init__(self, plugin_path, plugins, loader, settings) -> None:
|
|
self.plugin_path = plugin_path
|
|
self.plugins = plugins
|
|
self.loader = loader
|
|
self.settings = settings
|
|
self.install_requests = {}
|
|
|
|
def _unzip_to_plugin_dir(self, zip, name, hash):
|
|
zip_hash = sha256(zip.getbuffer()).hexdigest()
|
|
if hash and (zip_hash != hash):
|
|
return False
|
|
zip_file = ZipFile(zip)
|
|
zip_file.extractall(self.plugin_path)
|
|
plugin_dir = path.join(self.plugin_path, self.find_plugin_folder(name))
|
|
|
|
if not chown(plugin_dir) or not chmod(plugin_dir, 555):
|
|
logger.error(f"chown/chmod exited with a non-zero exit code")
|
|
return False
|
|
return True
|
|
|
|
async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath):
|
|
rv = False
|
|
try:
|
|
packageJsonPath = path.join(pluginBasePath, 'package.json')
|
|
pluginBinPath = path.join(pluginBasePath, 'bin')
|
|
|
|
if access(packageJsonPath, R_OK):
|
|
with open(packageJsonPath, "r", encoding="utf-8") as f:
|
|
packageJson = json.load(f)
|
|
if "remote_binary" in packageJson and len(packageJson["remote_binary"]) > 0:
|
|
# create bin directory if needed.
|
|
chmod(pluginBasePath, 777)
|
|
if access(pluginBasePath, W_OK):
|
|
if not path.exists(pluginBinPath):
|
|
mkdir(pluginBinPath)
|
|
if not access(pluginBinPath, W_OK):
|
|
chmod(pluginBinPath, 777)
|
|
|
|
rv = True
|
|
for remoteBinary in packageJson["remote_binary"]:
|
|
# Required Fields. If any Remote Binary is missing these fail the install.
|
|
binName = remoteBinary["name"]
|
|
binURL = remoteBinary["url"]
|
|
binHash = remoteBinary["sha256hash"]
|
|
if not await download_remote_binary_to_path(binURL, binHash, path.join(pluginBinPath, binName)):
|
|
rv = False
|
|
raise Exception(f"Error Downloading Remote Binary {binName}@{binURL} with hash {binHash} to {path.join(pluginBinPath, binName)}")
|
|
|
|
chown(self.plugin_path)
|
|
chmod(pluginBasePath, 555)
|
|
else:
|
|
rv = True
|
|
logger.debug(f"No Remote Binaries to Download")
|
|
|
|
except Exception as e:
|
|
rv = False
|
|
logger.debug(str(e))
|
|
|
|
return rv
|
|
|
|
"""Return the filename (only) for the specified plugin"""
|
|
def find_plugin_folder(self, name):
|
|
for folder in listdir(self.plugin_path):
|
|
try:
|
|
with open(path.join(self.plugin_path, folder, 'plugin.json'), "r", encoding="utf-8") as f:
|
|
plugin = json.load(f)
|
|
|
|
if plugin['name'] == name:
|
|
return folder
|
|
except:
|
|
logger.debug(f"skipping {folder}")
|
|
|
|
async def uninstall_plugin(self, name):
|
|
if self.loader.watcher:
|
|
self.loader.watcher.disabled = True
|
|
tab = await get_gamepadui_tab()
|
|
plugin_dir = path.join(self.plugin_path, self.find_plugin_folder(name))
|
|
try:
|
|
logger.info("uninstalling " + name)
|
|
logger.info(" at dir " + plugin_dir)
|
|
logger.debug("calling frontend unload for %s" % str(name))
|
|
res = await tab.evaluate_js(f"DeckyPluginLoader.unloadPlugin('{name}')")
|
|
logger.debug("result of unload from UI: %s", res)
|
|
# plugins_snapshot = self.plugins.copy()
|
|
# snapshot_string = pformat(plugins_snapshot)
|
|
# logger.debug("current plugins: %s", snapshot_string)
|
|
if name in self.plugins:
|
|
logger.debug("Plugin %s was found", name)
|
|
self.plugins[name].stop()
|
|
logger.debug("Plugin %s was stopped", name)
|
|
del self.plugins[name]
|
|
logger.debug("Plugin %s was removed from the dictionary", name)
|
|
self.cleanup_plugin_settings(name)
|
|
logger.debug("removing files %s" % str(name))
|
|
rmtree(plugin_dir)
|
|
except FileNotFoundError:
|
|
logger.warning(f"Plugin {name} not installed, skipping uninstallation")
|
|
except Exception as e:
|
|
logger.error(f"Plugin {name} in {plugin_dir} was not uninstalled")
|
|
logger.error(f"Error at {str(e)}", exc_info=e)
|
|
if self.loader.watcher:
|
|
self.loader.watcher.disabled = False
|
|
|
|
async def _install(self, artifact, name, version, hash):
|
|
# Will be set later in code
|
|
res_zip = None
|
|
|
|
# Check if plugin is installed
|
|
isInstalled = False
|
|
if self.loader.watcher:
|
|
self.loader.watcher.disabled = True
|
|
try:
|
|
pluginFolderPath = self.find_plugin_folder(name)
|
|
if pluginFolderPath:
|
|
isInstalled = True
|
|
except:
|
|
logger.error(f"Failed to determine if {name} is already installed, continuing anyway.")
|
|
|
|
# Check if the file is a local file or a URL
|
|
if artifact.startswith("file://"):
|
|
logger.info(f"Installing {name} from local ZIP file (Version: {version})")
|
|
res_zip = BytesIO(open(artifact[7:], "rb").read())
|
|
else:
|
|
logger.info(f"Installing {name} from URL (Version: {version})")
|
|
async with ClientSession() as client:
|
|
logger.debug(f"Fetching {artifact}")
|
|
res = await client.get(artifact, ssl=get_ssl_context())
|
|
if res.status == 200:
|
|
logger.debug("Got 200. Reading...")
|
|
data = await res.read()
|
|
logger.debug(f"Read {len(data)} bytes")
|
|
res_zip = BytesIO(data)
|
|
else:
|
|
logger.fatal(f"Could not fetch from URL. {await res.text()}")
|
|
|
|
# Check to make sure we got the file
|
|
if res_zip is None:
|
|
logger.fatal(f"Could not fetch {artifact}")
|
|
return
|
|
|
|
# If plugin is installed, uninstall it
|
|
if isInstalled:
|
|
try:
|
|
logger.debug("Uninstalling existing plugin...")
|
|
await self.uninstall_plugin(name)
|
|
except:
|
|
logger.error(f"Plugin {name} could not be uninstalled.")
|
|
|
|
# Install the plugin
|
|
logger.debug("Unzipping...")
|
|
ret = self._unzip_to_plugin_dir(res_zip, name, hash)
|
|
if ret:
|
|
plugin_folder = self.find_plugin_folder(name)
|
|
plugin_dir = path.join(self.plugin_path, plugin_folder)
|
|
ret = await self._download_remote_binaries_for_plugin_with_name(plugin_dir)
|
|
if ret:
|
|
logger.info(f"Installed {name} (Version: {version})")
|
|
if name in self.loader.plugins:
|
|
self.loader.plugins[name].stop()
|
|
self.loader.plugins.pop(name, None)
|
|
await sleep(1)
|
|
|
|
current_plugin_order = self.settings.getSetting("pluginOrder")
|
|
current_plugin_order.append(name)
|
|
self.settings.setSetting("pluginOrder", current_plugin_order)
|
|
logger.debug("Plugin %s was added to the pluginOrder setting", name)
|
|
self.loader.import_plugin(path.join(plugin_dir, "main.py"), plugin_folder)
|
|
else:
|
|
logger.fatal(f"Failed Downloading Remote Binaries")
|
|
else:
|
|
self.log.fatal(f"SHA-256 Mismatch!!!! {name} (Version: {version})")
|
|
if self.loader.watcher:
|
|
self.loader.watcher.disabled = False
|
|
|
|
async def request_plugin_install(self, artifact, name, version, hash, install_type):
|
|
request_id = str(time())
|
|
self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash)
|
|
tab = await get_gamepadui_tab()
|
|
await tab.open_websocket()
|
|
await tab.evaluate_js(f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}', '{request_id}', '{hash}', {install_type})")
|
|
|
|
async def request_multiple_plugin_installs(self, requests):
|
|
request_id = str(time())
|
|
self.install_requests[request_id] = [PluginInstallContext(req['artifact'], req['name'], req['version'], req['hash']) for req in requests]
|
|
js_requests_parameter = ','.join([
|
|
f"{{ name: '{req['name']}', version: '{req['version']}', hash: '{req['hash']}', install_type: {req['install_type']}}}" for req in requests
|
|
])
|
|
|
|
tab = await get_gamepadui_tab()
|
|
await tab.open_websocket()
|
|
await tab.evaluate_js(f"DeckyPluginLoader.addMultiplePluginsInstallPrompt('{request_id}', [{js_requests_parameter}])")
|
|
|
|
async def confirm_plugin_install(self, request_id):
|
|
requestOrRequests = self.install_requests.pop(request_id)
|
|
if isinstance(requestOrRequests, list):
|
|
[await self._install(req.artifact, req.name, req.version, req.hash) for req in requestOrRequests]
|
|
else:
|
|
await self._install(requestOrRequests.artifact, requestOrRequests.name, requestOrRequests.version, requestOrRequests.hash)
|
|
|
|
def cancel_plugin_install(self, request_id):
|
|
self.install_requests.pop(request_id)
|
|
|
|
def cleanup_plugin_settings(self, name):
|
|
"""Removes any settings related to a plugin. Propably called when a plugin is uninstalled.
|
|
|
|
Args:
|
|
name (string): The name of the plugin
|
|
"""
|
|
hidden_plugins = self.settings.getSetting("hiddenPlugins", [])
|
|
if name in hidden_plugins:
|
|
hidden_plugins.remove(name)
|
|
self.settings.setSetting("hiddenPlugins", hidden_plugins)
|
|
|
|
|
|
plugin_order = self.settings.getSetting("pluginOrder", [])
|
|
|
|
if name in plugin_order:
|
|
plugin_order.remove(name)
|
|
self.settings.setSetting("pluginOrder", plugin_order)
|
|
|
|
logger.debug("Removed any settings for plugin %s", name)
|