*Update DHT management

This commit is contained in:
Derrick Hammer 2022-08-31 15:35:24 -04:00
parent 4956592d1d
commit b25706e8ac
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 54 additions and 74 deletions

View File

@ -1,12 +0,0 @@
function idFactory(start = 1, step = 1, limit = 2 ** 32) {
let id = start;
return function nextId() {
const nextId = id;
id += step;
if (id >= limit) id = start;
return nextId;
};
}
export const nextId = idFactory(1);

View File

@ -3,7 +3,6 @@ import DHT from "@lumeweb/dht-web";
import type { ActiveQuery } from "libkmodule";
import { addHandler, getSeed, handleMessage } from "libkmodule";
import { handlePresentSeed as handlePresentSeedModule } from "libkmodule/dist/seed.js";
import { nextId } from "./id";
import type { Buffer } from "buffer";
import { hexToBuf } from "libskynet";
@ -13,11 +12,28 @@ interface DhtConnection {
}
const connections = new Map<number, DhtConnection>();
const dhtInstances = new Map()<number, DHT>;
const dhtInstances = new Map<number, DHT>();
let defaultDht;
let defaultDht: DHT;
let moduleReadyResolve: Function;
let moduleReady: Promise<void> = new Promise((resolve) => {
moduleReadyResolve = resolve;
});
onmessage = handleMessage;
function idFactory(start = 1, step = 1, limit = 2 ** 32) {
let id = start;
return function nextId() {
const nextId = id;
id += step;
if (id >= limit) id = start;
return nextId;
};
}
const nextId = idFactory(1);
addHandler("presentSeed", handlePresentSeed);
addHandler("openDht", handleOpenDht);
@ -38,10 +54,11 @@ addHandler("ready", handleReady);
async function handlePresentSeed(aq: ActiveQuery) {
const keyPair = aq.callerInput.myskyRootKeypair;
handlePresentSeedModule({ callerInput: { seed: keyPair } });
handlePresentSeedModule({ callerInput: { seed: keyPair } } as ActiveQuery);
if (!defaultDht) {
defaultDht = await createDht();
defaultDht = dhtInstances.get(await createDht()) as DHT;
}
moduleReadyResolve();
}
async function handleOpenDht(aq: ActiveQuery) {
@ -84,11 +101,7 @@ async function handleConnect(aq: ActiveQuery) {
let socket: any;
const dht = validateDht(aq);
if (!dht) {
return;
}
const dht = await getDht(aq);
try {
// @ts-ignore
@ -104,7 +117,12 @@ async function handleConnect(aq: ActiveQuery) {
const id = nextId();
socket.on("open", () => {
setDhtConnection(id, dht as number, socket);
let dhtId: any = [...dhtInstances.entries()].filter(
(item) => item[1] === dht
);
dhtId = dhtId.shift()[0];
setDhtConnection(id, dhtId as number, socket);
aq.respond({ id });
});
@ -196,98 +214,72 @@ function validateConnection(aq: ActiveQuery): any | boolean {
return false;
}
return getDhtConnection(id).conn;
return getDhtConnection(id)?.conn;
}
function validateDht(aq: ActiveQuery): DHT | boolean {
let { dht = null } = aq.callerInput;
async function getDht(aq: ActiveQuery): Promise<DHT> {
await moduleReady;
let dht;
if ("callerInput" in aq && aq.callerInput) {
dht = aq.callerInput.dht ?? null;
if (dht && !dhtInstances.has(dht)) {
aq.reject("Invalid DHT id");
return false;
if (dht && !dhtInstances.has(dht)) {
const error = "Invalid DHT id";
aq.reject(error);
throw new Error(error);
}
}
if (!dht) {
dht = defaultDht;
return defaultDht;
}
return dhtInstances.get(dht);
return dhtInstances.get(dht) as DHT;
}
async function handleAddRelay(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
const dht = validateDht(aq);
if (!dht) {
return;
}
if (!pubkey) {
aq.reject("invalid pubkey");
return;
}
const dht = await getDht(aq);
aq.respond(await dht.addRelay(pubkey));
}
function handleRemoveRelay(aq: ActiveQuery) {
async function handleRemoveRelay(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
const dht = validateDht(aq);
if (!dht) {
return;
}
if (!pubkey) {
aq.reject("invalid pubkey");
return;
}
const dht = await getDht(aq);
aq.respond(dht.removeRelay(pubkey));
}
function handleClearRelays(aq: ActiveQuery) {
const dht = validateDht(aq);
if (!dht) {
return;
}
async function handleClearRelays(aq: ActiveQuery) {
const dht = await getDht(aq);
dht.clearRelays();
aq.respond();
}
function handleGetRelays(aq: ActiveQuery) {
const dht = validateDht(aq);
if (!dht) {
return;
}
aq.respond(dht.relays);
async function handleGetRelays(aq: ActiveQuery) {
aq.respond(await (await getDht(aq)).relays);
}
function handleGetRelayServers(aq: ActiveQuery) {
const dht = validateDht(aq);
if (!dht) {
return;
}
aq.respond(dht.relayServers);
async function handleGetRelayServers(aq: ActiveQuery) {
aq.respond(await (await getDht(aq)).relayServers);
}
async function handleReady(aq: ActiveQuery) {
const dht = validateDht(aq);
if (!dht) {
return;
}
// @ts-ignore
await dht.ready();
await (await getDht(aq)).ready();
aq.respond();
}