diff --git a/package.json b/package.json index 3341f58..eea20e6 100644 --- a/package.json +++ b/package.json @@ -7,6 +7,7 @@ "build": "rimraf dist && tsc" }, "devDependencies": { + "@types/random-number-csprng": "^1.0.0", "@types/ws": "^8.5.3", "esbuild": "^0.14.48", "prettier": "^2.7.1", @@ -20,6 +21,7 @@ "libkmodule": "^0.2.12", "libskynet": "^0.0.48", "msgpackr": "^1.6.1", + "random-number-csprng": "^1.0.2", "safe-buffer": "^5.2.1", "websocket-pool": "^1.3.1" } diff --git a/src/index.ts b/src/index.ts index 1c37089..0351918 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,34 +7,36 @@ import createRoundRobin from "@derhuerst/round-robin-scheduler"; // @ts-ignore import {Buffer} from "buffer"; // @ts-ignore -import {blake2b} from "libskynet"; +// @ts-ignore +import {blake2b, errTuple} from "libskynet"; // @ts-ignore import {registryRead} from "libkmodule"; -// @ts-ignore -import {errTuple} from "libskynet"; import {unpack} from "msgpackr"; +import randomNumber from "random-number-csprng"; const REGISTRY_DHT_KEY = "lumeweb-dht-node"; export default class DHT { - private _wsPool: createRoundRobin.RoundRobin; private _options: any; - private _relays: { [pubkey: string]: string } = {}; + private _relays: Map = new Map(); + private _activeRelays: Map = new Map(); + private _maxConnections = 10; + private _inited = false; constructor(opts = {}) { // @ts-ignore opts.custodial = false; this._options = opts; - this._wsPool = createRoundRobin(); } ready(): Promise { - return Promise.resolve(); + this._inited = true; + return this.fillConnections(); } get relays(): string[] { - return Object.keys(this._relays); + return [...this._relays.keys()]; } public async addRelay(pubkey: string): Promise { @@ -61,32 +63,37 @@ export default class DHT { return false; } - const connection = `wss://${domain}:${port}/`; + this._relays[pubkey] = `wss://${domain}:${port}/`; - this._wsPool.add(connection); - this._relays[pubkey] = connection; + if (this._inited) { + await this.fillConnections(); + } return true; } public removeRelay(pubkey: string): boolean { - if (!(pubkey in this._relays)) { + if (!this._relays.has(pubkey)) { return false; } - this._wsPool.remove(this._relays[pubkey]); - delete this._relays[pubkey]; + + if (this._activeRelays.has(pubkey)) { + this._activeRelays.get(pubkey).destroy(); + this._activeRelays.delete(pubkey) + } + + this._relays.delete(pubkey) return true; } public clearRelays(): void { - this._wsPool = createRoundRobin(); - this._relays = {}; + [...this._relays.keys()].forEach(this.removeRelay); } private async isServerAvailable(connection: string): Promise { - return new Promise((resolve) => { + return new Promise((resolve) => { const ws = new WebSocket(connection); ws.addEventListener("open", () => { ws.close(); @@ -99,27 +106,34 @@ export default class DHT { } async connect(pubkey: string, options = {}): Promise { - const relay = await this.getAvailableRelay(); - - if (!relay) { + if (this._activeRelays.size === 0) { throw new Error("Failed to find an available relay"); } - const node = new DhtNode(new Stream(true, new WebSocket(relay as string)), this._options); - await node.ready(); + const node = this._activeRelays.get([...this._activeRelays.keys()][await randomNumber(0, this._activeRelays.size - 1)]); return node.connect(pubkey, options) } - async getAvailableRelay(): Promise { - for (let i = 0; i < this._wsPool.length; i++) { - const relay = this._wsPool.get(); - if (await this.isServerAvailable(relay)) { - return relay; + private async fillConnections(): Promise { + let available = [...this._relays.keys()].filter(x => [...this._activeRelays.keys()].includes(x)); + let relayPromises = []; + while (this._activeRelays.size <= Math.min(this._maxConnections, available.length + this._activeRelays.size)) { + const relayIndex = await randomNumber(0, available.length - 1); + + const connection = available[relayIndex]; + + if (!this.isServerAvailable(connection)) { + continue; } + + const node = new DhtNode(new Stream(true, new WebSocket(this._activeRelays.get(connection) as string)), this._options); + this._activeRelays.set(available[relayIndex], node); + + relayPromises.push(node.ready()); } - return false; + return Promise.allSettled(relayPromises); } }