*Initial version, extracted from kernel-swarm-client

This commit is contained in:
Derrick Hammer 2023-04-08 20:41:38 -04:00
parent 31ec4203fb
commit 5f3e9a465f
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
4 changed files with 289 additions and 1 deletions

View File

@ -1,6 +1,6 @@
MIT License
Copyright (c) <year> <copyright holders>
Copyright (c) 2023 Hammer Technologies LLC
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

14
package.json Normal file
View File

@ -0,0 +1,14 @@
{
"name": "@lumeweb/kernel-protomux-client",
"dependencies": {
"@lumeweb/kernel-swarm-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git",
"@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git",
"b4a": "^1.6.3",
"p-defer": "^4.0.0"
},
"devDependencies": {
"@types/b4a": "^1.6.0",
"prettier": "^2.8.7",
"typescript": "^5.0.4"
}
}

261
src/index.ts Normal file
View File

@ -0,0 +1,261 @@
import { Client, factory } from "@lumeweb/libkernel-universal";
import { MODULE } from "@lumeweb/kernel-swarm-client";
import defer from "p-defer";
import b4a from "b4a";
export default class Protomux {
private isProtomux = true;
constructor(stream: any) {
this._stream = stream;
if (!stream.userData) {
stream.userData = this;
}
}
private _stream: any;
get stream(): any {
return this._stream;
}
static from(stream: any) {
if (stream.userData && stream.userData.isProtomux) return stream.userData;
if (stream.isProtomux) return stream;
return new this(stream);
}
public async createChannel({
protocol,
id = null,
handshake = null,
onopen = undefined,
onclose = undefined,
ondestroy = undefined,
}: {
protocol: string;
id: any;
handshake: any;
onopen?: Function;
onclose?: Function;
ondestroy?: Function;
}) {
return createChannel(
this,
protocol,
id,
handshake,
onopen,
onclose,
ondestroy
);
}
}
class Channel extends Client {
private protocol: string;
private id: any;
private handshake: any;
private onopen?: Function;
private onclose?: Function;
private ondestroy?: Function;
private _created = defer();
private _send?: (data?: any) => void;
private _queue: Message[] = [];
private _inited = false;
constructor(
mux: Protomux,
protocol: string,
id: any,
handshake: any,
onopen?: Function,
onclose?: Function,
ondestroy?: Function
) {
super();
this._mux = mux;
this.protocol = protocol;
this.id = id;
this.handshake = handshake;
this.onopen = onopen;
this.onclose = onclose;
this.ondestroy = ondestroy;
}
private _ready = defer();
get ready(): Promise<void> {
return this._ready.promise as Promise<void>;
}
private _mux: Protomux;
get mux(): Protomux {
return this._mux;
}
private _channelId = -1;
get channelId(): number {
return this._channelId;
}
async open(): Promise<void> {
await this.init();
await this._created;
while (this._queue.length) {
await this._queue.shift()?.init();
}
this._ready.resolve();
}
public addMessage({
encoding = undefined,
onmessage,
}: {
encoding?: any;
onmessage: Function;
}) {
return createMessage({ channel: this, encoding, onmessage });
}
public async queueMessage(message: Message) {
this._queue.push(message);
}
private async init(): Promise<void> {
if (this._inited) {
return;
}
this._inited = true;
const [update, ret] = this.connectModule(
"createProtomuxChannel",
{
id: this._mux.stream.id,
data: {
protocol: this.protocol,
id: this.id,
handshake: this.handshake,
onopen: !!this.onopen,
onclose: !!this.onclose,
ondestroy: !!this.ondestroy,
},
},
(data: any) => {
switch (data.action) {
case "onopen":
this.onopen?.(...data.args);
break;
case "onclose":
this.onclose?.(...data.args);
break;
case "ondestroy":
this.ondestroy?.(...data.args);
break;
default:
this._channelId = data;
this._created.resolve();
}
}
);
this._send = update;
ret.catch((e) => this._created.reject(e));
return this._created.promise as Promise<void>;
}
}
class Message extends Client {
private encoding: any;
private onmessage: Function;
private channel: Channel;
private _send?: (data?: any) => void;
constructor({
channel,
encoding = undefined,
onmessage = () => {},
}: {
channel: Channel;
encoding?: any;
onmessage: Function;
}) {
super();
this.channel = channel;
this.encoding = encoding;
this.onmessage = onmessage;
this.channel.queueMessage(this);
}
async init(): Promise<void> {
const created = defer();
await this.loadLibs(MODULE);
const [update] = this.connectModule(
"createProtomuxMessage",
{
id: this.channel.mux.stream.id,
channelId: this.channel.channelId,
data: {
encoding: !!this.encoding,
onmessage: !!this.onmessage,
},
},
async (data: any) => {
if (data?.args && data?.args[0] instanceof Uint8Array) {
data.args[0] = b4a.from(data.args[0]);
}
switch (data.action) {
case "encode":
update({
action: "encode",
args: [await this.encoding.encode?.(...data.args), data.args[0]],
});
break;
case "decode":
update({
action: "decode",
args: [await this.encoding.decode?.(...data.args), data.args[0]],
});
break;
case "preencode":
update({
action: "preencode",
args: [
await this.encoding.preencode?.(...data.args),
data.args[0],
],
});
break;
case "onmessage":
this.onmessage?.(...data.args);
break;
case "created":
created.resolve();
break;
}
}
);
this._send = update;
return created.promise as Promise<void>;
}
public send(data: any) {
this._send?.({ action: "send", args: [data] });
}
}
const createChannel = factory<Channel>(Channel, MODULE);
const createMessage = factory<Message>(Message, MODULE);

13
tsconfig.json Normal file
View File

@ -0,0 +1,13 @@
{
"compilerOptions": {
"target": "esnext",
"declaration": true,
"moduleResolution": "node",
"outDir": "./dist",
"strict": true,
"allowSyntheticDefaultImports": true,
"esModuleInterop": true
},
"include": ["src"],
"exclude": ["node_modules", "**/__tests__/*"]
}