*Switch to a different, simpler method of syncing protomux state

This commit is contained in:
Derrick Hammer 2023-04-06 16:32:17 -04:00
parent 2e62597cd6
commit 1bd159c19e
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 60 additions and 48 deletions

View File

@ -12,7 +12,6 @@ import { logErr } from "libkmodule/dist";
// @ts-ignore
import Protomux from "protomux";
import { Mutex } from "async-mutex";
import defer from "p-defer";
const MAX_PEER_LISTENERS = 20;
@ -38,6 +37,7 @@ let moduleReady: Promise<void> = new Promise((resolve) => {
});
onmessage = handleMessage;
function idFactory(start = 1) {
let id = start;
@ -75,6 +75,7 @@ addHandler("socketClose", handleCloseSocketEvent);
addHandler("syncProtomux", handleSyncProtomux, {
receiveUpdates: true,
});
async function handlePresentSeed(aq: ActiveQuery) {
const pubkey = await ed.getPublicKey(aq.callerInput.rootKey);
handlePresentSeedModule({
@ -295,6 +296,7 @@ async function handleJoin(aq: ActiveQuery) {
swarm.join(topic, { server: false });
aq.respond();
}
async function handleGetPeerByPubkey(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
@ -333,6 +335,7 @@ async function handleInit(aq: ActiveQuery) {
aq.respond();
}
async function handleReady(aq: ActiveQuery) {
const swarm = await getSwarm(aq);
@ -346,6 +349,7 @@ async function handleReady(aq: ActiveQuery) {
aq.respond();
});
}
async function handleListenConnections(aq: ActiveQuery) {
const swarm = await getSwarm(aq);
const swarmId = getSwarmToSwarmId(swarm);
@ -415,62 +419,69 @@ async function handleSyncProtomux(aq: ActiveQuery) {
return;
}
if (!("action" in aq.callerInput)) {
aq.reject("action required");
return;
}
if (!("data" in aq.callerInput)) {
aq.reject("data required");
return;
}
if (isNaN(parseInt(aq.callerInput))) {
aq.reject("data must be a number");
return;
}
const mux = Protomux.from(socket);
const mutex: Mutex = mux.mutex ?? new Mutex();
if (!mux.mutex) {
let mutex: Mutex | null = null;
if (mux.mutex) {
mutex = mux.mutex;
}
if (!mutex) {
mutex = new Mutex();
mux.mutex = mutex;
}
socket.once("close", () => {
socket.off("syncProtomux", sync);
aq.respond();
});
await mutex.acquire();
const send = (mux: any) => {
aq.sendUpdate({
remote: Object.keys(mux._remote),
local: Object.keys(mux._local),
free: mux._free,
});
};
const data = aq.callerInput.data;
const sync = () => send(mux);
let ret;
if (!mux.syncState) {
mux.syncState = socket.emit.bind(socket, "syncProtomux");
switch (aq.callerInput.action) {
case "popFree":
if (mux._free.includes(data)) {
mux._free[mux._free.indexOf(data)] = null;
mux._free = mux._free.filter((item: number) => item !== null);
}
break;
case "pushFree":
mux._free.push(data);
mux._free = Array.from(new Set(mux._free));
break;
case "pushLocal":
if (typeof mux._local[data] === "undefined") {
mux._local[data] = null;
}
break;
case "pushRemote":
if (typeof mux._remote[data] === "undefined") {
mux._remote[data] = null;
}
break;
case "pullFree":
ret = Object.values(mux._free);
break;
case "pullLocal":
ret = Object.keys(mux._local);
break;
case "pullRemote":
ret = Object.keys(mux._remote);
break;
}
socket.on("syncProtomux", sync);
aq.setReceiveUpdate?.(async (data: any) => {
await mutex.acquire();
["remote", "local"].forEach((field) => {
const rField = `_${field}`;
data[field].forEach((item: any) => {
item = parseInt(item);
if (typeof mux[rField][item] === "undefined") {
while (item > mux[rField].length) {
mux[rField].push(null);
}
}
if (typeof mux[rField][item] === "undefined") {
mux[rField][item] = null;
}
});
});
data.free.forEach((index: number) => {
if (mux._free[index] === null) {
mux._free[index] = undefined;
}
});
mux._free = mux._free.filter((item: any) => item !== undefined);
aq.sendUpdate(true);
socket.emit("syncProtomux");
mutex.release();
});
aq.respond(ret);
}
function getSwarmToSocketConnectionId(socket: any) {
@ -482,6 +493,7 @@ function getSwarmToSocketConnectionId(socket: any) {
return false;
}
function getSwarmToSwarmId(swarm: any) {
for (const swarmInstance of swarmInstances) {
if (swarmInstance[1] === swarm) {