*Add setupStream method from relay code to ensure only one RPC instance exists per socket

This commit is contained in:
Derrick Hammer 2023-03-19 10:35:36 -04:00
parent f875cd116f
commit 0dc8eebc7e
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 22 additions and 2 deletions

View File

@ -2,7 +2,12 @@ import RpcNetwork from "../network.js";
import { ClientRPCRequest, RPCResponse } from "@lumeweb/relay-types"; import { ClientRPCRequest, RPCResponse } from "@lumeweb/relay-types";
import { RpcQueryOptions } from "../types.js"; import { RpcQueryOptions } from "../types.js";
import b4a from "b4a"; import b4a from "b4a";
import { hashQuery, isPromise, validateTimestampedResponse } from "../util.js"; import {
hashQuery,
isPromise,
setupStream,
validateTimestampedResponse,
} from "../util.js";
import RPC from "@lumeweb/rpc"; import RPC from "@lumeweb/rpc";
import { ERR_INVALID_SIGNATURE } from "../error.js"; import { ERR_INVALID_SIGNATURE } from "../error.js";
import RpcQueryBase from "./base.js"; import RpcQueryBase from "./base.js";
@ -69,7 +74,7 @@ export default class SimpleRpcQuery extends RpcQueryBase {
await socket.opened; await socket.opened;
const rpc = new RPC(socket); const rpc = setupStream(socket);
if (this._query.bypassCache) { if (this._query.bypassCache) {
delete this._query.bypassCache; delete this._query.bypassCache;

View File

@ -6,6 +6,10 @@ import crypto from "hypercore-crypto";
// @ts-ignore // @ts-ignore
import sodium from "sodium-universal"; import sodium from "sodium-universal";
import b4a from "b4a"; import b4a from "b4a";
import RPC from "@lumeweb/rpc";
const RPC_PROTOCOL_ID = b4a.from("lumeweb");
export const RPC_PROTOCOL_SYMBOL = Symbol.for(RPC_PROTOCOL_ID.toString());
export function isPromise(obj: Promise<any>) { export function isPromise(obj: Promise<any>) {
return ( return (
@ -112,3 +116,14 @@ export function createHash(data: string): Buffer {
return hash; return hash;
} }
export function setupStream(stream: any) {
const existing = stream[RPC_PROTOCOL_SYMBOL];
if (existing) {
return existing;
}
stream[RPC_PROTOCOL_SYMBOL] = new RPC(stream);
return stream[RPC_PROTOCOL_SYMBOL];
}