*Epic refactor based on new RPC query design and protocol
This commit is contained in:
parent
71eb37160c
commit
7263ecf907
|
@ -8,14 +8,19 @@
|
|||
"build": "rimraf dist && tsc"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git",
|
||||
"@types/json-stable-stringify": "^1.0.34",
|
||||
"@lumeweb/relay-types": "https://git.lumeweb.com/LumeWeb/relay-types.git",
|
||||
"@types/b4a": "^1.6.0",
|
||||
"@types/express": "^4.17.14",
|
||||
"@types/node": "^18.0.0",
|
||||
"node-cache": "^5.1.2",
|
||||
"prettier": "^2.7.1",
|
||||
"typescript": "^4.7.4"
|
||||
},
|
||||
"dependencies": {
|
||||
"@hyperswarm/dht": "^6.0.1",
|
||||
"@lumeweb/rpc": "https://git.lumeweb.com/LumeWeb/rpc.git",
|
||||
"b4a": "^1.6.1",
|
||||
"json-stringify-deterministic": "^1.0.7",
|
||||
"libskynet": "^0.0.61",
|
||||
"msgpackr": "^1.6.1"
|
||||
}
|
||||
|
|
|
@ -1 +1,3 @@
|
|||
export const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT";
|
||||
export const ERR_INVALID_SIGNATURE = "INVALID_SIGNATURE";
|
||||
export const ERR_NO_RELAYS = "NO_RELAYS";
|
||||
|
|
|
@ -1,15 +1,8 @@
|
|||
import RpcNetwork from "./network.js";
|
||||
import RpcQueryBase from "./query/base.js";
|
||||
import SimpleRpcQuery from "./query/simple.js";
|
||||
import StreamingRpcQuery from "./query/streaming.js";
|
||||
import WisdomRpcQuery from "./query/wisdom.js";
|
||||
|
||||
export * from "./types.js";
|
||||
|
||||
export {
|
||||
RpcNetwork,
|
||||
RpcQueryBase,
|
||||
SimpleRpcQuery,
|
||||
StreamingRpcQuery,
|
||||
WisdomRpcQuery,
|
||||
};
|
||||
export { RpcNetwork, RpcQueryBase, SimpleRpcQuery, WisdomRpcQuery };
|
||||
|
|
|
@ -1,15 +1,22 @@
|
|||
import WisdomRpcQuery from "./query/wisdom.js";
|
||||
// @ts-ignore
|
||||
import DHT from "@hyperswarm/dht";
|
||||
import StreamingRpcQuery from "./query/streaming.js";
|
||||
import { RpcQueryOptions, StreamHandlerFunction } from "./types.js";
|
||||
import b4a from "b4a";
|
||||
import RPC from "@lumeweb/rpc";
|
||||
import { isPromise } from "./util.js";
|
||||
import SimpleRpcQuery from "./query/simple.js";
|
||||
import WisdomRpcQuery from "./query/wisdom.js";
|
||||
|
||||
export default class RpcNetwork {
|
||||
constructor(dht = new DHT()) {
|
||||
this._dht = dht;
|
||||
}
|
||||
|
||||
private _activeRelay?: RPC;
|
||||
|
||||
get activeRelay(): RPC {
|
||||
return this._activeRelay as RPC;
|
||||
}
|
||||
|
||||
private _dht: typeof DHT;
|
||||
|
||||
get dht() {
|
||||
|
@ -58,6 +65,7 @@ export default class RpcNetwork {
|
|||
if (!this._ready) {
|
||||
this._ready = this._dht.ready() as Promise<void>;
|
||||
}
|
||||
|
||||
return this._ready;
|
||||
}
|
||||
|
||||
|
@ -106,7 +114,7 @@ export default class RpcNetwork {
|
|||
module: string,
|
||||
data: object | any[] = {},
|
||||
bypassCache: boolean = false,
|
||||
options: RpcQueryOptions = {}
|
||||
options = {}
|
||||
): WisdomRpcQuery {
|
||||
return new WisdomRpcQuery(
|
||||
this,
|
||||
|
@ -119,30 +127,13 @@ export default class RpcNetwork {
|
|||
options
|
||||
).run();
|
||||
}
|
||||
|
||||
public streamingQuery(
|
||||
relay: Buffer | string,
|
||||
method: string,
|
||||
module: string,
|
||||
streamHandler: StreamHandlerFunction,
|
||||
data: object | any[] = {},
|
||||
options: RpcQueryOptions = {}
|
||||
): StreamingRpcQuery {
|
||||
return new StreamingRpcQuery(
|
||||
this,
|
||||
relay,
|
||||
{ method, module, data },
|
||||
{ ...options, streamHandler }
|
||||
).run();
|
||||
}
|
||||
|
||||
public simpleQuery(
|
||||
relay: Buffer | string,
|
||||
relay: string,
|
||||
method: string,
|
||||
module: string,
|
||||
data: object | any[] = {},
|
||||
bypassCache: boolean = false,
|
||||
options: RpcQueryOptions = {}
|
||||
options: {}
|
||||
): SimpleRpcQuery {
|
||||
return new SimpleRpcQuery(
|
||||
this,
|
||||
|
|
|
@ -4,7 +4,13 @@ import { Buffer } from "buffer";
|
|||
import { isPromise } from "../util.js";
|
||||
import RpcNetwork from "../network.js";
|
||||
import { RpcQueryOptions } from "../types.js";
|
||||
import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types";
|
||||
import type {
|
||||
ClientRPCRequest,
|
||||
RPCRequest,
|
||||
RPCResponse,
|
||||
} from "@lumeweb/relay-types";
|
||||
import RPC from "@lumeweb/rpc";
|
||||
import { RPCBroadcastRequest } from "@lumeweb/relay-types";
|
||||
|
||||
export default abstract class RpcQueryBase {
|
||||
protected _network: RpcNetwork;
|
||||
|
@ -15,13 +21,13 @@ export default abstract class RpcQueryBase {
|
|||
protected _timeoutTimer?: any;
|
||||
protected _timeout: boolean = false;
|
||||
protected _completed: boolean = false;
|
||||
protected _responses: { [relay: string]: RPCResponse } = {};
|
||||
protected _errors: { [relay: string]: any } = {};
|
||||
protected _response?: RPCResponse;
|
||||
protected _error?: string;
|
||||
protected _promiseResolve?: (data: any) => void;
|
||||
|
||||
constructor(
|
||||
network: RpcNetwork,
|
||||
query: RPCRequest,
|
||||
query: ClientRPCRequest | RPCRequest,
|
||||
options: RpcQueryOptions = {}
|
||||
) {
|
||||
this._network = network;
|
||||
|
@ -33,7 +39,7 @@ export default abstract class RpcQueryBase {
|
|||
return this._promise as Promise<RPCResponse>;
|
||||
}
|
||||
|
||||
private handeTimeout() {
|
||||
protected handeTimeout() {
|
||||
this.resolve(undefined, true);
|
||||
}
|
||||
|
||||
|
@ -62,75 +68,55 @@ export default abstract class RpcQueryBase {
|
|||
this._timeoutTimer ??
|
||||
setTimeout(
|
||||
this.handeTimeout.bind(this),
|
||||
(this._options.queryTimeout || this._network.queryTimeout) * 1000
|
||||
(this._options?.queryTimeout || this._network.queryTimeout) * 1000
|
||||
);
|
||||
|
||||
this._network.ready.then(() => {
|
||||
const promises = [];
|
||||
|
||||
for (const relay of this.getRelays()) {
|
||||
promises.push(this.queryRelay(relay));
|
||||
}
|
||||
|
||||
Promise.allSettled(promises).then(() => this.checkResponses());
|
||||
});
|
||||
this._doRun();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
protected async queryRelay(relay: string | Buffer): Promise<any> {
|
||||
let socket: any;
|
||||
|
||||
let relayKey: Buffer = relay as Buffer;
|
||||
|
||||
if (typeof relay === "string") {
|
||||
relayKey = Buffer.from(relay, "hex");
|
||||
}
|
||||
if (relay instanceof Buffer) {
|
||||
relayKey = relay;
|
||||
relay = relay.toString("hex");
|
||||
}
|
||||
|
||||
private async _doRun() {
|
||||
try {
|
||||
socket = this._network.dht.connect(relayKey);
|
||||
if (isPromise(socket)) {
|
||||
socket = await socket;
|
||||
await this._network.ready;
|
||||
await this._run();
|
||||
} catch (e: any) {
|
||||
this._promiseResolve?.({ error: e.message });
|
||||
}
|
||||
} catch (e) {
|
||||
return;
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
let timer: any;
|
||||
socket.on("data", (res: Buffer) => {
|
||||
relay = relay as string;
|
||||
if (timer) {
|
||||
clearTimeout(timer as any);
|
||||
timer = null;
|
||||
}
|
||||
socket.end();
|
||||
const response = unpack(res as any) as RPCResponse;
|
||||
if (response && response.error) {
|
||||
this._errors[relay] = response.error;
|
||||
return reject(null);
|
||||
}
|
||||
this._responses[relay] = response;
|
||||
resolve(null);
|
||||
});
|
||||
socket.on("error", (error: any) => {
|
||||
relay = relay as string;
|
||||
this._errors[relay] = error;
|
||||
reject({ error });
|
||||
});
|
||||
socket.write("rpc");
|
||||
socket.write(pack(this._query));
|
||||
timer = setTimeout(() => {
|
||||
this._errors[relay as string] = "timeout";
|
||||
reject(null);
|
||||
|
||||
protected setupRelayTimeout(reject: Function): NodeJS.Timeout {
|
||||
return setTimeout(() => {
|
||||
this._error = "timeout";
|
||||
reject("timeout");
|
||||
}, (this._options.relayTimeout || this._network.relayTimeout) * 1000) as NodeJS.Timeout;
|
||||
});
|
||||
}
|
||||
|
||||
protected abstract checkResponses(): void;
|
||||
protected abstract _run(): void;
|
||||
|
||||
protected abstract getRelays(): string[] | Buffer[];
|
||||
protected async queryRpc(rpc: any, request: RPCRequest) {
|
||||
let timer: NodeJS.Timeout;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
rpc
|
||||
// @ts-ignore
|
||||
.request(`${request.module}.${request.method}`, request.data)
|
||||
.then((resp: any) => {
|
||||
if (resp.error) {
|
||||
throw new Error(resp.error);
|
||||
}
|
||||
clearTimeout(timer as any);
|
||||
|
||||
this._response = resp;
|
||||
resolve(null);
|
||||
})
|
||||
.catch((e: Error) => {
|
||||
this._error = e.message;
|
||||
reject({ error: e.message });
|
||||
clearTimeout(timer as any);
|
||||
});
|
||||
|
||||
timer = this.setupRelayTimeout(reject);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,34 +1,84 @@
|
|||
import RpcQueryBase from "./base.js";
|
||||
import RpcNetwork from "../network.js";
|
||||
import type { RPCRequest } from "@lumeweb/relay-types";
|
||||
import {
|
||||
ClientRPCRequest,
|
||||
RPCBroadcastRequest,
|
||||
RPCRequest,
|
||||
RPCResponse,
|
||||
} from "@lumeweb/relay-types";
|
||||
import { RpcQueryOptions } from "../types.js";
|
||||
import type { Buffer } from "buffer";
|
||||
import { clearTimeout, setTimeout } from "timers";
|
||||
import b4a from "b4a";
|
||||
import {
|
||||
isPromise,
|
||||
validateResponse,
|
||||
validateTimestampedResponse,
|
||||
} from "../util.js";
|
||||
import RPC from "@lumeweb/rpc";
|
||||
import { ERR_INVALID_SIGNATURE } from "../error.js";
|
||||
import RpcQueryBase from "./base.js";
|
||||
|
||||
export default class SimpleRpcQuery extends RpcQueryBase {
|
||||
private _relay: string | Buffer;
|
||||
protected _relay: string;
|
||||
|
||||
constructor(
|
||||
network: RpcNetwork,
|
||||
relay: string | Buffer,
|
||||
query: RPCRequest,
|
||||
relay: string,
|
||||
query: ClientRPCRequest,
|
||||
options: RpcQueryOptions
|
||||
) {
|
||||
super(network, query, options);
|
||||
this._relay = relay;
|
||||
}
|
||||
|
||||
protected checkResponses(): void {
|
||||
if (Object.keys(this._responses).length) {
|
||||
this.resolve(Object.values(this._responses).pop());
|
||||
protected async _run(): Promise<void> {
|
||||
await this.queryRelay();
|
||||
await this.checkResponses();
|
||||
}
|
||||
|
||||
protected async queryRelay(): Promise<any> {
|
||||
let socket: any;
|
||||
|
||||
try {
|
||||
socket = this._network.dht.connect(b4a.from(this._relay, "hex"));
|
||||
if (isPromise(socket)) {
|
||||
socket = await socket;
|
||||
}
|
||||
} catch (e) {
|
||||
return;
|
||||
}
|
||||
await socket.opened;
|
||||
|
||||
if (Object.keys(this._errors).length) {
|
||||
const error = Object.values(this._errors).pop();
|
||||
this.resolve(error, error === "timeout");
|
||||
}
|
||||
const rpc = new RPC(socket);
|
||||
|
||||
try {
|
||||
await this.queryRpc(rpc, this._query);
|
||||
} catch (e: any) {
|
||||
// @ts-ignore
|
||||
rpc.end();
|
||||
throw e;
|
||||
}
|
||||
|
||||
protected getRelays(): string[] | Buffer[] {
|
||||
return [this._relay] as string[] | Buffer[];
|
||||
// @ts-ignore
|
||||
rpc.end();
|
||||
}
|
||||
|
||||
protected async checkResponses() {
|
||||
let response: RPCResponse = this._response as RPCResponse;
|
||||
|
||||
if (this._error) {
|
||||
response = { error: this._error };
|
||||
}
|
||||
|
||||
if (
|
||||
!response.error &&
|
||||
!validateTimestampedResponse(
|
||||
b4a.from(this._relay, "hex") as Buffer,
|
||||
response
|
||||
)
|
||||
) {
|
||||
response = { error: ERR_INVALID_SIGNATURE };
|
||||
}
|
||||
|
||||
this.resolve(response);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,94 +0,0 @@
|
|||
import SimpleRpcQuery from "./simple.js";
|
||||
import { Buffer } from "buffer";
|
||||
import { isPromise } from "../util.js";
|
||||
import { clearTimeout, setTimeout } from "timers";
|
||||
import { pack, unpack } from "msgpackr";
|
||||
import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types";
|
||||
import RpcNetwork from "../network.js";
|
||||
import { StreamingRpcQueryOptions } from "../types.js";
|
||||
|
||||
export default class StreamingRpcQuery extends SimpleRpcQuery {
|
||||
protected _options: StreamingRpcQueryOptions;
|
||||
protected _canceled = false;
|
||||
constructor(
|
||||
network: RpcNetwork,
|
||||
relay: string | Buffer,
|
||||
query: RPCRequest,
|
||||
options: StreamingRpcQueryOptions
|
||||
) {
|
||||
super(network, relay, query, options);
|
||||
this._options = options;
|
||||
}
|
||||
|
||||
public cancel() {
|
||||
this._canceled = true;
|
||||
}
|
||||
|
||||
protected async queryRelay(relay: string | Buffer): Promise<any> {
|
||||
let socket: any;
|
||||
|
||||
let relayKey: Buffer = relay as Buffer;
|
||||
|
||||
if (relay === "string") {
|
||||
relayKey = Buffer.from(relay, "hex");
|
||||
}
|
||||
if (relay instanceof Buffer) {
|
||||
relayKey = relay;
|
||||
relay = relay.toString("hex");
|
||||
}
|
||||
|
||||
try {
|
||||
socket = this._network.dht.connect(relayKey);
|
||||
if (isPromise(socket)) {
|
||||
socket = await socket;
|
||||
}
|
||||
} catch (e) {
|
||||
return;
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
const finish = () => {
|
||||
relay = relay as string;
|
||||
this._responses[relay] = {};
|
||||
resolve(null);
|
||||
socket.end();
|
||||
};
|
||||
|
||||
const listener = (res: Buffer) => {
|
||||
relay = relay as string;
|
||||
if (this._timeoutTimer) {
|
||||
clearTimeout(this._timeoutTimer as any);
|
||||
this._timeoutTimer = null;
|
||||
}
|
||||
|
||||
if (this._canceled) {
|
||||
socket.write(pack({ cancel: true }));
|
||||
socket.off("data", listener);
|
||||
finish();
|
||||
return;
|
||||
}
|
||||
|
||||
const response = unpack(res as any) as RPCResponse;
|
||||
if (response && response.error) {
|
||||
this._errors[relay] = response.error;
|
||||
return reject(null);
|
||||
}
|
||||
|
||||
if (response?.data.done) {
|
||||
finish();
|
||||
return;
|
||||
}
|
||||
|
||||
this._options.streamHandler(response?.data.data);
|
||||
};
|
||||
|
||||
socket.on("data", listener);
|
||||
socket.on("error", (error: any) => {
|
||||
relay = relay as string;
|
||||
this._errors[relay] = error;
|
||||
reject({ error });
|
||||
});
|
||||
socket.write("rpc");
|
||||
socket.write(pack(this._query));
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,9 +1,21 @@
|
|||
import RpcQueryBase from "./base.js";
|
||||
import { flatten } from "../util.js";
|
||||
import { Buffer } from "buffer";
|
||||
import type { RPCResponse } from "@lumeweb/relay-types";
|
||||
import {
|
||||
RPCBroadcastRequest,
|
||||
RPCBroadcastResponse,
|
||||
RPCRequest,
|
||||
RPCResponse,
|
||||
} from "@lumeweb/relay-types";
|
||||
import { clearTimeout } from "timers";
|
||||
import b4a from "b4a";
|
||||
import {
|
||||
flatten,
|
||||
isPromise,
|
||||
validateResponse,
|
||||
validateTimestampedResponse,
|
||||
} from "../util.js";
|
||||
import RPC from "@lumeweb/rpc";
|
||||
import { blake2b } from "libskynet";
|
||||
import { ERR_MAX_TRIES_HIT } from "../error.js";
|
||||
import { ERR_INVALID_SIGNATURE, ERR_NO_RELAYS } from "../error.js";
|
||||
import RpcQueryBase from "./base.js";
|
||||
|
||||
function flatHash(data: any) {
|
||||
const flattenedData = flatten(data).sort();
|
||||
|
@ -13,23 +25,105 @@ function flatHash(data: any) {
|
|||
}
|
||||
|
||||
export default class WisdomRpcQuery extends RpcQueryBase {
|
||||
private _maxTries = 3;
|
||||
private _tries = 0;
|
||||
protected declare _response?: RPCBroadcastResponse;
|
||||
private static _activeRelay: any;
|
||||
|
||||
protected checkResponses(): void {
|
||||
const responseStore = this._responses;
|
||||
const responseStoreData = Object.values(responseStore);
|
||||
static get activeRelay(): any {
|
||||
return this._activeRelay;
|
||||
}
|
||||
|
||||
get result(): Promise<RPCResponse> {
|
||||
return this._promise as Promise<RPCResponse>;
|
||||
}
|
||||
|
||||
protected async _run(): Promise<void> {
|
||||
await this.setupRelay();
|
||||
await this.queryRelay();
|
||||
await this.checkResponse();
|
||||
}
|
||||
|
||||
protected resolve(data?: RPCResponse, timeout: boolean = false): void {
|
||||
clearTimeout(this._timeoutTimer);
|
||||
this._timeout = timeout;
|
||||
this._completed = true;
|
||||
|
||||
if (timeout) {
|
||||
data = {
|
||||
error: "timeout",
|
||||
};
|
||||
}
|
||||
|
||||
this._promiseResolve?.(data);
|
||||
}
|
||||
|
||||
protected async queryRelay(): Promise<any> {
|
||||
let activeRelay = WisdomRpcQuery.activeRelay;
|
||||
let relays = this.getRelays();
|
||||
|
||||
if (!relays.length) {
|
||||
throw new Error(ERR_NO_RELAYS);
|
||||
}
|
||||
|
||||
return this.queryRpc(activeRelay, {
|
||||
module: "rpc",
|
||||
method: "broadcast_request",
|
||||
data: {
|
||||
request: this._query,
|
||||
relays,
|
||||
},
|
||||
} as RPCRequest);
|
||||
}
|
||||
|
||||
protected async checkResponse() {
|
||||
if (this._error) {
|
||||
this.resolve({ error: this._error });
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
!validateResponse(
|
||||
WisdomRpcQuery.activeRelay.stream.remotePublicKey,
|
||||
this._response as RPCResponse
|
||||
)
|
||||
) {
|
||||
this.resolve({ error: ERR_INVALID_SIGNATURE });
|
||||
return;
|
||||
}
|
||||
|
||||
let relays: RPCResponse[] = [];
|
||||
|
||||
for (const relay in this._response?.relays) {
|
||||
const resp = this._response?.relays[relay];
|
||||
if (
|
||||
validateTimestampedResponse(
|
||||
b4a.from(relay, "hex") as Buffer,
|
||||
resp as RPCResponse
|
||||
)
|
||||
) {
|
||||
relays.push(resp as RPCResponse);
|
||||
}
|
||||
}
|
||||
|
||||
if (!relays.length) {
|
||||
this.resolve({ error: ERR_NO_RELAYS });
|
||||
return;
|
||||
}
|
||||
|
||||
type ResponseGroup = { [response: string]: number };
|
||||
|
||||
const responseObjects = responseStoreData.reduce((output: any, item) => {
|
||||
const hash = flatHash(item?.data);
|
||||
const responseObjects = relays.reduce((output: any, item: RPCResponse) => {
|
||||
const field = item.signedField || "data";
|
||||
// @ts-ignore
|
||||
const hash = flatHash(item[field]);
|
||||
output[hash] = item?.data;
|
||||
return output;
|
||||
}, {});
|
||||
const responses: ResponseGroup = responseStoreData.reduce(
|
||||
(output: ResponseGroup, item) => {
|
||||
const hash = flatHash(item?.data);
|
||||
|
||||
const responses: ResponseGroup = relays.reduce(
|
||||
(output: ResponseGroup, item: RPCResponse) => {
|
||||
const field = item.signedField || "data";
|
||||
// @ts-ignore
|
||||
const hash = flatHash(item[field]);
|
||||
output[hash] = output[hash] ?? 0;
|
||||
output[hash]++;
|
||||
return output;
|
||||
|
@ -37,38 +131,14 @@ export default class WisdomRpcQuery extends RpcQueryBase {
|
|||
{}
|
||||
);
|
||||
|
||||
if (!Object.keys(responses).length) {
|
||||
if (Object.keys(this._errors).length) {
|
||||
this.resolve({ error: Object.values(this._errors).pop() });
|
||||
return;
|
||||
}
|
||||
if (this._tries <= this._maxTries) {
|
||||
this._tries++;
|
||||
this.retry();
|
||||
return;
|
||||
}
|
||||
this.resolve({ data: { error: ERR_MAX_TRIES_HIT } });
|
||||
return;
|
||||
}
|
||||
for (const responseHash in responses) {
|
||||
if (
|
||||
responses[responseHash] / responseStoreData.length >=
|
||||
responses[responseHash] / relays.length >=
|
||||
this._network.majorityThreshold
|
||||
) {
|
||||
let response: RPCResponse = responseObjects[responseHash];
|
||||
|
||||
// @ts-ignore
|
||||
if (null === response) {
|
||||
if (this._tries <= this._maxTries) {
|
||||
this._tries++;
|
||||
this.retry();
|
||||
return;
|
||||
}
|
||||
|
||||
response = { error: ERR_MAX_TRIES_HIT };
|
||||
} else {
|
||||
response = { data: response };
|
||||
}
|
||||
|
||||
this.resolve(response);
|
||||
break;
|
||||
|
@ -76,17 +146,6 @@ export default class WisdomRpcQuery extends RpcQueryBase {
|
|||
}
|
||||
}
|
||||
|
||||
private retry() {
|
||||
this._responses = {};
|
||||
this._errors = {};
|
||||
|
||||
if (this._completed) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.run();
|
||||
}
|
||||
|
||||
protected getRelays(): string[] | [] {
|
||||
if (
|
||||
this._network.maxRelays === 0 ||
|
||||
|
@ -106,4 +165,27 @@ export default class WisdomRpcQuery extends RpcQueryBase {
|
|||
|
||||
return list;
|
||||
}
|
||||
|
||||
private async setupRelay() {
|
||||
let active = WisdomRpcQuery.activeRelay;
|
||||
let relays = this._network.relays;
|
||||
|
||||
if (!active) {
|
||||
if (!relays.length) {
|
||||
throw new Error(ERR_NO_RELAYS);
|
||||
}
|
||||
|
||||
let relay = relays[Math.floor(Math.random() * relays.length)];
|
||||
let socket = this._network.dht.connect(b4a.from(relay, "hex"));
|
||||
if (isPromise(socket)) {
|
||||
socket = await socket;
|
||||
}
|
||||
await socket.opened;
|
||||
|
||||
WisdomRpcQuery._activeRelay = new RPC(socket);
|
||||
socket.once("close", () => {
|
||||
WisdomRpcQuery._activeRelay = undefined;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,8 +2,3 @@ export interface RpcQueryOptions {
|
|||
queryTimeout?: number;
|
||||
relayTimeout?: number;
|
||||
}
|
||||
export interface StreamingRpcQueryOptions extends RpcQueryOptions {
|
||||
streamHandler: StreamHandlerFunction;
|
||||
}
|
||||
|
||||
export type StreamHandlerFunction = (data: Uint8Array) => void;
|
||||
|
|
53
src/util.ts
53
src/util.ts
|
@ -1,13 +1,18 @@
|
|||
import { isArray } from "util";
|
||||
// @ts-ignore
|
||||
import stringify from "json-stringify-deterministic";
|
||||
import type { RPCRequest, RPCResponse } from "@lumeweb/relay-types";
|
||||
// @ts-ignore
|
||||
import crypto from "hypercore-crypto";
|
||||
import b4a from "b4a";
|
||||
|
||||
function isBuffer(obj: any): boolean {
|
||||
export function isPromise(obj: Promise<any>) {
|
||||
return (
|
||||
obj &&
|
||||
obj.constructor &&
|
||||
typeof obj.constructor.isBuffer === "function" &&
|
||||
obj.constructor.isBuffer(obj)
|
||||
!!obj &&
|
||||
(typeof obj === "object" || typeof obj === "function") &&
|
||||
typeof obj.then === "function"
|
||||
);
|
||||
}
|
||||
|
||||
/*
|
||||
Forked from https://github.com/hughsk/flat
|
||||
*/
|
||||
|
@ -29,7 +34,7 @@ export function flatten(target: any, opts: any = {}): any[] {
|
|||
const value = object[key];
|
||||
const isarray = opts.safe && Array.isArray(value);
|
||||
const type = Object.prototype.toString.call(value);
|
||||
const isbuffer = isBuffer(value);
|
||||
const isbuffer = b4a.isBuffer(value);
|
||||
const isobject = type === "[object Object]" || type === "[object Array]";
|
||||
|
||||
const newKey = prev
|
||||
|
@ -55,10 +60,34 @@ export function flatten(target: any, opts: any = {}): any[] {
|
|||
return output;
|
||||
}
|
||||
|
||||
export function isPromise(obj: Promise<any>) {
|
||||
return (
|
||||
!!obj &&
|
||||
(typeof obj === "object" || typeof obj === "function") &&
|
||||
typeof obj.then === "function"
|
||||
export function validateResponse(
|
||||
relay: Buffer,
|
||||
response: RPCResponse,
|
||||
timestamped = false
|
||||
): boolean {
|
||||
const field = response.signedField || "data";
|
||||
// @ts-ignore
|
||||
const data = response[field];
|
||||
let json = data;
|
||||
if (typeof json !== "string") {
|
||||
json = stringify(json);
|
||||
}
|
||||
const updated = response.updated as number;
|
||||
|
||||
if (timestamped && updated) {
|
||||
json = updated.toString() + json;
|
||||
}
|
||||
|
||||
return !!crypto.verify(
|
||||
b4a.from(json),
|
||||
b4a.from(response.signature as string, "hex"),
|
||||
relay
|
||||
);
|
||||
}
|
||||
|
||||
export function validateTimestampedResponse(
|
||||
relay: Buffer,
|
||||
response: RPCResponse
|
||||
): boolean {
|
||||
return validateResponse(relay, response, true);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue