* Add import for b4a, create a _created Promise, replace _opened Promise with _created Promise, and add getter for ready Promise. Also, add methods to add and queue messages, and fix package.json dependencies.

This commit is contained in:
Derrick Hammer 2023-04-08 20:18:56 -04:00
parent 876d56b25a
commit 35df37fdff
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 45 additions and 43 deletions

View File

@ -12,15 +12,16 @@
"@lumeweb/interface-relay": "git+https://git.lumeweb.com/LumeWeb/interface-relay.git", "@lumeweb/interface-relay": "git+https://git.lumeweb.com/LumeWeb/interface-relay.git",
"@lumeweb/kernel-swarm-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git", "@lumeweb/kernel-swarm-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git",
"@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git", "@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git",
"@lumeweb/protomux-rpc-web": "git+https://git.lumeweb.com/LumeWeb/protomux-rpc-web.git",
"@lumeweb/rpc-client": "git+https://git.lumeweb.com/LumeWeb/rpc-client.git", "@lumeweb/rpc-client": "git+https://git.lumeweb.com/LumeWeb/rpc-client.git",
"b4a": "^1.6.3", "b4a": "^1.6.3",
"libkmodule": "^0.2.53", "libkmodule": "^0.2.53",
"p-defer": "^4.0.0", "p-defer": "^4.0.0"
"protomux": "git+https://git.lumeweb.com/LumeWeb/protomux.git"
}, },
"devDependencies": { "devDependencies": {
"@scure/bip39": "^1.2.0", "@scure/bip39": "^1.2.0",
"@skynetlabs/skynet-nodejs": "^2.9.0", "@skynetlabs/skynet-nodejs": "^2.9.0",
"@types/b4a": "^1.6.0",
"@types/node": "^18.15.11", "@types/node": "^18.15.11",
"@types/read": "^0.0.29", "@types/read": "^0.0.29",
"cli-progress": "^3.12.0", "cli-progress": "^3.12.0",
@ -31,6 +32,7 @@
}, },
"browser": { "browser": {
"timers": "timers-browserify", "timers": "timers-browserify",
"protomux": "./src/protomux.ts" "protomux": "./src/protomux.ts",
"protomux-rpc": "@lumeweb/protomux-rpc-web"
} }
} }

View File

@ -2,6 +2,7 @@ import { Client, factory } from "@lumeweb/libkernel-universal";
const MODULE = "_AXYJDzn2fjd-YfuchEegna7iErhun6QwQK7gSa3UNjHvw"; const MODULE = "_AXYJDzn2fjd-YfuchEegna7iErhun6QwQK7gSa3UNjHvw";
import defer from "p-defer"; import defer from "p-defer";
import b4a from "b4a";
class Protomux { class Protomux {
private isProtomux = true; private isProtomux = true;
@ -59,14 +60,9 @@ class Channel extends Client {
private onopen?: Function; private onopen?: Function;
private onclose?: Function; private onclose?: Function;
private ondestroy?: Function; private ondestroy?: Function;
private _ready?: Promise<void>; private _created = defer();
private _send?: (data?: any) => void; private _send?: (data?: any) => void;
private _opened = defer();
private _queue: Message[] = []; private _queue: Message[] = [];
private _inited = false; private _inited = false;
constructor( constructor(
@ -88,6 +84,12 @@ class Channel extends Client {
this.ondestroy = ondestroy; this.ondestroy = ondestroy;
} }
private _ready = defer();
get ready(): Promise<void> {
return this._ready.promise as Promise<void>;
}
private _mux: Protomux; private _mux: Protomux;
get mux(): Protomux { get mux(): Protomux {
@ -100,6 +102,31 @@ class Channel extends Client {
return this._channelId; return this._channelId;
} }
async open(): Promise<void> {
await this.init();
await this._created;
while (this._queue.length) {
await this._queue.shift()?.init();
}
this._ready.resolve();
}
public addMessage({
encoding = undefined,
onmessage,
}: {
encoding?: any;
onmessage: Function;
}) {
return createMessage({ channel: this, encoding, onmessage });
}
public async queueMessage(message: Message) {
this._queue.push(message);
}
private async init(): Promise<void> { private async init(): Promise<void> {
if (this._inited) { if (this._inited) {
return; return;
@ -107,7 +134,6 @@ class Channel extends Client {
this._inited = true; this._inited = true;
const created = defer();
const [update, ret] = this.connectModule( const [update, ret] = this.connectModule(
"createProtomuxChannel", "createProtomuxChannel",
{ {
@ -123,9 +149,6 @@ class Channel extends Client {
}, },
(data: any) => { (data: any) => {
switch (data.action) { switch (data.action) {
case "open":
this._opened.resolve();
break;
case "onopen": case "onopen":
this.onopen?.(...data.args); this.onopen?.(...data.args);
break; break;
@ -137,42 +160,15 @@ class Channel extends Client {
break; break;
default: default:
this._channelId = data; this._channelId = data;
created.resolve(); this._created.resolve();
} }
} }
); );
this._send = update; this._send = update;
ret.catch((e) => created.reject(e)); ret.catch((e) => this._created.reject(e));
this._ready = created.promise as Promise<void>; return this._created.promise as Promise<void>;
}
async open(): Promise<void> {
await this.init();
await this._ready;
while (this._queue.length) {
await this._queue.shift()?.init();
}
this._send?.({ action: "open" });
return this._opened.promise as Promise<void>;
}
public addMessage({
encoding = undefined,
onmessage,
}: {
encoding?: any;
onmessage: Function;
}) {
return createMessage({ channel: this, encoding, onmessage });
}
public async queueMessage(message: Message) {
this._queue.push(message);
} }
} }
@ -215,6 +211,9 @@ class Message extends Client {
}, },
}, },
async (data: any) => { async (data: any) => {
if (data?.args && data?.args[0] instanceof Uint8Array) {
data.args[0] = b4a.from(data.args[0]);
}
switch (data.action) { switch (data.action) {
case "encode": case "encode":
update({ update({
@ -260,4 +259,5 @@ class Message extends Client {
const createChannel = factory<Channel>(Channel, MODULE); const createChannel = factory<Channel>(Channel, MODULE);
const createMessage = factory<Message>(Message, MODULE); const createMessage = factory<Message>(Message, MODULE);
// @ts-ignore
export = Protomux; export = Protomux;