Compare commits

..

No commits in common. "960c321ecf0b2b461b1721b86792a4823cee40f5" and "71eb37160c386d09be3ecf8d6255d263178797e6" have entirely different histories.

36 changed files with 562 additions and 577 deletions

2
dist/error.d.ts vendored
View File

@ -1,4 +1,2 @@
export declare const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT"; export declare const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT";
export declare const ERR_INVALID_SIGNATURE = "INVALID_SIGNATURE";
export declare const ERR_NO_RELAYS = "NO_RELAYS";
//# sourceMappingURL=error.d.ts.map //# sourceMappingURL=error.d.ts.map

2
dist/error.d.ts.map vendored
View File

@ -1 +1 @@
{"version":3,"file":"error.d.ts","sourceRoot":"","sources":["../src/error.ts"],"names":[],"mappings":"AAAA,eAAO,MAAM,iBAAiB,sBAAsB,CAAC;AACrD,eAAO,MAAM,qBAAqB,sBAAsB,CAAC;AACzD,eAAO,MAAM,aAAa,cAAc,CAAC"} {"version":3,"file":"error.d.ts","sourceRoot":"","sources":["../src/error.ts"],"names":[],"mappings":"AAAA,eAAO,MAAM,iBAAiB,sBAAsB,CAAC"}

2
dist/error.js vendored
View File

@ -1,3 +1 @@
export const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT"; export const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT";
export const ERR_INVALID_SIGNATURE = "INVALID_SIGNATURE";
export const ERR_NO_RELAYS = "NO_RELAYS";

9
dist/index.d.ts vendored
View File

@ -1,7 +1,14 @@
import RpcNetwork from "./network.js"; import RpcNetwork from "./network.js";
import RpcQueryBase from "./query/base.js"; import RpcQueryBase from "./query/base.js";
import SimpleRpcQuery from "./query/simple.js"; import SimpleRpcQuery from "./query/simple.js";
import StreamingRpcQuery from "./query/streaming.js";
import WisdomRpcQuery from "./query/wisdom.js"; import WisdomRpcQuery from "./query/wisdom.js";
export * from "./types.js"; export * from "./types.js";
export { RpcNetwork, RpcQueryBase, SimpleRpcQuery, WisdomRpcQuery }; export {
RpcNetwork,
RpcQueryBase,
SimpleRpcQuery,
StreamingRpcQuery,
WisdomRpcQuery,
};
//# sourceMappingURL=index.d.ts.map //# sourceMappingURL=index.d.ts.map

2
dist/index.d.ts.map vendored
View File

@ -1 +1 @@
{"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../src/index.ts"],"names":[],"mappings":"AAAA,OAAO,UAAU,MAAM,cAAc,CAAC;AACtC,OAAO,YAAY,MAAM,iBAAiB,CAAC;AAC3C,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAC/C,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAE/C,cAAc,YAAY,CAAC;AAE3B,OAAO,EAAE,UAAU,EAAE,YAAY,EAAE,cAAc,EAAE,cAAc,EAAE,CAAC"} {"version":3,"file":"index.d.ts","sourceRoot":"","sources":["../src/index.ts"],"names":[],"mappings":"AAAA,OAAO,UAAU,MAAM,cAAc,CAAC;AACtC,OAAO,YAAY,MAAM,iBAAiB,CAAC;AAC3C,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAC/C,OAAO,iBAAiB,MAAM,sBAAsB,CAAC;AACrD,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAE/C,cAAc,YAAY,CAAC;AAE3B,OAAO,EACL,UAAU,EACV,YAAY,EACZ,cAAc,EACd,iBAAiB,EACjB,cAAc,GACf,CAAC"}

3
dist/index.js vendored
View File

@ -1,6 +1,7 @@
import RpcNetwork from "./network.js"; import RpcNetwork from "./network.js";
import RpcQueryBase from "./query/base.js"; import RpcQueryBase from "./query/base.js";
import SimpleRpcQuery from "./query/simple.js"; import SimpleRpcQuery from "./query/simple.js";
import StreamingRpcQuery from "./query/streaming.js";
import WisdomRpcQuery from "./query/wisdom.js"; import WisdomRpcQuery from "./query/wisdom.js";
export * from "./types.js"; export * from "./types.js";
export { RpcNetwork, RpcQueryBase, SimpleRpcQuery, WisdomRpcQuery }; export { RpcNetwork, RpcQueryBase, SimpleRpcQuery, StreamingRpcQuery, WisdomRpcQuery, };

28
dist/network.d.ts vendored
View File

@ -1,10 +1,10 @@
import RPC from "@lumeweb/rpc"; /// <reference types="node" />
import SimpleRpcQuery from "./query/simple.js";
import WisdomRpcQuery from "./query/wisdom.js"; import WisdomRpcQuery from "./query/wisdom.js";
import StreamingRpcQuery from "./query/streaming.js";
import { RpcQueryOptions, StreamHandlerFunction } from "./types.js";
import SimpleRpcQuery from "./query/simple.js";
export default class RpcNetwork { export default class RpcNetwork {
constructor(dht?: any); constructor(dht?: any);
private _activeRelay?;
get activeRelay(): RPC;
private _dht; private _dht;
get dht(): any; get dht(): any;
private _majorityThreshold; private _majorityThreshold;
@ -34,15 +34,23 @@ export default class RpcNetwork {
module: string, module: string,
data?: object | any[], data?: object | any[],
bypassCache?: boolean, bypassCache?: boolean,
options?: {} options?: RpcQueryOptions
): WisdomRpcQuery; ): WisdomRpcQuery;
simpleQuery( streamingQuery(
relay: string, relay: Buffer | string,
method: string, method: string,
module: string, module: string,
data: object | any[] | undefined, streamHandler: StreamHandlerFunction,
bypassCache: boolean | undefined, data?: object | any[],
options: {} options?: RpcQueryOptions
): StreamingRpcQuery;
simpleQuery(
relay: Buffer | string,
method: string,
module: string,
data?: object | any[],
bypassCache?: boolean,
options?: RpcQueryOptions
): SimpleRpcQuery; ): SimpleRpcQuery;
} }
//# sourceMappingURL=network.d.ts.map //# sourceMappingURL=network.d.ts.map

View File

