*Support multiple dht connections with a default DHT connection

This commit is contained in:
Derrick Hammer 2022-08-03 12:01:11 -04:00
parent e01d4fc9b5
commit 77c473611a
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 131 additions and 20 deletions

View File

@ -6,6 +6,7 @@
},
"scripts": {
"test": "jest",
"format": "prettier -w src",
"build-script": "tsc --project tsconfig.build.json && mv dist-build/build.js dist-build/build.mjs",
"compile": "npm run build-script && node build.js",
"build": "npm run compile && rimraf node_modules/@hyperswarm/secret-stream/node_modules && node ./dist-build/build.mjs dev"

View File

@ -1,17 +1,26 @@
// @ts-ignore
import DHT from "@lumeweb/dht-web";
import { addHandler, handleMessage } from "libkmodule";
import type { ActiveQuery } from "libkmodule";
import { addHandler, getSeed, handleMessage } from "libkmodule";
import { handlePresentSeed as handlePresentSeedModule } from "libkmodule/dist/seed.js";
import { nextId } from "./id";
import { Buffer } from "buffer";
let dht: DHT;
interface DhtConnection {
dht: number;
conn: any;
}
const connections = new Map();
const connections = new Map<number, DhtConnection>();
const dhtInstances = new Map()<number, DHT>;
let defaultDht;
onmessage = handleMessage;
addHandler("presentSeed", handlePresentSeed);
addHandler("openDht", handleOpenDht);
addHandler("closeDht", handleCloseDht);
addHandler("connect", handleConnect);
addHandler("listenSocketEvent", handleListenSocketEvent, {
receiveUpdates: true,
@ -23,18 +32,60 @@ addHandler("removeRelay", handleRemoveRelay);
addHandler("clearRelays", handleClearRelays);
addHandler("ready", handleReady);
function handlePresentSeed(aq: ActiveQuery) {
async function handlePresentSeed(aq: ActiveQuery) {
const keyPair = aq.callerInput.myskyRootKeypair;
if (!dht) {
dht = new DHT({ keyPair });
handlePresentSeedModule({ callerInput: { seed: keyPair } });
if (!defaultDht) {
defaultDht = await createDht();
}
}
async function handleOpenDht(aq: ActiveQuery) {
const id = await createDht();
aq.respond({ dht: id });
}
async function handleCloseDht(aq: ActiveQuery) {
const { dht = null } = aq.callerInput;
if (!dht) {
aq.reject("Invalid DHT id");
return;
}
if (dht === defaultDht) {
aq.reject("Cannot close default DHT");
return;
}
dhtInstances.delete(dht);
Array.from(connections.values())
.filter((item) => item.dht === dht)
.forEach((item) => {
item.conn.end();
});
aq.respond();
}
async function createDht(): Promise<number> {
const dhtInstance = new DHT({ keyPair: await getSeed() });
const id = nextId();
dhtInstances.set(id, dhtInstance);
return id;
}
async function handleConnect(aq: ActiveQuery) {
const { pubkey, options = {} } = aq.callerInput;
let socket: any;
const dht = validateDht(aq);
if (!dht) {
return;
}
try {
// @ts-ignore
socket = await dht.connect(
@ -49,21 +100,26 @@ async function handleConnect(aq: ActiveQuery) {
const id = nextId();
socket.on("open", () => {
connections.set(id, socket);
setDhtConnection(id, dht as number, socket);
aq.respond({ id });
});
socket.on("end", () => {
deleteDhtConnection(id);
});
socket.on("error", (e: any) => {
connections.set(id, socket);
deleteDhtConnection(id);
aq.reject(e);
});
}
function handleListenSocketEvent(aq: ActiveQuery) {
const { event = null } = aq.callerInput;
const id = validateConnection(aq);
if (!id) {
const socket = validateConnection(aq);
if (!socket) {
return;
}
@ -72,7 +128,6 @@ function handleListenSocketEvent(aq: ActiveQuery) {
return;
}
const socket = connections.get(id);
const cb = (data: Buffer) => {
aq.sendUpdate(data);
};
@ -94,21 +149,21 @@ function handleListenSocketEvent(aq: ActiveQuery) {
}
function handleCloseSocketEvent(aq: ActiveQuery) {
const id = validateConnection(aq);
const socket = validateConnection(aq);
if (!id) {
if (!socket) {
return;
}
connections.get(id).end();
socket.end();
aq.respond();
}
function handleWriteSocketEvent(aq: ActiveQuery) {
const id = validateConnection(aq);
const socket = validateConnection(aq);
if (!id) {
if (!socket) {
return;
}
const { message = null } = aq.callerInput;
@ -118,25 +173,46 @@ function handleWriteSocketEvent(aq: ActiveQuery) {
return false;
}
connections.get(id).write(message);
socket.write(message);
aq.respond();
}
function validateConnection(aq: ActiveQuery): number | boolean {
function validateConnection(aq: ActiveQuery): any | boolean {
const { id = null } = aq.callerInput;
if (!id || !connections.has(id)) {
if (!id || !hasDhtConnection(id)) {
aq.reject("Invalid connection id");
return false;
}
return id;
return getDhtConnection(id).conn;
}
function validateDht(aq: ActiveQuery): DHT | boolean {
let { dht = null } = aq.callerInput;
if (dht && !dhtInstances.has(dht)) {
aq.reject("Invalid DHT id");
return false;
}
if (!dht) {
dht = defaultDht;
}
return dhtInstances.get(dht);
}
async function handleAddRelay(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
const dht = validateDht(aq);
if (!dht) {
return;
}
if (!pubkey) {
aq.reject("invalid pubkey");
return;
@ -148,6 +224,12 @@ async function handleAddRelay(aq: ActiveQuery) {
function handleRemoveRelay(aq: ActiveQuery) {
const { pubkey = null } = aq.callerInput;
const dht = validateDht(aq);
if (!dht) {
return;
}
if (!pubkey) {
aq.reject("invalid pubkey");
return;
@ -157,13 +239,41 @@ function handleRemoveRelay(aq: ActiveQuery) {
}
function handleClearRelays(aq: ActiveQuery) {
const dht = validateDht(aq);
if (!dht) {
return;
}
dht.clearRelays();
aq.respond();
}
async function handleReady(aq: ActiveQuery) {
const dht = validateDht(aq);
if (!dht) {
return;
}
// @ts-ignore
await dht.ready();
aq.respond();
}
function setDhtConnection(id: number, dht: number, conn: any) {
connections.set(id, { dht, conn });
}
function getDhtConnection(id: number) {
return connections.get(id);
}
function hasDhtConnection(id: number) {
return connections.has(id);
}
function deleteDhtConnection(id: number) {
connections.delete(id);
}