*Extract RPC setup to a utility function and ensure new streams in getRpcByPeer use it

This commit is contained in:
Derrick Hammer 2022-12-18 06:58:45 -05:00
parent d14320b9d0
commit 9d6a198bca
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 20 additions and 14 deletions

View File

@ -5,7 +5,11 @@ import config from "../config.js";
import { errorExit } from "../lib/error.js";
// @ts-ignore
import stringify from "json-stable-stringify";
import { getRpcServer, RPC_PROTOCOL_SYMBOL } from "./rpc/server.js";
import {
getRpcServer,
RPC_PROTOCOL_SYMBOL,
setupStream,
} from "./rpc/server.js";
import { get as getSwarm, SecretStream } from "./swarm.js";
import b4a from "b4a";
@ -33,7 +37,7 @@ export async function getRpcByPeer(peer: Buffer | string) {
}
swarm.removeListener("connection", listener);
resolve(peer[RPC_PROTOCOL_SYMBOL]);
resolve(setupStream(peer));
});
swarm.joinPeer(peer);

View File

@ -1,5 +1,4 @@
import {
RPCCacheData,
RPCCacheItem,
RPCMethod,
RPCRequest,
@ -9,7 +8,7 @@ import EventEmitter from "events";
// @ts-ignore
import ProtomuxRPC from "protomux-rpc";
import b4a from "b4a";
import { get as getSwarm, SecretStream } from "../swarm";
import { SecretStream } from "../swarm";
// @ts-ignore
import c from "compact-encoding";
// @ts-ignore
@ -34,6 +33,18 @@ export function getRpcServer(): RPCServer {
return server as RPCServer;
}
export function setupStream(stream: SecretStream) {
const existing = stream[RPC_PROTOCOL_SYMBOL];
if (existing) {
return existing;
}
stream[RPC_PROTOCOL_SYMBOL] = new ProtomuxRPC(stream, {
id: RPC_PROTOCOL_ID,
valueEncoding: c.json,
});
}
export class RPCServer extends EventEmitter {
private _modules: Map<string, Map<string, RPCMethod>> = new Map<
string,
@ -102,16 +113,7 @@ export class RPCServer extends EventEmitter {
}
public setup(stream: SecretStream) {
const existing = stream[RPC_PROTOCOL_SYMBOL];
if (existing) return existing;
const options = {
id: RPC_PROTOCOL_ID,
valueEncoding: c.json,
};
const rpc = new ProtomuxRPC(stream, options);
stream[RPC_PROTOCOL_SYMBOL] = rpc;
const rpc = setupStream(stream);
for (const module of this._modules.keys()) {
for (const method of (