From 5d0d7a52e8d427fae350267852b0861b05de74b5 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Fri, 7 Apr 2023 22:08:49 -0400 Subject: [PATCH] *Change approach to proxying protomux requests over the kernel --- package.json | 17 ++-- src/protomux.ts | 263 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+), 11 deletions(-) create mode 100644 src/protomux.ts diff --git a/package.json b/package.json index cef05c8..81203ef 100644 --- a/package.json +++ b/package.json @@ -11,14 +11,13 @@ "dependencies": { "@lumeweb/interface-relay": "git+https://git.lumeweb.com/LumeWeb/interface-relay.git", "@lumeweb/kernel-swarm-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git", + "@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git", "@lumeweb/rpc-client": "git+https://git.lumeweb.com/LumeWeb/rpc-client.git", "b4a": "^1.6.3", "libkmodule": "^0.2.53", + "p-defer": "^4.0.0", "protomux": "git+https://git.lumeweb.com/LumeWeb/protomux.git" }, - "browser": { - "timers": "timers-browserify" - }, "devDependencies": { "@scure/bip39": "^1.2.0", "@skynetlabs/skynet-nodejs": "^2.9.0", @@ -28,14 +27,10 @@ "esbuild": "^0.17.15", "read": "^2.0.0", "timers-browserify": "^2.0.12", - "typescript": "^5.0.3" + "typescript": "^5.0.4" }, - "pnpm": { - "patchedDependencies": { - "b4a@1.6.3": "patches/b4a@1.6.3.patch" - }, - "overrides": { - "protomux": "git+https://git.lumeweb.com/LumeWeb/protomux.git" - } + "browser": { + "timers": "timers-browserify", + "protomux": "./src/protomux.ts" } } diff --git a/src/protomux.ts b/src/protomux.ts new file mode 100644 index 0000000..a07a264 --- /dev/null +++ b/src/protomux.ts @@ -0,0 +1,263 @@ +import { Client, factory } from "@lumeweb/libkernel-universal"; +const MODULE = "_AXYJDzn2fjd-YfuchEegna7iErhun6QwQK7gSa3UNjHvw"; + +import defer from "p-defer"; + +class Protomux { + private isProtomux = true; + + constructor(stream: any) { + this._stream = stream; + if (!stream.userData) { + stream.userData = this; + } + } + + private _stream: any; + + get stream(): any { + return this._stream; + } + + static from(stream: any) { + if (stream.userData && stream.userData.isProtomux) return stream.userData; + if (stream.isProtomux) return stream; + return new this(stream); + } + + public async createChannel({ + protocol, + id = null, + handshake = null, + onopen = undefined, + onclose = undefined, + ondestroy = undefined, + }: { + protocol: string; + id: any; + handshake: any; + onopen?: Function; + onclose?: Function; + ondestroy?: Function; + }) { + return createChannel( + this, + protocol, + id, + handshake, + onopen, + onclose, + ondestroy + ); + } +} + +class Channel extends Client { + private protocol: string; + private id: any; + private handshake: any; + private onopen?: Function; + private onclose?: Function; + private ondestroy?: Function; + private _ready?: Promise; + + private _send?: (data?: any) => void; + + private _opened = defer(); + + private _queue: Message[] = []; + + private _inited = false; + + constructor( + mux: Protomux, + protocol: string, + id: any, + handshake: any, + onopen?: Function, + onclose?: Function, + ondestroy?: Function + ) { + super(); + this._mux = mux; + this.protocol = protocol; + this.id = id; + this.handshake = handshake; + this.onopen = onopen; + this.onclose = onclose; + this.ondestroy = ondestroy; + } + + private _mux: Protomux; + + get mux(): Protomux { + return this._mux; + } + + private _channelId = -1; + + get channelId(): number { + return this._channelId; + } + + private async init(): Promise { + if (this._inited) { + return; + } + + this._inited = true; + + const created = defer(); + const [update, ret] = this.connectModule( + "createProtomuxChannel", + { + id: this._mux.stream.id, + data: { + protocol: this.protocol, + id: this.id, + handshake: this.handshake, + onopen: !!this.onopen, + onclose: !!this.onclose, + ondestroy: !!this.ondestroy, + }, + }, + (data: any) => { + switch (data.action) { + case "open": + this._opened.resolve(); + break; + case "onopen": + this.onopen?.(...data.args); + break; + case "onclose": + this.onclose?.(...data.args); + break; + case "ondestroy": + this.ondestroy?.(...data.args); + break; + default: + this._channelId = data; + created.resolve(); + } + } + ); + this._send = update; + + ret.catch((e) => created.reject(e)); + + this._ready = created.promise as Promise; + } + + async open(): Promise { + await this.init(); + await this._ready; + + while (this._queue.length) { + await this._queue.shift()?.init(); + } + + this._send?.({ action: "open" }); + + return this._opened.promise as Promise; + } + + public addMessage({ + encoding = undefined, + onmessage, + }: { + encoding?: any; + onmessage: Function; + }) { + return createMessage({ channel: this, encoding, onmessage }); + } + + public async queueMessage(message: Message) { + this._queue.push(message); + } +} + +class Message extends Client { + private encoding: any; + private onmessage: Function; + private channel: Channel; + + private _send?: (data?: any) => void; + + constructor({ + channel, + encoding = undefined, + onmessage = () => {}, + }: { + channel: Channel; + encoding?: any; + onmessage: Function; + }) { + super(); + this.channel = channel; + this.encoding = encoding; + this.onmessage = onmessage; + this.channel.queueMessage(this); + } + + async init(): Promise { + const created = defer(); + + await this.loadLibs(MODULE); + + const [update] = this.connectModule( + "createProtomuxMessage", + { + id: this.channel.mux.stream.id, + channelId: this.channel.channelId, + data: { + encoding: !!this.encoding, + onmessage: !!this.onmessage, + }, + }, + async (data: any) => { + switch (data.action) { + case "encode": + update({ + action: "encode", + args: [await this.encoding.encode?.(...data.args), data.args[0]], + }); + break; + case "decode": + update({ + action: "decode", + args: [await this.encoding.decode?.(...data.args), data.args[0]], + }); + break; + case "preencode": + update({ + action: "preencode", + args: [ + await this.encoding.preencode?.(...data.args), + data.args[0], + ], + }); + break; + case "onmessage": + this.onmessage?.(...data.args); + break; + case "created": + created.resolve(); + break; + } + } + ); + + this._send = update; + + return created.promise as Promise; + } + + public send(data: any) { + this._send?.({ action: "send", args: [data] }); + } +} + +const createChannel = factory(Channel, MODULE); +const createMessage = factory(Message, MODULE); + +export = Protomux;