*Initial version of syncProtomux api protocol that will keep Protomux channel/message tracking in sync between workers

This commit is contained in:
Derrick Hammer 2023-04-05 02:41:57 -04:00
parent 2f30b743f6
commit 053e309d98
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 72 additions and 0 deletions

View File

@ -14,14 +14,17 @@
"type": "module",
"dependencies": {
"@lumeweb/hyperswarm-web": "git+https://git.lumeweb.com/LumeWeb/hyperswarm-web.git",
"@lumeweb/rpc": "git+https://git.lumeweb.com/LumeWeb/rpc.git",
"@noble/ed25519": "^1.7.3",
"@peculiar/webcrypto": "git+https://git.lumeweb.com/LumeWeb/webcrypto.git",
"async-mutex": "^0.4.0",
"b4a": "^1.6.3",
"eventemitter2": "^6.4.9",
"hyperswarm": "^4.4.0",
"libkmodule": "^0.2.53",
"libskynet": "^0.0.62",
"noise-handshake": "^3.0.2",
"protomux": "^3.4.1",
"randombytes": "github:LumeWeb/randombytes-browser"
},
"devDependencies": {

View File

@ -9,6 +9,9 @@ import b4a from "b4a";
import { pubKeyToIpv6 } from "./addr.js";
import { EventEmitter2 as EventEmitter } from "eventemitter2";
import { logErr } from "libkmodule/dist";
// @ts-ignore
import Protomux from "protomux";
import { Mutex } from "async-mutex";
const MAX_PEER_LISTENERS = 20;
@ -68,6 +71,9 @@ addHandler("socketListenEvent", handleSocketListenEvent, {
});
addHandler("socketWrite", handleWriteSocketEvent);
addHandler("socketClose", handleCloseSocketEvent);
addHandler("syncProtomux", handleSyncProtomux, {
receiveUpdates: true,
});
async function handlePresentSeed(aq: ActiveQuery) {
const pubkey = await ed.getPublicKey(aq.callerInput.rootKey);
handlePresentSeedModule({
@ -392,6 +398,69 @@ async function handleGetSocketInfo(aq: ActiveQuery) {
});
}
async function handleSyncProtomux(aq: ActiveQuery) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
const mux = Protomux.from(socket);
const mutex: Mutex = mux.mutex ?? new Mutex();
if (!mux.mutex) {
mux.mutex = mutex;
}
socket.once("close", () => {
socket.off("syncProtomux", sync);
aq.respond();
});
const send = (mux: any) => {
aq.sendUpdate({
remote: Object.keys(mux._remote),
local: Object.keys(mux._local),
free: mux._free,
});
};
const sync = () => send(mux);
mux.syncState = send.bind(undefined, mux);
sync();
socket.on("syncProtomux", sync);
aq.setReceiveUpdate?.(async (data: any) => {
await mutex.acquire();
["remote", "local"].forEach((field) => {
const rField = `_${field}`;
data[field].forEach((item: any) => {
if (!mux[rField][item]) {
while (item > mux[rField].length) {
mux[rField].push(null);
}
}
if (!mux[rField][item]) {
mux[rField][item] = null;
}
});
});
data.free.forEach((index: number) => {
if (mux._free[index] === null) {
mux._free[index] = undefined;
}
});
mux._free = mux._free.filter((item: any) => item !== undefined);
socket.emit("syncProtomux");
mutex.release();
});
}
function getSwarmToSocketConnectionId(socket: any) {
for (const conn of connections) {
if (conn[1].conn === socket) {