diff --git a/dist/error.d.ts b/dist/error.d.ts index d3f0fcc..39707f5 100644 --- a/dist/error.d.ts +++ b/dist/error.d.ts @@ -1,2 +1,4 @@ export declare const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT"; +export declare const ERR_INVALID_SIGNATURE = "INVALID_SIGNATURE"; +export declare const ERR_NO_RELAYS = "NO_RELAYS"; //# sourceMappingURL=error.d.ts.map diff --git a/dist/error.d.ts.map b/dist/error.d.ts.map index 2027c0c..47b4c03 100644 --- a/dist/error.d.ts.map +++ b/dist/error.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"error.d.ts","sourceRoot":"","sources":["../src/error.ts"],"names":[],"mappings":"AAAA,eAAO,MAAM,iBAAiB,sBAAsB,CAAC"} \ No newline at end of file +{"version":3,"file":"error.d.ts","sourceRoot":"","sources":["../src/error.ts"],"names":[],"mappings":"AAAA,eAAO,MAAM,iBAAiB,sBAAsB,CAAC;AACrD,eAAO,MAAM,qBAAqB,sBAAsB,CAAC;AACzD,eAAO,MAAM,aAAa,cAAc,CAAC"} \ No newline at end of file diff --git a/dist/error.js b/dist/error.js index efb866d..35daffb 100644 --- a/dist/error.js +++ b/dist/error.js @@ -1 +1,3 @@ export const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT"; +export const ERR_INVALID_SIGNATURE = "INVALID_SIGNATURE"; +export const ERR_NO_RELAYS = "NO_RELAYS"; diff --git a/dist/index.d.ts b/dist/index.d.ts index f9eaa37..e44138f 100644 --- a/dist/index.d.ts +++ b/dist/index.d.ts @@ -1,14 +1,7 @@ import RpcNetwork from "./network.js"; import RpcQueryBase from "./query/base.js"; import SimpleRpcQuery from "./query/simple.js"; -import StreamingRpcQuery from "./query/streaming.js"; import WisdomRpcQuery from "./query/wisdom.js"; export * from "./types.js"; -export { - RpcNetwork, - RpcQueryBase, - SimpleRpcQuery, - StreamingRpcQuery, - WisdomRpcQuery, -}; +export { RpcNetwork, RpcQueryBase, SimpleRpcQuery, WisdomRpcQuery }; //# sourceMappingURL=index.d.ts.map diff --git a/dist/index.d.ts.map b/dist/index.d.ts.map index 5cdddd9..c412864 100644 --- a/dist/index.d.ts.map +++ b/dist/index.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../src/index.ts"],"names":[],"mappings":"AAAA,OAAO,UAAU,MAAM,cAAc,CAAC;AACtC,OAAO,YAAY,MAAM,iBAAiB,CAAC;AAC3C,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAC/C,OAAO,iBAAiB,MAAM,sBAAsB,CAAC;AACrD,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAE/C,cAAc,YAAY,CAAC;AAE3B,OAAO,EACL,UAAU,EACV,YAAY,EACZ,cAAc,EACd,iBAAiB,EACjB,cAAc,GACf,CAAC"} \ No newline at end of file +{"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../src/index.ts"],"names":[],"mappings":"AAAA,OAAO,UAAU,MAAM,cAAc,CAAC;AACtC,OAAO,YAAY,MAAM,iBAAiB,CAAC;AAC3C,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAC/C,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAE/C,cAAc,YAAY,CAAC;AAE3B,OAAO,EAAE,UAAU,EAAE,YAAY,EAAE,cAAc,EAAE,cAAc,EAAE,CAAC"} \ No newline at end of file diff --git a/dist/index.js b/dist/index.js index 64345ca..69150cc 100644 --- a/dist/index.js +++ b/dist/index.js @@ -1,7 +1,6 @@ import RpcNetwork from "./network.js"; import RpcQueryBase from "./query/base.js"; import SimpleRpcQuery from "./query/simple.js"; -import StreamingRpcQuery from "./query/streaming.js"; import WisdomRpcQuery from "./query/wisdom.js"; export * from "./types.js"; -export { RpcNetwork, RpcQueryBase, SimpleRpcQuery, StreamingRpcQuery, WisdomRpcQuery, }; +export { RpcNetwork, RpcQueryBase, SimpleRpcQuery, WisdomRpcQuery }; diff --git a/dist/network.d.ts b/dist/network.d.ts index 525f392..dc93fc8 100644 --- a/dist/network.d.ts +++ b/dist/network.d.ts @@ -1,10 +1,10 @@ -/// -import WisdomRpcQuery from "./query/wisdom.js"; -import StreamingRpcQuery from "./query/streaming.js"; -import { RpcQueryOptions, StreamHandlerFunction } from "./types.js"; +import RPC from "@lumeweb/rpc"; import SimpleRpcQuery from "./query/simple.js"; +import WisdomRpcQuery from "./query/wisdom.js"; export default class RpcNetwork { constructor(dht?: any); + private _activeRelay?; + get activeRelay(): RPC; private _dht; get dht(): any; private _majorityThreshold; @@ -34,23 +34,15 @@ export default class RpcNetwork { module: string, data?: object | any[], bypassCache?: boolean, - options?: RpcQueryOptions + options?: {} ): WisdomRpcQuery; - streamingQuery( - relay: Buffer | string, - method: string, - module: string, - streamHandler: StreamHandlerFunction, - data?: object | any[], - options?: RpcQueryOptions - ): StreamingRpcQuery; simpleQuery( - relay: Buffer | string, + relay: string, method: string, module: string, - data?: object | any[], - bypassCache?: boolean, - options?: RpcQueryOptions + data: object | any[] | undefined, + bypassCache: boolean | undefined, + options: {} ): SimpleRpcQuery; } //# sourceMappingURL=network.d.ts.map diff --git a/dist/network.d.ts.map b/dist/network.d.ts.map index c476bec..e31f808 100644 --- a/dist/network.d.ts.map +++ b/dist/network.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"network.d.ts","sourceRoot":"","sources":["../src/network.ts"],"names":[],"mappings":";AAAA,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAG/C,OAAO,iBAAiB,MAAM,sBAAsB,CAAC;AACrD,OAAO,EAAE,eAAe,EAAE,qBAAqB,EAAE,MAAM,YAAY,CAAC;AACpE,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAE/C,MAAM,CAAC,OAAO,OAAO,UAAU;gBACjB,GAAG,MAAY;IAI3B,OAAO,CAAC,IAAI,CAAa;IAEzB,IAAI,GAAG,QAEN;IAED,OAAO,CAAC,kBAAkB,CAAQ;IAElC,IAAI,iBAAiB,IAAI,MAAM,CAE9B;IAED,IAAI,iBAAiB,CAAC,KAAK,EAAE,MAAM,EAElC;IAED,OAAO,CAAC,aAAa,CAAM;IAE3B,IAAI,YAAY,IAAI,MAAM,CAEzB;IAED,IAAI,YAAY,CAAC,KAAK,EAAE,MAAM,EAE7B;IAED,OAAO,CAAC,aAAa,CAAK;IAE1B,IAAI,YAAY,IAAI,MAAM,CAEzB;IAED,IAAI,YAAY,CAAC,KAAK,EAAE,MAAM,EAE7B;IAED,OAAO,CAAC,OAAO,CAAgB;IAE/B,IAAI,MAAM,IAAI,MAAM,EAAE,CAErB;IAED,OAAO,CAAC,MAAM,CAAC,CAAgB;IAE/B,IAAI,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC,CAKzB;IAED,OAAO,CAAC,YAAY,CAAkB;IAEtC,IAAI,WAAW,IAAI,OAAO,CAEzB;IAED,IAAI,WAAW,CAAC,KAAK,EAAE,OAAO,EAE7B;IAED,OAAO,CAAC,UAAU,CAAa;IAE/B,IAAI,SAAS,IAAI,MAAM,CAEtB;IAED,IAAI,SAAS,CAAC,KAAK,EAAE,MAAM,EAE1B;IAEM,QAAQ,CAAC,MAAM,EAAE,MAAM,GAAG,IAAI;IAK9B,WAAW,CAAC,MAAM,EAAE,MAAM,GAAG,OAAO;IAWpC,WAAW,IAAI,IAAI;IAInB,WAAW,CAChB,MAAM,EAAE,MAAM,EACd,MAAM,EAAE,MAAM,EACd,IAAI,GAAE,MAAM,GAAG,GAAG,EAAO,EACzB,WAAW,GAAE,OAAe,EAC5B,OAAO,GAAE,eAAoB,GAC5B,cAAc;IAaV,cAAc,CACnB,KAAK,EAAE,MAAM,GAAG,MAAM,EACtB,MAAM,EAAE,MAAM,EACd,MAAM,EAAE,MAAM,EACd,aAAa,EAAE,qBAAqB,EACpC,IAAI,GAAE,MAAM,GAAG,GAAG,EAAO,EACzB,OAAO,GAAE,eAAoB,GAC5B,iBAAiB;IASb,WAAW,CAChB,KAAK,EAAE,MAAM,GAAG,MAAM,EACtB,MAAM,EAAE,MAAM,EACd,MAAM,EAAE,MAAM,EACd,IAAI,GAAE,MAAM,GAAG,GAAG,EAAO,EACzB,WAAW,GAAE,OAAe,EAC5B,OAAO,GAAE,eAAoB,GAC5B,cAAc;CAalB"} \ No newline at end of file +{"version":3,"file":"network.d.ts","sourceRoot":"","sources":["../src/network.ts"],"names":[],"mappings":"AAGA,OAAO,GAAG,MAAM,cAAc,CAAC;AAE/B,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAC/C,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAE/C,MAAM,CAAC,OAAO,OAAO,UAAU;gBACjB,GAAG,MAAY;IAI3B,OAAO,CAAC,YAAY,CAAC,CAAM;IAE3B,IAAI,WAAW,IAAI,GAAG,CAErB;IAED,OAAO,CAAC,IAAI,CAAa;IAEzB,IAAI,GAAG,QAEN;IAED,OAAO,CAAC,kBAAkB,CAAQ;IAElC,IAAI,iBAAiB,IAAI,MAAM,CAE9B;IAED,IAAI,iBAAiB,CAAC,KAAK,EAAE,MAAM,EAElC;IAED,OAAO,CAAC,aAAa,CAAM;IAE3B,IAAI,YAAY,IAAI,MAAM,CAEzB;IAED,IAAI,YAAY,CAAC,KAAK,EAAE,MAAM,EAE7B;IAED,OAAO,CAAC,aAAa,CAAK;IAE1B,IAAI,YAAY,IAAI,MAAM,CAEzB;IAED,IAAI,YAAY,CAAC,KAAK,EAAE,MAAM,EAE7B;IAED,OAAO,CAAC,OAAO,CAAgB;IAE/B,IAAI,MAAM,IAAI,MAAM,EAAE,CAErB;IAED,OAAO,CAAC,MAAM,CAAC,CAAgB;IAE/B,IAAI,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC,CAMzB;IAED,OAAO,CAAC,YAAY,CAAkB;IAEtC,IAAI,WAAW,IAAI,OAAO,CAEzB;IAED,IAAI,WAAW,CAAC,KAAK,EAAE,OAAO,EAE7B;IAED,OAAO,CAAC,UAAU,CAAa;IAE/B,IAAI,SAAS,IAAI,MAAM,CAEtB;IAED,IAAI,SAAS,CAAC,KAAK,EAAE,MAAM,EAE1B;IAEM,QAAQ,CAAC,MAAM,EAAE,MAAM,GAAG,IAAI;IAK9B,WAAW,CAAC,MAAM,EAAE,MAAM,GAAG,OAAO;IAWpC,WAAW,IAAI,IAAI;IAInB,WAAW,CAChB,MAAM,EAAE,MAAM,EACd,MAAM,EAAE,MAAM,EACd,IAAI,GAAE,MAAM,GAAG,GAAG,EAAO,EACzB,WAAW,GAAE,OAAe,EAC5B,OAAO,KAAK,GACX,cAAc;IAYV,WAAW,CAChB,KAAK,EAAE,MAAM,EACb,MAAM,EAAE,MAAM,EACd,MAAM,EAAE,MAAM,EACd,IAAI,4BAAqB,EACzB,WAAW,qBAAiB,EAC5B,OAAO,EAAE,EAAE,GACV,cAAc;CAalB"} \ No newline at end of file diff --git a/dist/network.js b/dist/network.js index 5656bb8..57c226c 100644 --- a/dist/network.js +++ b/dist/network.js @@ -1,12 +1,15 @@ -import WisdomRpcQuery from "./query/wisdom.js"; // @ts-ignore import DHT from "@hyperswarm/dht"; -import StreamingRpcQuery from "./query/streaming.js"; import SimpleRpcQuery from "./query/simple.js"; +import WisdomRpcQuery from "./query/wisdom.js"; export default class RpcNetwork { constructor(dht = new DHT()) { this._dht = dht; } + _activeRelay; + get activeRelay() { + return this._activeRelay; + } _dht; get dht() { return this._dht; @@ -80,10 +83,7 @@ export default class RpcNetwork { bypassCache: bypassCache || this._bypassCache, }, options).run(); } - streamingQuery(relay, method, module, streamHandler, data = {}, options = {}) { - return new StreamingRpcQuery(this, relay, { method, module, data }, { ...options, streamHandler }).run(); - } - simpleQuery(relay, method, module, data = {}, bypassCache = false, options = {}) { + simpleQuery(relay, method, module, data = {}, bypassCache = false, options) { return new SimpleRpcQuery(this, relay, { method, module, diff --git a/dist/query/base.d.ts b/dist/query/base.d.ts index a9812d8..4a7d565 100644 --- a/dist/query/base.d.ts +++ b/dist/query/base.d.ts @@ -1,8 +1,11 @@ /// -import { Buffer } from "buffer"; import RpcNetwork from "../network.js"; import { RpcQueryOptions } from "../types.js"; -import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types"; +import type { + ClientRPCRequest, + RPCRequest, + RPCResponse, +} from "@lumeweb/relay-types"; export default abstract class RpcQueryBase { protected _network: RpcNetwork; protected _query: RPCRequest; @@ -11,24 +14,21 @@ export default abstract class RpcQueryBase { protected _timeoutTimer?: any; protected _timeout: boolean; protected _completed: boolean; - protected _responses: { - [relay: string]: RPCResponse; - }; - protected _errors: { - [relay: string]: any; - }; + protected _response?: RPCResponse; + protected _error?: string; protected _promiseResolve?: (data: any) => void; constructor( network: RpcNetwork, - query: RPCRequest, + query: ClientRPCRequest | RPCRequest, options?: RpcQueryOptions ); get result(): Promise; - private handeTimeout; + protected handeTimeout(): void; protected resolve(data?: RPCResponse, timeout?: boolean): void; run(): this; - protected queryRelay(relay: string | Buffer): Promise; - protected abstract checkResponses(): void; - protected abstract getRelays(): string[] | Buffer[]; + private _doRun; + protected setupRelayTimeout(reject: Function): NodeJS.Timeout; + protected abstract _run(): void; + protected queryRpc(rpc: any, request: RPCRequest): Promise; } //# sourceMappingURL=base.d.ts.map diff --git a/dist/query/base.d.ts.map b/dist/query/base.d.ts.map index f408ce0..11a2160 100644 --- a/dist/query/base.d.ts.map +++ b/dist/query/base.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"base.d.ts","sourceRoot":"","sources":["../../src/query/base.ts"],"names":[],"mappings":";AAEA,OAAO,EAAE,MAAM,EAAE,MAAM,QAAQ,CAAC;AAEhC,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,EAAE,eAAe,EAAE,MAAM,aAAa,CAAC;AAC9C,OAAO,KAAK,EAAE,UAAU,EAAE,WAAW,EAAE,MAAM,sBAAsB,CAAC;AAEpE,MAAM,CAAC,OAAO,CAAC,QAAQ,OAAO,YAAY;IACxC,SAAS,CAAC,QAAQ,EAAE,UAAU,CAAC;IAC/B,SAAS,CAAC,MAAM,EAAE,UAAU,CAAC;IAC7B,SAAS,CAAC,QAAQ,EAAE,eAAe,CAAC;IAEpC,SAAS,CAAC,QAAQ,CAAC,EAAE,OAAO,CAAC,GAAG,CAAC,CAAC;IAClC,SAAS,CAAC,aAAa,CAAC,EAAE,GAAG,CAAC;IAC9B,SAAS,CAAC,QAAQ,EAAE,OAAO,CAAS;IACpC,SAAS,CAAC,UAAU,EAAE,OAAO,CAAS;IACtC,SAAS,CAAC,UAAU,EAAE;QAAE,CAAC,KAAK,EAAE,MAAM,GAAG,WAAW,CAAA;KAAE,CAAM;IAC5D,SAAS,CAAC,OAAO,EAAE;QAAE,CAAC,KAAK,EAAE,MAAM,GAAG,GAAG,CAAA;KAAE,CAAM;IACjD,SAAS,CAAC,eAAe,CAAC,EAAE,CAAC,IAAI,EAAE,GAAG,KAAK,IAAI,CAAC;gBAG9C,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,UAAU,EACjB,OAAO,GAAE,eAAoB;IAO/B,IAAI,MAAM,IAAI,OAAO,CAAC,WAAW,CAAC,CAEjC;IAED,OAAO,CAAC,YAAY;IAIpB,SAAS,CAAC,OAAO,CAAC,IAAI,CAAC,EAAE,WAAW,EAAE,OAAO,GAAE,OAAe,GAAG,IAAI;IAc9D,GAAG,IAAI,IAAI;cA2BF,UAAU,CAAC,KAAK,EAAE,MAAM,GAAG,MAAM,GAAG,OAAO,CAAC,GAAG,CAAC;IAoDhE,SAAS,CAAC,QAAQ,CAAC,cAAc,IAAI,IAAI;IAEzC,SAAS,CAAC,QAAQ,CAAC,SAAS,IAAI,MAAM,EAAE,GAAG,MAAM,EAAE;CACpD"} \ No newline at end of file +{"version":3,"file":"base.d.ts","sourceRoot":"","sources":["../../src/query/base.ts"],"names":[],"mappings":";AAIA,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,EAAE,eAAe,EAAE,MAAM,aAAa,CAAC;AAC9C,OAAO,KAAK,EACV,gBAAgB,EAChB,UAAU,EACV,WAAW,EACZ,MAAM,sBAAsB,CAAC;AAI9B,MAAM,CAAC,OAAO,CAAC,QAAQ,OAAO,YAAY;IACxC,SAAS,CAAC,QAAQ,EAAE,UAAU,CAAC;IAC/B,SAAS,CAAC,MAAM,EAAE,UAAU,CAAC;IAC7B,SAAS,CAAC,QAAQ,EAAE,eAAe,CAAC;IAEpC,SAAS,CAAC,QAAQ,CAAC,EAAE,OAAO,CAAC,GAAG,CAAC,CAAC;IAClC,SAAS,CAAC,aAAa,CAAC,EAAE,GAAG,CAAC;IAC9B,SAAS,CAAC,QAAQ,EAAE,OAAO,CAAS;IACpC,SAAS,CAAC,UAAU,EAAE,OAAO,CAAS;IACtC,SAAS,CAAC,SAAS,CAAC,EAAE,WAAW,CAAC;IAClC,SAAS,CAAC,MAAM,CAAC,EAAE,MAAM,CAAC;IAC1B,SAAS,CAAC,eAAe,CAAC,EAAE,CAAC,IAAI,EAAE,GAAG,KAAK,IAAI,CAAC;gBAG9C,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,gBAAgB,GAAG,UAAU,EACpC,OAAO,GAAE,eAAoB;IAO/B,IAAI,MAAM,IAAI,OAAO,CAAC,WAAW,CAAC,CAEjC;IAED,SAAS,CAAC,YAAY;IAItB,SAAS,CAAC,OAAO,CAAC,IAAI,CAAC,EAAE,WAAW,EAAE,OAAO,GAAE,OAAe,GAAG,IAAI;IAc9D,GAAG,IAAI,IAAI;YAmBJ,MAAM;IASpB,SAAS,CAAC,iBAAiB,CAAC,MAAM,EAAE,QAAQ,GAAG,MAAM,CAAC,OAAO;IAO7D,SAAS,CAAC,QAAQ,CAAC,IAAI,IAAI,IAAI;cAEf,QAAQ,CAAC,GAAG,EAAE,GAAG,EAAE,OAAO,EAAE,UAAU;CAyBvD"} \ No newline at end of file diff --git a/dist/query/base.js b/dist/query/base.js index 39ce2b5..7585bfb 100644 --- a/dist/query/base.js +++ b/dist/query/base.js @@ -1,7 +1,4 @@ import { clearTimeout, setTimeout } from "timers"; -import { pack, unpack } from "msgpackr"; -import { Buffer } from "buffer"; -import { isPromise } from "../util.js"; export default class RpcQueryBase { _network; _query; @@ -10,8 +7,8 @@ export default class RpcQueryBase { _timeoutTimer; _timeout = false; _completed = false; - _responses = {}; - _errors = {}; + _response; + _error; _promiseResolve; constructor(network, query, options = {}) { this._network = network; @@ -43,63 +40,45 @@ export default class RpcQueryBase { }); this._timeoutTimer = this._timeoutTimer ?? - setTimeout(this.handeTimeout.bind(this), (this._options.queryTimeout || this._network.queryTimeout) * 1000); - this._network.ready.then(() => { - const promises = []; - for (const relay of this.getRelays()) { - promises.push(this.queryRelay(relay)); - } - Promise.allSettled(promises).then(() => this.checkResponses()); - }); + setTimeout(this.handeTimeout.bind(this), (this._options?.queryTimeout || this._network.queryTimeout) * 1000); + this._doRun(); return this; } - async queryRelay(relay) { - let socket; - let relayKey = relay; - if (typeof relay === "string") { - relayKey = Buffer.from(relay, "hex"); - } - if (relay instanceof Buffer) { - relayKey = relay; - relay = relay.toString("hex"); - } + async _doRun() { try { - socket = this._network.dht.connect(relayKey); - if (isPromise(socket)) { - socket = await socket; - } + await this._network.ready; + await this._run(); } catch (e) { - return; + this._promiseResolve?.({ error: e.message }); } + } + setupRelayTimeout(reject) { + return setTimeout(() => { + this._error = "timeout"; + reject("timeout"); + }, (this._options.relayTimeout || this._network.relayTimeout) * 1000); + } + async queryRpc(rpc, request) { + let timer; return new Promise((resolve, reject) => { - let timer; - socket.on("data", (res) => { - relay = relay; - if (timer) { - clearTimeout(timer); - timer = null; + rpc + // @ts-ignore + .request(`${request.module}.${request.method}`, request.data) + .then((resp) => { + if (resp.error) { + throw new Error(resp.error); } - socket.end(); - const response = unpack(res); - if (response && response.error) { - this._errors[relay] = response.error; - return reject(null); - } - this._responses[relay] = response; + clearTimeout(timer); + this._response = resp; resolve(null); + }) + .catch((e) => { + this._error = e.message; + reject({ error: e.message }); + clearTimeout(timer); }); - socket.on("error", (error) => { - relay = relay; - this._errors[relay] = error; - reject({ error }); - }); - socket.write("rpc"); - socket.write(pack(this._query)); - timer = setTimeout(() => { - this._errors[relay] = "timeout"; - reject(null); - }, (this._options.relayTimeout || this._network.relayTimeout) * 1000); + timer = this.setupRelayTimeout(reject); }); } } diff --git a/dist/query/simple.d.ts b/dist/query/simple.d.ts index 82c74c2..6673306 100644 --- a/dist/query/simple.d.ts +++ b/dist/query/simple.d.ts @@ -1,18 +1,17 @@ -/// -import RpcQueryBase from "./base.js"; import RpcNetwork from "../network.js"; -import type { RPCRequest } from "@lumeweb/relay-types"; +import { ClientRPCRequest } from "@lumeweb/relay-types"; import { RpcQueryOptions } from "../types.js"; -import type { Buffer } from "buffer"; +import RpcQueryBase from "./base.js"; export default class SimpleRpcQuery extends RpcQueryBase { - private _relay; + protected _relay: string; constructor( network: RpcNetwork, - relay: string | Buffer, - query: RPCRequest, + relay: string, + query: ClientRPCRequest, options: RpcQueryOptions ); - protected checkResponses(): void; - protected getRelays(): string[] | Buffer[]; + protected _run(): Promise; + protected queryRelay(): Promise; + protected checkResponses(): Promise; } //# sourceMappingURL=simple.d.ts.map diff --git a/dist/query/simple.d.ts.map b/dist/query/simple.d.ts.map index 472d96f..fc4d9c4 100644 --- a/dist/query/simple.d.ts.map +++ b/dist/query/simple.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"simple.d.ts","sourceRoot":"","sources":["../../src/query/simple.ts"],"names":[],"mappings":";AAAA,OAAO,YAAY,MAAM,WAAW,CAAC;AACrC,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,KAAK,EAAE,UAAU,EAAE,MAAM,sBAAsB,CAAC;AACvD,OAAO,EAAE,eAAe,EAAE,MAAM,aAAa,CAAC;AAC9C,OAAO,KAAK,EAAE,MAAM,EAAE,MAAM,QAAQ,CAAC;AAErC,MAAM,CAAC,OAAO,OAAO,cAAe,SAAQ,YAAY;IACtD,OAAO,CAAC,MAAM,CAAkB;gBAE9B,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,MAAM,GAAG,MAAM,EACtB,KAAK,EAAE,UAAU,EACjB,OAAO,EAAE,eAAe;IAM1B,SAAS,CAAC,cAAc,IAAI,IAAI;IAYhC,SAAS,CAAC,SAAS,IAAI,MAAM,EAAE,GAAG,MAAM,EAAE;CAG3C"} \ No newline at end of file +{"version":3,"file":"simple.d.ts","sourceRoot":"","sources":["../../src/query/simple.ts"],"names":[],"mappings":"AAAA,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,EACL,gBAAgB,EAIjB,MAAM,sBAAsB,CAAC;AAC9B,OAAO,EAAE,eAAe,EAAE,MAAM,aAAa,CAAC;AAU9C,OAAO,YAAY,MAAM,WAAW,CAAC;AAErC,MAAM,CAAC,OAAO,OAAO,cAAe,SAAQ,YAAY;IACtD,SAAS,CAAC,MAAM,EAAE,MAAM,CAAC;gBAGvB,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,MAAM,EACb,KAAK,EAAE,gBAAgB,EACvB,OAAO,EAAE,eAAe;cAMV,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;cAKrB,UAAU,IAAI,OAAO,CAAC,GAAG,CAAC;cA2B1B,cAAc;CAmB/B"} \ No newline at end of file diff --git a/dist/query/simple.js b/dist/query/simple.js index 5d33dbd..78a18ff 100644 --- a/dist/query/simple.js +++ b/dist/query/simple.js @@ -1,3 +1,7 @@ +import b4a from "b4a"; +import { isPromise, validateTimestampedResponse, } from "../util.js"; +import RPC from "@lumeweb/rpc"; +import { ERR_INVALID_SIGNATURE } from "../error.js"; import RpcQueryBase from "./base.js"; export default class SimpleRpcQuery extends RpcQueryBase { _relay; @@ -5,17 +9,43 @@ export default class SimpleRpcQuery extends RpcQueryBase { super(network, query, options); this._relay = relay; } - checkResponses() { - if (Object.keys(this._responses).length) { - this.resolve(Object.values(this._responses).pop()); + async _run() { + await this.queryRelay(); + await this.checkResponses(); + } + async queryRelay() { + let socket; + try { + socket = this._network.dht.connect(b4a.from(this._relay, "hex")); + if (isPromise(socket)) { + socket = await socket; + } + } + catch (e) { return; } - if (Object.keys(this._errors).length) { - const error = Object.values(this._errors).pop(); - this.resolve(error, error === "timeout"); + await socket.opened; + const rpc = new RPC(socket); + try { + await this.queryRpc(rpc, this._query); } + catch (e) { + // @ts-ignore + rpc.end(); + throw e; + } + // @ts-ignore + rpc.end(); } - getRelays() { - return [this._relay]; + async checkResponses() { + let response = this._response; + if (this._error) { + response = { error: this._error }; + } + if (!response.error && + !validateTimestampedResponse(b4a.from(this._relay, "hex"), response)) { + response = { error: ERR_INVALID_SIGNATURE }; + } + this.resolve(response); } } diff --git a/dist/query/streaming.d.ts b/dist/query/streaming.d.ts deleted file mode 100644 index 0aa4e8a..0000000 --- a/dist/query/streaming.d.ts +++ /dev/null @@ -1,19 +0,0 @@ -/// -import SimpleRpcQuery from "./simple.js"; -import { Buffer } from "buffer"; -import type { RPCRequest } from "@lumeweb/relay-types"; -import RpcNetwork from "../network.js"; -import { StreamingRpcQueryOptions } from "../types.js"; -export default class StreamingRpcQuery extends SimpleRpcQuery { - protected _options: StreamingRpcQueryOptions; - protected _canceled: boolean; - constructor( - network: RpcNetwork, - relay: string | Buffer, - query: RPCRequest, - options: StreamingRpcQueryOptions - ); - cancel(): void; - protected queryRelay(relay: string | Buffer): Promise; -} -//# sourceMappingURL=streaming.d.ts.map diff --git a/dist/query/streaming.d.ts.map b/dist/query/streaming.d.ts.map deleted file mode 100644 index 6ba47c2..0000000 --- a/dist/query/streaming.d.ts.map +++ /dev/null @@ -1 +0,0 @@ -{"version":3,"file":"streaming.d.ts","sourceRoot":"","sources":["../../src/query/streaming.ts"],"names":[],"mappings":";AAAA,OAAO,cAAc,MAAM,aAAa,CAAC;AACzC,OAAO,EAAE,MAAM,EAAE,MAAM,QAAQ,CAAC;AAIhC,OAAO,KAAK,EAAE,UAAU,EAAe,MAAM,sBAAsB,CAAC;AACpE,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,EAAE,wBAAwB,EAAE,MAAM,aAAa,CAAC;AAEvD,MAAM,CAAC,OAAO,OAAO,iBAAkB,SAAQ,cAAc;IAC3D,SAAS,CAAC,QAAQ,EAAE,wBAAwB,CAAC;IAC7C,SAAS,CAAC,SAAS,UAAS;gBAE1B,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,MAAM,GAAG,MAAM,EACtB,KAAK,EAAE,UAAU,EACjB,OAAO,EAAE,wBAAwB;IAM5B,MAAM;cAIG,UAAU,CAAC,KAAK,EAAE,MAAM,GAAG,MAAM,GAAG,OAAO,CAAC,GAAG,CAAC;CAmEjE"} \ No newline at end of file diff --git a/dist/query/streaming.js b/dist/query/streaming.js deleted file mode 100644 index 6a15518..0000000 --- a/dist/query/streaming.js +++ /dev/null @@ -1,75 +0,0 @@ -import SimpleRpcQuery from "./simple.js"; -import { Buffer } from "buffer"; -import { isPromise } from "../util.js"; -import { clearTimeout } from "timers"; -import { pack, unpack } from "msgpackr"; -export default class StreamingRpcQuery extends SimpleRpcQuery { - _options; - _canceled = false; - constructor(network, relay, query, options) { - super(network, relay, query, options); - this._options = options; - } - cancel() { - this._canceled = true; - } - async queryRelay(relay) { - let socket; - let relayKey = relay; - if (relay === "string") { - relayKey = Buffer.from(relay, "hex"); - } - if (relay instanceof Buffer) { - relayKey = relay; - relay = relay.toString("hex"); - } - try { - socket = this._network.dht.connect(relayKey); - if (isPromise(socket)) { - socket = await socket; - } - } - catch (e) { - return; - } - return new Promise((resolve, reject) => { - const finish = () => { - relay = relay; - this._responses[relay] = {}; - resolve(null); - socket.end(); - }; - const listener = (res) => { - relay = relay; - if (this._timeoutTimer) { - clearTimeout(this._timeoutTimer); - this._timeoutTimer = null; - } - if (this._canceled) { - socket.write(pack({ cancel: true })); - socket.off("data", listener); - finish(); - return; - } - const response = unpack(res); - if (response && response.error) { - this._errors[relay] = response.error; - return reject(null); - } - if (response?.data.done) { - finish(); - return; - } - this._options.streamHandler(response?.data.data); - }; - socket.on("data", listener); - socket.on("error", (error) => { - relay = relay; - this._errors[relay] = error; - reject({ error }); - }); - socket.write("rpc"); - socket.write(pack(this._query)); - }); - } -} diff --git a/dist/query/wisdom.d.ts b/dist/query/wisdom.d.ts index 01440c6..11d43d2 100644 --- a/dist/query/wisdom.d.ts +++ b/dist/query/wisdom.d.ts @@ -1,9 +1,15 @@ +import { RPCBroadcastResponse, RPCResponse } from "@lumeweb/relay-types"; import RpcQueryBase from "./base.js"; export default class WisdomRpcQuery extends RpcQueryBase { - private _maxTries; - private _tries; - protected checkResponses(): void; - private retry; + protected _response?: RPCBroadcastResponse; + private static _activeRelay; + static get activeRelay(): any; + get result(): Promise; + protected _run(): Promise; + protected resolve(data?: RPCResponse, timeout?: boolean): void; + protected queryRelay(): Promise; + protected checkResponse(): Promise; protected getRelays(): string[] | []; + private setupRelay; } //# sourceMappingURL=wisdom.d.ts.map diff --git a/dist/query/wisdom.d.ts.map b/dist/query/wisdom.d.ts.map index 553acdb..f41cff3 100644 --- a/dist/query/wisdom.d.ts.map +++ b/dist/query/wisdom.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"wisdom.d.ts","sourceRoot":"","sources":["../../src/query/wisdom.ts"],"names":[],"mappings":"AAAA,OAAO,YAAY,MAAM,WAAW,CAAC;AAcrC,MAAM,CAAC,OAAO,OAAO,cAAe,SAAQ,YAAY;IACtD,OAAO,CAAC,SAAS,CAAK;IACtB,OAAO,CAAC,MAAM,CAAK;IAEnB,SAAS,CAAC,cAAc,IAAI,IAAI;IA4DhC,OAAO,CAAC,KAAK;IAWb,SAAS,CAAC,SAAS,IAAI,MAAM,EAAE,GAAG,EAAE;CAmBrC"} \ No newline at end of file +{"version":3,"file":"wisdom.d.ts","sourceRoot":"","sources":["../../src/query/wisdom.ts"],"names":[],"mappings":"AAAA,OAAO,EAEL,oBAAoB,EAEpB,WAAW,EACZ,MAAM,sBAAsB,CAAC;AAY9B,OAAO,YAAY,MAAM,WAAW,CAAC;AASrC,MAAM,CAAC,OAAO,OAAO,cAAe,SAAQ,YAAY;IACtD,UAAkB,SAAS,CAAC,EAAE,oBAAoB,CAAC;IACnD,OAAO,CAAC,MAAM,CAAC,YAAY,CAAM;IAEjC,MAAM,KAAK,WAAW,IAAI,GAAG,CAE5B;IAED,IAAI,MAAM,IAAI,OAAO,CAAC,WAAW,CAAC,CAEjC;cAEe,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;IAMrC,SAAS,CAAC,OAAO,CAAC,IAAI,CAAC,EAAE,WAAW,EAAE,OAAO,GAAE,OAAe,GAAG,IAAI;cAcrD,UAAU,IAAI,OAAO,CAAC,GAAG,CAAC;cAkB1B,aAAa;IAwE7B,SAAS,CAAC,SAAS,IAAI,MAAM,EAAE,GAAG,EAAE;YAoBtB,UAAU;CAsBzB"} \ No newline at end of file diff --git a/dist/query/wisdom.js b/dist/query/wisdom.js index cfb69fa..a2e824a 100644 --- a/dist/query/wisdom.js +++ b/dist/query/wisdom.js @@ -1,71 +1,98 @@ -import RpcQueryBase from "./base.js"; -import { flatten } from "../util.js"; -import { Buffer } from "buffer"; +import { clearTimeout } from "timers"; +import b4a from "b4a"; +import { flatten, isPromise, validateResponse, validateTimestampedResponse, } from "../util.js"; +import RPC from "@lumeweb/rpc"; import { blake2b } from "libskynet"; -import { ERR_MAX_TRIES_HIT } from "../error.js"; +import { ERR_INVALID_SIGNATURE, ERR_NO_RELAYS } from "../error.js"; +import RpcQueryBase from "./base.js"; function flatHash(data) { const flattenedData = flatten(data).sort(); return Buffer.from(blake2b(Buffer.from(JSON.stringify(flattenedData)))).toString("hex"); } export default class WisdomRpcQuery extends RpcQueryBase { - _maxTries = 3; - _tries = 0; - checkResponses() { - const responseStore = this._responses; - const responseStoreData = Object.values(responseStore); - const responseObjects = responseStoreData.reduce((output, item) => { - const hash = flatHash(item?.data); + static _activeRelay; + static get activeRelay() { + return this._activeRelay; + } + get result() { + return this._promise; + } + async _run() { + await this.setupRelay(); + await this.queryRelay(); + await this.checkResponse(); + } + resolve(data, timeout = false) { + clearTimeout(this._timeoutTimer); + this._timeout = timeout; + this._completed = true; + if (timeout) { + data = { + error: "timeout", + }; + } + this._promiseResolve?.(data); + } + async queryRelay() { + let activeRelay = WisdomRpcQuery.activeRelay; + let relays = this.getRelays(); + if (!relays.length) { + throw new Error(ERR_NO_RELAYS); + } + return this.queryRpc(activeRelay, { + module: "rpc", + method: "broadcast_request", + data: { + request: this._query, + relays, + }, + }); + } + async checkResponse() { + if (this._error) { + this.resolve({ error: this._error }); + return; + } + if (!validateResponse(WisdomRpcQuery.activeRelay.stream.remotePublicKey, this._response)) { + this.resolve({ error: ERR_INVALID_SIGNATURE }); + return; + } + let relays = []; + for (const relay in this._response?.relays) { + const resp = this._response?.relays[relay]; + if (validateTimestampedResponse(b4a.from(relay, "hex"), resp)) { + relays.push(resp); + } + } + if (!relays.length) { + this.resolve({ error: ERR_NO_RELAYS }); + return; + } + const responseObjects = relays.reduce((output, item) => { + const field = item.signedField || "data"; + // @ts-ignore + const hash = flatHash(item[field]); output[hash] = item?.data; return output; }, {}); - const responses = responseStoreData.reduce((output, item) => { - const hash = flatHash(item?.data); + const responses = relays.reduce((output, item) => { + const field = item.signedField || "data"; + // @ts-ignore + const hash = flatHash(item[field]); output[hash] = output[hash] ?? 0; output[hash]++; return output; }, {}); - if (!Object.keys(responses).length) { - if (Object.keys(this._errors).length) { - this.resolve({ error: Object.values(this._errors).pop() }); - return; - } - if (this._tries <= this._maxTries) { - this._tries++; - this.retry(); - return; - } - this.resolve({ data: { error: ERR_MAX_TRIES_HIT } }); - return; - } for (const responseHash in responses) { - if (responses[responseHash] / responseStoreData.length >= + if (responses[responseHash] / relays.length >= this._network.majorityThreshold) { let response = responseObjects[responseHash]; - // @ts-ignore - if (null === response) { - if (this._tries <= this._maxTries) { - this._tries++; - this.retry(); - return; - } - response = { error: ERR_MAX_TRIES_HIT }; - } - else { - response = { data: response }; - } + response = { data: response }; this.resolve(response); break; } } } - retry() { - this._responses = {}; - this._errors = {}; - if (this._completed) { - return; - } - this.run(); - } getRelays() { if (this._network.maxRelays === 0 || this._network.relays.length <= this._network.maxRelays) { @@ -80,4 +107,23 @@ export default class WisdomRpcQuery extends RpcQueryBase { } return list; } + async setupRelay() { + let active = WisdomRpcQuery.activeRelay; + let relays = this._network.relays; + if (!active) { + if (!relays.length) { + throw new Error(ERR_NO_RELAYS); + } + let relay = relays[Math.floor(Math.random() * relays.length)]; + let socket = this._network.dht.connect(b4a.from(relay, "hex")); + if (isPromise(socket)) { + socket = await socket; + } + await socket.opened; + WisdomRpcQuery._activeRelay = new RPC(socket); + socket.once("close", () => { + WisdomRpcQuery._activeRelay = undefined; + }); + } + } } diff --git a/dist/types.d.ts b/dist/types.d.ts index f1776b4..3b740df 100644 --- a/dist/types.d.ts +++ b/dist/types.d.ts @@ -2,8 +2,4 @@ export interface RpcQueryOptions { queryTimeout?: number; relayTimeout?: number; } -export interface StreamingRpcQueryOptions extends RpcQueryOptions { - streamHandler: StreamHandlerFunction; -} -export declare type StreamHandlerFunction = (data: Uint8Array) => void; //# sourceMappingURL=types.d.ts.map diff --git a/dist/types.d.ts.map b/dist/types.d.ts.map index ef80f9e..1499f13 100644 --- a/dist/types.d.ts.map +++ b/dist/types.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"types.d.ts","sourceRoot":"","sources":["../src/types.ts"],"names":[],"mappings":"AAAA,MAAM,WAAW,eAAe;IAC9B,YAAY,CAAC,EAAE,MAAM,CAAC;IACtB,YAAY,CAAC,EAAE,MAAM,CAAC;CACvB;AACD,MAAM,WAAW,wBAAyB,SAAQ,eAAe;IAC/D,aAAa,EAAE,qBAAqB,CAAC;CACtC;AAED,oBAAY,qBAAqB,GAAG,CAAC,IAAI,EAAE,UAAU,KAAK,IAAI,CAAC"} \ No newline at end of file +{"version":3,"file":"types.d.ts","sourceRoot":"","sources":["../src/types.ts"],"names":[],"mappings":"AAAA,MAAM,WAAW,eAAe;IAC9B,YAAY,CAAC,EAAE,MAAM,CAAC;IACtB,YAAY,CAAC,EAAE,MAAM,CAAC;CACvB"} \ No newline at end of file diff --git a/dist/util.d.ts b/dist/util.d.ts index 0454630..a023c45 100644 --- a/dist/util.d.ts +++ b/dist/util.d.ts @@ -1,3 +1,14 @@ -export declare function flatten(target: any, opts?: any): any[]; +/// +import type { RPCResponse } from "@lumeweb/relay-types"; export declare function isPromise(obj: Promise): boolean; +export declare function flatten(target: any, opts?: any): any[]; +export declare function validateResponse( + relay: Buffer, + response: RPCResponse, + timestamped?: boolean +): boolean; +export declare function validateTimestampedResponse( + relay: Buffer, + response: RPCResponse +): boolean; //# sourceMappingURL=util.d.ts.map diff --git a/dist/util.d.ts.map b/dist/util.d.ts.map index 21ad54e..5a76bf4 100644 --- a/dist/util.d.ts.map +++ b/dist/util.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"util.d.ts","sourceRoot":"","sources":["../src/util.ts"],"names":[],"mappings":"AAaA,wBAAgB,OAAO,CAAC,MAAM,EAAE,GAAG,EAAE,IAAI,GAAE,GAAQ,GAAG,GAAG,EAAE,CA0C1D;AAED,wBAAgB,SAAS,CAAC,GAAG,EAAE,OAAO,CAAC,GAAG,CAAC,WAM1C"} \ No newline at end of file +{"version":3,"file":"util.d.ts","sourceRoot":"","sources":["../src/util.ts"],"names":[],"mappings":";AAEA,OAAO,KAAK,EAAc,WAAW,EAAE,MAAM,sBAAsB,CAAC;AAKpE,wBAAgB,SAAS,CAAC,GAAG,EAAE,OAAO,CAAC,GAAG,CAAC,WAM1C;AAKD,wBAAgB,OAAO,CAAC,MAAM,EAAE,GAAG,EAAE,IAAI,GAAE,GAAQ,GAAG,GAAG,EAAE,CA0C1D;AAED,wBAAgB,gBAAgB,CAC9B,KAAK,EAAE,MAAM,EACb,QAAQ,EAAE,WAAW,EACrB,WAAW,UAAQ,GAClB,OAAO,CAmBT;AAED,wBAAgB,2BAA2B,CACzC,KAAK,EAAE,MAAM,EACb,QAAQ,EAAE,WAAW,GACpB,OAAO,CAET"} \ No newline at end of file diff --git a/dist/util.js b/dist/util.js index 99f2bfb..f5697a1 100644 --- a/dist/util.js +++ b/dist/util.js @@ -1,8 +1,12 @@ -function isBuffer(obj) { - return (obj && - obj.constructor && - typeof obj.constructor.isBuffer === "function" && - obj.constructor.isBuffer(obj)); +// @ts-ignore +import stringify from "json-stringify-deterministic"; +// @ts-ignore +import crypto from "hypercore-crypto"; +import b4a from "b4a"; +export function isPromise(obj) { + return (!!obj && + (typeof obj === "object" || typeof obj === "function") && + typeof obj.then === "function"); } /* Forked from https://github.com/hughsk/flat @@ -22,7 +26,7 @@ export function flatten(target, opts = {}) { const value = object[key]; const isarray = opts.safe && Array.isArray(value); const type = Object.prototype.toString.call(value); - const isbuffer = isBuffer(value); + const isbuffer = b4a.isBuffer(value); const isobject = type === "[object Object]" || type === "[object Array]"; const newKey = prev ? prev + delimiter + transformKey(key) @@ -40,8 +44,20 @@ export function flatten(target, opts = {}) { step(target); return output; } -export function isPromise(obj) { - return (!!obj && - (typeof obj === "object" || typeof obj === "function") && - typeof obj.then === "function"); +export function validateResponse(relay, response, timestamped = false) { + const field = response.signedField || "data"; + // @ts-ignore + const data = response[field]; + let json = data; + if (typeof json !== "string") { + json = stringify(json); + } + const updated = response.updated; + if (timestamped && updated) { + json = updated.toString() + json; + } + return !!crypto.verify(b4a.from(json), b4a.from(response.signature, "hex"), relay); +} +export function validateTimestampedResponse(relay, response) { + return validateResponse(relay, response, true); }