From b25706e8ac850054c3644b5d96baff5e84338607 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Wed, 31 Aug 2022 15:35:24 -0400 Subject: [PATCH] *Update DHT management --- src/id.ts | 12 ------ src/index.ts | 116 ++++++++++++++++++++++++--------------------------- 2 files changed, 54 insertions(+), 74 deletions(-) delete mode 100644 src/id.ts diff --git a/src/id.ts b/src/id.ts deleted file mode 100644 index 15292e7..0000000 --- a/src/id.ts +++ /dev/null @@ -1,12 +0,0 @@ -function idFactory(start = 1, step = 1, limit = 2 ** 32) { - let id = start; - - return function nextId() { - const nextId = id; - id += step; - if (id >= limit) id = start; - return nextId; - }; -} - -export const nextId = idFactory(1); diff --git a/src/index.ts b/src/index.ts index 084641a..6320b5d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,7 +3,6 @@ import DHT from "@lumeweb/dht-web"; import type { ActiveQuery } from "libkmodule"; import { addHandler, getSeed, handleMessage } from "libkmodule"; import { handlePresentSeed as handlePresentSeedModule } from "libkmodule/dist/seed.js"; -import { nextId } from "./id"; import type { Buffer } from "buffer"; import { hexToBuf } from "libskynet"; @@ -13,11 +12,28 @@ interface DhtConnection { } const connections = new Map(); -const dhtInstances = new Map(); +const dhtInstances = new Map(); -let defaultDht; +let defaultDht: DHT; + +let moduleReadyResolve: Function; +let moduleReady: Promise = new Promise((resolve) => { + moduleReadyResolve = resolve; +}); onmessage = handleMessage; +function idFactory(start = 1, step = 1, limit = 2 ** 32) { + let id = start; + + return function nextId() { + const nextId = id; + id += step; + if (id >= limit) id = start; + return nextId; + }; +} + +const nextId = idFactory(1); addHandler("presentSeed", handlePresentSeed); addHandler("openDht", handleOpenDht); @@ -38,10 +54,11 @@ addHandler("ready", handleReady); async function handlePresentSeed(aq: ActiveQuery) { const keyPair = aq.callerInput.myskyRootKeypair; - handlePresentSeedModule({ callerInput: { seed: keyPair } }); + handlePresentSeedModule({ callerInput: { seed: keyPair } } as ActiveQuery); if (!defaultDht) { - defaultDht = await createDht(); + defaultDht = dhtInstances.get(await createDht()) as DHT; } + moduleReadyResolve(); } async function handleOpenDht(aq: ActiveQuery) { @@ -84,11 +101,7 @@ async function handleConnect(aq: ActiveQuery) { let socket: any; - const dht = validateDht(aq); - - if (!dht) { - return; - } + const dht = await getDht(aq); try { // @ts-ignore @@ -104,7 +117,12 @@ async function handleConnect(aq: ActiveQuery) { const id = nextId(); socket.on("open", () => { - setDhtConnection(id, dht as number, socket); + let dhtId: any = [...dhtInstances.entries()].filter( + (item) => item[1] === dht + ); + dhtId = dhtId.shift()[0]; + + setDhtConnection(id, dhtId as number, socket); aq.respond({ id }); }); @@ -196,98 +214,72 @@ function validateConnection(aq: ActiveQuery): any | boolean { return false; } - return getDhtConnection(id).conn; + return getDhtConnection(id)?.conn; } -function validateDht(aq: ActiveQuery): DHT | boolean { - let { dht = null } = aq.callerInput; +async function getDht(aq: ActiveQuery): Promise { + await moduleReady; + let dht; + if ("callerInput" in aq && aq.callerInput) { + dht = aq.callerInput.dht ?? null; - if (dht && !dhtInstances.has(dht)) { - aq.reject("Invalid DHT id"); - return false; + if (dht && !dhtInstances.has(dht)) { + const error = "Invalid DHT id"; + aq.reject(error); + throw new Error(error); + } } if (!dht) { - dht = defaultDht; + return defaultDht; } - return dhtInstances.get(dht); + return dhtInstances.get(dht) as DHT; } async function handleAddRelay(aq: ActiveQuery) { const { pubkey = null } = aq.callerInput; - const dht = validateDht(aq); - - if (!dht) { - return; - } - if (!pubkey) { aq.reject("invalid pubkey"); return; } + const dht = await getDht(aq); + aq.respond(await dht.addRelay(pubkey)); } -function handleRemoveRelay(aq: ActiveQuery) { +async function handleRemoveRelay(aq: ActiveQuery) { const { pubkey = null } = aq.callerInput; - const dht = validateDht(aq); - - if (!dht) { - return; - } - if (!pubkey) { aq.reject("invalid pubkey"); return; } + const dht = await getDht(aq); + aq.respond(dht.removeRelay(pubkey)); } -function handleClearRelays(aq: ActiveQuery) { - const dht = validateDht(aq); - - if (!dht) { - return; - } +async function handleClearRelays(aq: ActiveQuery) { + const dht = await getDht(aq); dht.clearRelays(); aq.respond(); } -function handleGetRelays(aq: ActiveQuery) { - const dht = validateDht(aq); - - if (!dht) { - return; - } - - aq.respond(dht.relays); +async function handleGetRelays(aq: ActiveQuery) { + aq.respond(await (await getDht(aq)).relays); } -function handleGetRelayServers(aq: ActiveQuery) { - const dht = validateDht(aq); - - if (!dht) { - return; - } - - aq.respond(dht.relayServers); +async function handleGetRelayServers(aq: ActiveQuery) { + aq.respond(await (await getDht(aq)).relayServers); } async function handleReady(aq: ActiveQuery) { - const dht = validateDht(aq); - - if (!dht) { - return; - } - - // @ts-ignore - await dht.ready(); + await (await getDht(aq)).ready(); aq.respond(); }