@ -1 +1 @@
{"version":3,"file":"network.d.ts","sourceRoot":"","sources":["../src/network.ts"],"names":[],"mappings":"AAGA,OAAO,GAAG,MAAM,cAAc,CAAC;AAE/B,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAC/C,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAE/C,MAAM,CAAC,OAAO,OAAO,UAAU;gBACjB,GAAG,MAAY;IAI3B,OAAO,CAAC,YAAY,CAAC,CAAM;IAE3B,IAAI,WAAW,IAAI,GAAG,CAErB;IAED,OAAO,CAAC,IAAI,CAAa;IAEzB,IAAI,GAAG,QAEN;IAED,OAAO,CAAC,kBAAkB,CAAQ;IAElC,IAAI,iBAAiB,IAAI,MAAM,CAE9B;IAED,IAAI,iBAAiB,CAAC,KAAK,EAAE,MAAM,EAElC;IAED,OAAO,CAAC,aAAa,CAAM;IAE3B,IAAI,YAAY,IAAI,MAAM,CAEzB;IAED,IAAI,YAAY,CAAC,KAAK,EAAE,MAAM,EAE7B;IAED,OAAO,CAAC,aAAa,CAAK;IAE1B,IAAI,YAAY,IAAI,MAAM,CAEzB;IAED,IAAI,YAAY,CAAC,KAAK,EAAE,MAAM,EAE7B;IAED,OAAO,CAAC,OAAO,CAAgB;IAE/B,IAAI,MAAM,IAAI,MAAM,EAAE,CAErB;IAED,OAAO,CAAC,MAAM,CAAC,CAAgB;IAE/B,IAAI,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC,CAMzB;IAED,OAAO,CAAC,YAAY,CAAkB;IAEtC,IAAI,WAAW,IAAI,OAAO,CAEzB;IAED,IAAI,WAAW,CAAC,KAAK,EAAE,OAAO,EAE7B;IAED,OAAO,CAAC,UAAU,CAAa;IAE/B,IAAI,SAAS,IAAI,MAAM,CAEtB;IAED,IAAI,SAAS,CAAC,KAAK,EAAE,MAAM,EAE1B;IAEM,QAAQ,CAAC,MAAM,EAAE,MAAM,GAAG,IAAI;IAK9B,WAAW,CAAC,MAAM,EAAE,MAAM,GAAG,OAAO;IAWpC,WAAW,IAAI,IAAI;IAInB,WAAW,CAChB,MAAM,EAAE,MAAM,EACd,MAAM,EAAE,MAAM,EACd,IAAI,GAAE,MAAM,GAAG,GAAG,EAAO,EACzB,WAAW,GAAE,OAAe,EAC5B,OAAO,KAAK,GACX,cAAc;IAYV,WAAW,CAChB,KAAK,EAAE,MAAM,EACb,MAAM,EAAE,MAAM,EACd,MAAM,EAAE,MAAM,EACd,IAAI,4BAAqB,EACzB,WAAW,qBAAiB,EAC5B,OAAO,EAAE,EAAE,GACV,cAAc;CAalB"} {"version":3,"file":"network.d.ts","sourceRoot":"","sources":["../src/network.ts"],"names":[],"mappings":";AAAA,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAG/C,OAAO,iBAAiB,MAAM,sBAAsB,CAAC;AACrD,OAAO,EAAE,eAAe,EAAE,qBAAqB,EAAE,MAAM,YAAY,CAAC;AACpE,OAAO,cAAc,MAAM,mBAAmB,CAAC;AAE/C,MAAM,CAAC,OAAO,OAAO,UAAU;gBACjB,GAAG,MAAY;IAI3B,OAAO,CAAC,IAAI,CAAa;IAEzB,IAAI,GAAG,QAEN;IAED,OAAO,CAAC,kBAAkB,CAAQ;IAElC,IAAI,iBAAiB,IAAI,MAAM,CAE9B;IAED,IAAI,iBAAiB,CAAC,KAAK,EAAE,MAAM,EAElC;IAED,OAAO,CAAC,aAAa,CAAM;IAE3B,IAAI,YAAY,IAAI,MAAM,CAEzB;IAED,IAAI,YAAY,CAAC,KAAK,EAAE,MAAM,EAE7B;IAED,OAAO,CAAC,aAAa,CAAK;IAE1B,IAAI,YAAY,IAAI,MAAM,CAEzB;IAED,IAAI,YAAY,CAAC,KAAK,EAAE,MAAM,EAE7B;IAED,OAAO,CAAC,OAAO,CAAgB;IAE/B,IAAI,MAAM,IAAI,MAAM,EAAE,CAErB;IAED,OAAO,CAAC,MAAM,CAAC,CAAgB;IAE/B,IAAI,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC,CAKzB;IAED,OAAO,CAAC,YAAY,CAAkB;IAEtC,IAAI,WAAW,IAAI,OAAO,CAEzB;IAED,IAAI,WAAW,CAAC,KAAK,EAAE,OAAO,EAE7B;IAED,OAAO,CAAC,UAAU,CAAa;IAE/B,IAAI,SAAS,IAAI,MAAM,CAEtB;IAED,IAAI,SAAS,CAAC,KAAK,EAAE,MAAM,EAE1B;IAEM,QAAQ,CAAC,MAAM,EAAE,MAAM,GAAG,IAAI;IAK9B,WAAW,CAAC,MAAM,EAAE,MAAM,GAAG,OAAO;IAWpC,WAAW,IAAI,IAAI;IAInB,WAAW,CAChB,MAAM,EAAE,MAAM,EACd,MAAM,EAAE,MAAM,EACd,IAAI,GAAE,MAAM,GAAG,GAAG,EAAO,EACzB,WAAW,GAAE,OAAe,EAC5B,OAAO,GAAE,eAAoB,GAC5B,cAAc;IAaV,cAAc,CACnB,KAAK,EAAE,MAAM,GAAG,MAAM,EACtB,MAAM,EAAE,MAAM,EACd,MAAM,EAAE,MAAM,EACd,aAAa,EAAE,qBAAqB,EACpC,IAAI,GAAE,MAAM,GAAG,GAAG,EAAO,EACzB,OAAO,GAAE,eAAoB,GAC5B,iBAAiB;IASb,WAAW,CAChB,KAAK,EAAE,MAAM,GAAG,MAAM,EACtB,MAAM,EAAE,MAAM,EACd,MAAM,EAAE,MAAM,EACd,IAAI,GAAE,MAAM,GAAG,GAAG,EAAO,EACzB,WAAW,GAAE,OAAe,EAC5B,OAAO,GAAE,eAAoB,GAC5B,cAAc;CAalB"}

12
dist/network.js vendored
View File

