From fb849550dbbad16bcae5a94cf8e84c8b9a76016d Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sat, 27 Aug 2022 15:09:34 -0400 Subject: [PATCH] *Heavily refactor to use new RPC schema *Create basic, wisdom, and streaming rpc request variants --- package.json | 1 + src/error.ts | 1 + src/index.ts | 15 ++- src/{rpcNetwork.ts => network.ts} | 75 +++++++++---- src/query/base.ts | 135 +++++++++++++++++++++++ src/query/simple.ts | 35 ++++++ src/query/streaming.ts | 78 ++++++++++++++ src/query/wisdom.ts | 78 ++++++++++++++ src/rpcQuery.ts | 174 ------------------------------ src/types.ts | 17 ++- src/util.ts | 8 ++ 11 files changed, 409 insertions(+), 208 deletions(-) rename src/{rpcNetwork.ts => network.ts} (59%) create mode 100644 src/query/base.ts create mode 100644 src/query/simple.ts create mode 100644 src/query/streaming.ts create mode 100644 src/query/wisdom.ts delete mode 100644 src/rpcQuery.ts diff --git a/package.json b/package.json index 7dccfeb..a541c69 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "build": "rimraf dist && tsc" }, "devDependencies": { + "@lumeweb/relay": "https://github.com/LumeWeb/relay.git", "@types/json-stable-stringify": "^1.0.34", "@types/node": "^18.0.0", "prettier": "^2.7.1", diff --git a/src/error.ts b/src/error.ts index 74282fc..99d7a43 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1 +1,2 @@ export const ERR_NOT_READY = "NOT_READY"; +export const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT"; diff --git a/src/index.ts b/src/index.ts index 3179401..e775698 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,15 @@ -import RpcNetwork from "./rpcNetwork.js"; -import RpcQuery from "./rpcNetwork.js"; +import RpcNetwork from "./network.js"; +import RpcQueryBase from "./query/base.js"; +import SimpleRpcQuery from "./query/simple.js"; +import StreamingRpcQuery from "./query/streaming.js"; +import WisdomRpcQuery from "./query/wisdom.js"; export * from "./types.js"; -export { RpcNetwork, RpcQuery }; +export { + RpcNetwork, + RpcQueryBase, + SimpleRpcQuery, + StreamingRpcQuery, + WisdomRpcQuery, +}; diff --git a/src/rpcNetwork.ts b/src/network.ts similarity index 59% rename from src/rpcNetwork.ts rename to src/network.ts index e740140..bc59632 100644 --- a/src/rpcNetwork.ts +++ b/src/network.ts @@ -1,6 +1,9 @@ -import RpcQuery from "./rpcQuery.js"; +import WisdomRpcQuery from "./query/wisdom.js"; // @ts-ignore import DHT from "@hyperswarm/dht"; +import StreamingRpcQuery from "./query/streaming.js"; +import { RpcQueryOptions, StreamHandlerFunction } from "./types.js"; +import SimpleRpcQuery from "./query/simple.js"; export default class RpcNetwork { constructor(dht = new DHT()) { @@ -23,16 +26,6 @@ export default class RpcNetwork { this._majorityThreshold = value; } - private _maxTtl = 12 * 60 * 60; - - get maxTtl(): number { - return this._maxTtl; - } - - set maxTtl(value: number) { - this._maxTtl = value; - } - private _queryTimeout = 30; get queryTimeout(): number { @@ -98,17 +91,57 @@ export default class RpcNetwork { this._relays = []; } - public query( - query: string, - chain: string, + public wisdomQuery( + method: string, + module: string, data: object | any[] = {}, - bypassCache: boolean = false - ): RpcQuery { - return new RpcQuery(this, { - query, - chain, - data, + bypassCache: boolean = false, + options: RpcQueryOptions = {} + ): WisdomRpcQuery { + return new WisdomRpcQuery( + this, + { + method, + module, + data, bypassCache: bypassCache || this._bypassCache, - }); + }, + options + ); + } + + public streamingQuery( + relay: Buffer | string, + method: string, + module: string, + streamHandler: StreamHandlerFunction, + data: object | any[] = {}, + options: RpcQueryOptions = {} + ): StreamingRpcQuery { + return new StreamingRpcQuery( + this, + relay, + { method, module, data }, + { streamHandler, ...options } + ); + } + + public simpleQuery( + relay: Buffer | string, + method: string, + module: string, + data: object | any[] = {}, + options: RpcQueryOptions = {} + ): SimpleRpcQuery { + return new SimpleRpcQuery( + this, + relay, + { + method, + module, + data, + }, + options + ); } } diff --git a/src/query/base.ts b/src/query/base.ts new file mode 100644 index 0000000..abea3c7 --- /dev/null +++ b/src/query/base.ts @@ -0,0 +1,135 @@ +import { clearTimeout, setTimeout } from "timers"; +import { pack, unpack } from "msgpackr"; +import { Buffer } from "buffer"; +import { isPromise } from "../util.js"; +import RpcNetwork from "../rpcNetwork.js"; +import { RpcQueryOptions } from "../types.js"; +import type { RPCRequest, RPCResponse } from "@lumeweb/relay"; + +export default abstract class RpcQueryBase { + protected _network: RpcNetwork; + protected _query: RPCRequest; + protected _options: RpcQueryOptions; + + protected _promise?: Promise; + protected _timeoutTimer?: any; + protected _timeout: boolean = false; + protected _completed: boolean = false; + protected _responses: { [relay: string]: RPCResponse } = {}; + protected _errors: { [relay: string]: any } = {}; + protected _promiseResolve?: (data: any) => void; + + constructor( + network: RpcNetwork, + query: RPCRequest, + options: RpcQueryOptions = {} + ) { + this._network = network; + this._query = query; + this._options = options; + this.init(); + } + + get result(): Promise { + return this._promise as Promise; + } + + private handeTimeout() { + this.resolve(undefined, true); + } + + protected resolve(data?: RPCResponse, timeout: boolean = false): void { + clearTimeout(this._timeoutTimer); + this._timeout = timeout; + this._completed = true; + + if (timeout) { + data = { + error: "timeout", + }; + } + + this._promiseResolve?.(data); + } + + protected async init() { + this._promise = + this._promise ?? + new Promise((resolve) => { + this._promiseResolve = resolve; + }); + + this._timeoutTimer = + this._timeoutTimer ?? + setTimeout( + this.handeTimeout.bind(this), + (this._options.queryTimeout || this._network.queryTimeout) * 1000 + ); + + await this._network.ready; + + const promises = []; + + for (const relay of this.getRelays()) { + promises.push(this.queryRelay(relay)); + } + + await Promise.allSettled(promises); + this.checkResponses(); + } + + protected async queryRelay(relay: string | Buffer): Promise { + let socket: any; + + let relayKey: Buffer = relay as Buffer; + + if (relay === "string") { + relayKey = Buffer.from(relay, "hex"); + } + if (relay instanceof Buffer) { + relayKey = relay; + relay = relay.toString("hex"); + } + + try { + socket = this._network.dht.connect(relayKey); + if (isPromise(socket)) { + socket = await socket; + } + } catch (e) { + return; + } + return new Promise((resolve, reject) => { + let timer: any; + socket.on("data", (res: Buffer) => { + relay = relay as string; + if (timer && timer.close) { + clearTimeout(timer as any); + } + socket.end(); + const response = unpack(res as any) as RPCResponse; + if (response && response.error) { + this._errors[relay] = response.error; + return reject(null); + } + this._responses[relay] = response; + resolve(null); + }); + socket.on("error", (error: any) => { + relay = relay as string; + this._errors[relay] = error; + reject({ error }); + }); + socket.write("rpc"); + socket.write(pack(this._query)); + timer = setTimeout(() => { + this._errors[relay as string] = "timeout"; + reject(null); + }, (this._options.relayTimeout || this._network.relayTimeout) * 1000) as NodeJS.Timeout; + }); + } + + protected abstract checkResponses(): void; + + protected abstract getRelays(): string[] | Buffer[]; +} diff --git a/src/query/simple.ts b/src/query/simple.ts new file mode 100644 index 0000000..801288b --- /dev/null +++ b/src/query/simple.ts @@ -0,0 +1,35 @@ +import RpcQueryBase from "./base.js"; +import RpcNetwork from "../rpcNetwork.js"; +import type { RPCRequest } from "@lumeweb/relay"; +import { RpcQueryOptions } from "../types.js"; +import type { Buffer } from "buffer"; + +export default class SimpleRpcQuery extends RpcQueryBase { + private _relay: string | Buffer; + constructor( + network: RpcNetwork, + relay: string | Buffer, + query: RPCRequest, + options: RpcQueryOptions + ) { + super(network, query, options); + this._relay = relay; + this.init(); + } + + protected checkResponses(): void { + if (Object.keys(this._responses).length) { + this.resolve(Object.values(this._responses).pop()); + return; + } + + if (Object.keys(this._errors).length) { + this.resolve({ error: Object.values(this._errors).pop() }); + return; + } + } + + protected getRelays(): string[] | Buffer[] { + return [this._relay] as string[] | Buffer[]; + } +} diff --git a/src/query/streaming.ts b/src/query/streaming.ts new file mode 100644 index 0000000..795565d --- /dev/null +++ b/src/query/streaming.ts @@ -0,0 +1,78 @@ +import SimpleRpcQuery from "./simple.js"; +import { Buffer } from "buffer"; +import { isPromise } from "../util.js"; +import { clearTimeout, setTimeout } from "timers"; +import { pack, unpack } from "msgpackr"; +import type { RPCRequest } from "@lumeweb/relay"; +import { RPCResponse } from "@lumeweb/relay"; +import RpcNetwork from "../rpcNetwork.js"; +import { StreamingRpcQueryOptions } from "../types.js"; + +export default class StreamingRpcQuery extends SimpleRpcQuery { + protected _options: StreamingRpcQueryOptions; + constructor( + network: RpcNetwork, + relay: string | Buffer, + query: RPCRequest, + options: StreamingRpcQueryOptions + ) { + super(network, relay, query, options); + this._options = options; + } + protected async queryRelay(relay: string | Buffer): Promise { + let socket: any; + + let relayKey: Buffer = relay as Buffer; + + if (relay === "string") { + relayKey = Buffer.from(relay, "hex"); + } + if (relay instanceof Buffer) { + relayKey = relay; + relay = relay.toString("hex"); + } + + try { + socket = this._network.dht.connect(relayKey); + if (isPromise(socket)) { + socket = await socket; + } + } catch (e) { + return; + } + return new Promise((resolve, reject) => { + let timer: any; + socket.on("data", (res: Buffer) => { + relay = relay as string; + if (timer && timer.close) { + clearTimeout(timer as any); + } + socket.end(); + const response = unpack(res as any) as RPCResponse; + if (response && response.error) { + this._errors[relay] = response.error; + return reject(null); + } + + if (response?.data.done) { + this._responses[relay] = {}; + resolve(null); + return; + } + + this._options.streamHandler(response?.data.data); + }); + socket.on("error", (error: any) => { + relay = relay as string; + this._errors[relay] = error; + reject({ error }); + }); + socket.write("rpc"); + socket.write(pack(this._query)); + timer = setTimeout(() => { + this._errors[relay as string] = "timeout"; + reject(null); + }, (this._options.relayTimeout || this._network.relayTimeout) * 1000) as NodeJS.Timeout; + }); + } +} diff --git a/src/query/wisdom.ts b/src/query/wisdom.ts new file mode 100644 index 0000000..a1ceed1 --- /dev/null +++ b/src/query/wisdom.ts @@ -0,0 +1,78 @@ +import RpcQueryBase from "./base.js"; +import { flatten } from "../util.js"; +import { Buffer } from "buffer"; +import type { RPCResponse } from "@lumeweb/relay"; +import { blake2b } from "libskynet"; +import { ERR_MAX_TRIES_HIT } from "../error.js"; + +export default class WisdomRpcQuery extends RpcQueryBase { + private _maxTries = 3; + private _tries = 0; + + protected checkResponses(): void { + const responseStore = this._responses; + const responseStoreData = Object.values(responseStore); + + type ResponseGroup = { [response: string]: number }; + + const responseObjects = responseStoreData.reduce((output: any, item) => { + const itemFlattened = flatten(item?.data).sort(); + + const hash = Buffer.from( + blake2b(Buffer.from(JSON.stringify(itemFlattened))) + ).toString("hex"); + output[hash] = item?.data; + return output; + }, {}); + const responses: ResponseGroup = responseStoreData.reduce( + (output: ResponseGroup, item) => { + const itemFlattened = flatten(item?.data).sort(); + const hash = Buffer.from( + blake2b(Buffer.from(JSON.stringify(itemFlattened))) + ).toString("hex"); + output[hash] = output[hash] ?? 0; + output[hash]++; + return output; + }, + {} + ); + + for (const responseHash in responses) { + if ( + responses[responseHash] / responseStoreData.length >= + this._network.majorityThreshold + ) { + let response: RPCResponse = responseObjects[responseHash]; + + // @ts-ignore + if (null === response) { + if (this._tries <= this._maxTries) { + this._tries++; + this.retry(); + return; + } + + response = { error: ERR_MAX_TRIES_HIT }; + } + + this.resolve(response); + break; + } + } + } + + private retry() { + this._responses = {}; + this._errors = {}; + + if (this._completed) { + return; + } + + this.init(); + } + + protected getRelays(): string[] | [] { + return this._network.relays; + } +} diff --git a/src/rpcQuery.ts b/src/rpcQuery.ts deleted file mode 100644 index 1cbce30..0000000 --- a/src/rpcQuery.ts +++ /dev/null @@ -1,174 +0,0 @@ -import { clearTimeout, setTimeout } from "timers"; -import RpcNetwork from "./rpcNetwork.js"; -import { pack, unpack } from "msgpackr"; -import { RPCRequest, RPCResponse } from "./types.js"; -import { Buffer } from "buffer"; -import { blake2b } from "libskynet"; -import { flatten } from "./util.js"; - -export default class RpcQuery { - private _network: RpcNetwork; - private _query: RPCRequest; - private _promise?: Promise; - private _timeoutTimer?: any; - private _timeout: boolean = false; - private _completed: boolean = false; - private _responses: { [relay: string]: RPCResponse } = {}; - private _promiseResolve?: (data: any) => void; - private _maxTries = 3; - private _tries = 0; - - constructor(network: RpcNetwork, query: RPCRequest) { - this._network = network; - this._query = query; - this.init(); - } - - get result(): Promise { - return this._promise as Promise; - } - - private handeTimeout() { - this.resolve(false, true); - } - - private resolve(data: any, timeout: boolean = false): void { - clearTimeout(this._timeoutTimer); - this._timeout = timeout; - this._completed = true; - // @ts-ignore - this._promiseResolve(data); - } - - private async init() { - this._promise = - this._promise ?? - new Promise((resolve) => { - this._promiseResolve = resolve; - }); - - this._timeoutTimer = - this._timeoutTimer ?? - setTimeout( - this.handeTimeout.bind(this), - this._network.queryTimeout * 1000 - ); - - await this._network.ready; - - const promises = []; - - // tslint:disable-next-line:forin - for (const relay of this._network.relays) { - promises.push(this.queryRelay(relay)); - } - - await Promise.allSettled(promises); - this.checkResponses(); - } - - private async queryRelay(relay: string): Promise { - let socket: any; - - try { - socket = this._network.dht.connect(Buffer.from(relay, "hex")); - if (isPromise(socket)) { - socket = await socket; - } - } catch (e) { - return; - } - return new Promise((resolve, reject) => { - let timer: any; - socket.on("data", (res: Buffer) => { - if (timer && timer.close) { - clearTimeout(timer as any); - } - socket.end(); - const response = unpack(res as any) as RPCResponse; - if (response && response.error) { - return reject(response); - } - this._responses[relay] = response; - resolve(null); - }); - socket.on("error", (error: any) => reject({ error })); - socket.write("rpc"); - socket.write(pack(this._query)); - timer = setTimeout(() => { - reject("timeout"); - }, this._network.relayTimeout * 1000) as NodeJS.Timeout; - }); - } - - private checkResponses() { - const responseStore = this._responses; - const responseStoreData = Object.values(responseStore); - - type ResponseGroup = { [response: string]: number }; - - const responseObjects = responseStoreData.reduce((output: any, item) => { - const itemFlattened = flatten(item?.data).sort(); - - const hash = Buffer.from( - blake2b(Buffer.from(JSON.stringify(itemFlattened))) - ).toString("hex"); - output[hash] = item?.data; - return output; - }, {}); - const responses: ResponseGroup = responseStoreData.reduce( - (output: ResponseGroup, item) => { - const itemFlattened = flatten(item?.data).sort(); - const hash = Buffer.from( - blake2b(Buffer.from(JSON.stringify(itemFlattened))) - ).toString("hex"); - output[hash] = output[hash] ?? 0; - output[hash]++; - return output; - }, - {} - ); - - for (const responseHash in responses) { - if ( - responses[responseHash] / responseStoreData.length >= - this._network.majorityThreshold - ) { - // @ts-ignore - let response: RPCResponse | boolean = responseObjects[responseHash]; - - // @ts-ignore - if (null === response) { - if (this._tries <= this._maxTries) { - this._tries++; - this.retry(); - return; - } - - response = false; - } - - this.resolve(response); - break; - } - } - } - - private retry() { - this._responses = {}; - - if (this._completed) { - return; - } - - this.init(); - } -} - -function isPromise(obj: Promise) { - return ( - !!obj && - (typeof obj === "object" || typeof obj === "function") && - typeof obj.then === "function" - ); -} diff --git a/src/types.ts b/src/types.ts index eabf0f5..69b326b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,12 +1,9 @@ -export interface RPCRequest { - bypassCache: boolean; - chain: string; - query: string; - data: any; +export interface RpcQueryOptions { + queryTimeout?: number; + relayTimeout?: number; +} +export interface StreamingRpcQueryOptions extends RpcQueryOptions { + streamHandler: StreamHandlerFunction; } -export interface RPCResponse { - updated: number; - data: any; - error?: string -} +export type StreamHandlerFunction = (data: Uint8Array) => void; diff --git a/src/util.ts b/src/util.ts index 6090d9f..208a905 100644 --- a/src/util.ts +++ b/src/util.ts @@ -54,3 +54,11 @@ export function flatten(target: any, opts: any = {}): any[] { return output; } + +export function isPromise(obj: Promise) { + return ( + !!obj && + (typeof obj === "object" || typeof obj === "function") && + typeof obj.then === "function" + ); +}