relay/src/rpc.ts

277 lines
6.3 KiB
TypeScript
Raw Normal View History

2022-06-27 17:53:00 +00:00
import crypto from "crypto";
import jayson from "jayson/promise/index.js";
2022-07-19 22:31:15 +00:00
import { pack, unpack } from "msgpackr";
import { Mutex } from "async-mutex";
import { createRequire } from "module";
2022-06-27 17:53:00 +00:00
import NodeCache from "node-cache";
2022-07-19 22:31:15 +00:00
import { get as getDHT } from "./dht.js";
import { rpcMethods } from "./rpc/index.js";
2022-07-05 19:54:57 +00:00
import PocketPKG from "@pokt-network/pocket-js";
2022-07-19 22:31:15 +00:00
const { Configuration, HttpRpcProvider, PocketAAT, Pocket } = PocketPKG;
2022-07-04 23:17:58 +00:00
import {
JSONRPCError,
2022-07-19 22:31:15 +00:00
JSONRPCRequest,
JSONRPCResponseWithError,
JSONRPCResponseWithResult,
2022-07-04 23:17:58 +00:00
} from "jayson";
2022-07-19 22:31:15 +00:00
import config, { updateUsePocketGateway, usePocketGateway } from "./config.js";
import { ERR_NOT_READY, errorExit } from "./error.js";
2022-07-04 23:17:58 +00:00
2022-06-27 17:53:00 +00:00
const require = createRequire(import.meta.url);
const stringify = require("json-stable-stringify");
const pendingRequests = new NodeCache();
const processedRequests = new NodeCache({
2022-07-19 22:31:15 +00:00
stdTTL: 60 * 60 * 12,
2022-06-27 17:53:00 +00:00
});
2022-07-05 19:54:57 +00:00
type PocketAATObject = typeof PocketAAT;
let pocketServer: typeof Pocket;
let _aat: PocketAATObject;
2022-07-04 23:17:58 +00:00
let jsonServer: jayson.Server;
2022-06-27 17:53:00 +00:00
interface RPCRequest {
2022-07-19 22:31:15 +00:00
force: boolean;
chain: string;
query: string;
data: string;
2022-06-27 17:53:00 +00:00
}
interface RPCResponse {
2022-07-19 22:31:15 +00:00
updated: number;
data:
| any
| {
2022-06-27 17:53:00 +00:00
error: string | boolean;
2022-07-19 22:31:15 +00:00
};
2022-06-27 17:53:00 +00:00
}
function hash(data: string): string {
2022-07-19 22:31:15 +00:00
return crypto.createHash("sha256").update(data).digest("hex");
2022-06-27 17:53:00 +00:00
}
function getRequestId(request: RPCRequest) {
2022-07-19 22:31:15 +00:00
const clonedRequest = Object.assign({}, request);
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
// @ts-ignore
delete clonedRequest.force;
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
return hash(stringify(clonedRequest));
2022-06-27 17:53:00 +00:00
}
function maybeProcessRequest(request: RPCRequest) {
2022-07-19 22:31:15 +00:00
if (!request.chain) {
throw new Error("RPC chain missing");
}
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
if (!request.data) {
throw new Error("RPC data missing");
}
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
return processRequest(request);
2022-06-27 17:53:00 +00:00
}
async function processRequest(request: RPCRequest): Promise<RPCResponse> {
2022-07-19 22:31:15 +00:00
const reqId = getRequestId(request);
let lock: Mutex = pendingRequests.get(reqId) as Mutex;
const lockExists = !!lock;
if (!lockExists) {
lock = new Mutex();
pendingRequests.set(reqId, lock);
}
if (lock.isLocked()) {
await lock.waitForUnlock();
return processedRequests.get(reqId) as RPCResponse;
}
await lock.acquire();
if (!request.force && processedRequests.get(reqId)) {
return processedRequests.get(reqId) as RPCResponse;
}
let rpcResp;
let error;
try {
rpcResp = await processRpcRequest(
{
method: request.query,
jsonrpc: "2.0",
params: request.data,
id: 1,
} as unknown as JSONRPCRequest,
request.chain
);
} catch (e) {
error = (e as Error).message;
}
let dbData: RPCResponse = {
updated: Date.now(),
data: "",
};
if (rpcResp) {
rpcResp = rpcResp as JSONRPCResponseWithResult;
if (false === rpcResp.result) {
error = true;
2022-06-27 17:53:00 +00:00
}
2022-07-04 23:17:58 +00:00
2022-07-19 22:31:15 +00:00
rpcResp = rpcResp as unknown as JSONRPCResponseWithError;
2022-07-23 00:57:53 +00:00
if (rpcResp.error && typeof rpcResp.error === "object") {
error = (rpcResp.error as JSONRPCError).message;
2022-06-27 17:53:00 +00:00
}
2022-07-19 22:31:15 +00:00
}
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
dbData.data = error
? { error }
: (rpcResp as unknown as JSONRPCResponseWithResult).result;
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
if (!processedRequests.get(reqId) || request.force) {
processedRequests.set(reqId, dbData);
}
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
await lock.release();
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
return dbData;
2022-06-27 17:53:00 +00:00
}
2022-07-05 19:54:57 +00:00
export function updateAat(aat: PocketAATObject): void {
2022-07-19 22:31:15 +00:00
_aat = aat;
2022-07-04 23:17:58 +00:00
}
2022-07-05 19:54:57 +00:00
export function getAat(): PocketAATObject {
2022-07-19 22:31:15 +00:00
return _aat;
2022-07-04 23:17:58 +00:00
}
2022-07-05 19:54:57 +00:00
export function getPocketServer(): typeof Pocket {
2022-07-19 22:31:15 +00:00
return pocketServer;
2022-07-04 23:17:58 +00:00
}
export async function unlockAccount(
2022-07-19 22:31:15 +00:00
accountPrivateKey: string,
accountPublicKey: string,
accountPassphrase: string
2022-07-05 19:54:57 +00:00
): Promise<PocketAATObject> {
2022-07-19 22:31:15 +00:00
try {
// @ts-ignore
const account = await pocketServer.keybase.importAccount(
Buffer.from(accountPrivateKey, "hex"),
accountPassphrase
);
if (account instanceof Error) {
// noinspection ExceptionCaughtLocallyJS
throw account;
}
2022-07-19 22:31:15 +00:00
// @ts-ignore
await pocketServer.keybase.unlockAccount(
account.addressHex,
accountPassphrase,
0
);
// @ts-ignore
return await PocketAAT.from(
"0.0.1",
accountPublicKey,
accountPublicKey,
accountPrivateKey
);
} catch (e) {
console.error(e);
process.exit(1);
}
2022-07-04 23:17:58 +00:00
}
export async function processRpcRequest(
2022-07-19 22:31:15 +00:00
request: JSONRPCRequest,
chain: string
2022-07-04 23:17:58 +00:00
): Promise<JSONRPCResponseWithResult | JSONRPCResponseWithError | undefined> {
2022-07-19 22:31:15 +00:00
return new Promise((resolve) => {
jsonServer.call(
request,
{ chain },
(
err?: JSONRPCResponseWithError | null,
result?: JSONRPCResponseWithResult
): void => {
if (err) {
return resolve(err);
}
resolve(result);
}
);
});
2022-07-04 23:17:58 +00:00
}
2022-06-27 17:53:00 +00:00
export async function start() {
2022-07-19 22:31:15 +00:00
if (!config.str("pocket-app-id") || !config.str("pocket-app-key")) {
const pocketHost = config.str("pocket-host");
const pocketPort = config.uint("pocket-port");
if (!pocketHost || !pocketPort) {
errorExit(
"Please set pocket-host and pocket-port config options if you do not have an API key set"
);
}
2022-07-19 22:31:15 +00:00
const dispatchURL = new URL(
`http://${config.str("pocket-host")}:${config.uint("pocket-port")}`
);
const rpcProvider = new HttpRpcProvider(dispatchURL);
const configuration = new Configuration();
// @ts-ignore
pocketServer = new Pocket([dispatchURL], rpcProvider, configuration);
updateUsePocketGateway(false);
}
if (!usePocketGateway()) {
updateAat(
await unlockAccount(
<string>config.str("pocket-account-private-key"),
<string>config.str("pocket-account-public-key"),
"0"
)
);
}
jsonServer = new jayson.Server(rpcMethods, { useContext: true });
(await getDHT("server")).on("connection", (socket: any) => {
socket.rawStream._ondestroy = () => false;
let isRpc = false;
socket.once("data", async (data: any) => {
if (data === "rpc") {
isRpc = true;
}
});
socket.once("data", async (data: any) => {
if (!isRpc) {
return;
}
2022-07-19 22:31:15 +00:00
let request: RPCRequest;
try {
request = unpack(data) as RPCRequest;
} catch (e) {
return;
}
try {
socket.write(pack(await maybeProcessRequest(request)));
} catch (error) {
console.trace(error);
socket.write(pack({ error }));
}
socket.end();
2022-06-27 17:53:00 +00:00
});
2022-07-19 22:31:15 +00:00
});
2022-06-27 17:53:00 +00:00
}