From 44e1366856aa22188f687e4b9679bd87dd838f13 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Thu, 4 Aug 2022 23:43:02 -0400 Subject: [PATCH] *Refactor RPCConnection::processRequest to handle and process streams --- src/rpc.ts | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/src/rpc.ts b/src/rpc.ts index 112638d..3eff9bb 100644 --- a/src/rpc.ts +++ b/src/rpc.ts @@ -26,6 +26,8 @@ import { ERR_NOT_READY, errorExit } from "./error.js"; import log from "loglevel"; // @ts-ignore import stringify from "json-stable-stringify"; +import { getStream } from "./streams.js"; +import type { StreamFileResponse } from "./streams.js"; const pendingRequests = new NodeCache(); const processedRequests = new NodeCache({ @@ -281,13 +283,40 @@ class RPCConnection { } const that = this as any; - + let response; try { - that.write(pack(await maybeProcessRequest(request))); + response = await maybeProcessRequest(request); } catch (error) { log.trace(error); that.write(pack({ error })); + that.end(); + return; } + + if (response.data?.streamId) { + const stream = getStream( + response.data?.streamId + ) as AsyncIterable; + const emptyData = Uint8Array.from([]); + const streamResp = { + result: { + data: { + data: emptyData, + done: false, + } as StreamFileResponse, + }, + }; + for await (const chunk of stream) { + streamResp.result.data.data = chunk.slice() as unknown as Uint8Array; + that.write(pack(streamResp)); + } + + streamResp.result.data.data = emptyData; + streamResp.result.data.done = true; + response = streamResp; + } + + that.write(pack(response)); that.end(); }