diff --git a/package.json b/package.json index 3c05854..384e677 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ }, "scripts": { "test": "jest", + "format": "prettier -w src", "build-script": "tsc --project tsconfig.build.json && mv dist-build/build.js dist-build/build.mjs", "compile": "npm run build-script && node build.js", "build": "npm run compile && rimraf node_modules/@hyperswarm/secret-stream/node_modules && node ./dist-build/build.mjs dev" diff --git a/src/index.ts b/src/index.ts index 3ec8841..62f6a5f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,17 +1,26 @@ // @ts-ignore import DHT from "@lumeweb/dht-web"; -import { addHandler, handleMessage } from "libkmodule"; import type { ActiveQuery } from "libkmodule"; +import { addHandler, getSeed, handleMessage } from "libkmodule"; +import { handlePresentSeed as handlePresentSeedModule } from "libkmodule/dist/seed.js"; import { nextId } from "./id"; import { Buffer } from "buffer"; -let dht: DHT; +interface DhtConnection { + dht: number; + conn: any; +} -const connections = new Map(); +const connections = new Map(); +const dhtInstances = new Map(); + +let defaultDht; onmessage = handleMessage; addHandler("presentSeed", handlePresentSeed); +addHandler("openDht", handleOpenDht); +addHandler("closeDht", handleCloseDht); addHandler("connect", handleConnect); addHandler("listenSocketEvent", handleListenSocketEvent, { receiveUpdates: true, @@ -23,18 +32,60 @@ addHandler("removeRelay", handleRemoveRelay); addHandler("clearRelays", handleClearRelays); addHandler("ready", handleReady); -function handlePresentSeed(aq: ActiveQuery) { +async function handlePresentSeed(aq: ActiveQuery) { const keyPair = aq.callerInput.myskyRootKeypair; - if (!dht) { - dht = new DHT({ keyPair }); + handlePresentSeedModule({ callerInput: { seed: keyPair } }); + if (!defaultDht) { + defaultDht = await createDht(); } } +async function handleOpenDht(aq: ActiveQuery) { + const id = await createDht(); + aq.respond({ dht: id }); +} + +async function handleCloseDht(aq: ActiveQuery) { + const { dht = null } = aq.callerInput; + + if (!dht) { + aq.reject("Invalid DHT id"); + return; + } + + if (dht === defaultDht) { + aq.reject("Cannot close default DHT"); + return; + } + + dhtInstances.delete(dht); + Array.from(connections.values()) + .filter((item) => item.dht === dht) + .forEach((item) => { + item.conn.end(); + }); + + aq.respond(); +} + +async function createDht(): Promise { + const dhtInstance = new DHT({ keyPair: await getSeed() }); + const id = nextId(); + dhtInstances.set(id, dhtInstance); + return id; +} + async function handleConnect(aq: ActiveQuery) { const { pubkey, options = {} } = aq.callerInput; let socket: any; + const dht = validateDht(aq); + + if (!dht) { + return; + } + try { // @ts-ignore socket = await dht.connect( @@ -49,21 +100,26 @@ async function handleConnect(aq: ActiveQuery) { const id = nextId(); socket.on("open", () => { - connections.set(id, socket); + setDhtConnection(id, dht as number, socket); aq.respond({ id }); }); + socket.on("end", () => { + deleteDhtConnection(id); + }); + socket.on("error", (e: any) => { - connections.set(id, socket); + deleteDhtConnection(id); aq.reject(e); }); } function handleListenSocketEvent(aq: ActiveQuery) { const { event = null } = aq.callerInput; - const id = validateConnection(aq); - if (!id) { + const socket = validateConnection(aq); + + if (!socket) { return; } @@ -72,7 +128,6 @@ function handleListenSocketEvent(aq: ActiveQuery) { return; } - const socket = connections.get(id); const cb = (data: Buffer) => { aq.sendUpdate(data); }; @@ -94,21 +149,21 @@ function handleListenSocketEvent(aq: ActiveQuery) { } function handleCloseSocketEvent(aq: ActiveQuery) { - const id = validateConnection(aq); + const socket = validateConnection(aq); - if (!id) { + if (!socket) { return; } - connections.get(id).end(); + socket.end(); aq.respond(); } function handleWriteSocketEvent(aq: ActiveQuery) { - const id = validateConnection(aq); + const socket = validateConnection(aq); - if (!id) { + if (!socket) { return; } const { message = null } = aq.callerInput; @@ -118,25 +173,46 @@ function handleWriteSocketEvent(aq: ActiveQuery) { return false; } - connections.get(id).write(message); + socket.write(message); aq.respond(); } -function validateConnection(aq: ActiveQuery): number | boolean { +function validateConnection(aq: ActiveQuery): any | boolean { const { id = null } = aq.callerInput; - if (!id || !connections.has(id)) { + if (!id || !hasDhtConnection(id)) { aq.reject("Invalid connection id"); return false; } - return id; + return getDhtConnection(id).conn; +} + +function validateDht(aq: ActiveQuery): DHT | boolean { + let { dht = null } = aq.callerInput; + + if (dht && !dhtInstances.has(dht)) { + aq.reject("Invalid DHT id"); + return false; + } + + if (!dht) { + dht = defaultDht; + } + + return dhtInstances.get(dht); } async function handleAddRelay(aq: ActiveQuery) { const { pubkey = null } = aq.callerInput; + const dht = validateDht(aq); + + if (!dht) { + return; + } + if (!pubkey) { aq.reject("invalid pubkey"); return; @@ -148,6 +224,12 @@ async function handleAddRelay(aq: ActiveQuery) { function handleRemoveRelay(aq: ActiveQuery) { const { pubkey = null } = aq.callerInput; + const dht = validateDht(aq); + + if (!dht) { + return; + } + if (!pubkey) { aq.reject("invalid pubkey"); return; @@ -157,13 +239,41 @@ function handleRemoveRelay(aq: ActiveQuery) { } function handleClearRelays(aq: ActiveQuery) { + const dht = validateDht(aq); + + if (!dht) { + return; + } + dht.clearRelays(); aq.respond(); } async function handleReady(aq: ActiveQuery) { + const dht = validateDht(aq); + + if (!dht) { + return; + } + // @ts-ignore await dht.ready(); aq.respond(); } + +function setDhtConnection(id: number, dht: number, conn: any) { + connections.set(id, { dht, conn }); +} + +function getDhtConnection(id: number) { + return connections.get(id); +} + +function hasDhtConnection(id: number) { + return connections.has(id); +} + +function deleteDhtConnection(id: number) { + connections.delete(id); +}