diff --git a/src/index.ts b/src/index.ts index 4e9f605..38cb8cf 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,7 +3,11 @@ import type { ActiveQuery } from "libkmodule"; import PQueue from "p-queue"; import { ipfsPath, ipnsPath } from "is-ipfs"; import { DataFn } from "libskynet"; -import { RpcNetwork } from "@lumeweb/kernel-rpc-client"; +import { + RpcNetwork, + SimpleRpcQuery, + StreamingRpcQuery, +} from "@lumeweb/kernel-rpc-client"; import { RPCResponse } from "@lumeweb/relay-types"; interface StatFileResponse { @@ -35,9 +39,9 @@ let network: RpcNetwork; addHandler("presentSeed", handlePresentSeed); addHandler("refreshGatewayList", handleRefreshGatewayList); addHandler("statIpfs", handleStatIpfs); -addHandler("fetchIpfs", handleFetchIpfs); +addHandler("fetchIpfs", handleFetchIpfs, { receiveUpdates: true }); addHandler("statIpns", handleStatIpns); -addHandler("fetchIpns", handleFetchIpns); +addHandler("fetchIpns", handleFetchIpns, { receiveUpdates: true }); let readyPromiseResolve: any; let readyPromise = new Promise((resolve) => { @@ -130,7 +134,13 @@ async function handleFetch( const { hash, path } = valid; try { - await fetchFromRelays(hash, path, method, aq.sendUpdate); + await fetchFromRelays( + hash, + path, + method, + aq.sendUpdate, + aq.setReceiveUpdate + ); aq.respond(); } catch (e: any) { aq.reject(e); @@ -141,14 +151,15 @@ async function fetchFromRelays( hash: string, path: string, method: string, - stream: DataFn | undefined = undefined + stream: DataFn | undefined = undefined, + receiveUpdate: ((receiveUpdate: DataFn) => void) | undefined = undefined ) { let error = new Error("NOT_FOUND"); if (0 == activeRelays.length) { await refreshGatewayList(); } for (const relay of activeRelays) { - let query; + let query: any; if (stream) { query = network.streamingQuery( relay, @@ -161,6 +172,11 @@ async function fetchFromRelays( }, { queryTimeout: 30, relayTimeout: 30 } ); + receiveUpdate?.((message) => { + if (message && message.cancel) { + query.cancel(); + } + }); } else { query = network.simpleQuery( relay,