// @ts-ignore import Hyperswarm from "@lumeweb/hyperswarm-web"; import type { ActiveQuery } from "@lumeweb/libkernel/module"; import { addHandler, getKey, handleMessage, handlePresentKey as handlePresentKeyModule, logErr, } from "@lumeweb/libkernel/module"; import { Buffer } from "buffer"; import { ed25519 } from "@noble/curves/ed25519"; import b4a from "b4a"; import { pubKeyToIpv6 } from "./addr.js"; import { EventEmitter2 as EventEmitter } from "eventemitter2"; // @ts-ignore import Protomux from "protomux"; import defer, { DeferredPromise } from "p-defer"; import { concatBytes, hexToBytes } from "@lumeweb/libweb"; const MAX_PEER_LISTENERS = 20; interface SwarmConnection { swarm: number; conn: any; channels: Map; listeners: Map; } interface SwarmEvents { swarm: number; events: EventEmitter; } const connections = new Map(); const swarmInstances = new Map(); const swarmEvents = new Map(); let defaultSwarm: Hyperswarm; let moduleReadyResolve: Function; let moduleReady: Promise = new Promise((resolve) => { moduleReadyResolve = resolve; }); onmessage = handleMessage; function idFactory(start = 1) { let id = start; return function nextId() { const _nextId = id; id += 1; return _nextId; }; } const getSwarmId = idFactory(); const getSocketId = idFactory(); const getChannelId = idFactory(); const getMessageId = idFactory(); addHandler("presentKey", handlePresentKey); addHandler("join", handleJoin); addHandler("getPeerByPubkey", handleGetPeerByPubkey); addHandler("addRelay", handleAddRelay); addHandler("removeRelay", handleRemoveRelay); addHandler("clearRelays", handleClearRelays); addHandler("getRelays", handleGetRelays); addHandler("init", handleInit); addHandler("ready", handleReady); addHandler("listenConnections", handleListenConnections, { receiveUpdates: true, }); addHandler("socketGetInfo", handleGetSocketInfo); addHandler("socketExists", handleSocketExists); addHandler("socketListenEvent", handleSocketListenEvent, { receiveUpdates: true, }); addHandler("socketListeners", handleSocketListenersEvent); addHandler("socketWrite", handleWriteSocketEvent); addHandler("socketClose", handleCloseSocketEvent); addHandler("socketSetKeepAlive", handleSocketSetKeepAliveEvent); addHandler("createProtomuxChannel", handleCreateProtomuxChannel, { receiveUpdates: true, }); addHandler("createProtomuxMessage", handleCreateProtomuxMessage, { receiveUpdates: true, }); addHandler("createSwarm", handleCreateSwarm); async function handlePresentKey(aq: ActiveQuery) { handlePresentKeyModule({ callerInput: { key: aq.callerInput.rootKey, }, } as ActiveQuery); if (!defaultSwarm) { defaultSwarm = swarmInstances.get(await createSwarm()) as Hyperswarm; } moduleReadyResolve(); } async function createSwarm(): Promise { const privateKey = await getKey(); const swarmInstance = new Hyperswarm({ keyPair: { publicKey: ed25519.getPublicKey(privateKey), secretKey: concatBytes(privateKey, ed25519.getPublicKey(privateKey)), }, }); const id = getSwarmId(); swarmInstances.set(id, swarmInstance); swarmInstance.onSelf("init", () => { const swarmInstanceEvents = new EventEmitter(); swarmInstanceEvents.setMaxListeners(MAX_PEER_LISTENERS); swarmEvents.set(id, { swarm: id, events: swarmInstanceEvents }); swarmInstance.on("connection", (peer: any) => { const socketId = getSocketId(); connections.set(socketId, { swarm: id, conn: peer, channels: new Map(), listeners: new Map(), }); peer.once("close", () => { connections.delete(socketId); }); swarmInstanceEvents.emit("connection", peer); }); swarmInstance.relays.forEach((relay) => { swarmInstance.activeRelay.joinPeer(hexToBytes(relay)); }); }); swarmInstance.onSelf("close", (...args) => { swarmEvents.get(id)?.events.emit("close", ...args); swarmEvents.get(id)?.events.removeAllListeners(); swarmEvents.delete(id); }); return id; } function handleSocketListenEvent(aq: ActiveQuery) { const { event = null } = aq.callerInput; const socket = validateConnection(aq); if (!socket) { return; } if (!event) { aq.reject("Invalid event"); return; } const conn = connections.get(aq.callerInput.id) as SwarmConnection; let responded = false; const respond = () => { if (responded) { return; } responded = true; let count = conn.listeners.get(aq.domain) as number; count--; if (count > 0) { conn.listeners.set(aq.domain, count); } else { conn.listeners.delete(aq.domain); } aq.respond(); }; const cb = (data: Buffer) => { if (responded) { return; } aq.sendUpdate(data); }; socket.on(event, cb); socket.once("close", () => { socket.off(event, cb); respond(); }); aq.setReceiveUpdate?.(() => { socket.off(event, cb); respond(); }); if (!conn.listeners.has(aq.domain)) { conn.listeners.set(aq.domain, 0); } conn.listeners.set(aq.domain, (conn.listeners.get(aq.domain) as number) + 1); } async function handleSocketListenersEvent(aq: ActiveQuery) { const socket = validateConnection(aq); if (!socket) { return; } const conn = connections.get(aq.callerInput.id) as SwarmConnection; aq.respond( [...conn.listeners.entries()] .filter((item) => { return item[1] > 0; }) .map((item) => item[0]), ); } async function handleSocketExists(aq: ActiveQuery) { const { id = null } = aq.callerInput; aq.respond(connections.has(Number(id))); } function handleCloseSocketEvent(aq: ActiveQuery) { const socket = validateConnection(aq); if (!socket) { return; } socket.end(); aq.respond(); } function handleSocketSetKeepAliveEvent(aq: ActiveQuery) { const socket = validateConnection(aq); if (!socket) { return; } const { alive = null } = aq.callerInput; if (!alive) { aq.reject("alive required"); return; } socket.setKeepAlive(aq.callerInput.alive); aq.respond(); } async function handleWriteSocketEvent(aq: ActiveQuery) { const socket = validateConnection(aq); if (!socket) { return; } const { message = null } = aq.callerInput; if (!message) { aq.reject("empty message"); return; } socket.write(message); aq.respond(); } function validateConnection(aq: ActiveQuery): any | boolean { const { id = null } = aq.callerInput; if (!id || !connections.has(id)) { aq.reject("Invalid connection id"); return false; } return connections.get(id)?.conn; } async function getSwarm(aq: ActiveQuery): Promise { await moduleReady; let swarm; if ("callerInput" in aq && aq.callerInput) { swarm = aq.callerInput.swarm ?? null; if (swarm && !swarmInstances.has(swarm)) { const error = "Invalid swarm id"; aq.reject(error); throw new Error(error); } } if (!swarm) { if (defaultSwarm.activeRelay && defaultSwarm.ready) { await defaultSwarm.activeRelay.dht._protocol.opened; } return defaultSwarm; } if (swarm.activeRelay && swarm.ready) { await swarm.activeRelay.dht._protocol.opened; } return swarmInstances.get(swarm) as Hyperswarm; } async function handleAddRelay(aq: ActiveQuery) { const { pubkey = null } = aq.callerInput; if (!pubkey) { aq.reject("invalid pubkey"); return; } const swarm = await getSwarm(aq); aq.respond(await swarm.addRelay(pubkey)); } async function handleRemoveRelay(aq: ActiveQuery) { const { pubkey = null } = aq.callerInput; if (!pubkey) { aq.reject("invalid pubkey"); return; } const swarm = await getSwarm(aq); aq.respond(swarm.removeRelay(pubkey)); } async function handleClearRelays(aq: ActiveQuery) { const swarm = await getSwarm(aq); swarm.clearRelays(); aq.respond(); } async function handleGetRelays(aq: ActiveQuery) { aq.respond(await (await getSwarm(aq)).relays); } async function handleJoin(aq: ActiveQuery) { const { topic = null } = aq.callerInput; const swarm = await getSwarm(aq); if (!topic) { aq.reject("invalid topic"); return; } if (!b4a.isBuffer(topic)) { aq.reject("topic must be a buffer"); return; } // @ts-ignore swarm.join(topic, { server: false }); aq.respond(); } async function handleGetPeerByPubkey(aq: ActiveQuery) { const { pubkey = null } = aq.callerInput; const swarm = await getSwarm(aq); if (!pubkey) { aq.reject("invalid topic"); return; } if (!b4a.isBuffer(pubkey)) { aq.reject("pubkey must be a buffer"); return; } // @ts-ignore if (!swarm._allConnections.has(pubkey)) { aq.reject("peer does not exist"); return; } // @ts-ignore const peer = swarm._allConnections.get(pubkey); aq.respond(getSwarmToSocketConnectionId(peer)); } async function handleInit(aq: ActiveQuery) { const swarm = await getSwarm(aq); try { await swarm.init(); } catch (e) { aq.reject((e as Error).message); return; } aq.respond(); } async function handleReady(aq: ActiveQuery) { const swarm = await getSwarm(aq); if (swarm.activeRelay && swarm.ready) { aq.respond(); await swarm.activeRelay.dht._protocol.opened; return; } swarm.once("ready", async () => { await swarm.activeRelay.dht._protocol.opened; aq.respond(); }); } async function handleListenConnections(aq: ActiveQuery) { const swarm = await getSwarm(aq); const swarmId = getSwarmToSwarmId(swarm); const listener = (peer: any) => { aq.sendUpdate(getSwarmToSocketConnectionId(peer)); }; const swarmEvent = swarmEvents.get(swarmId as number)?.events; if (!swarmEvent) { logErr("swarm event object is missing"); } swarmEvent?.on("connection", listener); aq.setReceiveUpdate?.(() => { swarmEvent?.off("connection", listener); aq.respond(); }); for (const conn of connections) { if (conn[1].swarm === swarmId) { listener(conn[1].conn); } } const closeCb = () => { swarmEvent?.off("connection", listener); swarmEvent?.emit("close"); aq.respond(); }; const hookClose = () => { swarmEvent?.once("close", closeCb); }; if (swarm.activeRelay) { hookClose(); return; } swarm.onceSelf("ready", hookClose); } async function handleGetSocketInfo(aq: ActiveQuery) { const socket = validateConnection(aq); if (!socket) { return; } aq.respond({ remotePublicKey: socket.remotePublicKey, publicKey: socket.publicKey, rawStream: { remoteHost: pubKeyToIpv6(socket.remotePublicKey), remotePort: 0, remoteFamily: "IPv6", }, }); } async function handleCreateProtomuxChannel(aq: ActiveQuery) { const socket = validateConnection(aq); if (!socket) { return; } if (!("data" in aq.callerInput)) { aq.reject("data required"); return; } const mux = Protomux.from(socket); const data = aq.callerInput.data; const handleCallback = (name: string, enabled: boolean) => { if (!enabled && name !== "destroy") { return undefined; } return (...args: any) => { args = args.filter( (item: any) => item?.constructor.name.toLowerCase() !== "channel", ); if (name === "destroy") { connections.get(aq.callerInput.id)?.channels.delete(channelId); aq.respond(); } if (!enabled) { return; } aq.sendUpdate({ action: name, args, }); }; }; let channel = mux.createChannel({ protocol: data?.protocol, id: data?.id, handshake: data?.handshake, onopen: handleCallback("onopen", data?.onopen ?? undefined), onclose: handleCallback("onclose", data?.onclose ?? undefined), ondestroy: handleCallback("ondestroy", data?.ondestroy ?? undefined), }); if (channel === null) { aq.reject("duplicate channel"); return; } channel.open(); const channelId = getChannelId(); connections.get(aq.callerInput.id)?.channels.set(channelId, channel); aq.sendUpdate(channelId); } async function handleCreateProtomuxMessage(aq: ActiveQuery) { const socket = validateConnection(aq); if (!socket) { return; } if (!("data" in aq.callerInput)) { aq.reject("action required"); return; } if (!("channelId" in aq.callerInput)) { aq.reject("channel id required"); return; } const channel = connections .get(aq.callerInput.id) ?.channels.get(aq.callerInput.channelId); if (!channel) { aq.reject("invalid channel"); } const data = aq.callerInput.data; const defers = new Map>(); const handleEncoding = (enabled: boolean) => { if (!enabled) { return undefined; } const update = async (action: string, args: any) => { const messageId = getMessageId(); const d = defer(); defers.set(messageId, d); aq.sendUpdate({ id: messageId, action, args, }); const ret = (await d.promise) as any; if (ret[1]) { if (ret[1].buffer) { args[0].buffer = b4a.from(ret[1].buffer); } args[0].start = ret[1].start; args[0].end = ret[1].end; } return ret[0]; }; return { async preencode(...args: any) { return update("preencode", args); }, async encode(...args: any) { return update("encode", args); }, async decode(...args: any) { return update("decode", args); }, }; }; aq.setReceiveUpdate?.((data: any) => { if (data.action === "send") { message.send(...data.args); } defers.get(data.id)?.resolve(data.args); defers.delete(data.id); }); if (data.onmessage) { data.onmessage = async (...args: any) => { for (let i = 0; i < args.length; i++) { if (isPromise(args[i])) { args[i] = await args[i]; } } args = args.filter( (item: any) => item?.constructor.name.toLowerCase() !== "channel", ); aq.sendUpdate({ action: "onmessage", args, }); }; } const message = channel.addMessage({ encoding: handleEncoding(data.encoding ?? false), onmessage: data.onmessage ?? noop, }); aq.sendUpdate({ action: "created", }); } async function handleCreateSwarm(aq: ActiveQuery) { aq.respond(await createSwarm()); } function getSwarmToSocketConnectionId(socket: any) { for (const conn of connections) { if (conn[1].conn === socket) { return conn[0]; } } return false; } function getSwarmToSwarmId(swarm: any) { for (const swarmInstance of swarmInstances) { if (swarmInstance[1] === swarm) { return swarmInstance[0]; } } return false; } function noop() {} function isPromise(obj: Promise) { return ( !!obj && (typeof obj === "object" || typeof obj === "function") && typeof obj.then === "function" ); }