Compare commits

...

15 Commits

6 changed files with 556 additions and 1 deletions

View File

@ -1,6 +1,6 @@
MIT License
Copyright (c) <year> <copyright holders>
Copyright (c) 2023 Hammer Technologies LLC
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

57
dist/index.d.ts vendored Normal file
View File

@ -0,0 +1,57 @@
import { Client } from "@lumeweb/libkernel-universal";
export default class Protomux {
private isProtomux;
constructor(stream: any);
private _stream;
get stream(): any;
static from(stream: any): any;
createChannel({ protocol, id, handshake, onopen, onclose, ondestroy, }: {
protocol: string;
id: any;
handshake: any;
onopen?: Function;
onclose?: Function;
ondestroy?: Function;
}): Promise<Channel>;
}
declare class Channel extends Client {
private protocol;
private id;
private handshake;
private onopen?;
private onclose?;
private ondestroy?;
private _created;
private _send?;
private _queue;
private _inited;
constructor(mux: Protomux, protocol: string, id: any, handshake: any, onopen?: Function, onclose?: Function, ondestroy?: Function);
private _ready;
get ready(): Promise<void>;
private _mux;
get mux(): Protomux;
private _channelId;
get channelId(): number;
open(): Promise<void>;
addMessage({ encoding, onmessage, }: {
encoding?: any;
onmessage: Function;
}): Message;
queueMessage(message: Message): Promise<void>;
private init;
destroy(error: Error): void;
}
declare class Message extends Client {
private encoding;
private onmessage;
private channel;
private _send?;
constructor({ channel, encoding, onmessage, }: {
channel: Channel;
encoding?: any;
onmessage: Function;
});
init(): Promise<void>;
send(data: any): void;
}
export {};

189
dist/index.js vendored Normal file
View File

@ -0,0 +1,189 @@
import { Client, factory } from "@lumeweb/libkernel-universal";
import { MODULE } from "@lumeweb/kernel-swarm-client";
import defer from "p-defer";
import { Buffer } from "buffer";
export default class Protomux {
isProtomux = true;
constructor(stream) {
this._stream = stream;
if (!stream.userData) {
stream.userData = this;
}
}
_stream;
get stream() {
return this._stream;
}
static from(stream) {
if (stream.userData && stream.userData.isProtomux)
return stream.userData;
if (stream.isProtomux)
return stream;
return new this(stream);
}
async createChannel({ protocol, id = null, handshake = null, onopen = undefined, onclose = undefined, ondestroy = undefined, }) {
return createChannel(this, protocol, id, handshake, onopen, onclose, ondestroy);
}
}
class Channel extends Client {
protocol;
id;
handshake;
onopen;
onclose;
ondestroy;
_created = defer();
_send;
_queue = [];
_inited = false;
constructor(mux, protocol, id, handshake, onopen, onclose, ondestroy) {
super();
this._mux = mux;
this.protocol = protocol;
this.id = id;
this.handshake = handshake;
this.onopen = onopen;
this.onclose = onclose;
this.ondestroy = ondestroy;
}
_ready = defer();
get ready() {
return this._ready.promise;
}
_mux;
get mux() {
return this._mux;
}
_channelId = -1;
get channelId() {
return this._channelId;
}
async open() {
await this.init();
await this._created;
while (this._queue.length) {
await this._queue.shift()?.init();
}
this._ready.resolve();
}
addMessage({ encoding = undefined, onmessage, }) {
return createMessage({ channel: this, encoding, onmessage });
}
async queueMessage(message) {
this._queue.push(message);
}
async init() {
if (this._inited) {
return;
}
this._inited = true;
const [update, ret] = this.connectModule("createProtomuxChannel", {
id: this._mux.stream.id,
data: {
protocol: this.protocol,
id: this.id,
handshake: this.handshake,
onopen: !!this.onopen,
onclose: !!this.onclose,
ondestroy: !!this.ondestroy,
},
}, (data) => {
switch (data.action) {
case "onopen":
this.onopen?.(...data.args);
break;
case "onclose":
this.onclose?.(...data.args);
break;
case "ondestroy":
this.ondestroy?.(...data.args);
break;
default:
this._channelId = data;
this._created.resolve();
}
});
this._send = update;
ret.catch((e) => this._created.reject(e));
return this._created.promise;
}
destroy(error) {
this._send?.({ action: "destroy", args: [error] });
}
}
class Message extends Client {
encoding;
onmessage;
channel;
_send;
constructor({ channel, encoding = undefined, onmessage = () => { }, }) {
super();
this.channel = channel;
this.encoding = encoding;
this.onmessage = onmessage;
this.channel.queueMessage(this);
}
async init() {
const created = defer();
await this.loadLibs(MODULE);
const [update] = this.connectModule("createProtomuxMessage", {
id: this.channel.mux.stream.id,
channelId: this.channel.channelId,
data: {
encoding: !!this.encoding,
onmessage: !!this.onmessage,
},
}, async (data) => {
if (data.args) {
data.args = data.args.filter((arg) => {
if (arg instanceof Uint8Array) {
return Buffer.from(arg);
}
return arg;
});
}
if (data?.args && data?.args[0]?.buffer instanceof Uint8Array) {
data.args[0].buffer = Buffer.from(data.args[0].buffer);
}
switch (data.action) {
case "encode":
update({
id: data.id,
action: "encode",
args: [await this.encoding.encode?.(...data.args), data.args[0]],
});
break;
case "decode":
update({
id: data.id,
action: "decode",
args: [await this.encoding.decode?.(...data.args), data.args[0]],
});
break;
case "preencode":
update({
id: data.id,
action: "preencode",
args: [
await this.encoding.preencode?.(...data.args),
data.args[0],
],
});
break;
case "onmessage":
this.onmessage?.(...data.args);
break;
case "created":
created.resolve();
break;
}
});
this._send = update;
return created.promise;
}
send(data) {
this._send?.({ action: "send", args: [data] });
}
}
const createChannel = factory(Channel, MODULE);
const createMessage = factory(Message, MODULE);

