From 1bd159c19e09b5eebb5a93f213e8c10e779a9136 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Thu, 6 Apr 2023 16:32:17 -0400 Subject: [PATCH] *Switch to a different, simpler method of syncing protomux state --- src/index.ts | 108 ++++++++++++++++++++++++++++----------------------- 1 file changed, 60 insertions(+), 48 deletions(-) diff --git a/src/index.ts b/src/index.ts index a9fbadb..1f39079 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,7 +12,6 @@ import { logErr } from "libkmodule/dist"; // @ts-ignore import Protomux from "protomux"; import { Mutex } from "async-mutex"; -import defer from "p-defer"; const MAX_PEER_LISTENERS = 20; @@ -38,6 +37,7 @@ let moduleReady: Promise = new Promise((resolve) => { }); onmessage = handleMessage; + function idFactory(start = 1) { let id = start; @@ -75,6 +75,7 @@ addHandler("socketClose", handleCloseSocketEvent); addHandler("syncProtomux", handleSyncProtomux, { receiveUpdates: true, }); + async function handlePresentSeed(aq: ActiveQuery) { const pubkey = await ed.getPublicKey(aq.callerInput.rootKey); handlePresentSeedModule({ @@ -295,6 +296,7 @@ async function handleJoin(aq: ActiveQuery) { swarm.join(topic, { server: false }); aq.respond(); } + async function handleGetPeerByPubkey(aq: ActiveQuery) { const { pubkey = null } = aq.callerInput; @@ -333,6 +335,7 @@ async function handleInit(aq: ActiveQuery) { aq.respond(); } + async function handleReady(aq: ActiveQuery) { const swarm = await getSwarm(aq); @@ -346,6 +349,7 @@ async function handleReady(aq: ActiveQuery) { aq.respond(); }); } + async function handleListenConnections(aq: ActiveQuery) { const swarm = await getSwarm(aq); const swarmId = getSwarmToSwarmId(swarm); @@ -415,62 +419,69 @@ async function handleSyncProtomux(aq: ActiveQuery) { return; } + if (!("action" in aq.callerInput)) { + aq.reject("action required"); + return; + } + if (!("data" in aq.callerInput)) { + aq.reject("data required"); + return; + } + + if (isNaN(parseInt(aq.callerInput))) { + aq.reject("data must be a number"); + return; + } + const mux = Protomux.from(socket); - const mutex: Mutex = mux.mutex ?? new Mutex(); - if (!mux.mutex) { + let mutex: Mutex | null = null; + if (mux.mutex) { + mutex = mux.mutex; + } + if (!mutex) { + mutex = new Mutex(); mux.mutex = mutex; } - socket.once("close", () => { - socket.off("syncProtomux", sync); - aq.respond(); - }); + await mutex.acquire(); - const send = (mux: any) => { - aq.sendUpdate({ - remote: Object.keys(mux._remote), - local: Object.keys(mux._local), - free: mux._free, - }); - }; + const data = aq.callerInput.data; - const sync = () => send(mux); + let ret; - if (!mux.syncState) { - mux.syncState = socket.emit.bind(socket, "syncProtomux"); + switch (aq.callerInput.action) { + case "popFree": + if (mux._free.includes(data)) { + mux._free[mux._free.indexOf(data)] = null; + mux._free = mux._free.filter((item: number) => item !== null); + } + break; + case "pushFree": + mux._free.push(data); + mux._free = Array.from(new Set(mux._free)); + break; + case "pushLocal": + if (typeof mux._local[data] === "undefined") { + mux._local[data] = null; + } + break; + case "pushRemote": + if (typeof mux._remote[data] === "undefined") { + mux._remote[data] = null; + } + break; + case "pullFree": + ret = Object.values(mux._free); + break; + case "pullLocal": + ret = Object.keys(mux._local); + break; + case "pullRemote": + ret = Object.keys(mux._remote); + break; } - socket.on("syncProtomux", sync); - - aq.setReceiveUpdate?.(async (data: any) => { - await mutex.acquire(); - - ["remote", "local"].forEach((field) => { - const rField = `_${field}`; - data[field].forEach((item: any) => { - item = parseInt(item); - if (typeof mux[rField][item] === "undefined") { - while (item > mux[rField].length) { - mux[rField].push(null); - } - } - if (typeof mux[rField][item] === "undefined") { - mux[rField][item] = null; - } - }); - }); - - data.free.forEach((index: number) => { - if (mux._free[index] === null) { - mux._free[index] = undefined; - } - }); - mux._free = mux._free.filter((item: any) => item !== undefined); - aq.sendUpdate(true); - socket.emit("syncProtomux"); - - mutex.release(); - }); + aq.respond(ret); } function getSwarmToSocketConnectionId(socket: any) { @@ -482,6 +493,7 @@ function getSwarmToSocketConnectionId(socket: any) { return false; } + function getSwarmToSwarmId(swarm: any) { for (const swarmInstance of swarmInstances) { if (swarmInstance[1] === swarm) {