*Add support for canceling the stream

This commit is contained in:
Derrick Hammer 2022-09-05 06:30:20 -04:00
parent c674e18229
commit 9ebee0eefd
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 22 additions and 6 deletions

View File

@ -3,7 +3,11 @@ import type { ActiveQuery } from "libkmodule";
import PQueue from "p-queue"; import PQueue from "p-queue";
import { ipfsPath, ipnsPath } from "is-ipfs"; import { ipfsPath, ipnsPath } from "is-ipfs";
import { DataFn } from "libskynet"; import { DataFn } from "libskynet";
import { RpcNetwork } from "@lumeweb/kernel-rpc-client"; import {
RpcNetwork,
SimpleRpcQuery,
StreamingRpcQuery,
} from "@lumeweb/kernel-rpc-client";
import { RPCResponse } from "@lumeweb/relay-types"; import { RPCResponse } from "@lumeweb/relay-types";
interface StatFileResponse { interface StatFileResponse {
@ -35,9 +39,9 @@ let network: RpcNetwork;
addHandler("presentSeed", handlePresentSeed); addHandler("presentSeed", handlePresentSeed);
addHandler("refreshGatewayList", handleRefreshGatewayList); addHandler("refreshGatewayList", handleRefreshGatewayList);
addHandler("statIpfs", handleStatIpfs); addHandler("statIpfs", handleStatIpfs);
addHandler("fetchIpfs", handleFetchIpfs); addHandler("fetchIpfs", handleFetchIpfs, { receiveUpdates: true });
addHandler("statIpns", handleStatIpns); addHandler("statIpns", handleStatIpns);
addHandler("fetchIpns", handleFetchIpns); addHandler("fetchIpns", handleFetchIpns, { receiveUpdates: true });
let readyPromiseResolve: any; let readyPromiseResolve: any;
let readyPromise = new Promise((resolve) => { let readyPromise = new Promise((resolve) => {
@ -130,7 +134,13 @@ async function handleFetch(
const { hash, path } = valid; const { hash, path } = valid;
try { try {
await fetchFromRelays(hash, path, method, aq.sendUpdate); await fetchFromRelays(
hash,
path,
method,
aq.sendUpdate,
aq.setReceiveUpdate
);
aq.respond(); aq.respond();
} catch (e: any) { } catch (e: any) {
aq.reject(e); aq.reject(e);
@ -141,14 +151,15 @@ async function fetchFromRelays(
hash: string, hash: string,
path: string, path: string,
method: string, method: string,
stream: DataFn | undefined = undefined stream: DataFn | undefined = undefined,
receiveUpdate: ((receiveUpdate: DataFn) => void) | undefined = undefined
) { ) {
let error = new Error("NOT_FOUND"); let error = new Error("NOT_FOUND");
if (0 == activeRelays.length) { if (0 == activeRelays.length) {
await refreshGatewayList(); await refreshGatewayList();
} }
for (const relay of activeRelays) { for (const relay of activeRelays) {
let query; let query: any;
if (stream) { if (stream) {
query = network.streamingQuery( query = network.streamingQuery(
relay, relay,
@ -161,6 +172,11 @@ async function fetchFromRelays(
}, },
{ queryTimeout: 30, relayTimeout: 30 } { queryTimeout: 30, relayTimeout: 30 }
); );
receiveUpdate?.((message) => {
if (message && message.cancel) {
query.cancel();
}
});
} else { } else {
query = network.simpleQuery( query = network.simpleQuery(
relay, relay,