From 35df37fdff377affd5c82dbb7a0fd8e308b905be Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sat, 8 Apr 2023 20:18:56 -0400 Subject: [PATCH] * Add import for b4a, create a _created Promise, replace _opened Promise with _created Promise, and add getter for ready Promise. Also, add methods to add and queue messages, and fix package.json dependencies. --- package.json | 8 +++-- src/protomux.ts | 80 ++++++++++++++++++++++++------------------------- 2 files changed, 45 insertions(+), 43 deletions(-) diff --git a/package.json b/package.json index 81203ef..0af7318 100644 --- a/package.json +++ b/package.json @@ -12,15 +12,16 @@ "@lumeweb/interface-relay": "git+https://git.lumeweb.com/LumeWeb/interface-relay.git", "@lumeweb/kernel-swarm-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git", "@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git", + "@lumeweb/protomux-rpc-web": "git+https://git.lumeweb.com/LumeWeb/protomux-rpc-web.git", "@lumeweb/rpc-client": "git+https://git.lumeweb.com/LumeWeb/rpc-client.git", "b4a": "^1.6.3", "libkmodule": "^0.2.53", - "p-defer": "^4.0.0", - "protomux": "git+https://git.lumeweb.com/LumeWeb/protomux.git" + "p-defer": "^4.0.0" }, "devDependencies": { "@scure/bip39": "^1.2.0", "@skynetlabs/skynet-nodejs": "^2.9.0", + "@types/b4a": "^1.6.0", "@types/node": "^18.15.11", "@types/read": "^0.0.29", "cli-progress": "^3.12.0", @@ -31,6 +32,7 @@ }, "browser": { "timers": "timers-browserify", - "protomux": "./src/protomux.ts" + "protomux": "./src/protomux.ts", + "protomux-rpc": "@lumeweb/protomux-rpc-web" } } diff --git a/src/protomux.ts b/src/protomux.ts index a07a264..9cbdf6b 100644 --- a/src/protomux.ts +++ b/src/protomux.ts @@ -2,6 +2,7 @@ import { Client, factory } from "@lumeweb/libkernel-universal"; const MODULE = "_AXYJDzn2fjd-YfuchEegna7iErhun6QwQK7gSa3UNjHvw"; import defer from "p-defer"; +import b4a from "b4a"; class Protomux { private isProtomux = true; @@ -59,14 +60,9 @@ class Channel extends Client { private onopen?: Function; private onclose?: Function; private ondestroy?: Function; - private _ready?: Promise; - + private _created = defer(); private _send?: (data?: any) => void; - - private _opened = defer(); - private _queue: Message[] = []; - private _inited = false; constructor( @@ -88,6 +84,12 @@ class Channel extends Client { this.ondestroy = ondestroy; } + private _ready = defer(); + + get ready(): Promise { + return this._ready.promise as Promise; + } + private _mux: Protomux; get mux(): Protomux { @@ -100,6 +102,31 @@ class Channel extends Client { return this._channelId; } + async open(): Promise { + await this.init(); + await this._created; + + while (this._queue.length) { + await this._queue.shift()?.init(); + } + + this._ready.resolve(); + } + + public addMessage({ + encoding = undefined, + onmessage, + }: { + encoding?: any; + onmessage: Function; + }) { + return createMessage({ channel: this, encoding, onmessage }); + } + + public async queueMessage(message: Message) { + this._queue.push(message); + } + private async init(): Promise { if (this._inited) { return; @@ -107,7 +134,6 @@ class Channel extends Client { this._inited = true; - const created = defer(); const [update, ret] = this.connectModule( "createProtomuxChannel", { @@ -123,9 +149,6 @@ class Channel extends Client { }, (data: any) => { switch (data.action) { - case "open": - this._opened.resolve(); - break; case "onopen": this.onopen?.(...data.args); break; @@ -137,42 +160,15 @@ class Channel extends Client { break; default: this._channelId = data; - created.resolve(); + this._created.resolve(); } } ); this._send = update; - ret.catch((e) => created.reject(e)); + ret.catch((e) => this._created.reject(e)); - this._ready = created.promise as Promise; - } - - async open(): Promise { - await this.init(); - await this._ready; - - while (this._queue.length) { - await this._queue.shift()?.init(); - } - - this._send?.({ action: "open" }); - - return this._opened.promise as Promise; - } - - public addMessage({ - encoding = undefined, - onmessage, - }: { - encoding?: any; - onmessage: Function; - }) { - return createMessage({ channel: this, encoding, onmessage }); - } - - public async queueMessage(message: Message) { - this._queue.push(message); + return this._created.promise as Promise; } } @@ -215,6 +211,9 @@ class Message extends Client { }, }, async (data: any) => { + if (data?.args && data?.args[0] instanceof Uint8Array) { + data.args[0] = b4a.from(data.args[0]); + } switch (data.action) { case "encode": update({ @@ -260,4 +259,5 @@ class Message extends Client { const createChannel = factory(Channel, MODULE); const createMessage = factory(Message, MODULE); +// @ts-ignore export = Protomux;