Compare commits

...

7 Commits

Author SHA1 Message Date
Derrick Hammer 86ce21a4b4
*If we have a cached request, release the lock
ci/woodpecker/push/woodpecker Pipeline failed Details
2022-11-26 18:23:26 -05:00
Derrick Hammer 9ce66b15a3
*Add query hash to dht cache 2022-11-26 17:53:39 -05:00
Derrick Hammer 83b62bfdcb
*Refactor mutex lock logic 2022-11-26 17:53:16 -05:00
Derrick Hammer 5c02356595
*start swarm in boot first, before even plugins since they kickstart the rpc singleton 2022-11-26 17:13:37 -05:00
Derrick Hammer ec33e40c74
*Export swarm start
*Make swarm get non async to prevent race conditions
2022-11-26 17:13:02 -05:00
Derrick Hammer ebd09f9a52
*Bug fix signData 2022-11-26 17:11:48 -05:00
Derrick Hammer 0d5aa24b74
*privateKey needs to be secretKey 2022-11-26 14:36:44 -05:00
7 changed files with 41 additions and 28 deletions

View File

@ -6,6 +6,7 @@ import config from "./config.js";
import { loadPlugins } from "./modules/plugin.js"; import { loadPlugins } from "./modules/plugin.js";
import { start as startDns } from "./modules/dns.js"; import { start as startDns } from "./modules/dns.js";
import { start as startSSl } from "./modules/ssl.js"; import { start as startSSl } from "./modules/ssl.js";
import { start as startSwarm } from "./modules/swarm.js";
import { generateSeedPhraseDeterministic } from "libskynet"; import { generateSeedPhraseDeterministic } from "libskynet";
import * as crypto from "crypto"; import * as crypto from "crypto";
@ -20,6 +21,7 @@ if (!config.str("seed")) {
} }
async function boot() { async function boot() {
await startSwarm();
await loadPlugins(); await loadPlugins();
await startApp(); await startApp();
await startRpc(); await startRpc();

View File

@ -36,7 +36,7 @@ async function ipUpdate() {
} }
export async function start() { export async function start() {
const swarm = (await getSwarm()) as any; const swarm = getSwarm();
await ipUpdate(); await ipUpdate();

View File

@ -20,7 +20,7 @@ import { getSslContext } from "./ssl.js";
export async function start() { export async function start() {
const relayPort = config.uint("port"); const relayPort = config.uint("port");
const dht = await getSwarm(); const dht = getSwarm();
const statusCodeServer = http.createServer(function (req, res) { const statusCodeServer = http.createServer(function (req, res) {
// @ts-ignore // @ts-ignore

View File

@ -13,13 +13,13 @@ export async function start() {
errorExit("Please set pocket-app-id and pocket-app-key config options."); errorExit("Please set pocket-app-id and pocket-app-key config options.");
} }
(await getSwarm()).on("connection", (stream: SecretStream) => getSwarm().on("connection", (stream: SecretStream) =>
getRpcServer().setup(stream) getRpcServer().setup(stream)
); );
} }
export async function getRpcByPeer(peer: string) { export async function getRpcByPeer(peer: string) {
const swarm = await getSwarm(); const swarm = getSwarm();
if (swarm._allConnections.has(peer)) { if (swarm._allConnections.has(peer)) {
return swarm._allConnections.get(peer)[RPC_PROTOCOL_SYMBOL]; return swarm._allConnections.get(peer)[RPC_PROTOCOL_SYMBOL];

View File

@ -34,7 +34,10 @@ export class RPCCache extends EventEmitter {
constructor(server: RPCServer) { constructor(server: RPCServer) {
super(); super();
this.server = server; this.server = server;
this.init(); this._swarm = getSwarm();
this.dhtCache = new DHTCache(this._swarm, {
protocol: "lumeweb.rpccache",
});
} }
public async getNodeQuery( public async getNodeQuery(
@ -110,6 +113,7 @@ export class RPCCache extends EventEmitter {
item.signature = this.signResponse(item); item.signature = this.signResponse(item);
this.dhtCache?.addItem(queryHash);
this._data[queryHash] = item; this._data[queryHash] = item;
} }
@ -125,11 +129,4 @@ export class RPCCache extends EventEmitter {
return true; return true;
} }
private async init() {
this.dhtCache = new DHTCache(await getSwarm(), {
protocol: "lumeweb.rpccache",
});
this._swarm = await getSwarm();
}
} }

View File

@ -133,7 +133,7 @@ export class RPCServer extends EventEmitter {
} }
return crypto return crypto
.sign(Buffer.from(raw, this._cache.swarm.keyPair.privateKey)) .sign(Buffer.from(raw), this._cache.swarm.keyPair.secretKey)
.toString("hex"); .toString("hex");
} }
@ -147,6 +147,7 @@ export class RPCServer extends EventEmitter {
let cachedRequest = this.getCachedRequest(request) as RPCCacheItem; let cachedRequest = this.getCachedRequest(request) as RPCCacheItem;
if (cachedRequest) { if (cachedRequest) {
this.getRequestLock(request)?.release();
return cachedRequest.value; return cachedRequest.value;
} }
@ -185,13 +186,15 @@ export class RPCServer extends EventEmitter {
this.cache.addItem(request, rpcResult); this.cache.addItem(request, rpcResult);
} }
this.getRequestLock(request)?.release();
return rpcResult; return rpcResult;
} }
private getCachedRequest(request: RPCRequest): RPCCacheItem | boolean { private getCachedRequest(request: RPCRequest): RPCCacheItem | boolean {
const req = RPCServer.hashQuery(request); const req = RPCServer.hashQuery(request);
if (RPCServer.hashQuery(request) in this._cache.data) { if (RPCServer.hashQuery(request) in this._cache.data) {
this._cache.data[req]; return this._cache.data[req] as RPCCacheItem;
} }
return false; return false;
@ -225,16 +228,13 @@ export class RPCServer extends EventEmitter {
return; return;
} }
const reqId = RPCServer.hashQuery(request); if (!this.getRequestLock(request)) {
this.createRequestLock(request);
let lock: Mutex = this.pendingRequests.get(reqId) as Mutex;
const lockExists = !!lock;
if (!lockExists) {
lock = new Mutex();
this.pendingRequests.set(reqId, lock);
} }
const reqId = RPCServer.hashQuery(request);
const lock: Mutex = this.getRequestLock(request) as Mutex;
if (lock.isLocked()) { if (lock.isLocked()) {
await lock.waitForUnlock(); await lock.waitForUnlock();
if (reqId in this._cache.data) { if (reqId in this._cache.data) {
@ -244,4 +244,22 @@ export class RPCServer extends EventEmitter {
await lock.acquire(); await lock.acquire();
} }
private getRequestLock(request: RPCRequest): Mutex | null {
const reqId = RPCServer.hashQuery(request);
let lock: Mutex = this.pendingRequests.get(reqId) as Mutex;
if (!lock) {
return null;
}
return lock;
}
private createRequestLock(request: RPCRequest) {
const reqId = RPCServer.hashQuery(request);
this.pendingRequests.set(reqId, new Mutex());
}
} }

View File

@ -34,7 +34,7 @@ export function getKeyPair() {
return deriveMyskyRootKeypair(seedPhraseToSeed(seed)[0]); return deriveMyskyRootKeypair(seedPhraseToSeed(seed)[0]);
} }
async function start() { export async function start() {
const keyPair = getKeyPair(); const keyPair = getKeyPair();
node = new Hyperswarm({ keyPair, dht: new DHT({ keyPair }) }); node = new Hyperswarm({ keyPair, dht: new DHT({ keyPair }) });
@ -49,10 +49,6 @@ async function start() {
return node; return node;
} }
export async function get(): Promise<Hyperswarm> { export function get(): Hyperswarm {
if (!node) {
await start();
}
return node; return node;
} }