diff --git a/LICENSE b/LICENSE index 13a5b6e..8995c8b 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2022 Lume Web +Copyright (c) 2022 Hammer Technologies LLC Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/package.json b/package.json new file mode 100644 index 0000000..e1322da --- /dev/null +++ b/package.json @@ -0,0 +1,25 @@ +{ + "name": "@lumeweb/relay", + "type": "module", + "version": "0.1.0", + "description": "", + "main": "build/index.js", + "dependencies": { + "@hyperswarm/dht": "^6.0.1", + "@hyperswarm/dht-relay": "^0.3.0", + "@types/node": "^18.0.0", + "@types/ws": "^8.5.3", + "async-mutex": "^0.3.2", + "jayson": "^3.6.6", + "json-stable-stringify": "^1.0.1", + "libskynet": "^0.0.48", + "libskynetnode": "^0.1.3", + "msgpackr": "^1.6.1", + "node-cache": "^5.1.2", + "random-access-memory": "^4.1.0" + }, + "devDependencies": { + "hyper-typings": "^1.0.0", + "prettier": "^2.7.1" + } +} diff --git a/src/dht.ts b/src/dht.ts new file mode 100644 index 0000000..3e2f49e --- /dev/null +++ b/src/dht.ts @@ -0,0 +1,46 @@ +import { createRequire } from "module"; +const require = createRequire(import.meta.url); +const DHT = require("@hyperswarm/dht"); +import { errorExit } from "./util.js"; +import { + deriveMyskyRootKeypair, + ed25519Keypair, + seedPhraseToSeed, + validSeedPhrase, +} from "libskynet"; + +let server: { + listen: (arg0: ed25519Keypair) => void; + ready: () => any; +}; + +async function start() { + const RELAY_SEED = process.env.RELAY_SEED ?? null; + + if (!RELAY_SEED) { + errorExit("RELAY_SEED missing. Aborting."); + } + + let [, err] = validSeedPhrase(RELAY_SEED as string); + if (err !== null) { + errorExit("RELAY_SEED is invalid. Aborting."); + } + + const keyPair = deriveMyskyRootKeypair( + seedPhraseToSeed(RELAY_SEED as string)[0] + ); + + const node = new DHT({ keyPair }); + + await node.ready(); + + return (server = node); +} + +export async function get() { + if (!server) { + return start(); + } + + return server; +} diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..a43a6fb --- /dev/null +++ b/src/index.ts @@ -0,0 +1,11 @@ +import { start as startRpc } from "./rpc.js"; +import { start as startRelay } from "./relay.js"; + +startRelay(); +startRpc(); + +process.on("uncaughtException", function (err) { + console.log("Caught exception: " + err); +}); + +export {}; diff --git a/src/relay.ts b/src/relay.ts new file mode 100644 index 0000000..be34d52 --- /dev/null +++ b/src/relay.ts @@ -0,0 +1,57 @@ +import WS from "ws"; + +// @ts-ignore +import DHT from "@hyperswarm/dht"; +// @ts-ignore +import { relay } from "@hyperswarm/dht-relay"; +// @ts-ignore +import Stream from "@hyperswarm/dht-relay/ws"; +import { get as getDHT } from "./dht.js"; +import { overwriteRegistryEntry } from "libskynetnode/dist"; +import { Buffer } from "buffer"; +import { blake2b } from "libskynet/dist"; + +export async function start() { + const RELAY_PORT = process.env.RELAY_PORT ?? (8080 as unknown as string); + + const server = new WS.Server({ + port: RELAY_PORT as unknown as number, + }); + + const dht = await getDHT(); + + await overwriteRegistryEntry( + dht.defaultKeyPair, + hashDataKey("lume-dht-relay"), + stringToUint8ArrayUtf8(`${dht.localAddress()}:${RELAY_PORT}`) + ); + + server.on("connection", (socket) => { + relay(dht, new Stream(false, socket)); + }); +} + +export function hashDataKey(dataKey: string): Uint8Array { + return blake2b(encodeUtf8String(dataKey)); +} + +function encodeUtf8String(str: string): Uint8Array { + const byteArray = stringToUint8ArrayUtf8(str); + const encoded = new Uint8Array(8 + byteArray.length); + encoded.set(encodeNumber(byteArray.length)); + encoded.set(byteArray, 8); + return encoded; +} + +function stringToUint8ArrayUtf8(str: string): Uint8Array { + return Uint8Array.from(Buffer.from(str, "utf-8")); +} + +function encodeNumber(num: number): Uint8Array { + const encoded = new Uint8Array(8); + for (let index = 0; index < encoded.length; index++) { + encoded[index] = num & 0xff; + num = num >> 8; + } + return encoded; +} diff --git a/src/rpc.ts b/src/rpc.ts new file mode 100644 index 0000000..c86dd8f --- /dev/null +++ b/src/rpc.ts @@ -0,0 +1,149 @@ +import crypto from "crypto"; +import jayson from "jayson/promise/index.js"; +import { pack, unpack } from "msgpackr"; +import { Mutex } from "async-mutex"; +import { createRequire } from "module"; +import NodeCache from "node-cache"; +import { get as getDHT } from "./dht.js"; +const require = createRequire(import.meta.url); + +const stringify = require("json-stable-stringify"); + +const clients: { [chain: string]: any } = {}; +const pendingRequests = new NodeCache(); +const processedRequests = new NodeCache({ + stdTTL: 60 * 60 * 12, +}); + +interface RPCRequest { + force: boolean; + chain: string; + query: string; + data: string; +} + +interface RPCResponse { + updated: number; + data: + | any + | { + error: string | boolean; + }; +} + +function hash(data: string): string { + return crypto.createHash("sha256").update(data).digest("hex"); +} + +function getClient(chain: string): Function { + chain = chain.replace(/[^a-z0-9\-]/g, ""); + + if (!(chain in clients)) { + clients[chain] = jayson.Client.http({ + host: process.env.RPC_PROXY_HOST, + port: parseInt(process.env.RPC_PROXY_PORT as string), + path: "/", + headers: { + "X-Chain": chain, + }, + }); + } + + return clients[chain]; +} + +function getRequestId(request: RPCRequest) { + const clonedRequest = Object.assign({}, request); + + // @ts-ignore + delete clonedRequest.force; + + return hash(stringify(clonedRequest)); +} + +function maybeProcessRequest(item: any) { + let request: RPCRequest = unpack(item) as RPCRequest; + + if (!request.chain) { + throw new Error("RPC chain missing"); + } + + if (!request.data) { + throw new Error("RPC data missing"); + } + + return processRequest(request); +} + +async function processRequest(request: RPCRequest): Promise { + const reqId = getRequestId(request); + + let lock: Mutex = pendingRequests.get(reqId) as Mutex; + const lockExists = !!lock; + + if (!lockExists) { + lock = new Mutex(); + pendingRequests.set(reqId, lock); + } + + if (lock.isLocked()) { + await lock.waitForUnlock(); + return processedRequests.get(reqId) as RPCResponse; + } + await lock.acquire(); + + if (!request.force && processedRequests.get(reqId)) { + return processedRequests.get(reqId) as RPCResponse; + } + + let rpcResp; + + let error; + try { + // @ts-ignore + rpcResp = await getClient(request.chain).request( + request.query, + request.data + ); + } catch (e) { + error = (e as Error).message; + } + + let dbData: RPCResponse = { + updated: Date.now(), + data: "", + }; + + if (rpcResp) { + if (false === rpcResp.result) { + error = true; + } + if (rpcResp.error) { + error = rpcResp.error.message; + } + } + + dbData.data = error ? { error } : rpcResp.result; + + if (!processedRequests.get(reqId) || request.force) { + processedRequests.set(reqId, dbData); + } + + await lock.release(); + + return dbData; +} + +export async function start() { + (await getDHT()).on("connection", (socket: any) => { + socket.on("data", async (data: any) => { + try { + socket.write(pack(await maybeProcessRequest(data))); + } catch (error) { + console.trace(error); + socket.write(pack({ error })); + } + socket.end(); + }); + }); +} diff --git a/src/util.ts b/src/util.ts new file mode 100644 index 0000000..740b339 --- /dev/null +++ b/src/util.ts @@ -0,0 +1,4 @@ +export function errorExit(msg: string): void { + console.log(msg); + process.exit(1); +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..570cfad --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "target": "esnext", + "module": "esnext", + "moduleResolution": "node", + "rootDir": "src", + "outDir": "build", + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "strict": true, + "skipLibCheck": true + } +} diff --git a/tslint.json b/tslint.json new file mode 100644 index 0000000..fea9cee --- /dev/null +++ b/tslint.json @@ -0,0 +1,6 @@ +{ + "defaultSeverity": "error", + "extends": ["tslint:latest", "tslint-config-prettier"], + "jsRules": {}, + "rulesDirectory": [] +}