diff --git a/package.json b/package.json index 7c7cc5a..b97ee95 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,8 @@ "dependencies": { "@lumeweb/hyperswarm-web": "git+https://git.lumeweb.com/LumeWeb/hyperswarm-web.git", "@noble/ed25519": "^1.7.3", - "b4a": "^1.6.2", + "b4a": "^1.6.3", + "eventemitter2": "^6.4.9", "hyperswarm": "^4.3.7", "libkmodule": "^0.2.53", "libskynet": "^0.0.62", diff --git a/src/index.ts b/src/index.ts index 768b055..53b43f5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,6 +7,8 @@ import type { Buffer } from "buffer"; import * as ed from "@noble/ed25519"; import b4a from "b4a"; import { pubKeyToIpv6 } from "./addr.js"; +import { EventEmitter2 as EventEmitter } from "eventemitter2"; +import { logErr } from "libkmodule/dist"; const MAX_PEER_LISTENERS = 20; @@ -15,8 +17,14 @@ interface SwarmConnection { conn: any; } +interface SwarmEvents { + swarm: number; + events: EventEmitter; +} + const connections = new Map(); const swarmInstances = new Map(); +const swarmEvents = new Map(); let defaultSwarm: Hyperswarm; @@ -83,16 +91,30 @@ async function createSwarm(): Promise { swarmInstances.set(id, swarmInstance); swarmInstance.onSelf("init", () => { + const swarmInstanceEvents = new EventEmitter(); + swarmInstanceEvents.setMaxListeners(MAX_PEER_LISTENERS); + swarmEvents.set(id, { swarm: id, events: swarmInstanceEvents }); swarmInstance.on("connection", (peer: any) => { const socketId = getSocketId(); - connections.set(socketId, { swarm: id, conn: peer }); + connections.set(socketId, { + swarm: id, + conn: peer, + }); - peer.on("close", () => { + peer.once("close", () => { connections.delete(socketId); }); + + swarmInstanceEvents.emit("connection", peer); }); }); + swarmInstance.onSelf("close", (...args) => { + swarmEvents.get(id)?.events.emit("close", ...args); + swarmEvents.get(id)?.events.removeAllListeners(); + swarmEvents.delete(id); + }); + return id; } @@ -312,32 +334,39 @@ async function handleListenConnections(aq: ActiveQuery) { const swarm = await getSwarm(aq); const listener = (peer: any) => { - peer.setMaxListeners(MAX_PEER_LISTENERS); aq.sendUpdate(getSwarmToSocketConnectionId(peer)); }; - swarm.on("connection", listener); + const swarmEvent = swarmEvents.get( + getSwarmToSwarmId(swarm) as number + )?.events; + + if (!swarmEvent) { + logErr("swarm event object is missing"); + } + + swarmEvent?.on("connection", listener); aq.setReceiveUpdate?.(() => { - swarm.off("connection", listener); + swarmEvent?.off("connection", listener); aq.respond(); }); const closeCb = () => { - swarm.off("connection", listener); - swarm.emit("close"); + swarmEvent?.off("connection", listener); + swarmEvent?.emit("close"); aq.respond(); }; const hookClose = () => { - swarm.activeRelay.dht._protocol._stream.once("close", closeCb); + swarm.onceSelf("close", closeCb); }; if (swarm.activeRelay) { hookClose(); return; } - swarm.once("ready", hookClose); + swarm.onceSelf("ready", hookClose); } async function handleGetSocketInfo(aq: ActiveQuery) { @@ -367,3 +396,12 @@ function getSwarmToSocketConnectionId(socket: any) { return false; } +function getSwarmToSwarmId(swarm: any) { + for (const swarmInstance of swarmInstances) { + if (swarmInstance[1] === swarm) { + return swarmInstance[0]; + } + } + + return false; +}