*Add support for canceling a streaming query and stop the data stream

This commit is contained in:
Derrick Hammer 2022-08-31 19:58:44 -04:00
parent 925bbba9e5
commit dafe044e00
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 21 additions and 3 deletions

View File

@ -9,6 +9,7 @@ import { StreamingRpcQueryOptions } from "../types.js";
export default class StreamingRpcQuery extends SimpleRpcQuery { export default class StreamingRpcQuery extends SimpleRpcQuery {
protected _options: StreamingRpcQueryOptions; protected _options: StreamingRpcQueryOptions;
protected _canceled = false;
constructor( constructor(
network: RpcNetwork, network: RpcNetwork,
relay: string | Buffer, relay: string | Buffer,
@ -18,6 +19,11 @@ export default class StreamingRpcQuery extends SimpleRpcQuery {
super(network, relay, query, options); super(network, relay, query, options);
this._options = options; this._options = options;
} }
public cancel() {
this._canceled = true;
}
protected async queryRelay(relay: string | Buffer): Promise<any> { protected async queryRelay(relay: string | Buffer): Promise<any> {
let socket: any; let socket: any;
@ -41,11 +47,25 @@ export default class StreamingRpcQuery extends SimpleRpcQuery {
} }
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let timer: any; let timer: any;
const finish = () => {
relay = relay as string;
this._responses[relay] = {};
resolve(null);
socket.end();
};
socket.on("data", (res: Buffer) => { socket.on("data", (res: Buffer) => {
relay = relay as string; relay = relay as string;
if (timer && timer.close) { if (timer && timer.close) {
clearTimeout(timer as any); clearTimeout(timer as any);
} }
if (this._canceled) {
finish();
return;
}
const response = unpack(res as any) as RPCResponse; const response = unpack(res as any) as RPCResponse;
if (response && response.error) { if (response && response.error) {
this._errors[relay] = response.error; this._errors[relay] = response.error;
@ -53,9 +73,7 @@ export default class StreamingRpcQuery extends SimpleRpcQuery {
} }
if (response?.data.done) { if (response?.data.done) {
this._responses[relay] = {}; finish();
resolve(null);
socket.end();
return; return;
} }