diff --git a/src/index.ts b/src/index.ts index 7d30cdc..0d367a9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ -import { addHandler, handleMessage } from "libkmodule"; +import { addHandler, handleMessage , log} from "libkmodule"; import type { ActiveQuery } from "libkmodule"; import PQueue from "p-queue"; import { ipfsPath, ipnsPath } from "is-ipfs"; @@ -68,7 +68,7 @@ async function handleStatIpns(aq: ActiveQuery) { } async function handleFetchIpns(aq: ActiveQuery) { - return handleStat(aq, "fetch_ipns", "ipns"); + return handleFetch(aq, "fetch_ipns", "ipns"); } async function validateInputs(aq: ActiveQuery, type: "ipns" | "ipfs") { @@ -206,6 +206,7 @@ async function rpcCall( reject(response?.error || response?.data?.error); return; } + if (!stream && 1 === dataCount) { socket.end(); resolve(response?.data); @@ -216,6 +217,7 @@ async function rpcCall( stream(response?.data.data); if (response?.data.done) { socket.end(); + resolve(true); } } }); @@ -228,37 +230,13 @@ async function rpcCall( force: true, }) ); - timer = setTimeout(() => { +/* timer = setTimeout(() => { socket.end(); reject("timeout"); - }, 10 * 1000) as NodeJS.Timeout; + }, 10 * 1000) as NodeJS.Timeout;*/ }); } -async function processStream(aq: ActiveQuery, response: Response) { - const reader = response.body.getReader(); - - let aqResp = { - headers: Array.from(response.headers), - status: response.status, - }; - while (true) { - let chunk; - - try { - chunk = await reader.read(); - aq.sendUpdate(chunk.value); - if (chunk.done) { - aq.respond(aqResp); - break; - } - } catch (e) { - aq.respond(aqResp); - break; - } - } -} - async function refreshGatewayList() { let processResolve: any; blockingGatewayUpdate = new Promise((resolve) => {