Compare commits

...

7 Commits

Author SHA1 Message Date
Derrick Hammer 86ce21a4b4
*If we have a cached request, release the lock
ci/woodpecker/push/woodpecker Pipeline failed Details
2022-11-26 18:23:26 -05:00
Derrick Hammer 9ce66b15a3
*Add query hash to dht cache 2022-11-26 17:53:39 -05:00
Derrick Hammer 83b62bfdcb
*Refactor mutex lock logic 2022-11-26 17:53:16 -05:00
Derrick Hammer 5c02356595
*start swarm in boot first, before even plugins since they kickstart the rpc singleton 2022-11-26 17:13:37 -05:00
Derrick Hammer ec33e40c74
*Export swarm start
*Make swarm get non async to prevent race conditions
2022-11-26 17:13:02 -05:00
Derrick Hammer ebd09f9a52
*Bug fix signData 2022-11-26 17:11:48 -05:00
Derrick Hammer 0d5aa24b74
*privateKey needs to be secretKey 2022-11-26 14:36:44 -05:00
7 changed files with 41 additions and 28 deletions

View File

@ -6,6 +6,7 @@ import config from "./config.js";
import { loadPlugins } from "./modules/plugin.js";
import { start as startDns } from "./modules/dns.js";
import { start as startSSl } from "./modules/ssl.js";
import { start as startSwarm } from "./modules/swarm.js";
import { generateSeedPhraseDeterministic } from "libskynet";
import * as crypto from "crypto";
@ -20,6 +21,7 @@ if (!config.str("seed")) {
}
async function boot() {
await startSwarm();
await loadPlugins();
await startApp();
await startRpc();

View File

@ -36,7 +36,7 @@ async function ipUpdate() {
}
export async function start() {
const swarm = (await getSwarm()) as any;
const swarm = getSwarm();
await ipUpdate();

View File

@ -20,7 +20,7 @@ import { getSslContext } from "./ssl.js";
export async function start() {
const relayPort = config.uint("port");
const dht = await getSwarm();
const dht = getSwarm();
const statusCodeServer = http.createServer(function (req, res) {
// @ts-ignore

View File

@ -13,13 +13,13 @@ export async function start() {
errorExit("Please set pocket-app-id and pocket-app-key config options.");
}
(await getSwarm()).on("connection", (stream: SecretStream) =>
getSwarm().on("connection", (stream: SecretStream) =>
getRpcServer().setup(stream)
);
}
export async function getRpcByPeer(peer: string) {
const swarm = await getSwarm();
const swarm = getSwarm();
if (swarm._allConnections.has(peer)) {
return swarm._allConnections.get(peer)[RPC_PROTOCOL_SYMBOL];

View File

@ -34,7 +34,10 @@ export class RPCCache extends EventEmitter {
constructor(server: RPCServer) {
super();
this.server = server;
this.init();
this._swarm = getSwarm();
this.dhtCache = new DHTCache(this._swarm, {
protocol: "lumeweb.rpccache",
});
}
public async getNodeQuery(
@ -110,6 +113,7 @@ export class RPCCache extends EventEmitter {
item.signature = this.signResponse(item);
this.dhtCache?.addItem(queryHash);
this._data[queryHash] = item;
}
@ -125,11 +129,4 @@ export class RPCCache extends EventEmitter {
return true;
}
private async init() {
this.dhtCache = new DHTCache(await getSwarm(), {
protocol: "lumeweb.rpccache",
});
this._swarm = await getSwarm();
}
}

View File

@ -133,7 +133,7 @@ export class RPCServer extends EventEmitter {
}
return crypto
.sign(Buffer.from(raw, this._cache.swarm.keyPair.privateKey))
.sign(Buffer.from(raw), this._cache.swarm.keyPair.secretKey)
.toString("hex");
}
@ -147,6 +147,7 @@ export class RPCServer extends EventEmitter {
let cachedRequest = this.getCachedRequest(request) as RPCCacheItem;
if (cachedRequest) {
this.getRequestLock(request)?.release();
return cachedRequest.value;
}
@ -185,13 +186,15 @@ export class RPCServer extends EventEmitter {
this.cache.addItem(request, rpcResult);
}
this.getRequestLock(request)?.release();
return rpcResult;
}
private getCachedRequest(request: RPCRequest): RPCCacheItem | boolean {
const req = RPCServer.hashQuery(request);
if (RPCServer.hashQuery(request) in this._cache.data) {
this._cache.data[req];
return this._cache.data[req] as RPCCacheItem;
}
return false;
@ -225,16 +228,13 @@ export class RPCServer extends EventEmitter {
return;
}
const reqId = RPCServer.hashQuery(request);
let lock: Mutex = this.pendingRequests.get(reqId) as Mutex;
const lockExists = !!lock;
if (!lockExists) {
lock = new Mutex();
this.pendingRequests.set(reqId, lock);
if (!this.getRequestLock(request)) {
this.createRequestLock(request);
}
const reqId = RPCServer.hashQuery(request);
const lock: Mutex = this.getRequestLock(request) as Mutex;
if (lock.isLocked()) {
await lock.waitForUnlock();
if (reqId in this._cache.data) {
@ -244,4 +244,22 @@ export class RPCServer extends EventEmitter {
await lock.acquire();
}
private getRequestLock(request: RPCRequest): Mutex | null {
const reqId = RPCServer.hashQuery(request);
let lock: Mutex = this.pendingRequests.get(reqId) as Mutex;
if (!lock) {
return null;
}
return lock;
}
private createRequestLock(request: RPCRequest) {
const reqId = RPCServer.hashQuery(request);
this.pendingRequests.set(reqId, new Mutex());
}
}

View File

@ -34,7 +34,7 @@ export function getKeyPair() {
return deriveMyskyRootKeypair(seedPhraseToSeed(seed)[0]);
}
async function start() {
export async function start() {
const keyPair = getKeyPair();
node = new Hyperswarm({ keyPair, dht: new DHT({ keyPair }) });
@ -49,10 +49,6 @@ async function start() {
return node;
}
export async function get(): Promise<Hyperswarm> {
if (!node) {
await start();
}
export function get(): Hyperswarm {
return node;
}