From 7263ecf9074a028d14db32684d35376c4dd7aa94 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sun, 4 Dec 2022 02:40:36 -0500 Subject: [PATCH] *Epic refactor based on new RPC query design and protocol --- package.json | 9 +- src/error.ts | 2 + src/index.ts | 9 +- src/network.ts | 37 ++++---- src/query/base.ts | 116 +++++++++++-------------- src/query/simple.ts | 78 +++++++++++++---- src/query/streaming.ts | 94 --------------------- src/query/wisdom.ts | 186 +++++++++++++++++++++++++++++------------ src/types.ts | 5 -- src/util.ts | 53 +++++++++--- 10 files changed, 314 insertions(+), 275 deletions(-) delete mode 100644 src/query/streaming.ts diff --git a/package.json b/package.json index a2c048d..8f12279 100644 --- a/package.json +++ b/package.json @@ -8,14 +8,19 @@ "build": "rimraf dist && tsc" }, "devDependencies": { - "@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git", - "@types/json-stable-stringify": "^1.0.34", + "@lumeweb/relay-types": "https://git.lumeweb.com/LumeWeb/relay-types.git", + "@types/b4a": "^1.6.0", + "@types/express": "^4.17.14", "@types/node": "^18.0.0", + "node-cache": "^5.1.2", "prettier": "^2.7.1", "typescript": "^4.7.4" }, "dependencies": { "@hyperswarm/dht": "^6.0.1", + "@lumeweb/rpc": "https://git.lumeweb.com/LumeWeb/rpc.git", + "b4a": "^1.6.1", + "json-stringify-deterministic": "^1.0.7", "libskynet": "^0.0.61", "msgpackr": "^1.6.1" } diff --git a/src/error.ts b/src/error.ts index efb866d..35daffb 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1 +1,3 @@ export const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT"; +export const ERR_INVALID_SIGNATURE = "INVALID_SIGNATURE"; +export const ERR_NO_RELAYS = "NO_RELAYS"; diff --git a/src/index.ts b/src/index.ts index e775698..d63353f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,15 +1,8 @@ import RpcNetwork from "./network.js"; import RpcQueryBase from "./query/base.js"; import SimpleRpcQuery from "./query/simple.js"; -import StreamingRpcQuery from "./query/streaming.js"; import WisdomRpcQuery from "./query/wisdom.js"; export * from "./types.js"; -export { - RpcNetwork, - RpcQueryBase, - SimpleRpcQuery, - StreamingRpcQuery, - WisdomRpcQuery, -}; +export { RpcNetwork, RpcQueryBase, SimpleRpcQuery, WisdomRpcQuery }; diff --git a/src/network.ts b/src/network.ts index c08adf8..288b35b 100644 --- a/src/network.ts +++ b/src/network.ts @@ -1,15 +1,22 @@ -import WisdomRpcQuery from "./query/wisdom.js"; // @ts-ignore import DHT from "@hyperswarm/dht"; -import StreamingRpcQuery from "./query/streaming.js"; -import { RpcQueryOptions, StreamHandlerFunction } from "./types.js"; +import b4a from "b4a"; +import RPC from "@lumeweb/rpc"; +import { isPromise } from "./util.js"; import SimpleRpcQuery from "./query/simple.js"; +import WisdomRpcQuery from "./query/wisdom.js"; export default class RpcNetwork { constructor(dht = new DHT()) { this._dht = dht; } + private _activeRelay?: RPC; + + get activeRelay(): RPC { + return this._activeRelay as RPC; + } + private _dht: typeof DHT; get dht() { @@ -58,6 +65,7 @@ export default class RpcNetwork { if (!this._ready) { this._ready = this._dht.ready() as Promise; } + return this._ready; } @@ -106,7 +114,7 @@ export default class RpcNetwork { module: string, data: object | any[] = {}, bypassCache: boolean = false, - options: RpcQueryOptions = {} + options = {} ): WisdomRpcQuery { return new WisdomRpcQuery( this, @@ -119,30 +127,13 @@ export default class RpcNetwork { options ).run(); } - - public streamingQuery( - relay: Buffer | string, - method: string, - module: string, - streamHandler: StreamHandlerFunction, - data: object | any[] = {}, - options: RpcQueryOptions = {} - ): StreamingRpcQuery { - return new StreamingRpcQuery( - this, - relay, - { method, module, data }, - { ...options, streamHandler } - ).run(); - } - public simpleQuery( - relay: Buffer | string, + relay: string, method: string, module: string, data: object | any[] = {}, bypassCache: boolean = false, - options: RpcQueryOptions = {} + options: {} ): SimpleRpcQuery { return new SimpleRpcQuery( this, diff --git a/src/query/base.ts b/src/query/base.ts index 434fa32..ad7a67a 100644 --- a/src/query/base.ts +++ b/src/query/base.ts @@ -4,7 +4,13 @@ import { Buffer } from "buffer"; import { isPromise } from "../util.js"; import RpcNetwork from "../network.js"; import { RpcQueryOptions } from "../types.js"; -import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types"; +import type { + ClientRPCRequest, + RPCRequest, + RPCResponse, +} from "@lumeweb/relay-types"; +import RPC from "@lumeweb/rpc"; +import { RPCBroadcastRequest } from "@lumeweb/relay-types"; export default abstract class RpcQueryBase { protected _network: RpcNetwork; @@ -15,13 +21,13 @@ export default abstract class RpcQueryBase { protected _timeoutTimer?: any; protected _timeout: boolean = false; protected _completed: boolean = false; - protected _responses: { [relay: string]: RPCResponse } = {}; - protected _errors: { [relay: string]: any } = {}; + protected _response?: RPCResponse; + protected _error?: string; protected _promiseResolve?: (data: any) => void; constructor( network: RpcNetwork, - query: RPCRequest, + query: ClientRPCRequest | RPCRequest, options: RpcQueryOptions = {} ) { this._network = network; @@ -33,7 +39,7 @@ export default abstract class RpcQueryBase { return this._promise as Promise; } - private handeTimeout() { + protected handeTimeout() { this.resolve(undefined, true); } @@ -62,75 +68,55 @@ export default abstract class RpcQueryBase { this._timeoutTimer ?? setTimeout( this.handeTimeout.bind(this), - (this._options.queryTimeout || this._network.queryTimeout) * 1000 + (this._options?.queryTimeout || this._network.queryTimeout) * 1000 ); - this._network.ready.then(() => { - const promises = []; - - for (const relay of this.getRelays()) { - promises.push(this.queryRelay(relay)); - } - - Promise.allSettled(promises).then(() => this.checkResponses()); - }); + this._doRun(); return this; } - protected async queryRelay(relay: string | Buffer): Promise { - let socket: any; - - let relayKey: Buffer = relay as Buffer; - - if (typeof relay === "string") { - relayKey = Buffer.from(relay, "hex"); - } - if (relay instanceof Buffer) { - relayKey = relay; - relay = relay.toString("hex"); - } - + private async _doRun() { try { - socket = this._network.dht.connect(relayKey); - if (isPromise(socket)) { - socket = await socket; - } - } catch (e) { - return; + await this._network.ready; + await this._run(); + } catch (e: any) { + this._promiseResolve?.({ error: e.message }); } - return new Promise((resolve, reject) => { - let timer: any; - socket.on("data", (res: Buffer) => { - relay = relay as string; - if (timer) { - clearTimeout(timer as any); - timer = null; - } - socket.end(); - const response = unpack(res as any) as RPCResponse; - if (response && response.error) { - this._errors[relay] = response.error; - return reject(null); - } - this._responses[relay] = response; - resolve(null); - }); - socket.on("error", (error: any) => { - relay = relay as string; - this._errors[relay] = error; - reject({ error }); - }); - socket.write("rpc"); - socket.write(pack(this._query)); - timer = setTimeout(() => { - this._errors[relay as string] = "timeout"; - reject(null); - }, (this._options.relayTimeout || this._network.relayTimeout) * 1000) as NodeJS.Timeout; - }); } - protected abstract checkResponses(): void; + protected setupRelayTimeout(reject: Function): NodeJS.Timeout { + return setTimeout(() => { + this._error = "timeout"; + reject("timeout"); + }, (this._options.relayTimeout || this._network.relayTimeout) * 1000) as NodeJS.Timeout; + } - protected abstract getRelays(): string[] | Buffer[]; + protected abstract _run(): void; + + protected async queryRpc(rpc: any, request: RPCRequest) { + let timer: NodeJS.Timeout; + + return new Promise((resolve, reject) => { + rpc + // @ts-ignore + .request(`${request.module}.${request.method}`, request.data) + .then((resp: any) => { + if (resp.error) { + throw new Error(resp.error); + } + clearTimeout(timer as any); + + this._response = resp; + resolve(null); + }) + .catch((e: Error) => { + this._error = e.message; + reject({ error: e.message }); + clearTimeout(timer as any); + }); + + timer = this.setupRelayTimeout(reject); + }); + } } diff --git a/src/query/simple.ts b/src/query/simple.ts index 5f3f7c5..3003d6b 100644 --- a/src/query/simple.ts +++ b/src/query/simple.ts @@ -1,34 +1,84 @@ -import RpcQueryBase from "./base.js"; import RpcNetwork from "../network.js"; -import type { RPCRequest } from "@lumeweb/relay-types"; +import { + ClientRPCRequest, + RPCBroadcastRequest, + RPCRequest, + RPCResponse, +} from "@lumeweb/relay-types"; import { RpcQueryOptions } from "../types.js"; -import type { Buffer } from "buffer"; +import { clearTimeout, setTimeout } from "timers"; +import b4a from "b4a"; +import { + isPromise, + validateResponse, + validateTimestampedResponse, +} from "../util.js"; +import RPC from "@lumeweb/rpc"; +import { ERR_INVALID_SIGNATURE } from "../error.js"; +import RpcQueryBase from "./base.js"; export default class SimpleRpcQuery extends RpcQueryBase { - private _relay: string | Buffer; + protected _relay: string; + constructor( network: RpcNetwork, - relay: string | Buffer, - query: RPCRequest, + relay: string, + query: ClientRPCRequest, options: RpcQueryOptions ) { super(network, query, options); this._relay = relay; } - protected checkResponses(): void { - if (Object.keys(this._responses).length) { - this.resolve(Object.values(this._responses).pop()); + protected async _run(): Promise { + await this.queryRelay(); + await this.checkResponses(); + } + + protected async queryRelay(): Promise { + let socket: any; + + try { + socket = this._network.dht.connect(b4a.from(this._relay, "hex")); + if (isPromise(socket)) { + socket = await socket; + } + } catch (e) { return; } + await socket.opened; - if (Object.keys(this._errors).length) { - const error = Object.values(this._errors).pop(); - this.resolve(error, error === "timeout"); + const rpc = new RPC(socket); + + try { + await this.queryRpc(rpc, this._query); + } catch (e: any) { + // @ts-ignore + rpc.end(); + throw e; } + + // @ts-ignore + rpc.end(); } - protected getRelays(): string[] | Buffer[] { - return [this._relay] as string[] | Buffer[]; + protected async checkResponses() { + let response: RPCResponse = this._response as RPCResponse; + + if (this._error) { + response = { error: this._error }; + } + + if ( + !response.error && + !validateTimestampedResponse( + b4a.from(this._relay, "hex") as Buffer, + response + ) + ) { + response = { error: ERR_INVALID_SIGNATURE }; + } + + this.resolve(response); } } diff --git a/src/query/streaming.ts b/src/query/streaming.ts deleted file mode 100644 index 935657c..0000000 --- a/src/query/streaming.ts +++ /dev/null @@ -1,94 +0,0 @@ -import SimpleRpcQuery from "./simple.js"; -import { Buffer } from "buffer"; -import { isPromise } from "../util.js"; -import { clearTimeout, setTimeout } from "timers"; -import { pack, unpack } from "msgpackr"; -import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types"; -import RpcNetwork from "../network.js"; -import { StreamingRpcQueryOptions } from "../types.js"; - -export default class StreamingRpcQuery extends SimpleRpcQuery { - protected _options: StreamingRpcQueryOptions; - protected _canceled = false; - constructor( - network: RpcNetwork, - relay: string | Buffer, - query: RPCRequest, - options: StreamingRpcQueryOptions - ) { - super(network, relay, query, options); - this._options = options; - } - - public cancel() { - this._canceled = true; - } - - protected async queryRelay(relay: string | Buffer): Promise { - let socket: any; - - let relayKey: Buffer = relay as Buffer; - - if (relay === "string") { - relayKey = Buffer.from(relay, "hex"); - } - if (relay instanceof Buffer) { - relayKey = relay; - relay = relay.toString("hex"); - } - - try { - socket = this._network.dht.connect(relayKey); - if (isPromise(socket)) { - socket = await socket; - } - } catch (e) { - return; - } - return new Promise((resolve, reject) => { - const finish = () => { - relay = relay as string; - this._responses[relay] = {}; - resolve(null); - socket.end(); - }; - - const listener = (res: Buffer) => { - relay = relay as string; - if (this._timeoutTimer) { - clearTimeout(this._timeoutTimer as any); - this._timeoutTimer = null; - } - - if (this._canceled) { - socket.write(pack({ cancel: true })); - socket.off("data", listener); - finish(); - return; - } - - const response = unpack(res as any) as RPCResponse; - if (response && response.error) { - this._errors[relay] = response.error; - return reject(null); - } - - if (response?.data.done) { - finish(); - return; - } - - this._options.streamHandler(response?.data.data); - }; - - socket.on("data", listener); - socket.on("error", (error: any) => { - relay = relay as string; - this._errors[relay] = error; - reject({ error }); - }); - socket.write("rpc"); - socket.write(pack(this._query)); - }); - } -} diff --git a/src/query/wisdom.ts b/src/query/wisdom.ts index 652c991..e0905ec 100644 --- a/src/query/wisdom.ts +++ b/src/query/wisdom.ts @@ -1,9 +1,21 @@ -import RpcQueryBase from "./base.js"; -import { flatten } from "../util.js"; -import { Buffer } from "buffer"; -import type { RPCResponse } from "@lumeweb/relay-types"; +import { + RPCBroadcastRequest, + RPCBroadcastResponse, + RPCRequest, + RPCResponse, +} from "@lumeweb/relay-types"; +import { clearTimeout } from "timers"; +import b4a from "b4a"; +import { + flatten, + isPromise, + validateResponse, + validateTimestampedResponse, +} from "../util.js"; +import RPC from "@lumeweb/rpc"; import { blake2b } from "libskynet"; -import { ERR_MAX_TRIES_HIT } from "../error.js"; +import { ERR_INVALID_SIGNATURE, ERR_NO_RELAYS } from "../error.js"; +import RpcQueryBase from "./base.js"; function flatHash(data: any) { const flattenedData = flatten(data).sort(); @@ -13,23 +25,105 @@ function flatHash(data: any) { } export default class WisdomRpcQuery extends RpcQueryBase { - private _maxTries = 3; - private _tries = 0; + protected declare _response?: RPCBroadcastResponse; + private static _activeRelay: any; - protected checkResponses(): void { - const responseStore = this._responses; - const responseStoreData = Object.values(responseStore); + static get activeRelay(): any { + return this._activeRelay; + } + + get result(): Promise { + return this._promise as Promise; + } + + protected async _run(): Promise { + await this.setupRelay(); + await this.queryRelay(); + await this.checkResponse(); + } + + protected resolve(data?: RPCResponse, timeout: boolean = false): void { + clearTimeout(this._timeoutTimer); + this._timeout = timeout; + this._completed = true; + + if (timeout) { + data = { + error: "timeout", + }; + } + + this._promiseResolve?.(data); + } + + protected async queryRelay(): Promise { + let activeRelay = WisdomRpcQuery.activeRelay; + let relays = this.getRelays(); + + if (!relays.length) { + throw new Error(ERR_NO_RELAYS); + } + + return this.queryRpc(activeRelay, { + module: "rpc", + method: "broadcast_request", + data: { + request: this._query, + relays, + }, + } as RPCRequest); + } + + protected async checkResponse() { + if (this._error) { + this.resolve({ error: this._error }); + return; + } + + if ( + !validateResponse( + WisdomRpcQuery.activeRelay.stream.remotePublicKey, + this._response as RPCResponse + ) + ) { + this.resolve({ error: ERR_INVALID_SIGNATURE }); + return; + } + + let relays: RPCResponse[] = []; + + for (const relay in this._response?.relays) { + const resp = this._response?.relays[relay]; + if ( + validateTimestampedResponse( + b4a.from(relay, "hex") as Buffer, + resp as RPCResponse + ) + ) { + relays.push(resp as RPCResponse); + } + } + + if (!relays.length) { + this.resolve({ error: ERR_NO_RELAYS }); + return; + } type ResponseGroup = { [response: string]: number }; - const responseObjects = responseStoreData.reduce((output: any, item) => { - const hash = flatHash(item?.data); + const responseObjects = relays.reduce((output: any, item: RPCResponse) => { + const field = item.signedField || "data"; + // @ts-ignore + const hash = flatHash(item[field]); output[hash] = item?.data; return output; }, {}); - const responses: ResponseGroup = responseStoreData.reduce( - (output: ResponseGroup, item) => { - const hash = flatHash(item?.data); + + const responses: ResponseGroup = relays.reduce( + (output: ResponseGroup, item: RPCResponse) => { + const field = item.signedField || "data"; + // @ts-ignore + const hash = flatHash(item[field]); output[hash] = output[hash] ?? 0; output[hash]++; return output; @@ -37,38 +131,14 @@ export default class WisdomRpcQuery extends RpcQueryBase { {} ); - if (!Object.keys(responses).length) { - if (Object.keys(this._errors).length) { - this.resolve({ error: Object.values(this._errors).pop() }); - return; - } - if (this._tries <= this._maxTries) { - this._tries++; - this.retry(); - return; - } - this.resolve({ data: { error: ERR_MAX_TRIES_HIT } }); - return; - } for (const responseHash in responses) { if ( - responses[responseHash] / responseStoreData.length >= + responses[responseHash] / relays.length >= this._network.majorityThreshold ) { let response: RPCResponse = responseObjects[responseHash]; - // @ts-ignore - if (null === response) { - if (this._tries <= this._maxTries) { - this._tries++; - this.retry(); - return; - } - - response = { error: ERR_MAX_TRIES_HIT }; - } else { - response = { data: response }; - } + response = { data: response }; this.resolve(response); break; @@ -76,17 +146,6 @@ export default class WisdomRpcQuery extends RpcQueryBase { } } - private retry() { - this._responses = {}; - this._errors = {}; - - if (this._completed) { - return; - } - - this.run(); - } - protected getRelays(): string[] | [] { if ( this._network.maxRelays === 0 || @@ -106,4 +165,27 @@ export default class WisdomRpcQuery extends RpcQueryBase { return list; } + + private async setupRelay() { + let active = WisdomRpcQuery.activeRelay; + let relays = this._network.relays; + + if (!active) { + if (!relays.length) { + throw new Error(ERR_NO_RELAYS); + } + + let relay = relays[Math.floor(Math.random() * relays.length)]; + let socket = this._network.dht.connect(b4a.from(relay, "hex")); + if (isPromise(socket)) { + socket = await socket; + } + await socket.opened; + + WisdomRpcQuery._activeRelay = new RPC(socket); + socket.once("close", () => { + WisdomRpcQuery._activeRelay = undefined; + }); + } + } } diff --git a/src/types.ts b/src/types.ts index 69b326b..98627b4 100644 --- a/src/types.ts +++ b/src/types.ts @@ -2,8 +2,3 @@ export interface RpcQueryOptions { queryTimeout?: number; relayTimeout?: number; } -export interface StreamingRpcQueryOptions extends RpcQueryOptions { - streamHandler: StreamHandlerFunction; -} - -export type StreamHandlerFunction = (data: Uint8Array) => void; diff --git a/src/util.ts b/src/util.ts index 208a905..af260cd 100644 --- a/src/util.ts +++ b/src/util.ts @@ -1,13 +1,18 @@ -import { isArray } from "util"; +// @ts-ignore +import stringify from "json-stringify-deterministic"; +import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types"; +// @ts-ignore +import crypto from "hypercore-crypto"; +import b4a from "b4a"; -function isBuffer(obj: any): boolean { +export function isPromise(obj: Promise) { return ( - obj && - obj.constructor && - typeof obj.constructor.isBuffer === "function" && - obj.constructor.isBuffer(obj) + !!obj && + (typeof obj === "object" || typeof obj === "function") && + typeof obj.then === "function" ); } + /* Forked from https://github.com/hughsk/flat */ @@ -29,7 +34,7 @@ export function flatten(target: any, opts: any = {}): any[] { const value = object[key]; const isarray = opts.safe && Array.isArray(value); const type = Object.prototype.toString.call(value); - const isbuffer = isBuffer(value); + const isbuffer = b4a.isBuffer(value); const isobject = type === "[object Object]" || type === "[object Array]"; const newKey = prev @@ -55,10 +60,34 @@ export function flatten(target: any, opts: any = {}): any[] { return output; } -export function isPromise(obj: Promise) { - return ( - !!obj && - (typeof obj === "object" || typeof obj === "function") && - typeof obj.then === "function" +export function validateResponse( + relay: Buffer, + response: RPCResponse, + timestamped = false +): boolean { + const field = response.signedField || "data"; + // @ts-ignore + const data = response[field]; + let json = data; + if (typeof json !== "string") { + json = stringify(json); + } + const updated = response.updated as number; + + if (timestamped && updated) { + json = updated.toString() + json; + } + + return !!crypto.verify( + b4a.from(json), + b4a.from(response.signature as string, "hex"), + relay ); } + +export function validateTimestampedResponse( + relay: Buffer, + response: RPCResponse +): boolean { + return validateResponse(relay, response, true); +}