*Refactor RPCConnection::processRequest to handle and process streams

This commit is contained in:
Derrick Hammer 2022-08-04 23:43:02 -04:00
parent 26e12d19a4
commit 44e1366856
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 31 additions and 2 deletions

View File

@ -26,6 +26,8 @@ import { ERR_NOT_READY, errorExit } from "./error.js";
import log from "loglevel"; import log from "loglevel";
// @ts-ignore // @ts-ignore
import stringify from "json-stable-stringify"; import stringify from "json-stable-stringify";
import { getStream } from "./streams.js";
import type { StreamFileResponse } from "./streams.js";
const pendingRequests = new NodeCache(); const pendingRequests = new NodeCache();
const processedRequests = new NodeCache({ const processedRequests = new NodeCache({
@ -281,13 +283,40 @@ class RPCConnection {
} }
const that = this as any; const that = this as any;
let response;
try { try {
that.write(pack(await maybeProcessRequest(request))); response = await maybeProcessRequest(request);
} catch (error) { } catch (error) {
log.trace(error); log.trace(error);
that.write(pack({ error })); that.write(pack({ error }));
that.end();
return;
} }
if (response.data?.streamId) {
const stream = getStream(
response.data?.streamId
) as AsyncIterable<Uint8Array>;
const emptyData = Uint8Array.from([]);
const streamResp = {
result: {
data: {
data: emptyData,
done: false,
} as StreamFileResponse,
},
};
for await (const chunk of stream) {
streamResp.result.data.data = chunk.slice() as unknown as Uint8Array;
that.write(pack(streamResp));
}
streamResp.result.data.data = emptyData;
streamResp.result.data.done = true;
response = streamResp;
}
that.write(pack(response));
that.end(); that.end();
} }