diff --git a/src/query/streaming.ts b/src/query/streaming.ts index 1375113..4035fd9 100644 --- a/src/query/streaming.ts +++ b/src/query/streaming.ts @@ -9,6 +9,7 @@ import { StreamingRpcQueryOptions } from "../types.js"; export default class StreamingRpcQuery extends SimpleRpcQuery { protected _options: StreamingRpcQueryOptions; + protected _canceled = false; constructor( network: RpcNetwork, relay: string | Buffer, @@ -18,6 +19,11 @@ export default class StreamingRpcQuery extends SimpleRpcQuery { super(network, relay, query, options); this._options = options; } + + public cancel() { + this._canceled = true; + } + protected async queryRelay(relay: string | Buffer): Promise { let socket: any; @@ -41,11 +47,25 @@ export default class StreamingRpcQuery extends SimpleRpcQuery { } return new Promise((resolve, reject) => { let timer: any; + + const finish = () => { + relay = relay as string; + this._responses[relay] = {}; + resolve(null); + socket.end(); + }; + socket.on("data", (res: Buffer) => { relay = relay as string; if (timer && timer.close) { clearTimeout(timer as any); } + + if (this._canceled) { + finish(); + return; + } + const response = unpack(res as any) as RPCResponse; if (response && response.error) { this._errors[relay] = response.error; @@ -53,9 +73,7 @@ export default class StreamingRpcQuery extends SimpleRpcQuery { } if (response?.data.done) { - this._responses[relay] = {}; - resolve(null); - socket.end(); + finish(); return; }