*Refactor swarm event management to use a dedicated event emitter per swarm for efficiency

This commit is contained in:
Derrick Hammer 2023-03-19 15:16:51 -04:00
parent e7a0c381b9
commit 2e1780e28f
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 49 additions and 10 deletions

View File

@ -15,7 +15,8 @@
"dependencies": { "dependencies": {
"@lumeweb/hyperswarm-web": "git+https://git.lumeweb.com/LumeWeb/hyperswarm-web.git", "@lumeweb/hyperswarm-web": "git+https://git.lumeweb.com/LumeWeb/hyperswarm-web.git",
"@noble/ed25519": "^1.7.3", "@noble/ed25519": "^1.7.3",
"b4a": "^1.6.2", "b4a": "^1.6.3",
"eventemitter2": "^6.4.9",
"hyperswarm": "^4.3.7", "hyperswarm": "^4.3.7",
"libkmodule": "^0.2.53", "libkmodule": "^0.2.53",
"libskynet": "^0.0.62", "libskynet": "^0.0.62",

View File

@ -7,6 +7,8 @@ import type { Buffer } from "buffer";
import * as ed from "@noble/ed25519"; import * as ed from "@noble/ed25519";
import b4a from "b4a"; import b4a from "b4a";
import { pubKeyToIpv6 } from "./addr.js"; import { pubKeyToIpv6 } from "./addr.js";
import { EventEmitter2 as EventEmitter } from "eventemitter2";
import { logErr } from "libkmodule/dist";
const MAX_PEER_LISTENERS = 20; const MAX_PEER_LISTENERS = 20;
@ -15,8 +17,14 @@ interface SwarmConnection {
conn: any; conn: any;
} }
interface SwarmEvents {
swarm: number;
events: EventEmitter;
}
const connections = new Map<number, SwarmConnection>(); const connections = new Map<number, SwarmConnection>();
const swarmInstances = new Map<number, Hyperswarm>(); const swarmInstances = new Map<number, Hyperswarm>();
const swarmEvents = new Map<number, SwarmEvents>();
let defaultSwarm: Hyperswarm; let defaultSwarm: Hyperswarm;
@ -83,16 +91,30 @@ async function createSwarm(): Promise<number> {
swarmInstances.set(id, swarmInstance); swarmInstances.set(id, swarmInstance);
swarmInstance.onSelf("init", () => { swarmInstance.onSelf("init", () => {
const swarmInstanceEvents = new EventEmitter();
swarmInstanceEvents.setMaxListeners(MAX_PEER_LISTENERS);
swarmEvents.set(id, { swarm: id, events: swarmInstanceEvents });
swarmInstance.on("connection", (peer: any) => { swarmInstance.on("connection", (peer: any) => {
const socketId = getSocketId(); const socketId = getSocketId();
connections.set(socketId, { swarm: id, conn: peer }); connections.set(socketId, {
swarm: id,
conn: peer,
});
peer.on("close", () => { peer.once("close", () => {
connections.delete(socketId); connections.delete(socketId);
}); });
swarmInstanceEvents.emit("connection", peer);
}); });
}); });
swarmInstance.onSelf("close", (...args) => {
swarmEvents.get(id)?.events.emit("close", ...args);
swarmEvents.get(id)?.events.removeAllListeners();
swarmEvents.delete(id);
});
return id; return id;
} }
@ -312,32 +334,39 @@ async function handleListenConnections(aq: ActiveQuery) {
const swarm = await getSwarm(aq); const swarm = await getSwarm(aq);
const listener = (peer: any) => { const listener = (peer: any) => {
peer.setMaxListeners(MAX_PEER_LISTENERS);
aq.sendUpdate(getSwarmToSocketConnectionId(peer)); aq.sendUpdate(getSwarmToSocketConnectionId(peer));
}; };
swarm.on("connection", listener); const swarmEvent = swarmEvents.get(
getSwarmToSwarmId(swarm) as number
)?.events;
if (!swarmEvent) {
logErr("swarm event object is missing");
}
swarmEvent?.on("connection", listener);
aq.setReceiveUpdate?.(() => { aq.setReceiveUpdate?.(() => {
swarm.off("connection", listener); swarmEvent?.off("connection", listener);
aq.respond(); aq.respond();
}); });
const closeCb = () => { const closeCb = () => {
swarm.off("connection", listener); swarmEvent?.off("connection", listener);
swarm.emit("close"); swarmEvent?.emit("close");
aq.respond(); aq.respond();
}; };
const hookClose = () => { const hookClose = () => {
swarm.activeRelay.dht._protocol._stream.once("close", closeCb); swarm.onceSelf("close", closeCb);
}; };
if (swarm.activeRelay) { if (swarm.activeRelay) {
hookClose(); hookClose();
return; return;
} }
swarm.once("ready", hookClose); swarm.onceSelf("ready", hookClose);
} }
async function handleGetSocketInfo(aq: ActiveQuery) { async function handleGetSocketInfo(aq: ActiveQuery) {
@ -367,3 +396,12 @@ function getSwarmToSocketConnectionId(socket: any) {
return false; return false;
} }
function getSwarmToSwarmId(swarm: any) {
for (const swarmInstance of swarmInstances) {
if (swarmInstance[1] === swarm) {
return swarmInstance[0];
}
}
return false;
}