From 053e309d98c2f1c3edbcab78d8bed84a426fc040 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Wed, 5 Apr 2023 02:41:57 -0400 Subject: [PATCH] *Initial version of syncProtomux api protocol that will keep Protomux channel/message tracking in sync between workers --- package.json | 3 +++ src/index.ts | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/package.json b/package.json index 6cd6b03..99b55a9 100644 --- a/package.json +++ b/package.json @@ -14,14 +14,17 @@ "type": "module", "dependencies": { "@lumeweb/hyperswarm-web": "git+https://git.lumeweb.com/LumeWeb/hyperswarm-web.git", + "@lumeweb/rpc": "git+https://git.lumeweb.com/LumeWeb/rpc.git", "@noble/ed25519": "^1.7.3", "@peculiar/webcrypto": "git+https://git.lumeweb.com/LumeWeb/webcrypto.git", + "async-mutex": "^0.4.0", "b4a": "^1.6.3", "eventemitter2": "^6.4.9", "hyperswarm": "^4.4.0", "libkmodule": "^0.2.53", "libskynet": "^0.0.62", "noise-handshake": "^3.0.2", + "protomux": "^3.4.1", "randombytes": "github:LumeWeb/randombytes-browser" }, "devDependencies": { diff --git a/src/index.ts b/src/index.ts index c3128b9..eacb615 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,6 +9,9 @@ import b4a from "b4a"; import { pubKeyToIpv6 } from "./addr.js"; import { EventEmitter2 as EventEmitter } from "eventemitter2"; import { logErr } from "libkmodule/dist"; +// @ts-ignore +import Protomux from "protomux"; +import { Mutex } from "async-mutex"; const MAX_PEER_LISTENERS = 20; @@ -68,6 +71,9 @@ addHandler("socketListenEvent", handleSocketListenEvent, { }); addHandler("socketWrite", handleWriteSocketEvent); addHandler("socketClose", handleCloseSocketEvent); +addHandler("syncProtomux", handleSyncProtomux, { + receiveUpdates: true, +}); async function handlePresentSeed(aq: ActiveQuery) { const pubkey = await ed.getPublicKey(aq.callerInput.rootKey); handlePresentSeedModule({ @@ -392,6 +398,69 @@ async function handleGetSocketInfo(aq: ActiveQuery) { }); } +async function handleSyncProtomux(aq: ActiveQuery) { + const socket = validateConnection(aq); + + if (!socket) { + return; + } + + const mux = Protomux.from(socket); + const mutex: Mutex = mux.mutex ?? new Mutex(); + if (!mux.mutex) { + mux.mutex = mutex; + } + + socket.once("close", () => { + socket.off("syncProtomux", sync); + aq.respond(); + }); + + const send = (mux: any) => { + aq.sendUpdate({ + remote: Object.keys(mux._remote), + local: Object.keys(mux._local), + free: mux._free, + }); + }; + + const sync = () => send(mux); + + mux.syncState = send.bind(undefined, mux); + + sync(); + + socket.on("syncProtomux", sync); + + aq.setReceiveUpdate?.(async (data: any) => { + await mutex.acquire(); + + ["remote", "local"].forEach((field) => { + const rField = `_${field}`; + data[field].forEach((item: any) => { + if (!mux[rField][item]) { + while (item > mux[rField].length) { + mux[rField].push(null); + } + } + if (!mux[rField][item]) { + mux[rField][item] = null; + } + }); + }); + + data.free.forEach((index: number) => { + if (mux._free[index] === null) { + mux._free[index] = undefined; + } + }); + mux._free = mux._free.filter((item: any) => item !== undefined); + socket.emit("syncProtomux"); + + mutex.release(); + }); +} + function getSwarmToSocketConnectionId(socket: any) { for (const conn of connections) { if (conn[1].conn === socket) {