diff --git a/dist/query/streaming.d.ts b/dist/query/streaming.d.ts index 322ed7d..0aa4e8a 100644 --- a/dist/query/streaming.d.ts +++ b/dist/query/streaming.d.ts @@ -6,12 +6,14 @@ import RpcNetwork from "../network.js"; import { StreamingRpcQueryOptions } from "../types.js"; export default class StreamingRpcQuery extends SimpleRpcQuery { protected _options: StreamingRpcQueryOptions; + protected _canceled: boolean; constructor( network: RpcNetwork, relay: string | Buffer, query: RPCRequest, options: StreamingRpcQueryOptions ); + cancel(): void; protected queryRelay(relay: string | Buffer): Promise; } //# sourceMappingURL=streaming.d.ts.map diff --git a/dist/query/streaming.d.ts.map b/dist/query/streaming.d.ts.map index b0d1846..f295ffa 100644 --- a/dist/query/streaming.d.ts.map +++ b/dist/query/streaming.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"streaming.d.ts","sourceRoot":"","sources":["../../src/query/streaming.ts"],"names":[],"mappings":";AAAA,OAAO,cAAc,MAAM,aAAa,CAAC;AACzC,OAAO,EAAE,MAAM,EAAE,MAAM,QAAQ,CAAC;AAIhC,OAAO,KAAK,EAAE,UAAU,EAAe,MAAM,sBAAsB,CAAC;AACpE,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,EAAE,wBAAwB,EAAE,MAAM,aAAa,CAAC;AAEvD,MAAM,CAAC,OAAO,OAAO,iBAAkB,SAAQ,cAAc;IAC3D,SAAS,CAAC,QAAQ,EAAE,wBAAwB,CAAC;gBAE3C,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,MAAM,GAAG,MAAM,EACtB,KAAK,EAAE,UAAU,EACjB,OAAO,EAAE,wBAAwB;cAKnB,UAAU,CAAC,KAAK,EAAE,MAAM,GAAG,MAAM,GAAG,OAAO,CAAC,GAAG,CAAC;CAwDjE"} \ No newline at end of file +{"version":3,"file":"streaming.d.ts","sourceRoot":"","sources":["../../src/query/streaming.ts"],"names":[],"mappings":";AAAA,OAAO,cAAc,MAAM,aAAa,CAAC;AACzC,OAAO,EAAE,MAAM,EAAE,MAAM,QAAQ,CAAC;AAIhC,OAAO,KAAK,EAAE,UAAU,EAAe,MAAM,sBAAsB,CAAC;AACpE,OAAO,UAAU,MAAM,eAAe,CAAC;AACvC,OAAO,EAAE,wBAAwB,EAAE,MAAM,aAAa,CAAC;AAEvD,MAAM,CAAC,OAAO,OAAO,iBAAkB,SAAQ,cAAc;IAC3D,SAAS,CAAC,QAAQ,EAAE,wBAAwB,CAAC;IAC7C,SAAS,CAAC,SAAS,UAAS;gBAE1B,OAAO,EAAE,UAAU,EACnB,KAAK,EAAE,MAAM,GAAG,MAAM,EACtB,KAAK,EAAE,UAAU,EACjB,OAAO,EAAE,wBAAwB;IAM5B,MAAM;cAIG,UAAU,CAAC,KAAK,EAAE,MAAM,GAAG,MAAM,GAAG,OAAO,CAAC,GAAG,CAAC;CAoEjE"} \ No newline at end of file diff --git a/dist/query/streaming.js b/dist/query/streaming.js index 4812b9f..e4119c2 100644 --- a/dist/query/streaming.js +++ b/dist/query/streaming.js @@ -5,10 +5,14 @@ import { clearTimeout, setTimeout } from "timers"; import { pack, unpack } from "msgpackr"; export default class StreamingRpcQuery extends SimpleRpcQuery { _options; + _canceled = false; constructor(network, relay, query, options) { super(network, relay, query, options); this._options = options; } + cancel() { + this._canceled = true; + } async queryRelay(relay) { let socket; let relayKey = relay; @@ -30,20 +34,28 @@ export default class StreamingRpcQuery extends SimpleRpcQuery { } return new Promise((resolve, reject) => { let timer; + const finish = () => { + relay = relay; + this._responses[relay] = {}; + resolve(null); + socket.end(); + }; socket.on("data", (res) => { relay = relay; if (timer && timer.close) { clearTimeout(timer); } + if (this._canceled) { + finish(); + return; + } const response = unpack(res); if (response && response.error) { this._errors[relay] = response.error; return reject(null); } if (response?.data.done) { - this._responses[relay] = {}; - resolve(null); - socket.end(); + finish(); return; } this._options.streamHandler(response?.data.data);