From dafe044e00a25fbefaf5fe48cc83362e88f47b74 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Wed, 31 Aug 2022 19:58:44 -0400 Subject: [PATCH] *Add support for canceling a streaming query and stop the data stream --- src/query/streaming.ts | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/query/streaming.ts b/src/query/streaming.ts index 1375113..4035fd9 100644 --- a/src/query/streaming.ts +++ b/src/query/streaming.ts @@ -9,6 +9,7 @@ import { StreamingRpcQueryOptions } from "../types.js"; export default class StreamingRpcQuery extends SimpleRpcQuery { protected _options: StreamingRpcQueryOptions; + protected _canceled = false; constructor( network: RpcNetwork, relay: string | Buffer, @@ -18,6 +19,11 @@ export default class StreamingRpcQuery extends SimpleRpcQuery { super(network, relay, query, options); this._options = options; } + + public cancel() { + this._canceled = true; + } + protected async queryRelay(relay: string | Buffer): Promise { let socket: any; @@ -41,11 +47,25 @@ export default class StreamingRpcQuery extends SimpleRpcQuery { } return new Promise((resolve, reject) => { let timer: any; + + const finish = () => { + relay = relay as string; + this._responses[relay] = {}; + resolve(null); + socket.end(); + }; + socket.on("data", (res: Buffer) => { relay = relay as string; if (timer && timer.close) { clearTimeout(timer as any); } + + if (this._canceled) { + finish(); + return; + } + const response = unpack(res as any) as RPCResponse; if (response && response.error) { this._errors[relay] = response.error; @@ -53,9 +73,7 @@ export default class StreamingRpcQuery extends SimpleRpcQuery { } if (response?.data.done) { - this._responses[relay] = {}; - resolve(null); - socket.end(); + finish(); return; }