@ -1,15 +1,12 @@
import WisdomRpcQuery from "./query/wisdom.js";
// @ts-ignore // @ts-ignore
import DHT from "@hyperswarm/dht"; import DHT from "@hyperswarm/dht";
import StreamingRpcQuery from "./query/streaming.js";
import SimpleRpcQuery from "./query/simple.js"; import SimpleRpcQuery from "./query/simple.js";
import WisdomRpcQuery from "./query/wisdom.js";
export default class RpcNetwork { export default class RpcNetwork {
constructor(dht = new DHT()) { constructor(dht = new DHT()) {
this._dht = dht; this._dht = dht;
} }
_activeRelay;
get activeRelay() {
return this._activeRelay;
}
_dht; _dht;
get dht() { get dht() {
return this._dht; return this._dht;
@ -83,7 +80,10 @@ export default class RpcNetwork {
bypassCache: bypassCache || this._bypassCache, bypassCache: bypassCache || this._bypassCache,
}, options).run(); }, options).run();
} }
simpleQuery(relay, method, module, data = {}, bypassCache = false, options) { streamingQuery(relay, method, module, streamHandler, data = {}, options = {}) {
return new StreamingRpcQuery(this, relay, { method, module, data }, { ...options, streamHandler }).run();
}
simpleQuery(relay, method, module, data = {}, bypassCache = false, options = {}) {
return new SimpleRpcQuery(this, relay, { return new SimpleRpcQuery(this, relay, {
method, method,
module, module,

26
dist/query/base.d.ts vendored
View File

@ -1,11 +1,8 @@
/// <reference types="node" /> /// <reference types="node" />
import { Buffer } from "buffer";
import RpcNetwork from "../network.js"; import RpcNetwork from "../network.js";
import { RpcQueryOptions } from "../types.js"; import { RpcQueryOptions } from "../types.js";
import type { import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types";
ClientRPCRequest,
RPCRequest,
RPCResponse,
} from "@lumeweb/relay-types";
export default abstract class RpcQueryBase { export default abstract class RpcQueryBase {
protected _network: RpcNetwork; protected _network: RpcNetwork;
protected _query: RPCRequest; protected _query: RPCRequest;
@ -14,21 +11,24 @@ export default abstract class RpcQueryBase {
protected _timeoutTimer?: any; protected _timeoutTimer?: any;
protected _timeout: boolean; protected _timeout: boolean;
protected _completed: boolean; protected _completed: boolean;
protected _response?: RPCResponse; protected _responses: {
protected _error?: string; [relay: string]: RPCResponse;
};
protected _errors: {
[relay: string]: any;
};
protected _promiseResolve?: (data: any) => void; protected _promiseResolve?: (data: any) => void;
constructor( constructor(
network: RpcNetwork, network: RpcNetwork,
query: ClientRPCRequest | RPCRequest, query: RPCRequest,
options?: RpcQueryOptions options?: RpcQueryOptions
); );
get result(): Promise<RPCResponse>; get result(): Promise<RPCResponse>;
protected handeTimeout(): void; private handeTimeout;
protected resolve(data?: RPCResponse, timeout?: boolean): void; protected resolve(data?: RPCResponse, timeout?: boolean): void;
run(): this; run(): this;
private _doRun; protected queryRelay(relay: string | Buffer): Promise<any>;
protected setupRelayTimeout(reject: Function): NodeJS.Timeout; protected abstract checkResponses(): void;
protected abstract _run(): void; protected abstract getRelays(): string[] | Buffer[];
protected queryRpc(rpc: any, request: RPCRequest): Promise<unknown>;
} }
//# sourceMappingURL=base.d.ts.map //# sourceMappingURL=base.d.ts.map

View File

@ -1 +1 @@
{"version":3,"file":"base.d.ts","sourceRoot":"","sources":["../../src/query/base.ts"],"names":[],"mappings":";AAIA,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,EAAE,eAAe,EAAE,MAAM,aAAa,CAAC;AAC9C,OAAO,KAAK,EACV,gBAAgB,EAChB,UAAU,EACV,WAAW,EACZ,MAAM,sBAAsB,CAAC;AAI9B,MAAM,CAAC,OAAO,CAAC,QAAQ,OAAO,YAAY;IACxC,SAAS,CAAC,QAAQ,EAAE,UAAU,CAAC;IAC/B,SAAS,CAAC,MAAM,EAAE,UAAU,CAAC;IAC7B,SAAS,CAAC,QAAQ,EAAE,eAAe,CAAC;IAEpC,SAAS,CAAC,QAAQ,CAAC,EAAE,OAAO,CAAC,GAAG,CAAC,CAAC;IAClC,SAAS,CAAC,aAAa,CAAC,EAAE,GAAG,CAAC;IAC9B,SAAS,CAAC,QAAQ,EAAE,OAAO,CAAS;IACpC,SAAS,CAAC,UAAU,EAAE,OAAO,CAAS;IACtC,SAAS,CAAC,SAAS,CAAC,EAAE,WAAW,CAAC;IAClC,SAAS,CAAC,MAAM,CAAC,EAAE,MAAM,CAAC;IAC1B,SAAS,CAAC,eAAe,CAAC,EAAE,CAAC,IAAI,EAAE,GAAG,KAAK,IAAI,CAAC;gBAG9C,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,gBAAgB,GAAG,UAAU,EACpC,OAAO,GAAE,eAAoB;IAO/B,IAAI,MAAM,IAAI,OAAO,CAAC,WAAW,CAAC,CAEjC;IAED,SAAS,CAAC,YAAY;IAItB,SAAS,CAAC,OAAO,CAAC,IAAI,CAAC,EAAE,WAAW,EAAE,OAAO,GAAE,OAAe,GAAG,IAAI;IAc9D,GAAG,IAAI,IAAI;YAmBJ,MAAM;IASpB,SAAS,CAAC,iBAAiB,CAAC,MAAM,EAAE,QAAQ,GAAG,MAAM,CAAC,OAAO;IAO7D,SAAS,CAAC,QAAQ,CAAC,IAAI,IAAI,IAAI;cAEf,QAAQ,CAAC,GAAG,EAAE,GAAG,EAAE,OAAO,EAAE,UAAU;CAyBvD"} {"version":3,"file":"base.d.ts","sourceRoot":"","sources":["../../src/query/base.ts"],"names":[],"mappings":";AAEA,OAAO,EAAE,MAAM,EAAE,MAAM,QAAQ,CAAC;AAEhC,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,EAAE,eAAe,EAAE,MAAM,aAAa,CAAC;AAC9C,OAAO,KAAK,EAAE,UAAU,EAAE,WAAW,EAAE,MAAM,sBAAsB,CAAC;AAEpE,MAAM,CAAC,OAAO,CAAC,QAAQ,OAAO,YAAY;IACxC,SAAS,CAAC,QAAQ,EAAE,UAAU,CAAC;IAC/B,SAAS,CAAC,MAAM,EAAE,UAAU,CAAC;IAC7B,SAAS,CAAC,QAAQ,EAAE,eAAe,CAAC;IAEpC,SAAS,CAAC,QAAQ,CAAC,EAAE,OAAO,CAAC,GAAG,CAAC,CAAC;IAClC,SAAS,CAAC,aAAa,CAAC,EAAE,GAAG,CAAC;IAC9B,SAAS,CAAC,QAAQ,EAAE,OAAO,CAAS;IACpC,SAAS,CAAC,UAAU,EAAE,OAAO,CAAS;IACtC,SAAS,CAAC,UAAU,EAAE;QAAE,CAAC,KAAK,EAAE,MAAM,GAAG,WAAW,CAAA;KAAE,CAAM;IAC5D,SAAS,CAAC,OAAO,EAAE;QAAE,CAAC,KAAK,EAAE,MAAM,GAAG,GAAG,CAAA;KAAE,CAAM;IACjD,SAAS,CAAC,eAAe,CAAC,EAAE,CAAC,IAAI,EAAE,GAAG,KAAK,IAAI,CAAC;gBAG9C,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,UAAU,EACjB,OAAO,GAAE,eAAoB;IAO/B,IAAI,MAAM,IAAI,OAAO,CAAC,WAAW,CAAC,CAEjC;IAED,OAAO,CAAC,YAAY;IAIpB,SAAS,CAAC,OAAO,CAAC,IAAI,CAAC,EAAE,WAAW,EAAE,OAAO,GAAE,OAAe,GAAG,IAAI;IAc9D,GAAG,IAAI,IAAI;cA2BF,UAAU,CAAC,KAAK,EAAE,MAAM,GAAG,MAAM,GAAG,OAAO,CAAC,GAAG,CAAC;IAoDhE,SAAS,CAAC,QAAQ,CAAC,cAAc,IAAI,IAAI;IAEzC,SAAS,CAAC,QAAQ,CAAC,SAAS,IAAI,MAAM,EAAE,GAAG,MAAM,EAAE;CACpD"}

83
dist/query/base.js vendored
View File

@ -1,4 +1,7 @@
import { clearTimeout, setTimeout } from "timers"; import { clearTimeout, setTimeout } from "timers";
import { pack, unpack } from "msgpackr";
import { Buffer } from "buffer";
import { isPromise } from "../util.js";
export default class RpcQueryBase { export default class RpcQueryBase {
_network; _network;
_query; _query;
@ -7,8 +10,8 @@ export default class RpcQueryBase {
_timeoutTimer; _timeoutTimer;
_timeout = false; _timeout = false;
_completed = false; _completed = false;
_response; _responses = {};
_error; _errors = {};
_promiseResolve; _promiseResolve;
constructor(network, query, options = {}) { constructor(network, query, options = {}) {
this._network = network; this._network = network;
@ -40,45 +43,63 @@ export default class RpcQueryBase {
}); });
this._timeoutTimer = this._timeoutTimer =
this._timeoutTimer ?? this._timeoutTimer ??
setTimeout(this.handeTimeout.bind(this), (this._options?.queryTimeout || this._network.queryTimeout) * 1000); setTimeout(this.handeTimeout.bind(this), (this._options.queryTimeout || this._network.queryTimeout) * 1000);
this._doRun(); this._network.ready.then(() => {
const promises = [];
for (const relay of this.getRelays()) {
promises.push(this.queryRelay(relay));
}
Promise.allSettled(promises).then(() => this.checkResponses());
});
return this; return this;
} }
async _doRun() { async queryRelay(relay) {
let socket;
let relayKey = relay;
if (typeof relay === "string") {
relayKey = Buffer.from(relay, "hex");
}
if (relay instanceof Buffer) {
relayKey = relay;
relay = relay.toString("hex");
}
try { try {
await this._network.ready; socket = this._network.dht.connect(relayKey);
await this._run(); if (isPromise(socket)) {
socket = await socket;
}
} }
catch (e) { catch (e) {
this._promiseResolve?.({ error: e.message }); return;
} }
}
setupRelayTimeout(reject) {
return setTimeout(() => {
this._error = "timeout";
reject("timeout");
}, (this._options.relayTimeout || this._network.relayTimeout) * 1000);
}
async queryRpc(rpc, request) {
let timer;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
rpc let timer;
// @ts-ignore socket.on("data", (res) => {
.request(`${request.module}.${request.method}`, request.data) relay = relay;
.then((resp) => { if (timer) {
if (resp.error) { clearTimeout(timer);
throw new Error(resp.error); timer = null;
} }
clearTimeout(timer); socket.end();
this._response = resp; const response = unpack(res);
if (response && response.error) {
this._errors[relay] = response.error;
return reject(null);
}
this._responses[relay] = response;
resolve(null); resolve(null);
})
.catch((e) => {
this._error = e.message;
reject({ error: e.message });
clearTimeout(timer);
}); });
timer = this.setupRelayTimeout(reject); socket.on("error", (error) => {
relay = relay;
this._errors[relay] = error;
reject({ error });
});
socket.write("rpc");
socket.write(pack(this._query));
timer = setTimeout(() => {
this._errors[relay] = "timeout";
reject(null);
}, (this._options.relayTimeout || this._network.relayTimeout) * 1000);
}); });
} }
} }

View File

@ -1,17 +1,18 @@
import RpcNetwork from "../network.js"; /// <reference types="node" />
import { ClientRPCRequest } from "@lumeweb/relay-types";
import { RpcQueryOptions } from "../types.js";
import RpcQueryBase from "./base.js"; import RpcQueryBase from "./base.js";
import RpcNetwork from "../network.js";
import type { RPCRequest } from "@lumeweb/relay-types";
import { RpcQueryOptions } from "../types.js";
import type { Buffer } from "buffer";
export default class SimpleRpcQuery extends RpcQueryBase { export default class SimpleRpcQuery extends RpcQueryBase {
protected _relay: string; private _relay;
constructor( constructor(
network: RpcNetwork, network: RpcNetwork,
relay: string, relay: string | Buffer,
query: ClientRPCRequest, query: RPCRequest,
options: RpcQueryOptions options: RpcQueryOptions
); );
protected _run(): Promise<void>; protected checkResponses(): void;
protected queryRelay(): Promise<any>; protected getRelays(): string[] | Buffer[];
protected checkResponses(): Promise<void>;
} }
//# sourceMappingURL=simple.d.ts.map //# sourceMappingURL=simple.d.ts.map

View File

