*Refactor to use persistent websocket connections, randomly (secure) picked, then load balance connect requests the same way

This commit is contained in:
Derrick Hammer 2022-07-26 19:27:59 -04:00
parent 918c49f146
commit 685bc59894
2 changed files with 44 additions and 28 deletions

View File

@ -7,6 +7,7 @@
"build": "rimraf dist && tsc" "build": "rimraf dist && tsc"
}, },
"devDependencies": { "devDependencies": {
"@types/random-number-csprng": "^1.0.0",
"@types/ws": "^8.5.3", "@types/ws": "^8.5.3",
"esbuild": "^0.14.48", "esbuild": "^0.14.48",
"prettier": "^2.7.1", "prettier": "^2.7.1",
@ -20,6 +21,7 @@
"libkmodule": "^0.2.12", "libkmodule": "^0.2.12",
"libskynet": "^0.0.48", "libskynet": "^0.0.48",
"msgpackr": "^1.6.1", "msgpackr": "^1.6.1",
"random-number-csprng": "^1.0.2",
"safe-buffer": "^5.2.1", "safe-buffer": "^5.2.1",
"websocket-pool": "^1.3.1" "websocket-pool": "^1.3.1"
} }

View File

@ -7,34 +7,36 @@ import createRoundRobin from "@derhuerst/round-robin-scheduler";
// @ts-ignore // @ts-ignore
import {Buffer} from "buffer"; import {Buffer} from "buffer";
// @ts-ignore // @ts-ignore
import {blake2b} from "libskynet"; // @ts-ignore
import {blake2b, errTuple} from "libskynet";
// @ts-ignore // @ts-ignore
import {registryRead} from "libkmodule"; import {registryRead} from "libkmodule";
// @ts-ignore
import {errTuple} from "libskynet";
import {unpack} from "msgpackr"; import {unpack} from "msgpackr";
import randomNumber from "random-number-csprng";
const REGISTRY_DHT_KEY = "lumeweb-dht-node"; const REGISTRY_DHT_KEY = "lumeweb-dht-node";
export default class DHT { export default class DHT {
private _wsPool: createRoundRobin.RoundRobin;
private _options: any; private _options: any;
private _relays: { [pubkey: string]: string } = {}; private _relays: Map<string, string> = new Map();
private _activeRelays: Map<string, typeof DhtNode> = new Map();
private _maxConnections = 10;
private _inited = false;
constructor(opts = {}) { constructor(opts = {}) {
// @ts-ignore // @ts-ignore
opts.custodial = false; opts.custodial = false;
this._options = opts; this._options = opts;
this._wsPool = createRoundRobin();
} }
ready(): Promise<void> { ready(): Promise<void> {
return Promise.resolve(); this._inited = true;
return this.fillConnections();
} }
get relays(): string[] { get relays(): string[] {
return Object.keys(this._relays); return [...this._relays.keys()];
} }
public async addRelay(pubkey: string): Promise<boolean> { public async addRelay(pubkey: string): Promise<boolean> {
@ -61,32 +63,37 @@ export default class DHT {
return false; return false;
} }
const connection = `wss://${domain}:${port}/`; this._relays[pubkey] = `wss://${domain}:${port}/`;
this._wsPool.add(connection); if (this._inited) {
this._relays[pubkey] = connection; await this.fillConnections();
}
return true; return true;
} }
public removeRelay(pubkey: string): boolean { public removeRelay(pubkey: string): boolean {
if (!(pubkey in this._relays)) { if (!this._relays.has(pubkey)) {
return false; return false;
} }
this._wsPool.remove(this._relays[pubkey]);
delete this._relays[pubkey]; if (this._activeRelays.has(pubkey)) {
this._activeRelays.get(pubkey).destroy();
this._activeRelays.delete(pubkey)
}
this._relays.delete(pubkey)
return true; return true;
} }
public clearRelays(): void { public clearRelays(): void {
this._wsPool = createRoundRobin(); [...this._relays.keys()].forEach(this.removeRelay);
this._relays = {};
} }
private async isServerAvailable(connection: string): Promise<boolean> { private async isServerAvailable(connection: string): Promise<boolean> {
return new Promise((resolve) => { return new Promise<boolean>((resolve) => {
const ws = new WebSocket(connection); const ws = new WebSocket(connection);
ws.addEventListener("open", () => { ws.addEventListener("open", () => {
ws.close(); ws.close();
@ -99,27 +106,34 @@ export default class DHT {
} }
async connect(pubkey: string, options = {}): Promise<DhtNode> { async connect(pubkey: string, options = {}): Promise<DhtNode> {
const relay = await this.getAvailableRelay(); if (this._activeRelays.size === 0) {
if (!relay) {
throw new Error("Failed to find an available relay"); throw new Error("Failed to find an available relay");
} }
const node = new DhtNode(new Stream(true, new WebSocket(relay as string)), this._options); const node = this._activeRelays.get([...this._activeRelays.keys()][await randomNumber(0, this._activeRelays.size - 1)]);
await node.ready();
return node.connect(pubkey, options) return node.connect(pubkey, options)
} }
async getAvailableRelay(): Promise<string | boolean> { private async fillConnections(): Promise<any> {
for (let i = 0; i < this._wsPool.length; i++) { let available = [...this._relays.keys()].filter(x => [...this._activeRelays.keys()].includes(x));
const relay = this._wsPool.get(); let relayPromises = [];
if (await this.isServerAvailable(relay)) { while (this._activeRelays.size <= Math.min(this._maxConnections, available.length + this._activeRelays.size)) {
return relay; const relayIndex = await randomNumber(0, available.length - 1);
}
const connection = available[relayIndex];
if (!this.isServerAvailable(connection)) {
continue;
} }
return false; const node = new DhtNode(new Stream(true, new WebSocket(this._activeRelays.get(connection) as string)), this._options);
this._activeRelays.set(available[relayIndex], node);
relayPromises.push(node.ready());
}
return Promise.allSettled(relayPromises);
} }
} }