*Rwrite checkResponses

This commit is contained in:
Derrick Hammer 2022-07-19 14:47:33 -04:00
parent da06f787dc
commit 42d53b6137
1 changed files with 34 additions and 14 deletions

View File

@ -2,6 +2,8 @@ import { clearTimeout, setTimeout } from "timers";
import RpcNetwork from "./rpcNetwork.js";
import { pack, unpack } from "msgpackr";
import { RPCRequest, RPCResponse } from "./types";
import { Buffer } from "buffer";
import {blake2b} from "libskynet"
export default class RpcQuery {
private _network: RpcNetwork;
@ -89,33 +91,43 @@ export default class RpcQuery {
}
private checkResponses() {
const responses: { [response: string]: number } = {};
const responseStore = this._responses;
const responseStoreKeys = Object.keys(responseStore);
const responseStoreData = Object.values(responseStore);
// tslint:disable-next-line:forin
for (const peer in responseStore) {
const responseIndex = responseStoreKeys.indexOf(peer);
type ResponseGroup = { [response: string]: number };
responses[responseIndex] = responses[responseIndex] ?? 0;
responses[responseIndex]++;
}
for (const responseIndex in responses) {
const responseObjects = responseStoreData.reduce((output: any, item) => {
const hash = Buffer.from(blake2b(Buffer.from(JSON.stringify(item?.data)))).toString("hex");
output[hash] = item?.data;
return output;
}, {});
const responses: ResponseGroup = responseStoreData.reduce(
(output: ResponseGroup, item) => {
const hash = Buffer.from(blake2b(Buffer.from(JSON.stringify(item?.data)))).toString("hex");
output[hash] = output[hash] ?? 0;
output[hash]++;
return output;
},
{}
);
for (const responseHash in responses) {
if (
responses[responseIndex] / responseStoreKeys.length >=
responses[responseHash] / responseStoreData.length >=
this._network.majorityThreshold
) {
const response: RPCResponse | null =
responseStore[responseStoreKeys[parseInt(responseIndex, 10)]];
// @ts-ignore
const response: RPCResponse = responseObjects[responseHash];
// @ts-ignore
if (null === response || null === response?.data) {
if (null === response) {
this.retry();
return;
}
this.resolve(response?.data);
this.resolve(response);
break;
}
}
}
@ -130,3 +142,11 @@ export default class RpcQuery {
this.init();
}
}
function isPromise(obj: Promise<any>) {
return (
!!obj &&
(typeof obj === "object" || typeof obj === "function") &&
typeof obj.then === "function"
);
}