@ -1 +1 @@
{"version":3,"file":"simple.d.ts","sourceRoot":"","sources":["../../src/query/simple.ts"],"names":[],"mappings":"AAAA,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,EACL,gBAAgB,EAIjB,MAAM,sBAAsB,CAAC;AAC9B,OAAO,EAAE,eAAe,EAAE,MAAM,aAAa,CAAC;AAU9C,OAAO,YAAY,MAAM,WAAW,CAAC;AAErC,MAAM,CAAC,OAAO,OAAO,cAAe,SAAQ,YAAY;IACtD,SAAS,CAAC,MAAM,EAAE,MAAM,CAAC;gBAGvB,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,MAAM,EACb,KAAK,EAAE,gBAAgB,EACvB,OAAO,EAAE,eAAe;cAMV,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;cAKrB,UAAU,IAAI,OAAO,CAAC,GAAG,CAAC;cA2B1B,cAAc;CAmB/B"} {"version":3,"file":"simple.d.ts","sourceRoot":"","sources":["../../src/query/simple.ts"],"names":[],"mappings":";AAAA,OAAO,YAAY,MAAM,WAAW,CAAC;AACrC,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,KAAK,EAAE,UAAU,EAAE,MAAM,sBAAsB,CAAC;AACvD,OAAO,EAAE,eAAe,EAAE,MAAM,aAAa,CAAC;AAC9C,OAAO,KAAK,EAAE,MAAM,EAAE,MAAM,QAAQ,CAAC;AAErC,MAAM,CAAC,OAAO,OAAO,cAAe,SAAQ,YAAY;IACtD,OAAO,CAAC,MAAM,CAAkB;gBAE9B,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,MAAM,GAAG,MAAM,EACtB,KAAK,EAAE,UAAU,EACjB,OAAO,EAAE,eAAe;IAM1B,SAAS,CAAC,cAAc,IAAI,IAAI;IAYhC,SAAS,CAAC,SAAS,IAAI,MAAM,EAAE,GAAG,MAAM,EAAE;CAG3C"}

46
dist/query/simple.js vendored
View File

@ -1,7 +1,3 @@
import b4a from "b4a";
import { isPromise, validateTimestampedResponse, } from "../util.js";
import RPC from "@lumeweb/rpc";
import { ERR_INVALID_SIGNATURE } from "../error.js";
import RpcQueryBase from "./base.js"; import RpcQueryBase from "./base.js";
export default class SimpleRpcQuery extends RpcQueryBase { export default class SimpleRpcQuery extends RpcQueryBase {
_relay; _relay;
@ -9,43 +5,17 @@ export default class SimpleRpcQuery extends RpcQueryBase {
super(network, query, options); super(network, query, options);
this._relay = relay; this._relay = relay;
} }
async _run() { checkResponses() {
await this.queryRelay(); if (Object.keys(this._responses).length) {
await this.checkResponses(); this.resolve(Object.values(this._responses).pop());
}
async queryRelay() {
let socket;
try {
socket = this._network.dht.connect(b4a.from(this._relay, "hex"));
if (isPromise(socket)) {
socket = await socket;
}
}
catch (e) {
return; return;
} }
await socket.opened; if (Object.keys(this._errors).length) {
const rpc = new RPC(socket); const error = Object.values(this._errors).pop();
try { this.resolve(error, error === "timeout");
await this.queryRpc(rpc, this._query);
} }
catch (e) {
// @ts-ignore
rpc.end();
throw e;
} }
// @ts-ignore getRelays() {
rpc.end(); return [this._relay];
}
async checkResponses() {
let response = this._response;
if (this._error) {
response = { error: this._error };
}
if (!response.error &&
!validateTimestampedResponse(b4a.from(this._relay, "hex"), response)) {
response = { error: ERR_INVALID_SIGNATURE };
}
this.resolve(response);
} }
} }

19
dist/query/streaming.d.ts vendored Normal file
View File

@ -0,0 +1,19 @@
/// <reference types="node" />
import SimpleRpcQuery from "./simple.js";
import { Buffer } from "buffer";
import type { RPCRequest } from "@lumeweb/relay-types";
import RpcNetwork from "../network.js";
import { StreamingRpcQueryOptions } from "../types.js";
export default class StreamingRpcQuery extends SimpleRpcQuery {
protected _options: StreamingRpcQueryOptions;
protected _canceled: boolean;
constructor(
network: RpcNetwork,
relay: string | Buffer,
query: RPCRequest,
options: StreamingRpcQueryOptions
);
cancel(): void;
protected queryRelay(relay: string | Buffer): Promise<any>;
}
//# sourceMappingURL=streaming.d.ts.map

1
dist/query/streaming.d.ts.map vendored Normal file
View File

@ -0,0 +1 @@
{"version":3,"file":"streaming.d.ts","sourceRoot":"","sources":["../../src/query/streaming.ts"],"names":[],"mappings":";AAAA,OAAO,cAAc,MAAM,aAAa,CAAC;AACzC,OAAO,EAAE,MAAM,EAAE,MAAM,QAAQ,CAAC;AAIhC,OAAO,KAAK,EAAE,UAAU,EAAe,MAAM,sBAAsB,CAAC;AACpE,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,EAAE,wBAAwB,EAAE,MAAM,aAAa,CAAC;AAEvD,MAAM,CAAC,OAAO,OAAO,iBAAkB,SAAQ,cAAc;IAC3D,SAAS,CAAC,QAAQ,EAAE,wBAAwB,CAAC;IAC7C,SAAS,CAAC,SAAS,UAAS;gBAE1B,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,MAAM,GAAG,MAAM,EACtB,KAAK,EAAE,UAAU,EACjB,OAAO,EAAE,wBAAwB;IAM5B,MAAM;cAIG,UAAU,CAAC,KAAK,EAAE,MAAM,GAAG,MAAM,GAAG,OAAO,CAAC,GAAG,CAAC;CAmEjE"}

75
dist/query/streaming.js vendored Normal file
View File

@ -0,0 +1,75 @@
import SimpleRpcQuery from "./simple.js";
import { Buffer } from "buffer";
import { isPromise } from "../util.js";
import { clearTimeout } from "timers";
import { pack, unpack } from "msgpackr";
export default class StreamingRpcQuery extends SimpleRpcQuery {
_options;
_canceled = false;
constructor(network, relay, query, options) {
super(network, relay, query, options);
this._options = options;
}
cancel() {
this._canceled = true;
}
async queryRelay(relay) {
let socket;
let relayKey = relay;
if (relay === "string") {
relayKey = Buffer.from(relay, "hex");
}
if (relay instanceof Buffer) {
relayKey = relay;
relay = relay.toString("hex");
}
try {
socket = this._network.dht.connect(relayKey);
if (isPromise(socket)) {
socket = await socket;
}
}
catch (e) {
return;
}
return new Promise((resolve, reject) => {
const finish = () => {
relay = relay;
this._responses[relay] = {};
resolve(null);
socket.end();
};
const listener = (res) => {
relay = relay;
if (this._timeoutTimer) {
clearTimeout(this._timeoutTimer);
this._timeoutTimer = null;
}
if (this._canceled) {
socket.write(pack({ cancel: true }));
socket.off("data", listener);
finish();
return;
}
const response = unpack(res);
if (response && response.error) {
this._errors[relay] = response.error;
return reject(null);
}
if (response?.data.done) {
finish();
return;
}
this._options.streamHandler(response?.data.data);
};
socket.on("data", listener);
socket.on("error", (error) => {
relay = relay;
this._errors[relay] = error;
reject({ error });
});
socket.write("rpc");
socket.write(pack(this._query));
});
}
}

View File

@ -1,15 +1,9 @@
import { RPCBroadcastResponse, RPCResponse } from "@lumeweb/relay-types";
import RpcQueryBase from "./base.js"; import RpcQueryBase from "./base.js";
export default class WisdomRpcQuery extends RpcQueryBase { export default class WisdomRpcQuery extends RpcQueryBase {
protected _response?: RPCBroadcastResponse; private _maxTries;
private static _activeRelay; private _tries;
static get activeRelay(): any; protected checkResponses(): void;
get result(): Promise<RPCResponse>; private retry;
protected _run(): Promise<void>;
protected resolve(data?: RPCResponse, timeout?: boolean): void;
protected queryRelay(): Promise<any>;
protected checkResponse(): Promise<void>;
protected getRelays(): string[] | []; protected getRelays(): string[] | [];
private setupRelay;
} }
//# sourceMappingURL=wisdom.d.ts.map //# sourceMappingURL=wisdom.d.ts.map

View File

