more progress on WS router

This commit is contained in:
AAGaming
2023-07-03 23:11:44 -04:00
committed by marios8543
parent 18d89e76fd
commit 05b41b3410
2 changed files with 160 additions and 18 deletions

View File

@@ -15,23 +15,24 @@ class MessageType(Enum):
CALL = 0
REPLY = 1
ERROR = 2
# Pub/sub
# # Pub/sub
# SUBSCRIBE = 3
# UNSUBSCRIBE = 4
# PUBLISH = 5
running_calls: Dict[str, Future] = {}
subscriptions: Dict[str, Callable[[Any]]]
# {type: MessageType, data: dta, id: id}
# see wsrouter.ts for typings
class WSRouter:
def __init__(self) -> None:
self.ws = None
self.req_id = 0
self.routes = {}
self.running_calls: Dict[int, Future] = {}
# self.subscriptions: Dict[str, Callable[[Any]]] = {}
self.logger = getLogger("WSRouter")
async def add_routes(self, routes):
self.routes.update(routes)
async def add_route(self, name, route):
self.routes[name] = route
async def handle(self, request):
self.logger.debug('Websocket connection starting')
@@ -39,6 +40,13 @@ class WSRouter:
await ws.prepare(request)
self.logger.debug('Websocket connection ready')
if self.ws != None:
try:
await self.ws.close()
except:
pass
self.ws = None
self.ws = ws
try:
@@ -56,19 +64,30 @@ class WSRouter:
data = msg.json()
if self.routes[data.route]:
try:
res = await self.routes[data.route](data.data)
await ws.send_json({type: MessageType.REPLY, id: data.id, data: res})
res = await self.routes[data.route](*data.args)
await self.write({"type": MessageType.REPLY, "id": data.id, "result": res})
self.logger.debug(f"Started PY call {data.route} ID {data.id}")
except:
await ws.send_json({type: MessageType.ERROR, id: data.ud, data: format_exc()})
await self.write({"type": MessageType.ERROR, "id": data.id, "error": format_exc()})
else:
await self.write({"type": MessageType.ERROR, "id": data.id, "error": "Route does not exist."})
case MessageType.REPLY:
if running_calls[data.id]:
running_calls[data.id].set_result(data.data)
if self.running_calls[data.id]:
self.running_calls[data.id].set_result(data.result)
del self.running_calls[data.id]
self.logger.debug(f"Resolved JS call {data.id} with value {str(data.result)}")
case MessageType.ERROR:
if running_calls[data.id]:
running_calls[data.id].set_exception(data.data)
if self.running_calls[data.id]:
self.running_calls[data.id].set_exception(data.error)
del self.running_calls[data.id]
self.logger.debug(f"Errored JS call {data.id} with error {data.error}")
case _:
self.logger.error("Unknown message type", data)
finally:
try:
await ws.close()
self.ws = None
except:
pass

View File

@@ -1,14 +1,52 @@
import Logger from './logger';
enum MessageType {
// Call-reply
CALL,
REPLY,
ERROR,
// Pub/sub
// SUBSCRIBE,
// UNSUBSCRIBE,
// PUBLISH
}
interface CallMessage {
type: MessageType.CALL;
args: any[];
route: string;
id: number;
// TODO implement this
// skipResponse?: boolean;
}
interface ReplyMessage {
type: MessageType.REPLY;
result: any;
id: number;
}
interface ErrorMessage {
type: MessageType.ERROR;
error: any;
id: number;
}
type Message = CallMessage | ReplyMessage | ErrorMessage;
// Helper to resolve a promise from the outside
interface PromiseResolver<T> {
resolve: (res: T) => void;
reject: (error: string) => void;
promise: Promise<T>;
}
class WSRouter extends Logger {
routes: Map<string, (args: any) => any> = new Map();
routes: Map<string, (...args: any) => any> = new Map();
runningCalls: Map<number, PromiseResolver<any>> = new Map();
ws?: WebSocket;
// Used to map results and errors to calls
reqId: number = 0;
constructor() {
super('WSRouter');
}
@@ -21,7 +59,92 @@ class WSRouter extends Logger {
this.ws.addEventListener('message', this.onError.bind(this));
}
onMessage() {}
createPromiseResolver<T>(): PromiseResolver<T> {
let resolver: PromiseResolver<T>;
const promise = new Promise<T>((resolve, reject) => {
resolver = {
promise,
resolve,
reject,
};
this.debug('Created new PromiseResolver');
});
this.debug('Returning new PromiseResolver');
// The promise will always run first
// @ts-expect-error 2454
return resolver;
}
onError() {}
write(data: Message) {
this.ws?.send(JSON.stringify(data));
}
addRoute(name: string, route: (args: any) => any) {
this.routes.set(name, route);
}
removeRoute(name: string) {
this.routes.delete(name);
}
async onMessage(msg: MessageEvent) {
this.debug('WS Message', msg);
try {
const data = JSON.parse(msg.data) as Message;
switch (data.type) {
case MessageType.CALL:
if (this.routes.has(data.route)) {
try {
const res = await this.routes.get(data.route)!(...data.args);
this.write({ type: MessageType.REPLY, id: data.id, result: res });
this.debug(`Started JS call ${data.route} ID ${data.id}`);
} catch (e) {
await this.write({ type: MessageType.ERROR, id: data.id, error: (e as Error)?.stack || e });
}
} else {
await this.write({ type: MessageType.ERROR, id: data.id, error: 'Route does not exist.' });
}
break;
case MessageType.REPLY:
if (this.runningCalls.has(data.id)) {
this.runningCalls.get(data.id)!.resolve(data.result);
this.runningCalls.delete(data.id);
this.debug(`Resolved PY call ${data.id} with value`, data.result);
}
break;
case MessageType.ERROR:
if (this.runningCalls.has(data.id)) {
this.runningCalls.get(data.id)!.reject(data.error);
this.runningCalls.delete(data.id);
this.debug(`Errored PY call ${data.id} with error`, data.error);
}
break;
default:
this.error('Unknown message type', data);
break;
}
} catch (e) {
this.error('Error parsing WebSocket message', e);
}
this.call<[number, number], string>('methodName', 1, 2);
}
call<Args extends any[] = any[], Return = void>(route: string, ...args: Args): Promise<Return> {
const resolver = this.createPromiseResolver<Return>();
const id = ++this.reqId;
this.runningCalls.set(id, resolver);
this.write({ type: MessageType.CALL, route, args, id });
return resolver.promise;
}
onError(error: any) {
this.error('WS ERROR', error);
}
}