*Refactor again to use an api for protomux channels and messages

This commit is contained in:
Derrick Hammer 2023-04-07 19:48:23 -04:00
parent 41751c7d2b
commit 875af27733
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 139 additions and 54 deletions

View File

@ -24,7 +24,8 @@
"libkmodule": "^0.2.53", "libkmodule": "^0.2.53",
"libskynet": "^0.0.62", "libskynet": "^0.0.62",
"noise-handshake": "^3.0.2", "noise-handshake": "^3.0.2",
"protomux": "git+https://git.lumeweb.com/LumeWeb/protomux.git", "p-defer": "^4.0.0",
"protomux": "^3.4.1",
"randombytes": "github:LumeWeb/randombytes-browser" "randombytes": "github:LumeWeb/randombytes-browser"
}, },
"devDependencies": { "devDependencies": {
@ -62,10 +63,5 @@
"sodium-universal": "@screamingvoid/sodium-universal", "sodium-universal": "@screamingvoid/sodium-universal",
"crypto": "crypto-browserify", "crypto": "crypto-browserify",
"stream": "stream-browserify" "stream": "stream-browserify"
},
"pnpm": {
"overrides": {
"protomux": "git+https://git.lumeweb.com/LumeWeb/protomux.git"
}
} }
} }

View File

