*Heavily refactor to use new RPC schema
*Create basic, wisdom, and streaming rpc request variants
This commit is contained in:
parent
f7a8b69a55
commit
fb849550db
|
@ -8,6 +8,7 @@
|
|||
"build": "rimraf dist && tsc"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@lumeweb/relay": "https://github.com/LumeWeb/relay.git",
|
||||
"@types/json-stable-stringify": "^1.0.34",
|
||||
"@types/node": "^18.0.0",
|
||||
"prettier": "^2.7.1",
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
export const ERR_NOT_READY = "NOT_READY";
|
||||
export const ERR_MAX_TRIES_HIT = "ERR_MAX_TRIES_HIT";
|
||||
|
|
15
src/index.ts
15
src/index.ts
|
@ -1,6 +1,15 @@
|
|||
import RpcNetwork from "./rpcNetwork.js";
|
||||
import RpcQuery from "./rpcNetwork.js";
|
||||
import RpcNetwork from "./network.js";
|
||||
import RpcQueryBase from "./query/base.js";
|
||||
import SimpleRpcQuery from "./query/simple.js";
|
||||
import StreamingRpcQuery from "./query/streaming.js";
|
||||
import WisdomRpcQuery from "./query/wisdom.js";
|
||||
|
||||
export * from "./types.js";
|
||||
|
||||
export { RpcNetwork, RpcQuery };
|
||||
export {
|
||||
RpcNetwork,
|
||||
RpcQueryBase,
|
||||
SimpleRpcQuery,
|
||||
StreamingRpcQuery,
|
||||
WisdomRpcQuery,
|
||||
};
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
import RpcQuery from "./rpcQuery.js";
|
||||
import WisdomRpcQuery from "./query/wisdom.js";
|
||||
// @ts-ignore
|
||||
import DHT from "@hyperswarm/dht";
|
||||
import StreamingRpcQuery from "./query/streaming.js";
|
||||
import { RpcQueryOptions, StreamHandlerFunction } from "./types.js";
|
||||
import SimpleRpcQuery from "./query/simple.js";
|
||||
|
||||
export default class RpcNetwork {
|
||||
constructor(dht = new DHT()) {
|
||||
|
@ -23,16 +26,6 @@ export default class RpcNetwork {
|
|||
this._majorityThreshold = value;
|
||||
}
|
||||
|
||||
private _maxTtl = 12 * 60 * 60;
|
||||
|
||||
get maxTtl(): number {
|
||||
return this._maxTtl;
|
||||
}
|
||||
|
||||
set maxTtl(value: number) {
|
||||
this._maxTtl = value;
|
||||
}
|
||||
|
||||
private _queryTimeout = 30;
|
||||
|
||||
get queryTimeout(): number {
|
||||
|
@ -98,17 +91,57 @@ export default class RpcNetwork {
|
|||
this._relays = [];
|
||||
}
|
||||
|
||||
public query(
|
||||
query: string,
|
||||
chain: string,
|
||||
public wisdomQuery(
|
||||
method: string,
|
||||
module: string,
|
||||
data: object | any[] = {},
|
||||
bypassCache: boolean = false
|
||||
): RpcQuery {
|
||||
return new RpcQuery(this, {
|
||||
query,
|
||||
chain,
|
||||
data,
|
||||
bypassCache: boolean = false,
|
||||
options: RpcQueryOptions = {}
|
||||
): WisdomRpcQuery {
|
||||
return new WisdomRpcQuery(
|
||||
this,
|
||||
{
|
||||
method,
|
||||
module,
|
||||
data,
|
||||
bypassCache: bypassCache || this._bypassCache,
|
||||
});
|
||||
},
|
||||
options
|
||||
);
|
||||
}
|
||||
|
||||
public streamingQuery(
|
||||
relay: Buffer | string,
|
||||
method: string,
|
||||
module: string,
|
||||
streamHandler: StreamHandlerFunction,
|
||||
data: object | any[] = {},
|
||||
options: RpcQueryOptions = {}
|
||||
): StreamingRpcQuery {
|
||||
return new StreamingRpcQuery(
|
||||
this,
|
||||
relay,
|
||||
{ method, module, data },
|
||||
{ streamHandler, ...options }
|
||||
);
|
||||
}
|
||||
|
||||
public simpleQuery(
|
||||
relay: Buffer | string,
|
||||
method: string,
|
||||
module: string,
|
||||
data: object | any[] = {},
|
||||
options: RpcQueryOptions = {}
|
||||
): SimpleRpcQuery {
|
||||
return new SimpleRpcQuery(
|
||||
this,
|
||||
relay,
|
||||
{
|
||||
method,
|
||||
module,
|
||||
data,
|
||||
},
|
||||
options
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,135 @@
|
|||
import { clearTimeout, setTimeout } from "timers";
|
||||
import { pack, unpack } from "msgpackr";
|
||||
import { Buffer } from "buffer";
|
||||
import { isPromise } from "../util.js";
|
||||
import RpcNetwork from "../rpcNetwork.js";
|
||||
import { RpcQueryOptions } from "../types.js";
|
||||
import type { RPCRequest, RPCResponse } from "@lumeweb/relay";
|
||||
|
||||
export default abstract class RpcQueryBase {
|
||||
protected _network: RpcNetwork;
|
||||
protected _query: RPCRequest;
|
||||
protected _options: RpcQueryOptions;
|
||||
|
||||
protected _promise?: Promise<any>;
|
||||
protected _timeoutTimer?: any;
|
||||
protected _timeout: boolean = false;
|
||||
protected _completed: boolean = false;
|
||||
protected _responses: { [relay: string]: RPCResponse } = {};
|
||||
protected _errors: { [relay: string]: any } = {};
|
||||
protected _promiseResolve?: (data: any) => void;
|
||||
|
||||
constructor(
|
||||
network: RpcNetwork,
|
||||
query: RPCRequest,
|
||||
options: RpcQueryOptions = {}
|
||||
) {
|
||||
this._network = network;
|
||||
this._query = query;
|
||||
this._options = options;
|
||||
this.init();
|
||||
}
|
||||
|
||||
get result(): Promise<RPCResponse> {
|
||||
return this._promise as Promise<RPCResponse>;
|
||||
}
|
||||
|
||||
private handeTimeout() {
|
||||
this.resolve(undefined, true);
|
||||
}
|
||||
|
||||
protected resolve(data?: RPCResponse, timeout: boolean = false): void {
|
||||
clearTimeout(this._timeoutTimer);
|
||||
this._timeout = timeout;
|
||||
this._completed = true;
|
||||
|
||||
if (timeout) {
|
||||
data = {
|
||||
error: "timeout",
|
||||
};
|
||||
}
|
||||
|
||||
this._promiseResolve?.(data);
|
||||
}
|
||||
|
||||
protected async init() {
|
||||
this._promise =
|
||||
this._promise ??
|
||||
new Promise<any>((resolve) => {
|
||||
this._promiseResolve = resolve;
|
||||
});
|
||||
|
||||
this._timeoutTimer =
|
||||
this._timeoutTimer ??
|
||||
setTimeout(
|
||||
this.handeTimeout.bind(this),
|
||||
(this._options.queryTimeout || this._network.queryTimeout) * 1000
|
||||
);
|
||||
|
||||
await this._network.ready;
|
||||
|
||||
const promises = [];
|
||||
|
||||
for (const relay of this.getRelays()) {
|
||||
promises.push(this.queryRelay(relay));
|
||||
}
|
||||
|
||||
await Promise.allSettled(promises);
|
||||
this.checkResponses();
|
||||
}
|
||||
|
||||
protected async queryRelay(relay: string | Buffer): Promise<any> {
|
||||
let socket: any;
|
||||
|
||||
let relayKey: Buffer = relay as Buffer;
|
||||
|
||||
if (relay === "string") {
|
||||
relayKey = Buffer.from(relay, "hex");
|
||||
}
|
||||
if (relay instanceof Buffer) {
|
||||
relayKey = relay;
|
||||
relay = relay.toString("hex");
|
||||
}
|
||||
|
||||
try {
|
||||
socket = this._network.dht.connect(relayKey);
|
||||
if (isPromise(socket)) {
|
||||
socket = await socket;
|
||||
}
|
||||
} catch (e) {
|
||||
return;
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
let timer: any;
|
||||
socket.on("data", (res: Buffer) => {
|
||||
relay = relay as string;
|
||||
if (timer && timer.close) {
|
||||
clearTimeout(timer as any);
|
||||
}
|
||||
socket.end();
|
||||
const response = unpack(res as any) as RPCResponse;
|
||||
if (response && response.error) {
|
||||
this._errors[relay] = response.error;
|
||||
return reject(null);
|
||||
}
|
||||
this._responses[relay] = response;
|
||||
resolve(null);
|
||||
});
|
||||
socket.on("error", (error: any) => {
|
||||
relay = relay as string;
|
||||
this._errors[relay] = error;
|
||||
reject({ error });
|
||||
});
|
||||
socket.write("rpc");
|
||||
socket.write(pack(this._query));
|
||||
timer = setTimeout(() => {
|
||||
this._errors[relay as string] = "timeout";
|
||||
reject(null);
|
||||
}, (this._options.relayTimeout || this._network.relayTimeout) * 1000) as NodeJS.Timeout;
|
||||
});
|
||||
}
|
||||
|
||||
protected abstract checkResponses(): void;
|
||||
|
||||
protected abstract getRelays(): string[] | Buffer[];
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
import RpcQueryBase from "./base.js";
|
||||
import RpcNetwork from "../rpcNetwork.js";
|
||||
import type { RPCRequest } from "@lumeweb/relay";
|
||||
import { RpcQueryOptions } from "../types.js";
|
||||
import type { Buffer } from "buffer";
|
||||
|
||||
export default class SimpleRpcQuery extends RpcQueryBase {
|
||||
private _relay: string | Buffer;
|
||||
constructor(
|
||||
network: RpcNetwork,
|
||||
relay: string | Buffer,
|
||||
query: RPCRequest,
|
||||
options: RpcQueryOptions
|
||||
) {
|
||||
super(network, query, options);
|
||||
this._relay = relay;
|
||||
this.init();
|
||||
}
|
||||
|
||||
protected checkResponses(): void {
|
||||
if (Object.keys(this._responses).length) {
|
||||
this.resolve(Object.values(this._responses).pop());
|
||||
return;
|
||||
}
|
||||
|
||||
if (Object.keys(this._errors).length) {
|
||||
this.resolve({ error: Object.values(this._errors).pop() });
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
protected getRelays(): string[] | Buffer[] {
|
||||
return [this._relay] as string[] | Buffer[];
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
import SimpleRpcQuery from "./simple.js";
|
||||
import { Buffer } from "buffer";
|
||||
import { isPromise } from "../util.js";
|
||||
import { clearTimeout, setTimeout } from "timers";
|
||||
import { pack, unpack } from "msgpackr";
|
||||
import type { RPCRequest } from "@lumeweb/relay";
|
||||
import { RPCResponse } from "@lumeweb/relay";
|
||||
import RpcNetwork from "../rpcNetwork.js";
|
||||
import { StreamingRpcQueryOptions } from "../types.js";
|
||||
|
||||
export default class StreamingRpcQuery extends SimpleRpcQuery {
|
||||
protected _options: StreamingRpcQueryOptions;
|
||||
constructor(
|
||||
network: RpcNetwork,
|
||||
relay: string | Buffer,
|
||||
query: RPCRequest,
|
||||
options: StreamingRpcQueryOptions
|
||||
) {
|
||||
super(network, relay, query, options);
|
||||
this._options = options;
|
||||
}
|
||||
protected async queryRelay(relay: string | Buffer): Promise<any> {
|
||||
let socket: any;
|
||||
|
||||
let relayKey: Buffer = relay as Buffer;
|
||||
|
||||
if (relay === "string") {
|
||||
relayKey = Buffer.from(relay, "hex");
|
||||
}
|
||||
if (relay instanceof Buffer) {
|
||||
relayKey = relay;
|
||||
relay = relay.toString("hex");
|
||||
}
|
||||
|
||||
try {
|
||||
socket = this._network.dht.connect(relayKey);
|
||||
if (isPromise(socket)) {
|
||||
socket = await socket;
|
||||
}
|
||||
} catch (e) {
|
||||
return;
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
let timer: any;
|
||||
socket.on("data", (res: Buffer) => {
|
||||
relay = relay as string;
|
||||
if (timer && timer.close) {
|
||||
clearTimeout(timer as any);
|
||||
}
|
||||
socket.end();
|
||||
const response = unpack(res as any) as RPCResponse;
|
||||
if (response && response.error) {
|
||||
this._errors[relay] = response.error;
|
||||
return reject(null);
|
||||
}
|
||||
|
||||
if (response?.data.done) {
|
||||
this._responses[relay] = {};
|
||||
resolve(null);
|
||||
return;
|
||||
}
|
||||
|
||||
this._options.streamHandler(response?.data.data);
|
||||
});
|
||||
socket.on("error", (error: any) => {
|
||||
relay = relay as string;
|
||||
this._errors[relay] = error;
|
||||
reject({ error });
|
||||
});
|
||||
socket.write("rpc");
|
||||
socket.write(pack(this._query));
|
||||
timer = setTimeout(() => {
|
||||
this._errors[relay as string] = "timeout";
|
||||
reject(null);
|
||||
}, (this._options.relayTimeout || this._network.relayTimeout) * 1000) as NodeJS.Timeout;
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
import RpcQueryBase from "./base.js";
|
||||
import { flatten } from "../util.js";
|
||||
import { Buffer } from "buffer";
|
||||
import type { RPCResponse } from "@lumeweb/relay";
|
||||
import { blake2b } from "libskynet";
|
||||
import { ERR_MAX_TRIES_HIT } from "../error.js";
|
||||
|
||||
export default class WisdomRpcQuery extends RpcQueryBase {
|
||||
private _maxTries = 3;
|
||||
private _tries = 0;
|
||||
|
||||
protected checkResponses(): void {
|
||||
const responseStore = this._responses;
|
||||
const responseStoreData = Object.values(responseStore);
|
||||
|
||||
type ResponseGroup = { [response: string]: number };
|
||||
|
||||
const responseObjects = responseStoreData.reduce((output: any, item) => {
|
||||
const itemFlattened = flatten(item?.data).sort();
|
||||
|
||||
const hash = Buffer.from(
|
||||
blake2b(Buffer.from(JSON.stringify(itemFlattened)))
|
||||
).toString("hex");
|
||||
output[hash] = item?.data;
|
||||
return output;
|
||||
}, {});
|
||||
const responses: ResponseGroup = responseStoreData.reduce(
|
||||
(output: ResponseGroup, item) => {
|
||||
const itemFlattened = flatten(item?.data).sort();
|
||||
const hash = Buffer.from(
|
||||
blake2b(Buffer.from(JSON.stringify(itemFlattened)))
|
||||
).toString("hex");
|
||||
output[hash] = output[hash] ?? 0;
|
||||
output[hash]++;
|
||||
return output;
|
||||
},
|
||||
{}
|
||||
);
|
||||
|
||||
for (const responseHash in responses) {
|
||||
if (
|
||||
responses[responseHash] / responseStoreData.length >=
|
||||
this._network.majorityThreshold
|
||||
) {
|
||||
let response: RPCResponse = responseObjects[responseHash];
|
||||
|
||||
// @ts-ignore
|
||||
if (null === response) {
|
||||
if (this._tries <= this._maxTries) {
|
||||
this._tries++;
|
||||
this.retry();
|
||||
return;
|
||||
}
|
||||
|
||||
response = { error: ERR_MAX_TRIES_HIT };
|
||||
}
|
||||
|
||||
this.resolve(response);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private retry() {
|
||||
this._responses = {};
|
||||
this._errors = {};
|
||||
|
||||
if (this._completed) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.init();
|
||||
}
|
||||
|
||||
protected getRelays(): string[] | [] {
|
||||
return this._network.relays;
|
||||
}
|
||||
}
|
174
src/rpcQuery.ts
174
src/rpcQuery.ts
|
@ -1,174 +0,0 @@
|
|||
import { clearTimeout, setTimeout } from "timers";
|
||||
import RpcNetwork from "./rpcNetwork.js";
|
||||
import { pack, unpack } from "msgpackr";
|
||||
import { RPCRequest, RPCResponse } from "./types.js";
|
||||
import { Buffer } from "buffer";
|
||||
import { blake2b } from "libskynet";
|
||||
import { flatten } from "./util.js";
|
||||
|
||||
export default class RpcQuery {
|
||||
private _network: RpcNetwork;
|
||||
private _query: RPCRequest;
|
||||
private _promise?: Promise<any>;
|
||||
private _timeoutTimer?: any;
|
||||
private _timeout: boolean = false;
|
||||
private _completed: boolean = false;
|
||||
private _responses: { [relay: string]: RPCResponse } = {};
|
||||
private _promiseResolve?: (data: any) => void;
|
||||
private _maxTries = 3;
|
||||
private _tries = 0;
|
||||
|
||||
constructor(network: RpcNetwork, query: RPCRequest) {
|
||||
this._network = network;
|
||||
this._query = query;
|
||||
this.init();
|
||||
}
|
||||
|
||||
get result(): Promise<any> {
|
||||
return this._promise as Promise<any>;
|
||||
}
|
||||
|
||||
private handeTimeout() {
|
||||
this.resolve(false, true);
|
||||
}
|
||||
|
||||
private resolve(data: any, timeout: boolean = false): void {
|
||||
clearTimeout(this._timeoutTimer);
|
||||
this._timeout = timeout;
|
||||
this._completed = true;
|
||||
// @ts-ignore
|
||||
this._promiseResolve(data);
|
||||
}
|
||||
|
||||
private async init() {
|
||||
this._promise =
|
||||
this._promise ??
|
||||
new Promise<any>((resolve) => {
|
||||
this._promiseResolve = resolve;
|
||||
});
|
||||
|
||||
this._timeoutTimer =
|
||||
this._timeoutTimer ??
|
||||
setTimeout(
|
||||
this.handeTimeout.bind(this),
|
||||
this._network.queryTimeout * 1000
|
||||
);
|
||||
|
||||
await this._network.ready;
|
||||
|
||||
const promises = [];
|
||||
|
||||
// tslint:disable-next-line:forin
|
||||
for (const relay of this._network.relays) {
|
||||
promises.push(this.queryRelay(relay));
|
||||
}
|
||||
|
||||
await Promise.allSettled(promises);
|
||||
this.checkResponses();
|
||||
}
|
||||
|
||||
private async queryRelay(relay: string): Promise<any> {
|
||||
let socket: any;
|
||||
|
||||
try {
|
||||
socket = this._network.dht.connect(Buffer.from(relay, "hex"));
|
||||
if (isPromise(socket)) {
|
||||
socket = await socket;
|
||||
}
|
||||
} catch (e) {
|
||||
return;
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
let timer: any;
|
||||
socket.on("data", (res: Buffer) => {
|
||||
if (timer && timer.close) {
|
||||
clearTimeout(timer as any);
|
||||
}
|
||||
socket.end();
|
||||
const response = unpack(res as any) as RPCResponse;
|
||||
if (response && response.error) {
|
||||
return reject(response);
|
||||
}
|
||||
this._responses[relay] = response;
|
||||
resolve(null);
|
||||
});
|
||||
socket.on("error", (error: any) => reject({ error }));
|
||||
socket.write("rpc");
|
||||
socket.write(pack(this._query));
|
||||
timer = setTimeout(() => {
|
||||
reject("timeout");
|
||||
}, this._network.relayTimeout * 1000) as NodeJS.Timeout;
|
||||
});
|
||||
}
|
||||
|
||||
private checkResponses() {
|
||||
const responseStore = this._responses;
|
||||
const responseStoreData = Object.values(responseStore);
|
||||
|
||||
type ResponseGroup = { [response: string]: number };
|
||||
|
||||
const responseObjects = responseStoreData.reduce((output: any, item) => {
|
||||
const itemFlattened = flatten(item?.data).sort();
|
||||
|
||||
const hash = Buffer.from(
|
||||
blake2b(Buffer.from(JSON.stringify(itemFlattened)))
|
||||
).toString("hex");
|
||||
output[hash] = item?.data;
|
||||
return output;
|
||||
}, {});
|
||||
const responses: ResponseGroup = responseStoreData.reduce(
|
||||
(output: ResponseGroup, item) => {
|
||||
const itemFlattened = flatten(item?.data).sort();
|
||||
const hash = Buffer.from(
|
||||
blake2b(Buffer.from(JSON.stringify(itemFlattened)))
|
||||
).toString("hex");
|
||||
output[hash] = output[hash] ?? 0;
|
||||
output[hash]++;
|
||||
return output;
|
||||
},
|
||||
{}
|
||||
);
|
||||
|
||||
for (const responseHash in responses) {
|
||||
if (
|
||||
responses[responseHash] / responseStoreData.length >=
|
||||
this._network.majorityThreshold
|
||||
) {
|
||||
// @ts-ignore
|
||||
let response: RPCResponse | boolean = responseObjects[responseHash];
|
||||
|
||||
// @ts-ignore
|
||||
if (null === response) {
|
||||
if (this._tries <= this._maxTries) {
|
||||
this._tries++;
|
||||
this.retry();
|
||||
return;
|
||||
}
|
||||
|
||||
response = false;
|
||||
}
|
||||
|
||||
this.resolve(response);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private retry() {
|
||||
this._responses = {};
|
||||
|
||||
if (this._completed) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.init();
|
||||
}
|
||||
}
|
||||
|
||||
function isPromise(obj: Promise<any>) {
|
||||
return (
|
||||
!!obj &&
|
||||
(typeof obj === "object" || typeof obj === "function") &&
|
||||
typeof obj.then === "function"
|
||||
);
|
||||
}
|
17
src/types.ts
17
src/types.ts
|
@ -1,12 +1,9 @@
|
|||
export interface RPCRequest {
|
||||
bypassCache: boolean;
|
||||
chain: string;
|
||||
query: string;
|
||||
data: any;
|
||||
export interface RpcQueryOptions {
|
||||
queryTimeout?: number;
|
||||
relayTimeout?: number;
|
||||
}
|
||||
export interface StreamingRpcQueryOptions extends RpcQueryOptions {
|
||||
streamHandler: StreamHandlerFunction;
|
||||
}
|
||||
|
||||
export interface RPCResponse {
|
||||
updated: number;
|
||||
data: any;
|
||||
error?: string
|
||||
}
|
||||
export type StreamHandlerFunction = (data: Uint8Array) => void;
|
||||
|
|
|
@ -54,3 +54,11 @@ export function flatten(target: any, opts: any = {}): any[] {
|
|||
|
||||
return output;
|
||||
}
|
||||
|
||||
export function isPromise(obj: Promise<any>) {
|
||||
return (
|
||||
!!obj &&
|
||||
(typeof obj === "object" || typeof obj === "function") &&
|
||||
typeof obj.then === "function"
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue