relay/src/rpc.ts

298 lines
6.6 KiB
TypeScript
Raw Normal View History

//const require = createRequire(import.meta.url);
//import { createRequire } from "module";
2022-06-27 17:53:00 +00:00
import crypto from "crypto";
import jayson from "jayson/promise/index.js";
2022-07-19 22:31:15 +00:00
import { pack, unpack } from "msgpackr";
import { Mutex } from "async-mutex";
2022-06-27 17:53:00 +00:00
import NodeCache from "node-cache";
2022-07-19 22:31:15 +00:00
import { get as getDHT } from "./dht.js";
import { rpcMethods } from "./rpc/index.js";
import { start as startDns } from "./dns.js";
const {
Configuration,
HttpRpcProvider,
PocketAAT,
Pocket,
} = require("@pokt-network/pocket-js/dist/index.js");
2022-07-04 23:17:58 +00:00
import {
JSONRPCError,
2022-07-19 22:31:15 +00:00
JSONRPCRequest,
JSONRPCResponseWithError,
JSONRPCResponseWithResult,
2022-07-04 23:17:58 +00:00
} from "jayson";
2022-07-19 22:31:15 +00:00
import config, { updateUsePocketGateway, usePocketGateway } from "./config.js";
import { ERR_NOT_READY, errorExit } from "./error.js";
2022-07-25 06:45:16 +00:00
import log from "loglevel";
2022-07-04 23:17:58 +00:00
2022-06-27 17:53:00 +00:00
const stringify = require("json-stable-stringify");
const pendingRequests = new NodeCache();
const processedRequests = new NodeCache({
2022-07-19 22:31:15 +00:00
stdTTL: 60 * 60 * 12,
2022-06-27 17:53:00 +00:00
});
2022-07-05 19:54:57 +00:00
type PocketAATObject = typeof PocketAAT;
let pocketServer: typeof Pocket;
let _aat: PocketAATObject;
2022-07-04 23:17:58 +00:00
let jsonServer: jayson.Server;
2022-06-27 17:53:00 +00:00
interface RPCRequest {
2022-07-19 22:31:15 +00:00
force: boolean;
chain: string;
query: string;
data: string;
2022-06-27 17:53:00 +00:00
}
interface RPCResponse {
2022-07-19 22:31:15 +00:00
updated: number;
data:
| any
| {
2022-06-27 17:53:00 +00:00
error: string | boolean;
2022-07-19 22:31:15 +00:00
};
2022-06-27 17:53:00 +00:00
}
function hash(data: string): string {
2022-07-19 22:31:15 +00:00
return crypto.createHash("sha256").update(data).digest("hex");
2022-06-27 17:53:00 +00:00
}
function getRequestId(request: RPCRequest) {
2022-07-19 22:31:15 +00:00
const clonedRequest = Object.assign({}, request);
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
// @ts-ignore
delete clonedRequest.force;
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
return hash(stringify(clonedRequest));
2022-06-27 17:53:00 +00:00
}
function maybeProcessRequest(request: RPCRequest) {
2022-07-19 22:31:15 +00:00
if (!request.chain) {
throw new Error("RPC chain missing");
}
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
if (!request.data) {
throw new Error("RPC data missing");
}
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
return processRequest(request);
2022-06-27 17:53:00 +00:00
}
async function processRequest(request: RPCRequest): Promise<RPCResponse> {
2022-07-19 22:31:15 +00:00
const reqId = getRequestId(request);
let lock: Mutex = pendingRequests.get(reqId) as Mutex;
const lockExists = !!lock;
if (!lockExists) {
lock = new Mutex();
pendingRequests.set(reqId, lock);
}
if (lock.isLocked()) {
await lock.waitForUnlock();
return processedRequests.get(reqId) as RPCResponse;
}
await lock.acquire();
if (!request.force && processedRequests.get(reqId)) {
return processedRequests.get(reqId) as RPCResponse;
}
let rpcResp;
let error;
try {
rpcResp = await processRpcRequest(
{
method: request.query,
jsonrpc: "2.0",
params: request.data,
id: 1,
} as unknown as JSONRPCRequest,
request.chain
);
} catch (e) {
error = (e as Error).message;
}
let dbData: RPCResponse = {
updated: Date.now(),
data: "",
};
if (rpcResp) {
rpcResp = rpcResp as JSONRPCResponseWithResult;
if (false === rpcResp.result) {
error = true;
2022-06-27 17:53:00 +00:00
}
2022-07-04 23:17:58 +00:00
2022-07-19 22:31:15 +00:00
rpcResp = rpcResp as unknown as JSONRPCResponseWithError;
2022-07-23 00:57:53 +00:00
if (rpcResp.error && typeof rpcResp.error === "object") {
error = (rpcResp.error as JSONRPCError).message;
2022-06-27 17:53:00 +00:00
}
2022-07-19 22:31:15 +00:00
}
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
dbData.data = error
? { error }
: (rpcResp as unknown as JSONRPCResponseWithResult).result;
2022-06-27 17:53:00 +00:00
if (
(!processedRequests.get(reqId) || request.force) &&
dbData.data?.error !== ERR_NOT_READY
) {
2022-07-19 22:31:15 +00:00
processedRequests.set(reqId, dbData);
}
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
await lock.release();
2022-06-27 17:53:00 +00:00
2022-07-19 22:31:15 +00:00
return dbData;
2022-06-27 17:53:00 +00:00
}
2022-07-05 19:54:57 +00:00
export function updateAat(aat: PocketAATObject): void {
2022-07-19 22:31:15 +00:00
_aat = aat;
2022-07-04 23:17:58 +00:00
}
2022-07-05 19:54:57 +00:00
export function getAat(): PocketAATObject {
2022-07-19 22:31:15 +00:00
return _aat;
2022-07-04 23:17:58 +00:00
}
2022-07-05 19:54:57 +00:00
export function getPocketServer(): typeof Pocket {
2022-07-19 22:31:15 +00:00
return pocketServer;
2022-07-04 23:17:58 +00:00
}
export async function unlockAccount(
2022-07-19 22:31:15 +00:00
accountPrivateKey: string,
accountPublicKey: string,
accountPassphrase: string
2022-07-05 19:54:57 +00:00
): Promise<PocketAATObject> {
2022-07-19 22:31:15 +00:00
try {
// @ts-ignore
const account = await pocketServer.keybase.importAccount(
Buffer.from(accountPrivateKey, "hex"),
accountPassphrase
);
if (account instanceof Error) {
// noinspection ExceptionCaughtLocallyJS
throw account;
}
2022-07-19 22:31:15 +00:00
// @ts-ignore
await pocketServer.keybase.unlockAccount(
account.addressHex,
accountPassphrase,
0
);
// @ts-ignore
return await PocketAAT.from(
"0.0.1",
accountPublicKey,
accountPublicKey,
accountPrivateKey
);
} catch (e) {
console.error(e);
process.exit(1);
}
2022-07-04 23:17:58 +00:00
}
export async function processRpcRequest(
2022-07-19 22:31:15 +00:00
request: JSONRPCRequest,
chain: string
2022-07-04 23:17:58 +00:00
): Promise<JSONRPCResponseWithResult | JSONRPCResponseWithError | undefined> {
2022-07-19 22:31:15 +00:00
return new Promise((resolve) => {
jsonServer.call(
request,
{ chain },
(
err?: JSONRPCResponseWithError | null,
result?: JSONRPCResponseWithResult
): void => {
if (err) {
return resolve(err);
}
resolve(result);
}
);
});
2022-07-04 23:17:58 +00:00
}
2022-06-27 17:53:00 +00:00
export async function start() {
2022-07-19 22:31:15 +00:00
if (!config.str("pocket-app-id") || !config.str("pocket-app-key")) {
const pocketHost = config.str("pocket-host");
const pocketPort = config.uint("pocket-port");
if (!pocketHost || !pocketPort) {
errorExit(
"Please set pocket-host and pocket-port config options if you do not have an API key set"
);
}
2022-07-19 22:31:15 +00:00
const dispatchURL = new URL(
`http://${config.str("pocket-host")}:${config.uint("pocket-port")}`
);
const rpcProvider = new HttpRpcProvider(dispatchURL);
const configuration = new Configuration();
// @ts-ignore
pocketServer = new Pocket([dispatchURL], rpcProvider, configuration);
updateUsePocketGateway(false);
}
if (!usePocketGateway()) {
updateAat(
await unlockAccount(
<string>config.str("pocket-account-private-key"),
<string>config.str("pocket-account-public-key"),
"0"
)
);
}
jsonServer = new jayson.Server(rpcMethods, { useContext: true });
(await getDHT("server")).on("connection", RPCConnection.handleRequest);
await startDns();
}
class RPCConnection {
private _socket: any;
constructor(socket: any) {
this._socket = socket;
2022-07-19 22:31:15 +00:00
socket.rawStream._ondestroy = () => false;
socket.once("data", this.checkRpc.bind(this));
}
private async checkRpc(data: Buffer) {
if (data.toString() === "rpc") {
this._socket.once("data", this.processRequest);
}
}
2022-07-19 22:31:15 +00:00
private async processRequest(data: Buffer) {
let request: RPCRequest;
try {
request = unpack(data) as RPCRequest;
} catch (e) {
return;
}
const that = this as any;
try {
that.write(pack(await maybeProcessRequest(request)));
} catch (error) {
2022-07-25 06:45:16 +00:00
log.trace(error);
that.write(pack({ error }));
}
that.end();
}
public static handleRequest(socket: any) {
new RPCConnection(socket);
}
2022-06-27 17:53:00 +00:00
}