@ -3,7 +3,7 @@ import Hyperswarm from "@lumeweb/hyperswarm-web";
import type { ActiveQuery } from "libkmodule"; import type { ActiveQuery } from "libkmodule";
import { addHandler, getSeed, handleMessage } from "libkmodule"; import { addHandler, getSeed, handleMessage } from "libkmodule";
import { handlePresentSeed as handlePresentSeedModule } from "libkmodule/dist/seed.js"; import { handlePresentSeed as handlePresentSeedModule } from "libkmodule/dist/seed.js";
import type { Buffer } from "buffer"; import { Buffer } from "buffer";
import * as ed from "@noble/ed25519"; import * as ed from "@noble/ed25519";
import b4a from "b4a"; import b4a from "b4a";
import { pubKeyToIpv6 } from "./addr.js"; import { pubKeyToIpv6 } from "./addr.js";
@ -11,13 +11,14 @@ import { EventEmitter2 as EventEmitter } from "eventemitter2";
import { logErr } from "libkmodule/dist"; import { logErr } from "libkmodule/dist";
// @ts-ignore // @ts-ignore
import Protomux from "protomux"; import Protomux from "protomux";
import { Mutex } from "async-mutex"; import defer, { DeferredPromise } from "p-defer";
const MAX_PEER_LISTENERS = 20; const MAX_PEER_LISTENERS = 20;
interface SwarmConnection { interface SwarmConnection {
swarm: number; swarm: number;
conn: any; conn: any;
channels: Map<number, Protomux>;
} }
interface SwarmEvents { interface SwarmEvents {
@ -50,6 +51,7 @@ function idFactory(start = 1) {
const getSwarmId = idFactory(); const getSwarmId = idFactory();
const getSocketId = idFactory(); const getSocketId = idFactory();
const getChannelId = idFactory();
addHandler("presentSeed", handlePresentSeed); addHandler("presentSeed", handlePresentSeed);
addHandler("join", handleJoin); addHandler("join", handleJoin);
@ -72,7 +74,10 @@ addHandler("socketListenEvent", handleSocketListenEvent, {
}); });
addHandler("socketWrite", handleWriteSocketEvent); addHandler("socketWrite", handleWriteSocketEvent);
addHandler("socketClose", handleCloseSocketEvent); addHandler("socketClose", handleCloseSocketEvent);
addHandler("syncProtomux", handleSyncProtomux, { addHandler("createProtomuxChannel", createProtomuxChannel, {
receiveUpdates: true,
});
addHandler("createProtomuxMessage", createProtomuxMessage, {
receiveUpdates: true, receiveUpdates: true,
}); });
@ -107,6 +112,7 @@ async function createSwarm(): Promise<number> {
connections.set(socketId, { connections.set(socketId, {
swarm: id, swarm: id,
conn: peer, conn: peer,
channels: new Map<number, Protomux>(),
}); });
peer.once("close", () => { peer.once("close", () => {
@ -418,69 +424,152 @@ async function handleGetSocketInfo(aq: ActiveQuery) {
}); });
} }
async function handleSyncProtomux(aq: ActiveQuery) { async function createProtomuxChannel(aq: ActiveQuery) {
const socket = validateConnection(aq); const socket = validateConnection(aq);
if (!socket) { if (!socket) {
return; return;
} }
if (!("action" in aq.callerInput)) { if (!("data" in aq.callerInput)) {
aq.reject("action required"); aq.reject("data required");
return; return;
} }
const mux = Protomux.from(socket); const mux = Protomux.from(socket);
let mutex: Mutex | null = null; const data = aq.callerInput.data;
if (mux.mutex) {
mutex = mux.mutex; const handleCallback = (name: string, enabled: boolean) => {
} if (!enabled && name !== "destroy") {
if (!mutex) { return undefined;
mutex = new Mutex(); }
mux.mutex = mutex; return (...args: any) => {
args = args.filter(
(item: any) => item.constructor.name.toLowerCase() !== "channel"
);
if (name === "destroy") {
connections.get(aq.callerInput.id)?.channels.delete(channelId);
aq.respond();
}
if (!enabled) {
return;
}
aq.sendUpdate({
action: name,
args,
});
};
};
aq.setReceiveUpdate?.((data: any) => {
switch (data.action) {
case "open":
channel.open();
}
});
const channel = mux.createChannel({
protocol: data?.protocol,
id: data?.id,
handshake: data?.handshake,
onopen: handleCallback("onopen", data?.onopen ?? undefined),
onclose: handleCallback("onclose", data?.onclose ?? undefined),
ondestroy: handleCallback("ondestroy", data?.ondestroy ?? undefined),
});
if (channel === null) {
aq.reject("duplicate channel");
return;
} }
await mutex.acquire(); const channelId = getChannelId();
connections.get(aq.callerInput.id)?.channels.set(channelId, channel);
aq.sendUpdate(channelId);
}
async function createProtomuxMessage(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
if (!("data" in aq.callerInput)) {
aq.reject("action required");
return;
}
if (!("channelId" in aq.callerInput)) {
aq.reject("channel id required");
return;
}
const channel = connections
.get(aq.callerInput.id)
?.channels.get(aq.callerInput.channelId);
if (!channel) {
aq.reject("invalid channel");
}
const data = aq.callerInput.data; const data = aq.callerInput.data;
let ret; const defers: { [action: string]: DeferredPromise<any> } = {};
switch (aq.callerInput.action) { const handleEncoding = (enabled: boolean) => {
case "popFree": if (!enabled) {
if (mux._free.includes(data)) { return undefined;
mux._free[mux._free.indexOf(data)] = null; }
mux._free = mux._free.filter((item: number) => item !== null);
}
break;
case "pushFree":
mux._free.push(data);
mux._free = Array.from(new Set(mux._free));
break;
case "pushLocal":
if (typeof mux._local[data] === "undefined") {
mux._local[data] = null;
}
break;
case "pushRemote":
if (typeof mux._remote[data] === "undefined") {
mux._remote[data] = null;
}
break;
case "pullFree":
ret = Object.values(mux._free);
break;
case "pullLocal":
ret = Object.keys(mux._local);
break;
case "pullRemote":
ret = Object.keys(mux._remote);
break;
}
aq.respond(ret); const update = async (action: string, args: any) => {
await defers[action]?.promise;
defers[action] = defer();
aq.sendUpdate({
action,
args,
});
mutex.release(); const ret = await defers[action]?.promise;
if (ret[1]) {
args[0].buffer = Buffer.from(ret[1].buffer);
args[0].start = ret[1].start;
args[0].end = ret[1].end;
}
return ret[0];
};
return {
async preencode(...args: any) {
return update("preencode", args);
},
async encode(...args: any) {
return update("encode", args);
},
async decode(...args: any) {
return update("encode", args);
},
};
};
aq.setReceiveUpdate?.((data) => {
defers[data.action]?.resolve(data.args[0]);
});
const message = channel.addMessage({
encoding: handleEncoding(data.encoding ?? false),
onmessage: data.encoding ?? undefined,
});
aq.sendUpdate({
action: "created",
});
} }
function getSwarmToSocketConnectionId(socket: any) { function getSwarmToSocketConnectionId(socket: any) {