*If stream is canceled remove listener to prevent repeat cancel messages
This commit is contained in:
parent
f201005112
commit
e151647e8f
|
@ -55,7 +55,7 @@ export default class StreamingRpcQuery extends SimpleRpcQuery {
|
||||||
socket.end();
|
socket.end();
|
||||||
};
|
};
|
||||||
|
|
||||||
socket.on("data", (res: Buffer) => {
|
const listener = (res: Buffer) => {
|
||||||
relay = relay as string;
|
relay = relay as string;
|
||||||
if (timer && timer.close) {
|
if (timer && timer.close) {
|
||||||
clearTimeout(timer as any);
|
clearTimeout(timer as any);
|
||||||
|
@ -63,6 +63,7 @@ export default class StreamingRpcQuery extends SimpleRpcQuery {
|
||||||
|
|
||||||
if (this._canceled) {
|
if (this._canceled) {
|
||||||
socket.write(pack({ cancel: true }));
|
socket.write(pack({ cancel: true }));
|
||||||
|
socket.off("data", listener);
|
||||||
finish();
|
finish();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -79,7 +80,9 @@ export default class StreamingRpcQuery extends SimpleRpcQuery {
|
||||||
}
|
}
|
||||||
|
|
||||||
this._options.streamHandler(response?.data.data);
|
this._options.streamHandler(response?.data.data);
|
||||||
});
|
};
|
||||||
|
|
||||||
|
socket.on("data", listener);
|
||||||
socket.on("error", (error: any) => {
|
socket.on("error", (error: any) => {
|
||||||
relay = relay as string;
|
relay = relay as string;
|
||||||
this._errors[relay] = error;
|
this._errors[relay] = error;
|
||||||
|
|
Loading…
Reference in New Issue