17
package.json Normal file
View File

@ -0,0 +1,17 @@
{
"name": "@lumeweb/kernel-protomux-client",
"version": "0.1.0",
"main": "dist/index.js",
"dependencies": {
"@lumeweb/kernel-swarm-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git",
"@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git",
"b4a": "^1.6.3",
"buffer": "^6.0.3",
"p-defer": "^4.0.0"
},
"devDependencies": {
"@types/b4a": "^1.6.0",
"prettier": "^2.8.7",
"typescript": "^5.0.4"
}
}

279
src/index.ts Normal file
View File

@ -0,0 +1,279 @@
import { Client, factory } from "@lumeweb/libkernel-universal";
import { MODULE } from "@lumeweb/kernel-swarm-client";
import defer from "p-defer";
import b4a from "b4a";
import { Buffer } from "buffer";
export default class Protomux {
private isProtomux = true;
constructor(stream: any) {
this._stream = stream;
if (!stream.userData) {
stream.userData = this;
}
}
private _stream: any;
get stream(): any {
return this._stream;
}
static from(stream: any) {
if (stream.userData && stream.userData.isProtomux) return stream.userData;
if (stream.isProtomux) return stream;
return new this(stream);
}
public async createChannel({
protocol,
id = null,
handshake = null,
onopen = undefined,
onclose = undefined,
ondestroy = undefined,
}: {
protocol: string;
id: any;
handshake: any;
onopen?: Function;
onclose?: Function;
ondestroy?: Function;
}) {
return createChannel(
this,
protocol,
id,
handshake,
onopen,
onclose,
ondestroy
);
}
}
class Channel extends Client {
private protocol: string;
private id: any;
private handshake: any;
private onopen?: Function;
private onclose?: Function;
private ondestroy?: Function;
private _created = defer();
private _send?: (data?: any) => void;
private _queue: Message[] = [];
private _inited = false;
constructor(
mux: Protomux,
protocol: string,
id: any,
handshake: any,
onopen?: Function,
onclose?: Function,
ondestroy?: Function
) {
super();
this._mux = mux;
this.protocol = protocol;
this.id = id;
this.handshake = handshake;
this.onopen = onopen;
this.onclose = onclose;
this.ondestroy = ondestroy;
}
private _ready = defer();
get ready(): Promise<void> {
return this._ready.promise as Promise<void>;
}
private _mux: Protomux;
get mux(): Protomux {
return this._mux;
}
private _channelId = -1;
get channelId(): number {
return this._channelId;
}
async open(): Promise<void> {
await this.init();
await this._created;
while (this._queue.length) {
await this._queue.shift()?.init();
}
this._ready.resolve();
}
public addMessage({
encoding = undefined,
onmessage,
}: {
encoding?: any;
onmessage: Function;
}) {
return createMessage({ channel: this, encoding, onmessage });
}
public async queueMessage(message: Message) {
this._queue.push(message);
}
private async init(): Promise<void> {
if (this._inited) {
return;
}
this._inited = true;
const [update, ret] = this.connectModule(
"createProtomuxChannel",
{
id: this._mux.stream.id,
data: {
protocol: this.protocol,
id: this.id,
handshake: this.handshake,
onopen: !!this.onopen,
onclose: !!this.onclose,
ondestroy: !!this.ondestroy,
},
},
(data: any) => {
switch (data.action) {
case "onopen":
this.onopen?.(...data.args);
break;
case "onclose":
this.onclose?.(...data.args);
break;
case "ondestroy":
this.ondestroy?.(...data.args);
break;
default:
this._channelId = data;
this._created.resolve();
}
}
);
this._send = update;
ret.catch((e) => this._created.reject(e));
return this._created.promise as Promise<void>;
}
public destroy(error: Error) {
this._send?.({ action: "destroy", args: [error] });
}
}
class Message extends Client {
private encoding: any;
private onmessage: Function;
private channel: Channel;
private _send?: (data?: any) => void;
constructor({
channel,
encoding = undefined,
onmessage = () => {},
}: {
channel: Channel;
encoding?: any;
onmessage: Function;
}) {
super();
this.channel = channel;
this.encoding = encoding;
this.onmessage = onmessage;
this.channel.queueMessage(this);
}
async init(): Promise<void> {
const created = defer();
await this.loadLibs(MODULE);
const [update] = this.connectModule(
"createProtomuxMessage",
{
id: this.channel.mux.stream.id,
channelId: this.channel.channelId,
data: {
encoding: !!this.encoding,
onmessage: !!this.onmessage,
},
},
async (data: any) => {
if (data.args) {
data.args = data.args.filter((arg: any) => {
if (arg instanceof Uint8Array) {
return Buffer.from(arg);
}
return arg;
});
}
if (data?.args && data?.args[0]?.buffer instanceof Uint8Array) {
data.args[0].buffer = Buffer.from(data.args[0].buffer);
}
switch (data.action) {
case "encode":
update({
id: data.id,
action: "encode",
args: [await this.encoding.encode?.(...data.args), data.args[0]],
});
break;
case "decode":
update({
id: data.id,
action: "decode",
args: [await this.encoding.decode?.(...data.args), data.args[0]],
});
break;
case "preencode":
update({
id: data.id,
action: "preencode",
args: [
await this.encoding.preencode?.(...data.args),
data.args[0],
],
});
break;
case "onmessage":
this.onmessage?.(...data.args);
break;
case "created":
created.resolve();
break;
}
}
);
this._send = update;
return created.promise as Promise<void>;
}
public send(data: any) {
this._send?.({ action: "send", args: [data] });
}
}
const createChannel = factory<Channel>(Channel, MODULE);
const createMessage = factory<Message>(Message, MODULE);

13
tsconfig.json Normal file
View File

@ -0,0 +1,13 @@
{
"compilerOptions": {
"target": "esnext",
"declaration": true,
"moduleResolution": "node",
"outDir": "./dist",
"strict": true,
"allowSyntheticDefaultImports": true,
"esModuleInterop": true
},
"include": ["src"],
"exclude": ["node_modules", "**/__tests__/*"]
}