Compare commits

..

No commits in common. "7f1dde272a43ad8f6a5409282b41b722e0997544" and "d3d0f387b66148dcc202a3ef0cb2cd1b7fa5d571" have entirely different histories.

7 changed files with 179 additions and 15 deletions

View File

@ -19,20 +19,21 @@ switch (os.platform()) {
case "linux":
default:
configDir = "/etc/lumeweb/relay/conf.d";
configDir = "/etc/lumeweb/relay/config.d";
break;
}
config.inject({
"core.confdir": configDir,
"core.port": 8080,
"core.loglevel": "info",
"core.plugindir": path.resolve(configDir, "..", "plugins"),
configDir,
port: 8080,
logLevel: "info",
pluginDir: path.resolve(configDir, "..", "plugins"),
cache: true,
});
config.load();
configDir = config.str("core.confdir");
configDir = config.str("configdir");
if (fs.existsSync(configDir)) {
try {
@ -44,6 +45,6 @@ if (fs.existsSync(configDir)) {
config.load();
log.level = config.get("core.loglevel");
log.level = config.get("loglevel");
export default config;

View File

@ -8,7 +8,7 @@ import b4a from "b4a";
const BIP44_PATH = "m/44'/1627'/0'/0'/0'";
export function getSeed() {
const seed = config.str("core.seed");
const seed = config.str("seed");
let valid = bip39.validateMnemonic(seed, wordlist);
if (!valid) {

View File

@ -130,7 +130,7 @@ export class PluginAPIManager {
const paths = [];
for (const modulePath of [`${moduleName}.js`, `${moduleName}.mjs`]) {
const fullPath = path.join(config.get("core.plugindir"), modulePath);
const fullPath = path.join(config.get("plugindir"), modulePath);
if (fs.existsSync(fullPath)) {
paths.push(fullPath);
break;
@ -239,7 +239,7 @@ export async function loadPlugins() {
await apiManager.loadPluginInstance(plugin);
}
for (const plugin of [...new Set(config.array("core.plugins", []))] as []) {
for (const plugin of [...new Set(config.array("plugins", []))] as []) {
await apiManager.loadPlugin(plugin);
}

View File

@ -38,5 +38,5 @@ export async function start() {
relay(dht, new Stream(false, connection.socket));
});
await relayServer.listen({ port: config.uint("core.port"), host: "0.0.0.0" });
await relayServer.listen({ port: config.uint("port"), host: "0.0.0.0" });
}

119
src/modules/rpc/cache.ts Normal file
View File

@ -0,0 +1,119 @@
import EventEmitter from "events";
import DHTCache from "@lumeweb/dht-cache";
import { RPCCacheItem, RPCRequest, RPCResponse } from "@lumeweb/relay-types";
import { get as getSwarm } from "../swarm";
import { RPCServer } from "./server";
// @ts-ignore
import jsonStringify from "json-stringify-deterministic";
// @ts-ignore
import crypto from "hypercore-crypto";
import NodeCache from "node-cache";
import log from "../../log.js";
export class RPCCache extends EventEmitter {
private server: RPCServer;
constructor(server: RPCServer) {
super();
this.server = server;
this._swarm = getSwarm();
this._dhtCache = new DHTCache(this._swarm, {
protocol: "lumeweb.rpccache",
logger: log.child({ module: "dht-cache" }),
});
this._data.on("del", (key: string) => {
try {
this.deleteItem(key);
} catch {}
});
}
private _dhtCache?: DHTCache;
get dhtCache(): DHTCache {
return this._dhtCache as DHTCache;
}
private _swarm?: any;
get swarm(): any {
return this._swarm;
}
private _data: NodeCache = new NodeCache({ stdTTL: 60 * 60 * 24 });
get data(): NodeCache {
return this._data;
}
public signResponse(item: RPCCacheItem): string {
const field = item.value.signedField || "data";
const updated = item.value.updated;
// @ts-ignore
let json = item.value[field];
if (typeof json !== "string") {
json = jsonStringify(json);
}
return this.server.signData(`${updated}${json}`);
}
public verifyResponse(pubkey: Buffer, item: RPCCacheItem): boolean | Buffer {
const field = item.value.signedField || "data";
const updated = item.value.updated;
// @ts-ignore
let json = item.value[field];
if (typeof json !== "string") {
json = jsonStringify(json);
}
try {
if (
!crypto.verify(
Buffer.from(`${updated}${json}`),
Buffer.from(item?.signature as string, "hex"),
pubkey
)
) {
return false;
}
} catch {
return false;
}
return true;
}
public addItem(query: RPCRequest, response: RPCResponse) {
const queryHash = RPCServer.hashQuery(query);
const clonedResponse = { ...response };
clonedResponse.updated = Date.now();
const item = {
value: clonedResponse,
signature: "",
};
item.signature = this.signResponse(item);
this._dhtCache?.addItem(queryHash);
this._data.set(queryHash, item);
}
public deleteItem(queryHash: string): boolean {
const cache = this._dhtCache?.cache;
if (!cache?.includes(queryHash)) {
throw Error("item does not exist");
}
this._dhtCache?.removeItem(queryHash);
this._data.del(queryHash);
return true;
}
}

View File

@ -8,15 +8,17 @@ import EventEmitter from "events";
// @ts-ignore
import ProtomuxRPC from "protomux-rpc";
import b4a from "b4a";
import { get as getSwarm, SecretStream } from "../swarm";
import { SecretStream } from "../swarm";
// @ts-ignore
import c from "compact-encoding";
// @ts-ignore
import crypto from "hypercore-crypto";
// @ts-ignore
import { Mutex } from "async-mutex";
import { RPCCache } from "./cache";
// @ts-ignore
import jsonStringify from "json-stringify-deterministic";
import config from "../../config";
const sodium = require("sodium-universal");
let server: RPCServer;
@ -53,6 +55,19 @@ export class RPCServer extends EventEmitter {
>();
private pendingRequests: Map<string, Mutex> = new Map<string, Mutex>();
private _cache?: RPCCache;
constructor() {
super();
if (config.bool("cache")) {
this._cache = new RPCCache(this);
}
}
get cache(): RPCCache | undefined {
return this._cache;
}
public static hashQuery(query: RPCRequest): string {
const clonedQuery: RPCRequest = {
module: query.module,
@ -130,7 +145,7 @@ export class RPCServer extends EventEmitter {
}
return crypto
.sign(Buffer.from(raw), getSwarm().keyPair.secretKey)
.sign(Buffer.from(raw), this._cache?.swarm.keyPair.secretKey)
.toString("hex");
}
@ -141,6 +156,13 @@ export class RPCServer extends EventEmitter {
return lockedRequest;
}
let cachedRequest = this.getCachedRequest(request) as RPCCacheItem;
if (cachedRequest) {
this.getRequestLock(request)?.release();
return { ...cachedRequest.value, signature: cachedRequest.signature };
}
let method = this.getMethodByRequest(request);
let ret;
@ -186,11 +208,30 @@ export class RPCServer extends EventEmitter {
};
}
method = method as RPCMethod;
if (config.bool("cache") && method.cacheable) {
this.cache?.addItem(request, rpcResult);
}
this.getRequestLock(request)?.release();
return rpcResult;
}
private getCachedRequest(request: RPCRequest): RPCCacheItem | boolean {
if (!config.bool("cache")) {
return false;
}
const req = RPCServer.hashQuery(request);
if (this._cache?.data.has(req)) {
return this._cache?.data.get<RPCCacheItem>(req) as RPCCacheItem;
}
return false;
}
private getMethodByRequest(request: RPCRequest): Error | RPCMethod {
return this.getMethod(request.module, request.method);
}
@ -228,6 +269,9 @@ export class RPCServer extends EventEmitter {
if (lock.isLocked()) {
await lock.waitForUnlock();
if (this._cache?.data.has(reqId)) {
return this._cache?.data.get<RPCCacheItem>(reqId) as RPCCacheItem;
}
}
await lock.acquire();

View File

@ -51,7 +51,7 @@ export class SSLManager {
}
get enabled() {
return config.bool("core.ssl") && this._renewHandler;
return config.bool("ssl") && this._renewHandler;
}
}
@ -59,7 +59,7 @@ let sslManager: SSLManager;
export function get(): SSLManager {
if (!sslManager) {
sslManager = new SSLManager(config.get("core.domain"));
sslManager = new SSLManager(config.get("domain"));
}
return sslManager;