@ -1 +1 @@
{"version":3,"file":"wisdom.d.ts","sourceRoot":"","sources":["../../src/query/wisdom.ts"],"names":[],"mappings":"AAAA,OAAO,EAEL,oBAAoB,EAEpB,WAAW,EACZ,MAAM,sBAAsB,CAAC;AAY9B,OAAO,YAAY,MAAM,WAAW,CAAC;AASrC,MAAM,CAAC,OAAO,OAAO,cAAe,SAAQ,YAAY;IACtD,UAAkB,SAAS,CAAC,EAAE,oBAAoB,CAAC;IACnD,OAAO,CAAC,MAAM,CAAC,YAAY,CAAM;IAEjC,MAAM,KAAK,WAAW,IAAI,GAAG,CAE5B;IAED,IAAI,MAAM,IAAI,OAAO,CAAC,WAAW,CAAC,CAEjC;cAEe,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;IAMrC,SAAS,CAAC,OAAO,CAAC,IAAI,CAAC,EAAE,WAAW,EAAE,OAAO,GAAE,OAAe,GAAG,IAAI;cAcrD,UAAU,IAAI,OAAO,CAAC,GAAG,CAAC;cAkB1B,aAAa;IAwE7B,SAAS,CAAC,SAAS,IAAI,MAAM,EAAE,GAAG,EAAE;YAoBtB,UAAU;CAsBzB"} {"version":3,"file":"wisdom.d.ts","sourceRoot":"","sources":["../../src/query/wisdom.ts"],"names":[],"mappings":"AAAA,OAAO,YAAY,MAAM,WAAW,CAAC;AAcrC,MAAM,CAAC,OAAO,OAAO,cAAe,SAAQ,YAAY;IACtD,OAAO,CAAC,SAAS,CAAK;IACtB,OAAO,CAAC,MAAM,CAAK;IAEnB,SAAS,CAAC,cAAc,IAAI,IAAI;IA4DhC,OAAO,CAAC,KAAK;IAWb,SAAS,CAAC,SAAS,IAAI,MAAM,EAAE,GAAG,EAAE;CAmBrC"}

138
dist/query/wisdom.js vendored
View File

@ -1,98 +1,71 @@
import { clearTimeout } from "timers";
import b4a from "b4a";
import { flatten, isPromise, validateResponse, validateTimestampedResponse, } from "../util.js";
import RPC from "@lumeweb/rpc";
import { blake2b } from "libskynet";
import { ERR_INVALID_SIGNATURE, ERR_NO_RELAYS } from "../error.js";
import RpcQueryBase from "./base.js"; import RpcQueryBase from "./base.js";
import { flatten } from "../util.js";
import { Buffer } from "buffer";
import { blake2b } from "libskynet";
import { ERR_MAX_TRIES_HIT } from "../error.js";
function flatHash(data) { function flatHash(data) {
const flattenedData = flatten(data).sort(); const flattenedData = flatten(data).sort();
return Buffer.from(blake2b(Buffer.from(JSON.stringify(flattenedData)))).toString("hex"); return Buffer.from(blake2b(Buffer.from(JSON.stringify(flattenedData)))).toString("hex");
} }
export default class WisdomRpcQuery extends RpcQueryBase { export default class WisdomRpcQuery extends RpcQueryBase {
static _activeRelay; _maxTries = 3;
static get activeRelay() { _tries = 0;
return this._activeRelay; checkResponses() {
} const responseStore = this._responses;
get result() { const responseStoreData = Object.values(responseStore);
return this._promise; const responseObjects = responseStoreData.reduce((output, item) => {
} const hash = flatHash(item?.data);
async _run() {
await this.setupRelay();
await this.queryRelay();
await this.checkResponse();
}
resolve(data, timeout = false) {
clearTimeout(this._timeoutTimer);
this._timeout = timeout;
this._completed = true;
if (timeout) {
data = {
error: "timeout",
};
}
this._promiseResolve?.(data);
}
async queryRelay() {
let activeRelay = WisdomRpcQuery.activeRelay;
let relays = this.getRelays();
if (!relays.length) {
throw new Error(ERR_NO_RELAYS);
}
return this.queryRpc(activeRelay, {
module: "rpc",
method: "broadcast_request",
data: {
request: this._query,
relays,
},
});
}
async checkResponse() {
if (this._error) {
this.resolve({ error: this._error });
return;
}
if (!validateResponse(WisdomRpcQuery.activeRelay.stream.remotePublicKey, this._response)) {
this.resolve({ error: ERR_INVALID_SIGNATURE });
return;
}
let relays = [];
for (const relay in this._response?.relays) {
const resp = this._response?.relays[relay];
if (validateTimestampedResponse(b4a.from(relay, "hex"), resp)) {
relays.push(resp);
}
}
if (!relays.length) {
this.resolve({ error: ERR_NO_RELAYS });
return;
}
const responseObjects = relays.reduce((output, item) => {
const field = item.signedField || "data";
// @ts-ignore
const hash = flatHash(item[field]);
output[hash] = item?.data; output[hash] = item?.data;
return output; return output;
}, {}); }, {});
const responses = relays.reduce((output, item) => { const responses = responseStoreData.reduce((output, item) => {
const field = item.signedField || "data"; const hash = flatHash(item?.data);
// @ts-ignore
const hash = flatHash(item[field]);
output[hash] = output[hash] ?? 0; output[hash] = output[hash] ?? 0;
output[hash]++; output[hash]++;
return output; return output;
}, {}); }, {});
if (!Object.keys(responses).length) {
if (Object.keys(this._errors).length) {
this.resolve({ error: Object.values(this._errors).pop() });
return;
}
if (this._tries <= this._maxTries) {
this._tries++;
this.retry();
return;
}
this.resolve({ data: { error: ERR_MAX_TRIES_HIT } });
return;
}
for (const responseHash in responses) { for (const responseHash in responses) {
if (responses[responseHash] / relays.length >= if (responses[responseHash] / responseStoreData.length >=
this._network.majorityThreshold) { this._network.majorityThreshold) {
let response = responseObjects[responseHash]; let response = responseObjects[responseHash];
// @ts-ignore
if (null === response) {
if (this._tries <= this._maxTries) {
this._tries++;
this.retry();
return;
}
response = { error: ERR_MAX_TRIES_HIT };
}
else {
response = { data: response }; response = { data: response };
}
this.resolve(response); this.resolve(response);
break; break;
} }
} }
} }
retry() {
this._responses = {};
this._errors = {};
if (this._completed) {
return;
}
this.run();
}
getRelays() { getRelays() {
if (this._network.maxRelays === 0 || if (this._network.maxRelays === 0 ||
this._network.relays.length <= this._network.maxRelays) { this._network.relays.length <= this._network.maxRelays) {
@ -107,23 +80,4 @@ export default class WisdomRpcQuery extends RpcQueryBase {
} }
return list; return list;
} }
async setupRelay() {
let active = WisdomRpcQuery.activeRelay;
let relays = this._network.relays;
if (!active) {
if (!relays.length) {
throw new Error(ERR_NO_RELAYS);
}
let relay = relays[Math.floor(Math.random() * relays.length)];
let socket = this._network.dht.connect(b4a.from(relay, "hex"));
if (isPromise(socket)) {
socket = await socket;
}
await socket.opened;
WisdomRpcQuery._activeRelay = new RPC(socket);
socket.once("close", () => {
WisdomRpcQuery._activeRelay = undefined;
});
}
}
} }

4
dist/types.d.ts vendored
View File

@ -2,4 +2,8 @@ export interface RpcQueryOptions {
queryTimeout?: number; queryTimeout?: number;
relayTimeout?: number; relayTimeout?: number;
} }
export interface StreamingRpcQueryOptions extends RpcQueryOptions {
streamHandler: StreamHandlerFunction;
}
export declare type StreamHandlerFunction = (data: Uint8Array) => void;
//# sourceMappingURL=types.d.ts.map //# sourceMappingURL=types.d.ts.map

2
dist/types.d.ts.map vendored
View File

@ -1 +1 @@
{"version":3,"file":"types.d.ts","sourceRoot":"","sources":["../src/types.ts"],"names":[],"mappings":"AAAA,MAAM,WAAW,eAAe;IAC9B,YAAY,CAAC,EAAE,MAAM,CAAC;IACtB,YAAY,CAAC,EAAE,MAAM,CAAC;CACvB"} {"version":3,"file":"types.d.ts","sourceRoot":"","sources":["../src/types.ts"],"names":[],"mappings":"AAAA,MAAM,WAAW,eAAe;IAC9B,YAAY,CAAC,EAAE,MAAM,CAAC;IACtB,YAAY,CAAC,EAAE,MAAM,CAAC;CACvB;AACD,MAAM,WAAW,wBAAyB,SAAQ,eAAe;IAC/D,aAAa,EAAE,qBAAqB,CAAC;CACtC;AAED,oBAAY,qBAAqB,GAAG,CAAC,IAAI,EAAE,UAAU,KAAK,IAAI,CAAC"}

13
dist/util.d.ts vendored
View File

@ -1,14 +1,3 @@
/// <reference types="node" />
import type { RPCResponse } from "@lumeweb/relay-types";
export declare function isPromise(obj: Promise<any>): boolean;
export declare function flatten(target: any, opts?: any): any[]; export declare function flatten(target: any, opts?: any): any[];
export declare function validateResponse( export declare function isPromise(obj: Promise<any>): boolean;
relay: Buffer,
response: RPCResponse,
timestamped?: boolean
): boolean;
export declare function validateTimestampedResponse(
relay: Buffer,
response: RPCResponse
): boolean;
//# sourceMappingURL=util.d.ts.map //# sourceMappingURL=util.d.ts.map

