*Super refactor for the beginning of a new rpc server, changed protocol and simple plugin system

*Remove all rpc methods to prepare them for becoming plugins
This commit is contained in:
Derrick Hammer 2022-08-26 21:52:19 -04:00
parent a0cab03073
commit 48a5928256
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
19 changed files with 502 additions and 1035 deletions

View File

@ -31,6 +31,7 @@
"@types/ws": "^8.5.3",
"@types/xml2js": "^0.4.11",
"acme-client": "^4.2.5",
"ajv": "^8.11.0",
"algosdk": "^1.18.1",
"async-mutex": "^0.3.2",
"bcfg": "^0.1.7",
@ -40,13 +41,13 @@
"dotenv": "^16.0.1",
"ethers": "^5.6.9",
"express": "^4.18.1",
"globby": "^13.1.2",
"hsd": "https://github.com/LumeWeb/hsd.git#spv-namestate",
"ipfs-core": "^0.15.4",
"ipfs-http-response": "^3.0.4",
"ipfs-repo": "^14.0.1",
"it-last": "^1.0.6",
"it-to-stream": "^1.0.0",
"jayson": "^3.6.6",
"json-stable-stringify": "^1.0.1",
"libskynet": "^0.0.62",
"libskynetnode": "^0.1.3",
@ -60,6 +61,7 @@
"promise-retry": "^2.0.1",
"random-access-memory": "^4.1.0",
"random-key": "^0.3.2",
"slugify": "^1.6.5",
"sprintf-js": "^1.1.2",
"xml2js": "^0.4.23"
},

View File

