diff --git a/src/rpcQuery.ts b/src/rpcQuery.ts index 917684f..1ab505e 100644 --- a/src/rpcQuery.ts +++ b/src/rpcQuery.ts @@ -2,6 +2,8 @@ import { clearTimeout, setTimeout } from "timers"; import RpcNetwork from "./rpcNetwork.js"; import { pack, unpack } from "msgpackr"; import { RPCRequest, RPCResponse } from "./types"; +import { Buffer } from "buffer"; +import {blake2b} from "libskynet" export default class RpcQuery { private _network: RpcNetwork; @@ -89,33 +91,43 @@ export default class RpcQuery { } private checkResponses() { - const responses: { [response: string]: number } = {}; const responseStore = this._responses; - const responseStoreKeys = Object.keys(responseStore); + const responseStoreData = Object.values(responseStore); - // tslint:disable-next-line:forin - for (const peer in responseStore) { - const responseIndex = responseStoreKeys.indexOf(peer); + type ResponseGroup = { [response: string]: number }; - responses[responseIndex] = responses[responseIndex] ?? 0; - responses[responseIndex]++; - } - for (const responseIndex in responses) { + const responseObjects = responseStoreData.reduce((output: any, item) => { + const hash = Buffer.from(blake2b(Buffer.from(JSON.stringify(item?.data)))).toString("hex"); + output[hash] = item?.data; + return output; + }, {}); + const responses: ResponseGroup = responseStoreData.reduce( + (output: ResponseGroup, item) => { + const hash = Buffer.from(blake2b(Buffer.from(JSON.stringify(item?.data)))).toString("hex"); + output[hash] = output[hash] ?? 0; + output[hash]++; + return output; + }, + {} + ); + + for (const responseHash in responses) { if ( - responses[responseIndex] / responseStoreKeys.length >= + responses[responseHash] / responseStoreData.length >= this._network.majorityThreshold ) { - const response: RPCResponse | null = - responseStore[responseStoreKeys[parseInt(responseIndex, 10)]]; + // @ts-ignore + const response: RPCResponse = responseObjects[responseHash]; // @ts-ignore - if (null === response || null === response?.data) { + if (null === response) { this.retry(); return; } - this.resolve(response?.data); + this.resolve(response); + break; } } } @@ -130,3 +142,11 @@ export default class RpcQuery { this.init(); } } + +function isPromise(obj: Promise) { + return ( + !!obj && + (typeof obj === "object" || typeof obj === "function") && + typeof obj.then === "function" + ); +}