2
dist/util.d.ts.map vendored
View File

@ -1 +1 @@
{"version":3,"file":"util.d.ts","sourceRoot":"","sources":["../src/util.ts"],"names":[],"mappings":";AAEA,OAAO,KAAK,EAAc,WAAW,EAAE,MAAM,sBAAsB,CAAC;AAKpE,wBAAgB,SAAS,CAAC,GAAG,EAAE,OAAO,CAAC,GAAG,CAAC,WAM1C;AAKD,wBAAgB,OAAO,CAAC,MAAM,EAAE,GAAG,EAAE,IAAI,GAAE,GAAQ,GAAG,GAAG,EAAE,CA0C1D;AAED,wBAAgB,gBAAgB,CAC9B,KAAK,EAAE,MAAM,EACb,QAAQ,EAAE,WAAW,EACrB,WAAW,UAAQ,GAClB,OAAO,CAmBT;AAED,wBAAgB,2BAA2B,CACzC,KAAK,EAAE,MAAM,EACb,QAAQ,EAAE,WAAW,GACpB,OAAO,CAET"} {"version":3,"file":"util.d.ts","sourceRoot":"","sources":["../src/util.ts"],"names":[],"mappings":"AAaA,wBAAgB,OAAO,CAAC,MAAM,EAAE,GAAG,EAAE,IAAI,GAAE,GAAQ,GAAG,GAAG,EAAE,CA0C1D;AAED,wBAAgB,SAAS,CAAC,GAAG,EAAE,OAAO,CAAC,GAAG,CAAC,WAM1C"}

36
dist/util.js vendored
View File

@ -1,12 +1,8 @@
// @ts-ignore function isBuffer(obj) {
import stringify from "json-stringify-deterministic"; return (obj &&
// @ts-ignore obj.constructor &&
import crypto from "hypercore-crypto"; typeof obj.constructor.isBuffer === "function" &&
import b4a from "b4a"; obj.constructor.isBuffer(obj));
export function isPromise(obj) {
return (!!obj &&
(typeof obj === "object" || typeof obj === "function") &&
typeof obj.then === "function");
} }
/* /*
Forked from https://github.com/hughsk/flat Forked from https://github.com/hughsk/flat
@ -26,7 +22,7 @@ export function flatten(target, opts = {}) {
const value = object[key]; const value = object[key];
const isarray = opts.safe && Array.isArray(value); const isarray = opts.safe && Array.isArray(value);
const type = Object.prototype.toString.call(value); const type = Object.prototype.toString.call(value);
const isbuffer = b4a.isBuffer(value); const isbuffer = isBuffer(value);
const isobject = type === "[object Object]" || type === "[object Array]"; const isobject = type === "[object Object]" || type === "[object Array]";
const newKey = prev const newKey = prev
? prev + delimiter + transformKey(key) ? prev + delimiter + transformKey(key)
@ -44,20 +40,8 @@ export function flatten(target, opts = {}) {
step(target); step(target);
return output; return output;
} }
export function validateResponse(relay, response, timestamped = false) { export function isPromise(obj) {
const field = response.signedField || "data"; return (!!obj &&
// @ts-ignore (typeof obj === "object" || typeof obj === "function") &&
const data = response[field]; typeof obj.then === "function");
let json = data;
if (typeof json !== "string") {
json = stringify(json);
}
const updated = response.updated;
if (timestamped && updated) {
json = updated.toString() + json;
}
return !!crypto.verify(b4a.from(json), b4a.from(response.signature, "hex"), relay);
}
export function validateTimestampedResponse(relay, response) {
return validateResponse(relay, response, true);
} }

View File

@ -1,5 +1,5 @@
{ {
"name": "@lumeweb/rpc-client", "name": "@lumeweb/dht-rpc-client",
"type": "module", "type": "module",
"version": "0.1.0", "version": "0.1.0",
"description": "", "description": "",
@ -8,19 +8,14 @@
"build": "rimraf dist && tsc" "build": "rimraf dist && tsc"
}, },
"devDependencies": { "devDependencies": {
"@lumeweb/relay-types": "https://git.lumeweb.com/LumeWeb/relay-types.git", "@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git",
"@types/b4a": "^1.6.0", "@types/json-stable-stringify": "^1.0.34",
"@types/express": "^4.17.14",
"@types/node": "^18.0.0", "@types/node": "^18.0.0",
"node-cache": "^5.1.2",
"prettier": "^2.7.1", "prettier": "^2.7.1",
"typescript": "^4.7.4" "typescript": "^4.7.4"
}, },
"dependencies": { "dependencies": {
"@hyperswarm/dht": "^6.0.1", "@hyperswarm/dht": "^6.0.1",
"@lumeweb/rpc": "https://git.lumeweb.com/LumeWeb/rpc.git",
"b4a": "^1.6.1",
"json-stringify-deterministic": "^1.0.7",
"libskynet": "^0.0.61", "libskynet": "^0.0.61",
"msgpackr": "^1.6.1" "msgpackr": "^1.6.1"
} }

View File

@ -1,3 +1 @@
export const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT"; export const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT";
export const ERR_INVALID_SIGNATURE = "INVALID_SIGNATURE";
export const ERR_NO_RELAYS = "NO_RELAYS";

View File

@ -1,8 +1,15 @@
import RpcNetwork from "./network.js"; import RpcNetwork from "./network.js";
import RpcQueryBase from "./query/base.js"; import RpcQueryBase from "./query/base.js";
import SimpleRpcQuery from "./query/simple.js"; import SimpleRpcQuery from "./query/simple.js";
import StreamingRpcQuery from "./query/streaming.js";
import WisdomRpcQuery from "./query/wisdom.js"; import WisdomRpcQuery from "./query/wisdom.js";
export * from "./types.js"; export * from "./types.js";
export { RpcNetwork, RpcQueryBase, SimpleRpcQuery, WisdomRpcQuery }; export {
RpcNetwork,
RpcQueryBase,
SimpleRpcQuery,
StreamingRpcQuery,
WisdomRpcQuery,
};

View File

@ -1,22 +1,15 @@
import WisdomRpcQuery from "./query/wisdom.js";
// @ts-ignore // @ts-ignore
import DHT from "@hyperswarm/dht"; import DHT from "@hyperswarm/dht";
import b4a from "b4a"; import StreamingRpcQuery from "./query/streaming.js";
import RPC from "@lumeweb/rpc"; import { RpcQueryOptions, StreamHandlerFunction } from "./types.js";
import { isPromise } from "./util.js";
import SimpleRpcQuery from "./query/simple.js"; import SimpleRpcQuery from "./query/simple.js";
import WisdomRpcQuery from "./query/wisdom.js";
export default class RpcNetwork { export default class RpcNetwork {
constructor(dht = new DHT()) { constructor(dht = new DHT()) {
this._dht = dht; this._dht = dht;
} }
private _activeRelay?: RPC;
get activeRelay(): RPC {
return this._activeRelay as RPC;
}
private _dht: typeof DHT; private _dht: typeof DHT;
get dht() { get dht() {
@ -65,7 +58,6 @@ export default class RpcNetwork {
if (!this._ready) { if (!this._ready) {
this._ready = this._dht.ready() as Promise<void>; this._ready = this._dht.ready() as Promise<void>;
} }
return this._ready; return this._ready;
} }
@ -114,7 +106,7 @@ export default class RpcNetwork {
module: string, module: string,
data: object | any[] = {}, data: object | any[] = {},
bypassCache: boolean = false, bypassCache: boolean = false,
options = {} options: RpcQueryOptions = {}
): WisdomRpcQuery { ): WisdomRpcQuery {
return new WisdomRpcQuery( return new WisdomRpcQuery(
this, this,
@ -127,13 +119,30 @@ export default class RpcNetwork {
options options
).run(); ).run();
} }
public streamingQuery(
relay: Buffer | string,
method: string,
module: string,
streamHandler: StreamHandlerFunction,
data: object | any[] = {},
options: RpcQueryOptions = {}
): StreamingRpcQuery {
return new StreamingRpcQuery(
this,
relay,
{ method, module, data },
{ ...options, streamHandler }
).run();
}
public simpleQuery( public simpleQuery(
relay: string, relay: Buffer | string,
method: string, method: string,
module: string, module: string,
data: object | any[] = {}, data: object | any[] = {},
bypassCache: boolean = false, bypassCache: boolean = false,
options: {} options: RpcQueryOptions = {}
): SimpleRpcQuery { ): SimpleRpcQuery {
return new SimpleRpcQuery( return new SimpleRpcQuery(
this, this,

View File

@ -4,13 +4,7 @@ import { Buffer } from "buffer";
import { isPromise } from "../util.js"; import { isPromise } from "../util.js";
import RpcNetwork from "../network.js"; import RpcNetwork from "../network.js";
import { RpcQueryOptions } from "../types.js"; import { RpcQueryOptions } from "../types.js";
import type { import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types";
ClientRPCRequest,
RPCRequest,
RPCResponse,
} from "@lumeweb/relay-types";
import RPC from "@lumeweb/rpc";
import { RPCBroadcastRequest } from "@lumeweb/relay-types";
export default abstract class RpcQueryBase { export default abstract class RpcQueryBase {
protected _network: RpcNetwork; protected _network: RpcNetwork;
@ -21,13 +15,13 @@ export default abstract class RpcQueryBase {
protected _timeoutTimer?: any; protected _timeoutTimer?: any;
protected _timeout: boolean = false; protected _timeout: boolean = false;
protected _completed: boolean = false; protected _completed: boolean = false;
protected _response?: RPCResponse; protected _responses: { [relay: string]: RPCResponse } = {};
protected _error?: string; protected _errors: { [relay: string]: any } = {};
protected _promiseResolve?: (data: any) => void; protected _promiseResolve?: (data: any) => void;
constructor( constructor(
network: RpcNetwork, network: RpcNetwork,
query: ClientRPCRequest | RPCRequest, query: RPCRequest,
options: RpcQueryOptions = {} options: RpcQueryOptions = {}
) { ) {
this._network = network; this._network = network;
@ -39,7 +33,7 @@ export default abstract class RpcQueryBase {
return this._promise as Promise<RPCResponse>; return this._promise as Promise<RPCResponse>;
} }
protected handeTimeout() { private handeTimeout() {
this.resolve(undefined, true); this.resolve(undefined, true);
} }
@ -68,55 +62,75 @@ export default abstract class RpcQueryBase {
this._timeoutTimer ?? this._timeoutTimer ??
setTimeout( setTimeout(
this.handeTimeout.bind(this), this.handeTimeout.bind(this),
(this._options?.queryTimeout || this._network.queryTimeout) * 1000 (this._options.queryTimeout || this._network.queryTimeout) * 1000
); );
this._doRun(); this._network.ready.then(() => {
const promises = [];
for (const relay of this.getRelays()) {
promises.push(this.queryRelay(relay));
}
Promise.allSettled(promises).then(() => this.checkResponses());
});
return this; return this;
} }
private async _doRun() { protected async queryRelay(relay: string | Buffer): Promise<any> {
let socket: any;
let relayKey: Buffer = relay as Buffer;
if (typeof relay === "string") {
relayKey = Buffer.from(relay, "hex");
}
if (relay instanceof Buffer) {
relayKey = relay;
relay = relay.toString("hex");
}
try { try {
await this._network.ready; socket = this._network.dht.connect(relayKey);
await this._run(); if (isPromise(socket)) {
} catch (e: any) { socket = await socket;
this._promiseResolve?.({ error: e.message });
} }
} catch (e) {
return;
} }
protected setupRelayTimeout(reject: Function): NodeJS.Timeout {
return setTimeout(() => {
this._error = "timeout";
reject("timeout");
}, (this._options.relayTimeout || this._network.relayTimeout) * 1000) as NodeJS.Timeout;
}
protected abstract _run(): void;
protected async queryRpc(rpc: any, request: RPCRequest) {
let timer: NodeJS.Timeout;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
rpc let timer: any;
// @ts-ignore socket.on("data", (res: Buffer) => {
.request(`${request.module}.${request.method}`, request.data) relay = relay as string;
.then((resp: any) => { if (timer) {
if (resp.error) {
throw new Error(resp.error);
}
clearTimeout(timer as any); clearTimeout(timer as any);
timer = null;
this._response = resp; }
socket.end();
const response = unpack(res as any) as RPCResponse;
if (response && response.error) {
this._errors[relay] = response.error;
return reject(null);
}
this._responses[relay] = response;
resolve(null); resolve(null);
})
.catch((e: Error) => {
this._error = e.message;
reject({ error: e.message });
clearTimeout(timer as any);
}); });
socket.on("error", (error: any) => {
timer = this.setupRelayTimeout(reject); relay = relay as string;
this._errors[relay] = error;
reject({ error });
});
socket.write("rpc");
socket.write(pack(this._query));
timer = setTimeout(() => {
this._errors[relay as string] = "timeout";
reject(null);
}, (this._options.relayTimeout || this._network.relayTimeout) * 1000) as NodeJS.Timeout;
}); });
} }
protected abstract checkResponses(): void;
protected abstract getRelays(): string[] | Buffer[];
} }

View File

@ -1,84 +1,34 @@
import RpcNetwork from "../network.js";
import {
ClientRPCRequest,
RPCBroadcastRequest,
RPCRequest,
RPCResponse,
} from "@lumeweb/relay-types";
import { RpcQueryOptions } from "../types.js";
import { clearTimeout, setTimeout } from "timers";
import b4a from "b4a";
import {
isPromise,
validateResponse,
validateTimestampedResponse,
} from "../util.js";
import RPC from "@lumeweb/rpc";
import { ERR_INVALID_SIGNATURE } from "../error.js";
import RpcQueryBase from "./base.js"; import RpcQueryBase from "./base.js";
import RpcNetwork from "../network.js";
import type { RPCRequest } from "@lumeweb/relay-types";
import { RpcQueryOptions } from "../types.js";
import type { Buffer } from "buffer";
export default class SimpleRpcQuery extends RpcQueryBase { export default class SimpleRpcQuery extends RpcQueryBase {
protected _relay: string; private _relay: string | Buffer;
constructor( constructor(
network: RpcNetwork, network: RpcNetwork,
relay: string, relay: string | Buffer,
query: ClientRPCRequest, query: RPCRequest,
options: RpcQueryOptions options: RpcQueryOptions
) { ) {
super(network, query, options); super(network, query, options);
this._relay = relay; this._relay = relay;
} }
protected async _run(): Promise<void> { protected checkResponses(): void {
await this.queryRelay(); if (Object.keys(this._responses).length) {
await this.checkResponses(); this.resolve(Object.values(this._responses).pop());
}
protected async queryRelay(): Promise<any> {
let socket: any;
try {
socket = this._network.dht.connect(b4a.from(this._relay, "hex"));
if (isPromise(socket)) {
socket = await socket;
}
} catch (e) {
return; return;
} }
await socket.opened;
const rpc = new RPC(socket); if (Object.keys(this._errors).length) {
const error = Object.values(this._errors).pop();
try { this.resolve(error, error === "timeout");
await this.queryRpc(rpc, this._query); }
} catch (e: any) {
// @ts-ignore
rpc.end();
throw e;
} }
// @ts-ignore protected getRelays(): string[] | Buffer[] {
rpc.end(); return [this._relay] as string[] | Buffer[];
}
protected async checkResponses() {
let response: RPCResponse = this._response as RPCResponse;
if (this._error) {
response = { error: this._error };
}
if (
!response.error &&
!validateTimestampedResponse(
b4a.from(this._relay, "hex") as Buffer,
response
)
) {
response = { error: ERR_INVALID_SIGNATURE };
}
this.resolve(response);
} }
} }

94
src/query/streaming.ts Normal file
View File

@ -0,0 +1,94 @@
import SimpleRpcQuery from "./simple.js";
import { Buffer } from "buffer";
import { isPromise } from "../util.js";
import { clearTimeout, setTimeout } from "timers";
import { pack, unpack } from "msgpackr";
import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types";
import RpcNetwork from "../network.js";
import { StreamingRpcQueryOptions } from "../types.js";
export default class StreamingRpcQuery extends SimpleRpcQuery {
protected _options: StreamingRpcQueryOptions;
protected _canceled = false;
constructor(
network: RpcNetwork,
relay: string | Buffer,
query: RPCRequest,
options: StreamingRpcQueryOptions
) {
super(network, relay, query, options);
this._options = options;
}
public cancel() {
this._canceled = true;
}
protected async queryRelay(relay: string | Buffer): Promise<any> {
let socket: any;
let relayKey: Buffer = relay as Buffer;
if (relay === "string") {
relayKey = Buffer.from(relay, "hex");
}
if (relay instanceof Buffer) {
relayKey = relay;
relay = relay.toString("hex");
}
try {
socket = this._network.dht.connect(relayKey);
if (isPromise(socket)) {
socket = await socket;
}
} catch (e) {
return;
}
return new Promise((resolve, reject) => {
const finish = () => {
relay = relay as string;
this._responses[relay] = {};
resolve(null);
socket.end();
};
const listener = (res: Buffer) => {
relay = relay as string;
if (this._timeoutTimer) {
clearTimeout(this._timeoutTimer as any);
this._timeoutTimer = null;
}
if (this._canceled) {
socket.write(pack({ cancel: true }));
socket.off("data", listener);
finish();
return;
}
const response = unpack(res as any) as RPCResponse;
if (response && response.error) {
this._errors[relay] = response.error;
return reject(null);
}
if (response?.data.done) {
finish();
return;
}
this._options.streamHandler(response?.data.data);
};
socket.on("data", listener);
socket.on("error", (error: any) => {
relay = relay as string;
this._errors[relay] = error;
reject({ error });
});
socket.write("rpc");
socket.write(pack(this._query));
});
}
}

View File

@ -1,21 +1,9 @@
import {
RPCBroadcastRequest,
RPCBroadcastResponse,
RPCRequest,
RPCResponse,
} from "@lumeweb/relay-types";
import { clearTimeout } from "timers";
import b4a from "b4a";
import {
flatten,
isPromise,
validateResponse,
validateTimestampedResponse,
} from "../util.js";
import RPC from "@lumeweb/rpc";
import { blake2b } from "libskynet";
import { ERR_INVALID_SIGNATURE, ERR_NO_RELAYS } from "../error.js";
import RpcQueryBase from "./base.js"; import RpcQueryBase from "./base.js";
import { flatten } from "../util.js";
import { Buffer } from "buffer";
import type { RPCResponse } from "@lumeweb/relay-types";
import { blake2b } from "libskynet";
import { ERR_MAX_TRIES_HIT } from "../error.js";
function flatHash(data: any) { function flatHash(data: any) {
const flattenedData = flatten(data).sort(); const flattenedData = flatten(data).sort();
@ -25,105 +13,23 @@ function flatHash(data: any) {
} }
export default class WisdomRpcQuery extends RpcQueryBase { export default class WisdomRpcQuery extends RpcQueryBase {
protected declare _response?: RPCBroadcastResponse; private _maxTries = 3;
private static _activeRelay: any; private _tries = 0;
static get activeRelay(): any { protected checkResponses(): void {
return this._activeRelay; const responseStore = this._responses;
} const responseStoreData = Object.values(responseStore);
get result(): Promise<RPCResponse> {
return this._promise as Promise<RPCResponse>;
}
protected async _run(): Promise<void> {
await this.setupRelay();
await this.queryRelay();
await this.checkResponse();
}
protected resolve(data?: RPCResponse, timeout: boolean = false): void {
clearTimeout(this._timeoutTimer);
this._timeout = timeout;
this._completed = true;
if (timeout) {
data = {
error: "timeout",
};
}
this._promiseResolve?.(data);
}
protected async queryRelay(): Promise<any> {
let activeRelay = WisdomRpcQuery.activeRelay;
let relays = this.getRelays();
if (!relays.length) {
throw new Error(ERR_NO_RELAYS);
}
return this.queryRpc(activeRelay, {
module: "rpc",
method: "broadcast_request",
data: {
request: this._query,
relays,
},
} as RPCRequest);
}
protected async checkResponse() {
if (this._error) {
this.resolve({ error: this._error });
return;
}
if (
!validateResponse(
WisdomRpcQuery.activeRelay.stream.remotePublicKey,
this._response as RPCResponse
)
) {
this.resolve({ error: ERR_INVALID_SIGNATURE });
return;
}
let relays: RPCResponse[] = [];
for (const relay in this._response?.relays) {
const resp = this._response?.relays[relay];
if (
validateTimestampedResponse(
b4a.from(relay, "hex") as Buffer,
resp as RPCResponse
)
) {
relays.push(resp as RPCResponse);
}
}
if (!relays.length) {
this.resolve({ error: ERR_NO_RELAYS });
return;
}
type ResponseGroup = { [response: string]: number }; type ResponseGroup = { [response: string]: number };
const responseObjects = relays.reduce((output: any, item: RPCResponse) => { const responseObjects = responseStoreData.reduce((output: any, item) => {
const field = item.signedField || "data"; const hash = flatHash(item?.data);
// @ts-ignore
const hash = flatHash(item[field]);
output[hash] = item?.data; output[hash] = item?.data;
return output; return output;
}, {}); }, {});
const responses: ResponseGroup = responseStoreData.reduce(
const responses: ResponseGroup = relays.reduce( (output: ResponseGroup, item) => {
(output: ResponseGroup, item: RPCResponse) => { const hash = flatHash(item?.data);
const field = item.signedField || "data";
// @ts-ignore
const hash = flatHash(item[field]);
output[hash] = output[hash] ?? 0; output[hash] = output[hash] ?? 0;
output[hash]++; output[hash]++;
return output; return output;
@ -131,14 +37,38 @@ export default class WisdomRpcQuery extends RpcQueryBase {
{} {}
); );
if (!Object.keys(responses).length) {
if (Object.keys(this._errors).length) {
this.resolve({ error: Object.values(this._errors).pop() });
return;
}
if (this._tries <= this._maxTries) {
this._tries++;
this.retry();
return;
}
this.resolve({ data: { error: ERR_MAX_TRIES_HIT } });
return;
}
for (const responseHash in responses) { for (const responseHash in responses) {
if ( if (
responses[responseHash] / relays.length >= responses[responseHash] / responseStoreData.length >=
this._network.majorityThreshold this._network.majorityThreshold
) { ) {
let response: RPCResponse = responseObjects[responseHash]; let response: RPCResponse = responseObjects[responseHash];
// @ts-ignore
if (null === response) {
if (this._tries <= this._maxTries) {
this._tries++;
this.retry();
return;
}
response = { error: ERR_MAX_TRIES_HIT };
} else {
response = { data: response }; response = { data: response };
}
this.resolve(response); this.resolve(response);
break; break;
@ -146,6 +76,17 @@ export default class WisdomRpcQuery extends RpcQueryBase {
} }
} }
private retry() {
this._responses = {};
this._errors = {};
if (this._completed) {
return;
}
this.run();
}
protected getRelays(): string[] | [] { protected getRelays(): string[] | [] {
if ( if (
this._network.maxRelays === 0 || this._network.maxRelays === 0 ||
@ -165,27 +106,4 @@ export default class WisdomRpcQuery extends RpcQueryBase {
return list; return list;
} }
private async setupRelay() {
let active = WisdomRpcQuery.activeRelay;
let relays = this._network.relays;
if (!active) {
if (!relays.length) {
throw new Error(ERR_NO_RELAYS);
}
let relay = relays[Math.floor(Math.random() * relays.length)];
let socket = this._network.dht.connect(b4a.from(relay, "hex"));
if (isPromise(socket)) {
socket = await socket;
}
await socket.opened;
WisdomRpcQuery._activeRelay = new RPC(socket);
socket.once("close", () => {
WisdomRpcQuery._activeRelay = undefined;
});
}
}
} }

View File

@ -2,3 +2,8 @@ export interface RpcQueryOptions {
queryTimeout?: number; queryTimeout?: number;
relayTimeout?: number; relayTimeout?: number;
} }
export interface StreamingRpcQueryOptions extends RpcQueryOptions {
streamHandler: StreamHandlerFunction;
}
export type StreamHandlerFunction = (data: Uint8Array) => void;

View File

@ -1,18 +1,13 @@
// @ts-ignore import { isArray } from "util";
import stringify from "json-stringify-deterministic";
import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types";
// @ts-ignore
import crypto from "hypercore-crypto";
import b4a from "b4a";
export function isPromise(obj: Promise<any>) { function isBuffer(obj: any): boolean {
return ( return (
!!obj && obj &&
(typeof obj === "object" || typeof obj === "function") && obj.constructor &&
typeof obj.then === "function" typeof obj.constructor.isBuffer === "function" &&
obj.constructor.isBuffer(obj)
); );
} }
/* /*
Forked from https://github.com/hughsk/flat Forked from https://github.com/hughsk/flat
*/ */
@ -34,7 +29,7 @@ export function flatten(target: any, opts: any = {}): any[] {
const value = object[key]; const value = object[key];
const isarray = opts.safe && Array.isArray(value); const isarray = opts.safe && Array.isArray(value);
const type = Object.prototype.toString.call(value); const type = Object.prototype.toString.call(value);
const isbuffer = b4a.isBuffer(value); const isbuffer = isBuffer(value);
const isobject = type === "[object Object]" || type === "[object Array]"; const isobject = type === "[object Object]" || type === "[object Array]";
const newKey = prev const newKey = prev
@ -60,34 +55,10 @@ export function flatten(target: any, opts: any = {}): any[] {
return output; return output;
} }
export function validateResponse( export function isPromise(obj: Promise<any>) {
relay: Buffer, return (
response: RPCResponse, !!obj &&
timestamped = false (typeof obj === "object" || typeof obj === "function") &&
): boolean { typeof obj.then === "function"
const field = response.signedField || "data";
// @ts-ignore
const data = response[field];
let json = data;
if (typeof json !== "string") {
json = stringify(json);
}
const updated = response.updated as number;
if (timestamped && updated) {
json = updated.toString() + json;
}
return !!crypto.verify(
b4a.from(json),
b4a.from(response.signature as string, "hex"),
relay
); );
} }
export function validateTimestampedResponse(
relay: Buffer,
response: RPCResponse
): boolean {
return validateResponse(relay, response, true);
}