@ -11,19 +11,19 @@ import { errorExit } from "./error.js";
const config = new BConfig("lumeweb-relay");
let configLocation;
let configDir;
const configFile = "config.conf";
switch (os.platform()) {
case "win32":
configLocation = path.resolve(
require?.main?.filename as string,
configFile
);
configDir = path.dirname(require?.main?.filename as string);
configLocation = path.resolve(configDir, configFile);
break;
case "linux":
default:
configLocation = path.join("/etc/lumeweb/relay", configFile);
configDir = "/etc/lumeweb/relay";
configLocation = path.join(configDir, configFile);
break;
}
@ -31,6 +31,8 @@ config.inject({
port: 8080,
config: configLocation,
logLevel: "info",
pluginFolder: path.join(configDir, "plugins"),
plugins: ["core"],
});
config.load({

View File

@ -2,11 +2,13 @@ import { start as startRpc } from "./rpc.js";
import { start as startRelay } from "./relay.js";
import log from "loglevel";
import config from "./config.js";
import { loadPlugins } from "./plugin.js";
log.setDefaultLevel(config.str("log-level"));
async function boot() {
await startRpc();
await loadPlugins();
await startRelay();
}

75
src/plugin.ts Normal file
View File

@ -0,0 +1,75 @@
import { globby } from "globby";
import config from "./config.js";
import { getRpcServer } from "./rpc/server.js";
import { RelayPluginAPI, RPCMethod, Plugin } from "./types.js";
import slugify from "slugify";
let pluginApi: PluginAPI;
const sanitizeName = (name: string) =>
slugify(name, { lower: true, strict: true });
export class PluginAPI {
private registeredPlugins: Map<string, Plugin> = new Map<string, Plugin>();
public async loadPlugin(moduleName: string): Promise<Plugin> {
moduleName = sanitizeName(moduleName);
if (this.registeredPlugins.has(moduleName)) {
return this.registeredPlugins.get(moduleName) as Plugin;
}
const paths = await globby([`${moduleName}.js`, "${moduleName}.mjs"], {
cwd: config.get("plugin-folder"),
});
if (!paths.length) {
throw new Error(`Plugin ${moduleName} does not exist`);
}
let plugin: Plugin;
try {
plugin = (await import(paths.shift() as string)) as Plugin;
} catch (e) {
throw e;
}
plugin.name = sanitizeName(plugin.name);
this.registeredPlugins.set(plugin.name, plugin);
try {
plugin.plugin(this.getPluginAPI(plugin.name));
} catch (e) {
throw e;
}
return plugin;
}
private getPluginAPI(pluginName: string): RelayPluginAPI {
return {
config,
api: {
registerMethod: (methodName: string, method: RPCMethod): void => {
getRpcServer().registerMethod(pluginName, methodName, method);
},
loadPlugin: getPluginAPI().loadPlugin,
},
};
}
}
export function getPluginAPI(): PluginAPI {
if (!pluginApi) {
pluginApi = new PluginAPI();
}
return pluginApi as PluginAPI;
}
export async function loadPlugins() {
for (const plugin of config.array("plugins")) {
await getPluginAPI().loadPlugin(plugin);
}
}

View File

@ -1,239 +1,18 @@
//const require = createRequire(import.meta.url);
//import { createRequire } from "module";
import crypto from "crypto";
import jayson from "jayson/promise/index.js";
import { pack, unpack } from "msgpackr";
import { Mutex } from "async-mutex";
import NodeCache from "node-cache";
import { get as getDHT } from "./dht.js";
import { rpcMethods } from "./rpc/index.js";
import { start as startDns } from "./dns.js";
import {
JSONRPCError,
JSONRPCRequest,
JSONRPCResponseWithError,
JSONRPCResponseWithResult,
} from "jayson";
import config from "./config.js";
import { ERR_NOT_READY, errorExit } from "./error.js";
import log from "loglevel";
import { errorExit } from "./error.js";
// @ts-ignore
import stringify from "json-stable-stringify";
import type { StreamFileResponse } from "./streams.js";
import { getStream } from "./streams.js";
const pendingRequests = new NodeCache();
const processedRequests = new NodeCache({
stdTTL: 60 * 60 * 12,
});
let jsonServer: jayson.Server;
interface RPCRequest {
bypassCache: boolean;
chain: string;
query: string;
data: string;
}
interface RPCResponse {
updated: number;
data: any;
error?: string;
}
function hash(data: string): string {
return crypto.createHash("sha256").update(data).digest("hex");
}
function getRequestId(request: RPCRequest) {
const clonedRequest = Object.assign({}, request);
// @ts-ignore
delete clonedRequest.bypassCache;
return hash(stringify(clonedRequest));
}
function maybeProcessRequest(request: RPCRequest) {
if (!request.chain) {
throw new Error("RPC chain missing");
}
if (!request.data) {
throw new Error("RPC data missing");
}
return processRequest(request);
}
async function processRequest(request: RPCRequest): Promise<RPCResponse> {
const reqId = getRequestId(request);
let lock: Mutex = pendingRequests.get(reqId) as Mutex;
const lockExists = !!lock;
if (!lockExists) {
lock = new Mutex();
pendingRequests.set(reqId, lock);
}
if (lock.isLocked()) {
await lock.waitForUnlock();
return processedRequests.get(reqId) as RPCResponse;
}
await lock.acquire();
if (!request.bypassCache && processedRequests.get(reqId)) {
return processedRequests.get(reqId) as RPCResponse;
}
let rpcResp;
let error;
try {
rpcResp = await processRpcRequest(
{
method: request.query,
jsonrpc: "2.0",
params: request.data,
id: 1,
} as unknown as JSONRPCRequest,
request.chain
);
} catch (e) {
error = (e as Error).message;
}
let dbData: RPCResponse = {
updated: Date.now(),
data: "",
};
if (rpcResp) {
rpcResp = rpcResp as JSONRPCResponseWithResult;
if (false === rpcResp.result) {
error = true;
}
rpcResp = rpcResp as unknown as JSONRPCResponseWithError;
if (rpcResp.error && typeof rpcResp.error === "object") {
error = (rpcResp.error as JSONRPCError).message;
}
}
if (error) {
dbData.error = error as string;
} else {
dbData.data = (rpcResp as unknown as JSONRPCResponseWithResult).result;
}
if (
(!processedRequests.get(reqId) || request.bypassCache) &&
dbData.data?.error !== ERR_NOT_READY
) {
processedRequests.set(reqId, dbData);
}
await lock.release();
return dbData;
}
export async function processRpcRequest(
request: JSONRPCRequest,
chain: string
): Promise<JSONRPCResponseWithResult | JSONRPCResponseWithError | undefined> {
return new Promise((resolve) => {
jsonServer.call(
request,
{ chain },
(
err?: JSONRPCResponseWithError | null,
result?: JSONRPCResponseWithResult
): void => {
if (err) {
return resolve(err);
}
resolve(result);
}
);
});
}
import { getRpcServer } from "./rpc/server.js";
export async function start() {
if (!config.str("pocket-app-id") || !config.str("pocket-app-key")) {
errorExit("Please set pocket-app-id and pocket-app-key config options.");
}
jsonServer = new jayson.Server(rpcMethods, { useContext: true });
(await getDHT("server")).on("connection", RPCConnection.handleRequest);
getRpcServer();
await startDns();
}
class RPCConnection {
private _socket: any;
constructor(socket: any) {
this._socket = socket;
socket.rawStream._ondestroy = () => false;
socket.once("data", this.checkRpc.bind(this));
}
private async checkRpc(data: Buffer) {
if (data.toString() === "rpc") {
this._socket.once("data", this.processRequest);
}
}
private async processRequest(data: Buffer) {
let request: RPCRequest;
try {
request = unpack(data) as RPCRequest;
} catch (e) {
return;
}
const that = this as any;
let response;
try {
response = await maybeProcessRequest(request);
} catch (error) {
log.trace(error);
that.write(pack({ error }));
that.end();
return;
}
if (response.data?.streamId) {
const stream = getStream(
response.data?.streamId
) as AsyncIterable<Uint8Array>;
const emptyData = Uint8Array.from([]);
const streamResp = {
data: {
data: emptyData,
done: false,
} as StreamFileResponse,
};
for await (const chunk of stream) {
streamResp.data.data = chunk as unknown as Uint8Array;
that.write(pack(streamResp));
}
streamResp.data.data = emptyData;
streamResp.data.done = true;
response = streamResp;
}
that.write(pack(response));
that.end();
}
public static handleRequest(socket: any) {
new RPCConnection(socket);
}
}

View File

@ -1,120 +0,0 @@
import { maybeMapChainId, reverseMapChainId } from "../util.js";
import minimatch from "minimatch";
// @ts-ignore
import HTTPClient from "algosdk/dist/cjs/src/client/client.js";
import { sprintf } from "sprintf-js";
import { rpcError, RpcMethodList } from "./index.js";
import config from "../config.js";
import {
ERR_ENDPOINT_INVALID,
ERR_INVALID_CHAIN,
ERR_METHOD_INVALID,
} from "../error.js";
const allowedEndpoints: { [endpoint: string]: ("GET" | "POST")[] } = {
"/v2/teal/compile": ["POST"],
"/v2/accounts/*": ["GET"],
};
export function proxyRestMethod(
apiServer: string,
matchChainId: string
): Function {
return async function (args: any, context: object) {
// @ts-ignore
let chain = context.chain;
let chainId = maybeMapChainId(chain);
if (!chainId) {
return rpcError(ERR_INVALID_CHAIN);
}
chainId = reverseMapChainId(chainId as string);
if (!chainId || chainId !== matchChainId) {
return rpcError(ERR_INVALID_CHAIN);
}
let method = args.method ?? false;
let endpoint = args.endpoint ?? false;
let data = args.data ?? false;
let query = args.query ?? false;
let fullHeaders = args.fullHeaders ?? {};
fullHeaders = { ...fullHeaders, Referer: "lumeweb_dns_relay" };
if (method) {
method = method.toUpperCase();
}
if (!endpoint) {
throw new Error("Endpoint Missing");
}
let found = false;
for (const theEndpoint in allowedEndpoints) {
if (minimatch(endpoint, theEndpoint)) {
found = true;
break;
}
}
if (!found) {
return rpcError(ERR_ENDPOINT_INVALID);
}
let apiUrl;
try {
apiUrl = sprintf(apiServer, chainId, config.str("pocket-app-id"));
} catch (e) {
apiUrl = apiServer;
}
const client = new HTTPClient({}, apiUrl);
let resp;
switch (method) {
case "GET":
resp = await client.get(endpoint, query, fullHeaders);
break;
case "POST":
if (Array.isArray(data?.data)) {
data = new Uint8Array(Buffer.from(data.data));
}
resp = await client.post(endpoint, data, { ...fullHeaders });
break;
default:
return rpcError(ERR_METHOD_INVALID);
}
const getCircularReplacer = () => {
const seen = new WeakSet();
return (key: string, value: any): any => {
if (typeof value === "object" && value !== null) {
if (seen.has(value)) {
return;
}
seen.add(value);
}
return value;
};
};
if (resp?.body && "current-round" in resp?.body) {
delete resp?.body["current-round"];
}
return JSON.parse(JSON.stringify(resp?.body, getCircularReplacer()));
};
}
export default {
algorand_rest_request: proxyRestMethod(
"http://mainnet-api.algonode.network",
"algorand-mainnet"
),
//'algorand_rest_request': proxyRestMethod("https://%s.gateway.pokt.network/v1/lb/%s", "algorand-mainnet"),
algorand_rest_indexer_request: proxyRestMethod(
"http://mainnet-idx.algonode.network",
"algorand-mainnet-indexer"
),
} as RpcMethodList;

View File

@ -1,142 +0,0 @@
import { ethers } from "ethers";
import { Pocket, PocketAAT } from "@pokt-network/pocket-js/dist/index.js";
import { maybeMapChainId, reverseMapChainId } from "../util.js";
import { Connection } from "@solana/web3.js";
import { getAat, getPocketServer } from "../rpc.js";
import config, { usePocketGateway } from "../config.js";
import { ERR_INVALID_CHAIN } from "../error.js";
type RpcProviderMethod = (method: string, params: Array<any>) => Promise<any>;
interface RpcContext {
chain?: string;
}
const gatewayProviders: { [name: string]: RpcProviderMethod } = {};
const gatewayMethods: {
[name: string]: (chainId: string) => RpcProviderMethod;
} = {
default: (chainId: string): RpcProviderMethod => {
const provider = new ethers.providers.JsonRpcProvider({
url: `https://${chainId}.gateway.pokt.network/v1/lb/${config.str(
"pocket-app-id"
)}`,
password: config.str("pocket-app-key"),
});
return provider.send.bind(provider);
},
"solana-mainnet": (chainId: string): RpcProviderMethod => {
const provider = new Connection(
`https://solana-mainnet.gateway.pokt.network/v1/lb/${config.str(
"pocket-app-id"
)}`
);
// @ts-ignore
return provider._rpcRequest.bind(provider);
},
};
export function proxyRpcMethod(
method: string,
chains: string[] = []
): Function {
return async function (args: any, context: RpcContext) {
// @ts-ignore
let chain = context.chain;
let chainId = maybeMapChainId(chain as string);
let chainMatch = true;
if (
chains.length > 0 &&
!chains.includes(chain as string) &&
!chains.includes(chainId.toString())
) {
chainMatch = false;
}
if (!chainId || !chainMatch) {
return rpcError(ERR_INVALID_CHAIN);
}
if (usePocketGateway()) {
chainId = reverseMapChainId(chainId as string);
if (!chainId) {
return rpcError(ERR_INVALID_CHAIN);
}
let provider: RpcProviderMethod | boolean =
gatewayProviders[chainId as string] || false;
if (!provider) {
provider = getRpcProvider(chainId as string);
}
gatewayProviders[chainId as string] = provider;
let resp;
try {
resp = await provider(method, args);
} catch (e: any) {
e = e as Error;
if ("error" in e) {
return e.error;
}
return e;
}
return resp;
}
return await sendRelay(
JSON.stringify(args),
<string>chainId,
getAat() as unknown as PocketAAT
);
};
}
// Call this every time you want to fetch RPC data
async function sendRelay(
rpcQuery: string,
blockchain: string,
pocketAAT: PocketAAT
) {
try {
return await (getPocketServer() as unknown as Pocket).sendRelay(
rpcQuery,
blockchain,
pocketAAT
);
} catch (e) {
console.log(e);
throw e;
}
}
function getRpcProvider(chain: string): RpcProviderMethod {
if (chain in gatewayMethods) {
return gatewayMethods[chain](chain);
}
return gatewayMethods.default(chain);
}
class RpcError extends Error {
public code: number = -1;
}
export function rpcError(message: string): Promise<RpcError> {
return Promise.reject(new RpcError(message));
}
export function validateChain(chain: string, handler: any) {
return async (args: any, context: RpcContext) => {
if (!context?.chain || chain !== context?.chain) {
return rpcError(ERR_INVALID_CHAIN);
}
return handler(args, context);
};
}

67
src/rpc/connection.ts Normal file
View File

@ -0,0 +1,67 @@
import {
RPCRequest,
RPCResponse,
RPCStreamHandler,
StreamFileResponse,
} from "../types";
import { pack, unpack } from "msgpackr";
import log from "loglevel";
import { getRpcServer } from "./server";
export default class RPCConnection {
private _socket: any;
constructor(socket: any) {
this._socket = socket;
socket.rawStream._ondestroy = () => false;
socket.once("data", this.checkRpc.bind(this));
}
private async checkRpc(data: Buffer) {
if (data.toString() === "rpc") {
this._socket.once("data", this.processRequest);
}
}
private async processRequest(data: Buffer) {
let request: RPCRequest;
try {
request = unpack(data) as RPCRequest;
} catch (e) {
return;
}
const that = this as any;
let response;
const handleStream: RPCStreamHandler = async (
stream: AsyncIterable<Uint8Array>
): Promise<RPCResponse> => {
const emptyData = Uint8Array.from([]);
const streamResp = {
data: {
data: emptyData,
done: false,
} as StreamFileResponse,
};
for await (const chunk of stream) {
streamResp.data.data = chunk as unknown as Uint8Array;
that.write(pack(streamResp));
}
streamResp.data.data = emptyData;
streamResp.data.done = true;
return streamResp;
};
try {
response = await getRpcServer().handleRequest(request, handleStream);
} catch (error) {
log.trace(error);
that.write(pack({ error }));
that.end();
return;
}
that.write(pack(response));
that.end();
}
}

View File

@ -1,108 +0,0 @@
//import { createRequire } from "module";
//const require = createRequire(import.meta.url);
import { isIp } from "../util.js";
import { RpcMethodList, validateChain } from "./index.js";
// @ts-ignore
import bns from "bns";
const { StubResolver, RecursiveResolver } = bns;
const resolverOpt = {
tcp: true,
inet6: false,
edns: true,
dnssec: true,
};
const globalResolver = new RecursiveResolver(resolverOpt);
globalResolver.hints.setDefault();
globalResolver.open();
async function resolveNameServer(ns: string): Promise<string | boolean> {
if (isIp(ns)) {
return ns;
}
let result = await getDnsRecords(ns, "A");
if (result.length) {
return result[0];
}
return false;
}
async function getDnsRecords(
domain: string,
type: string,
authority: boolean = false,
resolver = globalResolver
): Promise<string[]> {
let result;
try {
result = await resolver.lookup(domain, type);
} catch (e) {
return [];
}
let prop = authority ? "authority" : "answer";
if (!result || !result[prop].length) {
return [];
}
return result[prop].map(
(item: object) =>
// @ts-ignore
item.data.address ?? item.data.target ?? item.data.ns ?? null
);
}
export default {
dnslookup: validateChain("icann", async function (args: any) {
let dnsResults: string[] = [];
let domain = args.domain;
let ns = args.nameserver;
let dnsResolver = ns ? new StubResolver(resolverOpt) : globalResolver;
await dnsResolver.open();
if (ns) {
let nextNs = ns;
let prevNs = null;
while (nextNs) {
nextNs = await resolveNameServer(nextNs);
if (!nextNs) {
nextNs = prevNs;
}
dnsResolver.setServers([nextNs]);
if (nextNs === prevNs) {
break;
}
let result = await getDnsRecords(domain, "NS", true, dnsResolver);
prevNs = nextNs;
nextNs = result.length ? result[0] : false;
}
}
for (const queryType of ["CNAME", "A"]) {
let result = await getDnsRecords(domain, queryType, false, dnsResolver);
if (result) {
dnsResults = dnsResults.concat(result);
}
}
await dnsResolver.close();
dnsResults = dnsResults.filter(Boolean);
if (dnsResults.length) {
return dnsResults[0];
}
return false;
}),
} as RpcMethodList;

View File

@ -1,14 +0,0 @@
import { proxyRpcMethod } from "./common.js";
import { RpcMethodList } from "./index.js";
const rpcMethods: RpcMethodList = {};
function proxyEvmRpcMethod(method: string): Function {
return proxyRpcMethod(method);
}
["eth_call", "eth_chainId", "net_version"].forEach((method) => {
rpcMethods[method] = proxyEvmRpcMethod(method);
});
export default rpcMethods;

View File

@ -1,99 +0,0 @@
//const require = createRequire(import.meta.url);
//import { createRequire } from "module";
import { rpcError, RpcMethodList, validateChain } from "./index.js";
// @ts-ignore
import rand from "random-key";
// @ts-ignore
import SPVNode from "hsd/lib/node/spvnode.js";
import config from "../config.js";
import { ERR_INVALID_CHAIN, ERR_NOT_READY } from "../error.js";
// @ts-ignore
import { NodeClient } from "hs-client";
let hsdServer: SPVNode;
let clientArgs = {
network: "main",
host: "127.0.0.1",
port: 12037,
apiKey: rand.generate(),
};
if (!config.bool("hsd-use-external-node")) {
hsdServer = new SPVNode({
config: false,
argv: false,
env: true,
noDns: true,
memory: false,
httpHost: "127.0.0.1",
apiKey: clientArgs.apiKey,
logFile: false,
logConsole: true,
logLevel: "info",
workers: true,
network: "main",
});
hsdServer.on("abort", async (err: any) => {
const timeout = setTimeout(() => {
console.error("Shutdown is taking a long time. Exiting.");
process.exit(3);
}, 5000);
timeout.unref();
try {
console.error("Shutting down...");
await hsdServer.close();
clearTimeout(timeout);
console.error((err as Error).stack);
process.exit(2);
} catch (e: any) {
console.error(`Error occurred during shutdown: ${(e as Error).message}`);
process.exit(3);
}
});
(async () => {
try {
await hsdServer.ensure();
await hsdServer.open();
await hsdServer.connect();
hsdServer.startSync();
} catch (e: any) {
console.error((e as Error).stack);
}
})();
} else {
clientArgs = {
network: config.str("hsd-network-type"),
host: config.str("hsd-host"),
port: config.uint("hsd-port"),
apiKey: config.str("hsd-api-key"),
};
}
const hnsClient = new NodeClient(clientArgs);
export default {
getnameresource: validateChain("hns", async (args: any) => {
let resp;
try {
resp = await hnsClient.execute("getnameresource", args);
} catch (e: any) {
e = e as Error;
const eType = e.type.toLowerCase();
const eMessage = e.message.toLowerCase();
if (eType === "rpcerror" && eMessage.includes("chain is not synced")) {
return rpcError(ERR_NOT_READY);
}
return rpcError(eMessage);
}
return resp;
}),
} as RpcMethodList;

View File

@ -1,22 +0,0 @@
export type RpcMethodList = { [name: string]: Function };
export * from "./common.js";
import { default as DnsMethods } from "./dns.js";
import { default as EvmMethods } from "./evm.js";
import { default as HnsMethods } from "./handshake.js";
import { default as SolMethods } from "./solana.js";
import { default as AlgoMethods } from "./algorand.js";
import { default as IpfsMethods } from "./ipfs.js";
import { default as MiscMethods } from "./misc.js";
export const rpcMethods: RpcMethodList = Object.assign(
{},
DnsMethods,
EvmMethods,
HnsMethods,
SolMethods,
AlgoMethods,
IpfsMethods,
MiscMethods
);

View File

@ -1,243 +0,0 @@
import { rpcError, RpcMethodList, validateChain } from "./index.js";
import type { IPFS } from "ipfs-core";
import { dynImport } from "../util.js";
import { CID } from "multiformats/cid";
// @ts-ignore
import toStream from "it-to-stream";
import { addStream } from "../streams.js";
import { ERR_HASH_IS_DIRECTORY } from "../error.js";
import type { StatResult } from "ipfs-core/dist/src/components/files/stat";
let client: IPFS | Promise<any>;
let utils: typeof import("ipfs-http-response").utils;
let detectContentType: typeof import("ipfs-http-response").utils.detectContentType;
interface StatFileResponse {
exists: boolean;
contentType: string | null;
error: any;
directory: boolean;
files: StatFileSubfile[];
timeout: boolean;
size: number;
}
interface StatFileSubfile {
name: string;
size: number;
}
function normalizeCidPath(path: any) {
if (path instanceof Uint8Array) {
return CID.decode(path).toString();
}
path = path.toString();
if (path.indexOf("/ipfs/") === 0) {
path = path.substring("/ipfs/".length);
}
if (path.charAt(path.length - 1) === "/") {
path = path.substring(0, path.length - 1);
}
return path;
}
async function initIpfs() {
if (client) {
if (client instanceof Promise) {
await client;
}
return;
}
const IPFS: typeof import("ipfs-http-client") = await dynImport(
"ipfs-http-client"
);
const ipfsHttpResponse: typeof import("ipfs-http-response") = await dynImport(
"ipfs-http-response"
);
utils = ipfsHttpResponse.utils;
detectContentType = utils.detectContentType;
client = IPFS.create({
host: "127.0.0.1",
});
client = await client;
}
initIpfs();
function normalizePath(
hash?: string,
path?: string,
fullPath?: string
): string {
if (!fullPath) {
if (!path) {
path = "/";
}
fullPath = `${hash}/${path}`;
}
fullPath = fullPath.replace(/\/{2,}/, "/");
return normalizeCidPath(fullPath);
}
async function fetchFile(hash?: string, path?: string, fullPath?: string) {
let data = await fileExists(hash, path, fullPath);
if (data instanceof Error) {
return data;
}
if (data?.type === "directory") {
return rpcError(ERR_HASH_IS_DIRECTORY);
}
client = client as IPFS;
const streamId = addStream(client.cat(data.cid));
return { streamId };
}
async function statFile(hash?: string, path?: string, fullPath?: string) {
let stats: StatFileResponse = {
exists: false,
contentType: null,
error: null,
directory: false,
files: [],
timeout: false,
size: 0,
};
client = client as IPFS;
let exists = await fileExists(hash, path, fullPath);
fullPath = normalizePath(hash, path, fullPath);
if (exists instanceof Error) {
stats.error = exists.toString();
if (exists.message.includes("aborted")) {
stats.timeout = true;
}
return stats;
}
stats.exists = true;
if (exists?.type === "directory") {
stats.directory = true;
for await (const item of client.ls(exists.cid)) {
stats.files.push({
name: item.name,
size: item.size,
} as StatFileSubfile);
}
return stats;
}
const { size } = await client.files.stat(`/ipfs/${exists.cid}`);
stats.size = size;
const { contentType } = await detectContentType(
fullPath,
client.cat(exists.cid)
);
stats.contentType = contentType ?? null;
return stats;
}
async function fileExists(
hash?: string,
path?: string,
fullPath?: string
): Promise<Error | StatResult> {
await initIpfs();
client = client as IPFS;
let ipfsPath = normalizePath(hash, path, fullPath);
try {
const controller = new AbortController();
// setTimeout(() => controller.abort(), 5000);
const ret = await client.files.stat(`/ipfs/${ipfsPath}`, {
signal: controller.signal,
});
return ret;
} catch (err: any) {
return err;
}
}
async function resolveIpns(
hash: string,
path: string
): Promise<string | boolean> {
client = client as IPFS;
for await (const result of client.name.resolve(hash)) {
return normalizePath(undefined, undefined, `${result}/${path}`);
}
return false;
}
const CHAIN = "ipfs";
export default {
stat_ipfs: validateChain(CHAIN, async (args: any) => {
try {
return await statFile(args?.hash, args?.path);
} catch (e: any) {
return rpcError((e as Error).message);
}
}),
stat_ipns: validateChain(CHAIN, async (args: any) => {
try {
let ipfsPath = await resolveIpns(args?.hash, args?.path);
if (!ipfsPath) {
throw new Error("ipns lookup failed");
}
return statFile(undefined, undefined, ipfsPath as string);
} catch (e: any) {
return rpcError((e as Error).message);
}
}),
fetch_ipfs: validateChain(CHAIN, async (args: any) => {
try {
const ret = await fetchFile(args?.hash, args?.path);
if (ret instanceof Error) {
throw ret;
}
return ret;
} catch (e: any) {
return rpcError((e as Error).message);
}
}),
fetch_ipns: validateChain(CHAIN, async (args: any) => {
try {
let ipfsPath = await resolveIpns(args?.hash, args?.path);
if (!ipfsPath) {
throw new Error("ipns lookup failed");
}
const ret = await fetchFile(undefined, undefined, ipfsPath as string);
if (ret instanceof Error) {
throw ret;
}
return ret;
} catch (e: any) {
return rpcError((e as Error).message);
}
}),
} as RpcMethodList;

View File

@ -1,12 +0,0 @@
import { RpcMethodList, rpcMethods, validateChain } from "./index.js";
const CHAIN = "misc";
export default {
ping: validateChain(CHAIN, async () => {
return { pong: true };
}),
get_methods: validateChain(CHAIN, async () => {
return Object.keys(rpcMethods);
}),
} as RpcMethodList;

217
src/rpc/server.ts Normal file
View File

@ -0,0 +1,217 @@
import {
RPC_REQUEST_SCHEMA,
RPCMethod,
RPCRequest,
RPCResponse,
RPCStreamHandler,
} from "../types.js";
import NodeCache from "node-cache";
import { get as getDHT } from "../dht.js";
import { Mutex } from "async-mutex";
import crypto from "crypto";
// @ts-ignore
import stringify from "json-stable-stringify";
import Ajv from "ajv";
import RPCConnection from "./connection.js";
const ajv = new Ajv();
ajv.addSchema(RPC_REQUEST_SCHEMA, "rpc_request");
let server: RPCServer;
export function getRpcServer(): RPCServer {
if (!server) {
server = new RPCServer();
}
return server as RPCServer;
}
export class RPCServer {
private methods = new Map<string, Map<string, RPCMethod>>();
private pendingRequests = new NodeCache();
private processedRequests = new NodeCache({
stdTTL: 60 * 60 * 12,
});
constructor() {
this.init();
}
public registerMethod(
moduleName: string,
methodName: string,
options: RPCMethod
): void {
const module = this.methods.get(moduleName);
if (module && module.get(methodName)) {
throw new Error(
`Method ${methodName} already exists for module ${moduleName}`
);
}
let methodMap: Map<string, RPCMethod> | null = null;
if (!module) {
methodMap = new Map<string, RPCMethod>();
this.methods.set(moduleName, methodMap);
}
if (!methodMap) {
methodMap = this.methods.get(moduleName) as Map<string, RPCMethod>;
}
methodMap.set(methodName, options);
}
private async init(): Promise<void> {
(await getDHT("server")).on(
"connection",
(socket: any) => new RPCConnection(socket)
);
}
public async handleRequest(
request: RPCRequest,
streamHandler: RPCStreamHandler
): Promise<RPCResponse> {
let valid = this.verifyRequest(request);
if (valid instanceof Error) {
return {
error: valid.message,
};
}
let lockedRequest = await this.waitOnRequestLock(request);
if (lockedRequest) {
return lockedRequest;
}
let cachedRequest = this.getCachedRequest(request);
if (cachedRequest) {
return cachedRequest;
}
let method = this.getMethodByRequest(request) as RPCMethod;
let result;
let isStream: AsyncIterable<Uint8Array> | boolean = false;
const flagIsStream = (stream: AsyncIterable<Uint8Array>) => {
isStream = stream;
};
try {
result = await method.handler(request, flagIsStream);
} catch (e) {
return {
error: (e as Error).message,
};
}
if (isStream) {
result = await streamHandler(isStream);
}
result = result as RPCResponse;
cachedRequest = this.getCachedRequest(request);
if (!cachedRequest && !isStream) {
this.cacheRequest(request, result);
}
return result;
}
private async waitOnRequestLock(request: RPCRequest) {
let method = this.getMethodByRequest(request) as RPCMethod;
if (!method.cachable) {
return;
}
const reqId = RPCServer.getRequestId(request);
let lock: Mutex = this.pendingRequests.get(reqId) as Mutex;
const lockExists = !!lock;
if (!lockExists) {
lock = new Mutex();
this.pendingRequests.set(reqId, lock);
}
if (lock.isLocked()) {
await lock.waitForUnlock();
return this.processedRequests.get(reqId) as RPCResponse;
}
await lock.acquire();
}
private getCachedRequest(request: RPCRequest): RPCResponse | undefined {
let method = this.getMethodByRequest(request) as RPCMethod;
if (!method.cachable) {
return;
}
const reqId = RPCServer.getRequestId(request);
if (!request.bypassCache && this.processedRequests.get(reqId)) {
return this.processedRequests.get(reqId) as RPCResponse;
}
}
private cacheRequest(request: RPCRequest, response: RPCResponse): void {
const reqId = RPCServer.getRequestId(request);
this.processedRequests.set(reqId, response);
}
private static hash(data: string): string {
return crypto.createHash("sha256").update(data).digest("hex");
}
private static getRequestId(request: RPCRequest) {
const clonedRequest = Object.assign({}, request) as RPCRequest;
delete clonedRequest.bypassCache;
return RPCServer.hash(stringify(clonedRequest));
}
private verifyRequest(request: RPCRequest) {
let valid: any = ajv.getSchema("rpc_request")?.(request);
if (!valid) {
return new Error("Invalid request");
}
valid = this.getMethodByRequest(request);
if (valid instanceof Error) {
return valid;
}
return true;
}
private getMethodByRequest(request: RPCRequest): Error | RPCMethod {
return this.getMethod(request.module, request.method);
}
private getMethod(moduleName: string, method: string): Error | RPCMethod {
let item: any = this.methods.get(moduleName);
if (!item) {
return new Error("Invalid module");
}
item = item.get(method);
if (!item) {
return new Error("Invalid method");
}
return item;
}
}

View File

@ -1,9 +0,0 @@
import { proxyRpcMethod } from "./common.js";
import { RpcMethodList } from "./index.js";
import chainNetworks from "@lumeweb/pokt-rpc-endpoints";
export default {
getAccountInfo: proxyRpcMethod("getAccountInfo", [
chainNetworks["solana-mainnet"],
]),
} as RpcMethodList;

View File

@ -1,33 +0,0 @@
function idFactory(start = 1, step = 1, limit = 2 ** 32) {
let id = start;
return function nextId() {
const nextId = id;
id += step;
if (id >= limit) id = start;
return nextId;
};
}
export interface StreamFileResponse {
data?: Uint8Array;
done: boolean;
}
const nextId = idFactory(1);
const streams = new Map<number, AsyncIterable<Uint8Array>>();
export function getStream(id: number): AsyncIterable<Uint8Array> | boolean {
if (!streams.has(id)) {
return false;
}
return streams.get(id) as AsyncIterable<Uint8Array>;
}
export function addStream(stream: AsyncIterable<Uint8Array>): number {
const id = nextId();
streams.set(id, stream);
return id;
}

72
src/types.ts Normal file
View File

@ -0,0 +1,72 @@
import { JSONSchemaType } from "ajv";
import { PluginAPI } from "./plugin.js";
export interface RPCRequest {
bypassCache?: boolean;
module: string;
method: string;
data: string;
}
export interface RPCResponse {
updated?: number;
data?: any;
error?: string;
}
export interface RPCMethod {
cachable: boolean;
handler: (
request: RPCRequest,
sendStream: (stream: AsyncIterable<Uint8Array>) => void
) => RPCResponse | null;
}
export const RPC_REQUEST_SCHEMA: JSONSchemaType<RPCRequest> = {
anyOf: [],
oneOf: [],
type: "object",
properties: {
module: {
type: "string",
},
method: {
type: "string",
},
data: {
type: "string",
anyOf: [
{ type: "string" },
{ type: "number" },
{ type: "integer" },
{ type: "object" },
{ type: "array" },
],
},
bypassCache: {
type: "boolean",
nullable: true,
},
},
};
export interface StreamFileResponse {
data?: Uint8Array;
done: boolean;
}
export interface RelayPluginAPI {
config: any;
api: {
registerMethod: (methodName: string, method: RPCMethod) => void;
loadPlugin: PluginAPI["loadPlugin"];
};
}
export type PluginFunction = (api: RelayPluginAPI) => Promise<void>;
export interface Plugin {
name: string;
plugin: PluginFunction;
exports?: any;
}
export type RPCStreamHandler = (
stream: AsyncIterable<Uint8Array>
) => Promise<RPCResponse>;

View File

@ -1751,6 +1751,16 @@ agent-base@6:
dependencies:
debug "4"
ajv@^8.11.0:
version "8.11.0"
resolved "https://registry.yarnpkg.com/ajv/-/ajv-8.11.0.tgz#977e91dd96ca669f54a11e23e378e33b884a565f"
integrity sha512-wGgprdCvMalC0BztXvitD2hC04YffAvtsUn93JbGXYLAtCUO4xd17mCCZQxUOItiBwZvJScWo8NIvQMQ71rdpg==
dependencies:
fast-deep-equal "^3.1.1"
json-schema-traverse "^1.0.0"
require-from-string "^2.0.2"
uri-js "^4.2.2"
algo-msgpack-with-bigint@^2.1.1:
version "2.1.1"
resolved "https://registry.yarnpkg.com/algo-msgpack-with-bigint/-/algo-msgpack-with-bigint-2.1.1.tgz#38bb717220525b3ff42232eefdcd9efb9ad405d6"
@ -3624,12 +3634,17 @@ eyes@^0.1.8:
resolved "https://registry.yarnpkg.com/eyes/-/eyes-0.1.8.tgz#62cf120234c683785d902348a800ef3e0cc20bc0"
integrity sha512-GipyPsXO1anza0AOZdy69Im7hGFCNB7Y/NGjDlZGJ3GJJLtwNSb2vrzYrTYJRrRloVx7pl+bhUaTB8yiccPvFQ==
fast-deep-equal@^3.1.1:
version "3.1.3"
resolved "https://registry.yarnpkg.com/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz#3a7d56b559d6cbc3eb512325244e619a65c6c525"
integrity sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==
fast-fifo@^1.0.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/fast-fifo/-/fast-fifo-1.1.0.tgz#17d1a3646880b9891dfa0c54e69c5fef33cad779"
integrity sha512-Kl29QoNbNvn4nhDsLYjyIAaIqaJB6rBx5p3sL9VjaefJ+eMFBWVZiaoguaoZfzEKr5RhAti0UgM8703akGPJ6g==
fast-glob@^3.2.9:
fast-glob@^3.2.11, fast-glob@^3.2.9:
version "3.2.11"
resolved "https://registry.yarnpkg.com/fast-glob/-/fast-glob-3.2.11.tgz#a1172ad95ceb8a16e20caa5c5e56480e5129c1d9"
integrity sha512-xrO3+1bxSo3ZVHAnqzyuewYT6aMFHRAd4Kcs92MAonjwQZLsK9d0SF1IyQ3k5PoirxTW0Oe/RqFgMQ6TcNE5Ew==
@ -3928,6 +3943,17 @@ globby@^11.1.0:
merge2 "^1.4.1"
slash "^3.0.0"
globby@^13.1.2:
version "13.1.2"
resolved "https://registry.yarnpkg.com/globby/-/globby-13.1.2.tgz#29047105582427ab6eca4f905200667b056da515"
integrity sha512-LKSDZXToac40u8Q1PQtZihbNdTYSNMuWe+K5l+oa6KgDzSvVrHXlJy40hUP522RjAIoNLJYBJi7ow+rbFpIhHQ==
dependencies:
dir-glob "^3.0.1"
fast-glob "^3.2.11"
ignore "^5.2.0"
merge2 "^1.4.1"
slash "^4.0.0"
goosig@~0.10.0:
version "0.10.0"
resolved "https://registry.yarnpkg.com/goosig/-/goosig-0.10.0.tgz#f73da6234af54bd47420c29672ce877b6c05d902"
@ -5091,7 +5117,7 @@ jake@^10.8.5:
filelist "^1.0.1"
minimatch "^3.0.4"
jayson@^3.4.4, jayson@^3.6.6:
jayson@^3.4.4:
version "3.6.6"
resolved "https://registry.yarnpkg.com/jayson/-/jayson-3.6.6.tgz#189984f624e398f831bd2be8e8c80eb3abf764a1"
integrity sha512-f71uvrAWTtrwoww6MKcl9phQTC+56AopLyEenWvKVAIMz+q0oVGj6tenLZ7Z6UiPBkJtKLj4kt0tACllFQruGQ==
@ -5149,6 +5175,11 @@ json-bigint@^1.0.0:
dependencies:
bignumber.js "^9.0.0"
json-schema-traverse@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz#ae7bcb3656ab77a73ba5c49bf654f38e6b6860e2"
integrity sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==
json-stable-stringify@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/json-stable-stringify/-/json-stable-stringify-1.0.1.tgz#9a759d39c5f2ff503fd5300646ed445f88c4f9af"
@ -6490,7 +6521,7 @@ punycode@^1.3.2:
resolved "https://registry.yarnpkg.com/punycode/-/punycode-1.4.1.tgz#c0d5a63b2718800ad8e1eb0fa5269c84dd41845e"
integrity sha512-jmYNElW7yvO7TV33CjSmvSiE2yco3bV2czu/OzDKdMNVZQWfxCblURLhf+47syQRBntjfLdd/H0egrzIG+oaFQ==
punycode@^2.1.1:
punycode@^2.1.0, punycode@^2.1.1:
version "2.1.1"
resolved "https://registry.yarnpkg.com/punycode/-/punycode-2.1.1.tgz#b58b010ac40c22c5657616c8d2c2c02c7bf479ec"
integrity sha512-XRsRjdf+j5ml+y/6GKHPZbrF/8p2Yga0JPtdqTIY2Xe5ohJPD9saDJJLPvp9+NSBprVvevdXZybnj2cv8OEd0A==
@ -6698,6 +6729,11 @@ require-directory@^2.1.1:
resolved "https://registry.yarnpkg.com/require-directory/-/require-directory-2.1.1.tgz#8c64ad5fd30dab1c976e2344ffe7f792a6a6df42"
integrity sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q==
require-from-string@^2.0.2:
version "2.0.2"
resolved "https://registry.yarnpkg.com/require-from-string/-/require-from-string-2.0.2.tgz#89a7fdd938261267318eafe14f9c32e598c36909"
integrity sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==
requires-port@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/requires-port/-/requires-port-1.0.0.tgz#925d2601d39ac485e091cf0da5c6e694dc3dcaff"
@ -7040,11 +7076,21 @@ slash@^3.0.0:
resolved "https://registry.yarnpkg.com/slash/-/slash-3.0.0.tgz#6539be870c165adbd5240220dbe361f1bc4d4634"
integrity sha512-g9Q1haeby36OSStwb4ntCGGGaKsaVSjQ68fBxoQcutl5fS1vuY18H3wSt3jFyFtrkx+Kz0V1G85A4MyAdDMi2Q==
slash@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/slash/-/slash-4.0.0.tgz#2422372176c4c6c5addb5e2ada885af984b396a7"
integrity sha512-3dOsAHXXUkQTpOYcoAxLIorMTp4gIQr5IW3iVb7A7lFIp0VHhnynm9izx6TssdrIcVIESAlVjtnO2K8bg+Coew==
slide@^1.1.5:
version "1.1.6"
resolved "https://registry.yarnpkg.com/slide/-/slide-1.1.6.tgz#56eb027d65b4d2dce6cb2e2d32c4d4afc9e1d707"
integrity sha512-NwrtjCg+lZoqhFU8fOwl4ay2ei8PaqCBOUV3/ektPY9trO1yQ1oXEfmHAhKArUVUr/hOHvy5f6AdP17dCM0zMw==
slugify@^1.6.5:
version "1.6.5"
resolved "https://registry.yarnpkg.com/slugify/-/slugify-1.6.5.tgz#c8f5c072bf2135b80703589b39a3d41451fbe8c8"
integrity sha512-8mo9bslnBO3tr5PEVFzMPIWwWnipGS0xVbYf65zxDqfNwmzYn1LpiKNrR6DlClusuvo+hDHd1zKpmfAe83NQSQ==
socket.io-client@^4.1.2:
version "4.5.1"
resolved "https://registry.yarnpkg.com/socket.io-client/-/socket.io-client-4.5.1.tgz#cab8da71976a300d3090414e28c2203a47884d84"
@ -7728,6 +7774,13 @@ unpipe@1.0.0, unpipe@~1.0.0:
resolved "https://registry.yarnpkg.com/unpipe/-/unpipe-1.0.0.tgz#b2bf4ee8514aae6165b4817829d21b2ef49904ec"
integrity sha512-pjy2bYhSsufwWlKwPc+l3cN7+wuJlK6uz0YdJEOlQDbl6jo/YlPi4mb8agUkVC8BF7V8NuzeyPNqRksA3hztKQ==
uri-js@^4.2.2:
version "4.4.1"
resolved "https://registry.yarnpkg.com/uri-js/-/uri-js-4.4.1.tgz#9b1a52595225859e55f669d928f88c6c57f2a77e"
integrity sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==
dependencies:
punycode "^2.1.0"
urkel@~1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/urkel/-/urkel-1.0.2.tgz#c3e97f1266dcce30428ad817171497efe30b0793"