From 824881ed88477155ece20a4dcfae73be7d36002a Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sun, 9 Apr 2023 17:16:12 -0400 Subject: [PATCH] * Add support for chunked streaming in `index.ts` by setting up a next chunk promise and handling "next" and "abort" messages in the receive update callback. --- src/index.ts | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/index.ts b/src/index.ts index 327abaa..9964f5c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -265,11 +265,19 @@ async function handleLs(aq: ActiveQuery) { } let aborted = false; + let nextChunk = defer(); - aq.setReceiveUpdate?.(() => { - aborted = true; + aq.setReceiveUpdate?.((data: any) => { + switch (data) { + case "abort": + aborted = true; + break; + case "next": + nextChunk.resolve(); + nextChunk = defer(); + break; + } }); - const iterable = fs.ls( getCID(aq.callerInput.cid), aq.callerInput.options ?? {} @@ -280,6 +288,8 @@ async function handleLs(aq: ActiveQuery) { break; } aq.sendUpdate(JSON.parse(JSON.stringify(item))); + + await nextChunk.promise; } aq.respond(); @@ -294,9 +304,18 @@ async function handleCat(aq: ActiveQuery) { } let aborted = false; + let nextChunk = defer(); - aq.setReceiveUpdate?.(() => { - aborted = true; + aq.setReceiveUpdate?.((data: any) => { + switch (data) { + case "abort": + aborted = true; + break; + case "next": + nextChunk.resolve(); + nextChunk = defer(); + break; + } }); const iterable = fs.cat( @@ -310,6 +329,8 @@ async function handleCat(aq: ActiveQuery) { } aq.sendUpdate(chunk); + + await nextChunk.promise; } aq.respond();