diff --git a/src/query/simple.ts b/src/query/simple.ts index a8e0b38..a54a625 100644 --- a/src/query/simple.ts +++ b/src/query/simple.ts @@ -2,7 +2,12 @@ import RpcNetwork from "../network.js"; import { ClientRPCRequest, RPCResponse } from "@lumeweb/relay-types"; import { RpcQueryOptions } from "../types.js"; import b4a from "b4a"; -import { hashQuery, isPromise, validateTimestampedResponse } from "../util.js"; +import { + hashQuery, + isPromise, + setupStream, + validateTimestampedResponse, +} from "../util.js"; import RPC from "@lumeweb/rpc"; import { ERR_INVALID_SIGNATURE } from "../error.js"; import RpcQueryBase from "./base.js"; @@ -69,7 +74,7 @@ export default class SimpleRpcQuery extends RpcQueryBase { await socket.opened; - const rpc = new RPC(socket); + const rpc = setupStream(socket); if (this._query.bypassCache) { delete this._query.bypassCache; diff --git a/src/util.ts b/src/util.ts index beafd33..2c4acf6 100644 --- a/src/util.ts +++ b/src/util.ts @@ -6,6 +6,10 @@ import crypto from "hypercore-crypto"; // @ts-ignore import sodium from "sodium-universal"; import b4a from "b4a"; +import RPC from "@lumeweb/rpc"; + +const RPC_PROTOCOL_ID = b4a.from("lumeweb"); +export const RPC_PROTOCOL_SYMBOL = Symbol.for(RPC_PROTOCOL_ID.toString()); export function isPromise(obj: Promise) { return ( @@ -112,3 +116,14 @@ export function createHash(data: string): Buffer { return hash; } + +export function setupStream(stream: any) { + const existing = stream[RPC_PROTOCOL_SYMBOL]; + if (existing) { + return existing; + } + + stream[RPC_PROTOCOL_SYMBOL] = new RPC(stream); + + return stream[RPC_PROTOCOL_SYMBOL]; +}