From 50f21f043450706ea2f218f47d73d6a8aad4fe1f Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sat, 18 Mar 2023 12:11:23 -0400 Subject: [PATCH] *refactor rpc to use new swarm based p2p *remove wisdom query for now --- README.md | 11 +-- package.json | 5 +- src/error.ts | 2 - src/network.ts | 114 ++++++++++++++++++------- src/query/base.ts | 14 ++-- src/query/clearCache.ts | 48 ++++++----- src/query/index.ts | 39 +++------ src/query/simple.ts | 64 ++++++++++----- src/query/wisdom.ts | 178 ---------------------------------------- src/sharedRelay.ts | 34 -------- src/util.ts | 11 ++- 11 files changed, 189 insertions(+), 331 deletions(-) delete mode 100644 src/query/wisdom.ts delete mode 100644 src/sharedRelay.ts diff --git a/README.md b/README.md index 5db12a2..76029be 100644 --- a/README.md +++ b/README.md @@ -1,10 +1 @@ -# dht-rpc-client -A client library that uses hypercore and the https://github.com/LumeWeb/relay server along with Skynet for web, to perform `Wisdom of the crowd` RPC requests. - -This enables access to blockchain RPC without running a node, and socializes the cost of access to RPC from use of services such as https://pokt.network - -As demand grows for users, so should the community. It is expected that both businesses operating on web3 and community members donating/supporting in altruism will ensure the upkeep of this dht. - -It is the projects hope that blockchains will evolve in the future such that much of this infrastructure becomes unneeded and RPC can be done directly with light clients. This would also need to support over Websockets like how Webtorrent works. - -As very few blockchains actually support this and for use with decentralized nodes, this type of dht/technology is required for mainstream adoption. +# rpc-client diff --git a/package.json b/package.json index 84df936..fa0213d 100644 --- a/package.json +++ b/package.json @@ -20,8 +20,11 @@ "@hyperswarm/dht": "^6.0.1", "@lumeweb/rpc": "git+https://git.lumeweb.com/LumeWeb/rpc.git", "b4a": "^1.6.1", + "hypercore-crypto": "^3.3.1", + "hyperswarm": "^4.3.7", "json-stringify-deterministic": "^1.0.7", "libskynet": "^0.0.61", - "msgpackr": "^1.6.1" + "msgpackr": "^1.6.1", + "sodium-universal": "^4.0.0" } } diff --git a/src/error.ts b/src/error.ts index 35daffb..3a15776 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1,3 +1 @@ -export const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT"; export const ERR_INVALID_SIGNATURE = "INVALID_SIGNATURE"; -export const ERR_NO_RELAYS = "NO_RELAYS"; diff --git a/src/network.ts b/src/network.ts index 73f9de7..834349b 100644 --- a/src/network.ts +++ b/src/network.ts @@ -1,10 +1,21 @@ // @ts-ignore -import DHT from "@hyperswarm/dht"; +import Hyperswarm from "hyperswarm"; import RpcNetworkQueryFactory from "./query/index.js"; +import b4a from "b4a"; +import { createHash } from "./util.js"; export default class RpcNetwork { - constructor(dht = new DHT()) { - this._dht = dht; + private _relaysAvailablePromise?: Promise; + private _relaysAvailableResolve?: Function; + constructor(swarm = new Hyperswarm()) { + this._swarm = swarm; + this.init(); + } + + private _methods: Map> = new Map>(); + + get methods(): Map> { + return this._methods; } private _factory = new RpcNetworkQueryFactory(this); @@ -13,10 +24,10 @@ export default class RpcNetwork { return this._factory; } - private _dht: typeof DHT; + private _swarm: typeof Hyperswarm; - get dht() { - return this._dht; + get swarm() { + return this._swarm; } private _majorityThreshold = 0.75; @@ -49,9 +60,9 @@ export default class RpcNetwork { this._relayTimeout = value; } - private _relays: string[] = []; + private _relays: Map = new Map(); - get relays(): string[] { + get relays(): Map { return this._relays; } @@ -59,12 +70,16 @@ export default class RpcNetwork { get ready(): Promise { if (!this._ready) { - this._ready = this._dht.ready() as Promise; + this._ready = this._swarm.dht.ready() as Promise; } return this._ready; } + get readyWithRelays(): Promise { + return this.ready.then(() => this._relaysAvailablePromise); + } + private _bypassCache: boolean = false; get bypassCache(): boolean { @@ -75,33 +90,70 @@ export default class RpcNetwork { this._bypassCache = value; } - private _maxRelays: number = 0; + public getAvailableRelay(module: string, method: string) { + method = `${module}.${method}`; - get maxRelays(): number { - return this._maxRelays; - } + let relays = this._methods.get(method) ?? new Set(); - set maxRelays(value: number) { - this._maxRelays = value; - } - - public addRelay(pubkey: string): void { - this._relays.push(pubkey); - this._relays = [...new Set(this._relays)]; - } - - public removeRelay(pubkey: string): boolean { - if (!this._relays.includes(pubkey)) { - return false; + if (!relays.size) { + throw Error("no available relay"); } - delete this._relays[this._relays.indexOf(pubkey)]; - this._relays = Object.values(this._relays); - - return true; + return Array.from(relays)[Math.floor(Math.random() * relays.size)]; } - public clearRelays(): void { - this._relays = []; + public getRelay(pubkey: string) { + if (this._relays.has(pubkey)) { + return this._relays.get(pubkey); + } + + return undefined; + } + + private init() { + this._swarm.join(createHash("lumeweb")); + this.setupRelayPromise(); + + this._swarm.on("connection", async (relay: any) => { + const query = this._factory.simple({ + relay, + query: { module: "core", method: "get_methods", data: null }, + }); + const resp = await query.result; + + const pubkey = b4a.from(relay.remotePublicKey).toString("hex"); + + if (resp.data) { + this._relays.set(pubkey, relay); + + (resp.data as string[]).forEach((item) => { + const methods: Set = + this._methods.get(item) ?? new Set(); + + methods.add(pubkey); + this._methods.set(item, methods); + }); + this._relaysAvailableResolve?.(); + } + + relay.on("close", () => { + this._methods.forEach((item) => { + if (item.has(pubkey)) { + item.delete(pubkey); + } + }); + this.relays.delete(pubkey); + + if (!this._relays.size) { + this.setupRelayPromise(); + } + }); + }); + } + + private setupRelayPromise() { + this._relaysAvailablePromise = new Promise((resolve) => { + this._relaysAvailableResolve = resolve; + }); } } diff --git a/src/query/base.ts b/src/query/base.ts index 6d1a268..02baf7e 100644 --- a/src/query/base.ts +++ b/src/query/base.ts @@ -20,11 +20,15 @@ export default abstract class RpcQueryBase { protected _error?: string; protected _promiseResolve?: (data: any) => void; - constructor( - network: RpcNetwork, - query: ClientRPCRequest | RPCRequest, - options: RpcQueryOptions = {} - ) { + constructor({ + network, + query, + options = {}, + }: { + network: RpcNetwork; + query: ClientRPCRequest | RPCRequest; + options: RpcQueryOptions; + }) { this._network = network; this._query = query; this._options = options; diff --git a/src/query/clearCache.ts b/src/query/clearCache.ts index d92afa3..3963f1e 100644 --- a/src/query/clearCache.ts +++ b/src/query/clearCache.ts @@ -2,24 +2,27 @@ import RpcNetwork from "../network.js"; import { RPCBroadcastRequest, RPCRequest } from "@lumeweb/relay-types"; import { RpcQueryOptions } from "../types.js"; import { hashQuery } from "../util.js"; -import { getActiveRelay, setupRelay } from "../sharedRelay.js"; import SimpleRpcQuery from "./simple.js"; export default class ClearCacheRpcQuery extends SimpleRpcQuery { protected _relays: string[]; - constructor( - network: RpcNetwork, - relays: string[], - query: RPCRequest, - options: RpcQueryOptions - ) { - super(network, "", query, options); + constructor({ + network, + relays, + query, + options, + }: { + network: RpcNetwork; + relays: string[]; + query: RPCRequest; + options: RpcQueryOptions; + }) { + super({ network, relay: "", query, options }); this._relays = relays; } protected async _run(): Promise { - await setupRelay(this._network); // @ts-ignore this._relay = getActiveRelay().stream.remotePublicKey; await this.queryRelay(); @@ -27,17 +30,20 @@ export default class ClearCacheRpcQuery extends SimpleRpcQuery { } protected async queryRelay(): Promise { - return this.queryRpc(getActiveRelay(), { - module: "rpc", - method: "broadcast_request", - data: { - request: { - module: "rpc", - method: "clear_cached_item", - data: hashQuery(this._query), - }, - relays: this._relays, - } as RPCBroadcastRequest, - }); + return this.queryRpc( + this._network.getAvailableRelay("rpc", "broadcast_request"), + { + module: "rpc", + method: "broadcast_request", + data: { + request: { + module: "rpc", + method: "clear_cached_item", + data: hashQuery(this._query), + }, + relays: this._relays, + } as RPCBroadcastRequest, + } + ); } } diff --git a/src/query/index.ts b/src/query/index.ts index 1ffa92e..fc1ecd9 100644 --- a/src/query/index.ts +++ b/src/query/index.ts @@ -1,6 +1,5 @@ import { ClientRPCRequest, RPCRequest } from "@lumeweb/relay-types"; import { RpcQueryOptions } from "../types.js"; -import WisdomRpcQuery from "./wisdom.js"; import SimpleRpcQuery from "./simple.js"; import ClearCacheRpcQuery from "./clearCache.js"; import RpcNetwork from "../network.js"; @@ -13,23 +12,6 @@ export default class RpcNetworkQueryFactory { this._network = network; } - wisdom({ - query, - options = {}, - }: { - query: ClientRPCRequest; - options?: RpcQueryOptions; - }): WisdomRpcQuery { - return new WisdomRpcQuery( - this._network, - { - ...query, - bypassCache: query.bypassCache || this._network.bypassCache, - }, - options - ).run(); - } - simple({ relay, query, @@ -39,15 +21,15 @@ export default class RpcNetworkQueryFactory { query: ClientRPCRequest; options?: RpcQueryOptions; }): SimpleRpcQuery { - return new SimpleRpcQuery( - this._network, + return new SimpleRpcQuery({ + network: this._network, relay, - { + query: { ...query, - bypassCache: query.bypassCache || this._network.bypassCache, + bypassCache: query?.bypassCache || this._network.bypassCache, }, - options - ).run(); + options, + }).run(); } clearCache({ @@ -59,8 +41,13 @@ export default class RpcNetworkQueryFactory { query: RPCRequest; options?: RpcQueryOptions; }): ClearCacheRpcQuery { - return new ClearCacheRpcQuery(this._network, relays, query, options).run(); + return new ClearCacheRpcQuery({ + network: this._network, + query, + relays, + options, + }).run(); } } -export { RpcNetwork, RpcQueryBase, SimpleRpcQuery, WisdomRpcQuery }; +export { RpcNetwork, RpcQueryBase, SimpleRpcQuery }; diff --git a/src/query/simple.ts b/src/query/simple.ts index ed15d15..3a1101e 100644 --- a/src/query/simple.ts +++ b/src/query/simple.ts @@ -6,18 +6,24 @@ import { hashQuery, isPromise, validateTimestampedResponse } from "../util.js"; import RPC from "@lumeweb/rpc"; import { ERR_INVALID_SIGNATURE } from "../error.js"; import RpcQueryBase from "./base.js"; +import { query } from "express"; export default class SimpleRpcQuery extends RpcQueryBase { - protected _relay: string; + protected _relay?: string | any; protected declare _query: ClientRPCRequest; - constructor( - network: RpcNetwork, - relay: string, - query: ClientRPCRequest, - options: RpcQueryOptions - ) { - super(network, query, options); + constructor({ + network, + relay, + query, + options, + }: { + network: RpcNetwork; + relay?: string | any; + query: ClientRPCRequest; + options: RpcQueryOptions; + }) { + super({ network, query, options }); this._relay = relay; } @@ -27,16 +33,37 @@ export default class SimpleRpcQuery extends RpcQueryBase { } protected async queryRelay(): Promise { - let socket: any; + let socket = this._relay; - try { - socket = this._network.dht.connect(b4a.from(this._relay, "hex")); - if (isPromise(socket)) { - socket = await socket; + if (socket) { + if (socket === "string") { + try { + const relay = this._network.getRelay(socket); + if (this._network.getRelay(socket)) { + socket = relay; + } + } catch {} + } + + if (socket === "string") { + try { + socket = this._network.swarm.connect(b4a.from(this._relay, "hex")); + if (isPromise(socket)) { + socket = await socket; + } + } catch {} } - } catch (e) { - return; } + + if (!socket) { + socket = this._network.getAvailableRelay( + this._query.module, + this._query.method + ); + } + + this._relay = socket; + await socket.opened; const rpc = new RPC(socket); @@ -57,13 +84,8 @@ export default class SimpleRpcQuery extends RpcQueryBase { try { await this.queryRpc(rpc, this._query); } catch (e: any) { - // @ts-ignore - rpc.end(); throw e; } - - // @ts-ignore - rpc.end(); } protected async checkResponses() { @@ -76,7 +98,7 @@ export default class SimpleRpcQuery extends RpcQueryBase { if ( !response.error && !validateTimestampedResponse( - b4a.from(this._relay, "hex") as Buffer, + b4a.from(this._relay.remotePublicKey, "hex") as Buffer, response ) ) { diff --git a/src/query/wisdom.ts b/src/query/wisdom.ts deleted file mode 100644 index 2709bf9..0000000 --- a/src/query/wisdom.ts +++ /dev/null @@ -1,178 +0,0 @@ -import { - ClientRPCRequest, - RPCBroadcastResponse, - RPCRequest, - RPCResponse, -} from "@lumeweb/relay-types"; -import { clearTimeout } from "timers"; -import b4a from "b4a"; -import { - flatten, - validateResponse, - validateTimestampedResponse, -} from "../util.js"; -import { blake2b } from "libskynet"; -import { ERR_INVALID_SIGNATURE, ERR_NO_RELAYS } from "../error.js"; -import RpcQueryBase from "./base.js"; -import { getActiveRelay, setupRelay } from "../sharedRelay.js"; - -function flatHash(data: any) { - const flattenedData = flatten(data).sort(); - return Buffer.from( - blake2b(Buffer.from(JSON.stringify(flattenedData))) - ).toString("hex"); -} - -export default class WisdomRpcQuery extends RpcQueryBase { - protected declare _response?: RPCBroadcastResponse; - protected declare _query: ClientRPCRequest; - get result(): Promise { - return this._promise as Promise; - } - - protected async _run(): Promise { - await setupRelay(this._network); - await this.queryRelay(); - await this.checkResponse(); - } - - protected resolve(data?: RPCResponse, timeout: boolean = false): void { - clearTimeout(this._timeoutTimer); - this._timeout = timeout; - this._completed = true; - - if (timeout) { - data = { - error: "timeout", - }; - } - - this._promiseResolve?.(data); - } - - protected async queryRelay(): Promise { - let activeRelay = getActiveRelay(); - let relays = this.getRelays(); - - if (!relays.length) { - throw new Error(ERR_NO_RELAYS); - } - - if (this._query.bypassCache) { - delete this._query.bypassCache; - const clearCacheQuery = this._network.factory.clearCache({ - relays, - query: this._query, - }); - await clearCacheQuery.result; - } - - if ("bypassCache" in this._query) { - delete this._query.bypassCache; - } - return this.queryRpc(activeRelay, { - module: "rpc", - method: "broadcast_request", - data: { - request: this._query, - relays, - }, - } as RPCRequest); - } - - protected async checkResponse() { - if (this._error) { - this.resolve({ error: this._error }); - return; - } - - if ( - !validateResponse( - // @ts-ignore - getActiveRelay().stream.remotePublicKey, - this._response as RPCResponse - ) - ) { - this.resolve({ error: ERR_INVALID_SIGNATURE }); - return; - } - - let relays: RPCResponse[] = []; - - for (const relay in this._response?.relays) { - const resp = this._response?.relays[relay]; - if (resp?.error) { - continue; - } - if ( - validateTimestampedResponse( - b4a.from(relay, "hex") as Buffer, - resp as RPCResponse - ) - ) { - relays.push(resp as RPCResponse); - } - } - - if (!relays.length) { - this.resolve({ error: ERR_NO_RELAYS }); - return; - } - - type ResponseGroup = { [response: string]: number }; - - const responseObjects = relays.reduce((output: any, item: RPCResponse) => { - const field = item.signedField || "data"; - // @ts-ignore - const hash = flatHash(item[field]); - output[hash] = item?.data; - return output; - }, {}); - - const responses: ResponseGroup = relays.reduce( - (output: ResponseGroup, item: RPCResponse) => { - const field = item.signedField || "data"; - // @ts-ignore - const hash = flatHash(item[field]); - output[hash] = output[hash] ?? 0; - output[hash]++; - return output; - }, - {} - ); - - for (const responseHash in responses) { - if ( - responses[responseHash] / relays.length >= - this._network.majorityThreshold - ) { - let response: RPCResponse = responseObjects[responseHash]; - - response = { data: response }; - - this.resolve(response); - break; - } - } - } - - protected getRelays(): string[] { - if ( - this._network.maxRelays === 0 || - this._network.relays.length <= this._network.maxRelays - ) { - return this._network.relays; - } - - const list: string[] = []; - let available = this._network.relays; - - while (list.length <= this._network.maxRelays) { - const item = Math.floor(Math.random() * available.length); - list.push(available[item]); - available.splice(item, 1); - } - - return list; - } -} diff --git a/src/sharedRelay.ts b/src/sharedRelay.ts deleted file mode 100644 index 003439d..0000000 --- a/src/sharedRelay.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { ERR_NO_RELAYS } from "./error.js"; -import b4a from "b4a"; -import { isPromise } from "./util.js"; -import RPC from "@lumeweb/rpc"; -import RpcNetwork from "./network.js"; - -let activeRelay: RPC | undefined; - -export async function setupRelay(network: RpcNetwork) { - const relays = network.relays; - - if (!activeRelay) { - if (!relays.length) { - throw new Error(ERR_NO_RELAYS); - } - - let relay = relays[Math.floor(Math.random() * relays.length)]; - let socket = network.dht.connect(b4a.from(relay, "hex")); - if (isPromise(socket)) { - socket = await socket; - } - - await socket.opened; - - activeRelay = new RPC(socket); - socket.once("close", () => { - activeRelay = undefined; - }); - } -} - -export function getActiveRelay(): RPC { - return activeRelay as RPC; -} diff --git a/src/util.ts b/src/util.ts index e751481..beafd33 100644 --- a/src/util.ts +++ b/src/util.ts @@ -69,8 +69,7 @@ export function validateResponse( ): boolean { const field = response.signedField || "data"; // @ts-ignore - const data = response[field]; - let json = data; + let json = response[field]; if (typeof json !== "string") { json = stringify(json); } @@ -105,3 +104,11 @@ export function hashQuery(query: RPCRequest): string { return queryHash.toString("hex"); } + +export function createHash(data: string): Buffer { + const buffer = b4a.from(data); + let hash = b4a.allocUnsafe(32) as Buffer; + sodium.crypto_generichash(hash, buffer); + + return hash; +}