*Wrap sendUpdate in a function to ensure the request has not been canceled since the low level streamx appears to buffer
This commit is contained in:
parent
320b5c0a35
commit
f4e1dad0f7
|
@ -166,12 +166,18 @@ async function handleStreamingQuery(aq: ActiveQuery) {
|
||||||
|
|
||||||
let resp: RPCResponse | null = null;
|
let resp: RPCResponse | null = null;
|
||||||
|
|
||||||
|
let canceled = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const rpcQuery = network.streamingQuery(
|
const rpcQuery = network.streamingQuery(
|
||||||
relay as Buffer | string,
|
relay as Buffer | string,
|
||||||
query.method,
|
query.method,
|
||||||
query.module,
|
query.module,
|
||||||
aq.sendUpdate,
|
(data) => {
|
||||||
|
if (!canceled) {
|
||||||
|
aq.sendUpdate?.(data);
|
||||||
|
}
|
||||||
|
},
|
||||||
query.data,
|
query.data,
|
||||||
{ ...options }
|
{ ...options }
|
||||||
);
|
);
|
||||||
|
@ -179,6 +185,7 @@ async function handleStreamingQuery(aq: ActiveQuery) {
|
||||||
aq.setReceiveUpdate?.((message: any) => {
|
aq.setReceiveUpdate?.((message: any) => {
|
||||||
if (message && message.cancel) {
|
if (message && message.cancel) {
|
||||||
rpcQuery.cancel();
|
rpcQuery.cancel();
|
||||||
|
canceled = true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue