diff --git a/package.json b/package.json index c22d963..5733aee 100644 --- a/package.json +++ b/package.json @@ -10,17 +10,17 @@ }, "dependencies": { "@lumeweb/kernel-dht-client": "https://github.com/LumeWeb/kernel-dht-client.git", + "@lumeweb/kernel-rpc-client": "https://github.com/LumeWeb/kernel-rpc-client.git", "@lumeweb/kernel-utils": "https://github.com/LumeWeb/kernel-utils.git", "buffer": "^6.0.3", - "fetch-retry": "^5.0.3", "is-ipfs": "^6.0.2", "libkmodule": "^0.2.12", "libskynet": "^0.0.62", - "msgpackr": "^1.6.2", "p-queue": "^7.3.0", "timers-browserify": "^2.0.12" }, "devDependencies": { + "@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git", "@types/events": "^3.0.0", "@types/node": "^18.0.3", "@types/read": "^0.0.29", diff --git a/src/index.ts b/src/index.ts index b782c62..346dd13 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,12 +1,10 @@ -import { addHandler, handleMessage, log } from "libkmodule"; +import { addHandler, handleMessage } from "libkmodule"; import type { ActiveQuery } from "libkmodule"; import PQueue from "p-queue"; import { ipfsPath, ipnsPath } from "is-ipfs"; -import { DHT } from "@lumeweb/kernel-dht-client"; -import { pack, unpack } from "msgpackr"; import { DataFn } from "libskynet"; - -onmessage = handleMessage; +import { RpcNetwork } from "@lumeweb/kernel-rpc-client"; +import { RPCResponse } from "@lumeweb/relay-types"; interface StatFileResponse { exists: boolean; @@ -15,6 +13,17 @@ interface StatFileResponse { directory: boolean; files: string[]; } +interface PingRPCResponse extends RPCResponse { + data?: { + ping?: any; + }; +} + +interface MethodsRPCResponse extends RPCResponse { + data?: string[]; +} + +onmessage = handleMessage; let blockingGatewayUpdate = Promise.resolve(); @@ -23,7 +32,7 @@ let relays = [ "25c2a0a833782d64213c08879b95dd5a60af244b44a058f3a7a70d6722f4bda7", ]; -let dht: DHT; +let network: RpcNetwork; addHandler("presentSeed", handlePresentSeed); addHandler("refreshGatewayList", handleRefreshGatewayList); @@ -38,11 +47,11 @@ let readyPromise = new Promise((resolve) => { }); async function handlePresentSeed() { - dht = new DHT(false); + network = new RpcNetwork(false); for (const relay of relays) { - await dht.addRelay(relay); + await network.addRelay(relay); } - await dht.ready(); + refreshGatewayList(); readyPromiseResolve(); } @@ -141,23 +150,25 @@ async function fetchFromRelays( await refreshGatewayList(); } for (const relay of activeRelays) { - let resp; - try { - resp = await rpcCall(relay, "ipfs", method, stream, { + let query; + if (stream) { + query = network.streamingQuery(relay, method, "ipfs", stream, { hash, path, }); - } catch (e: any) { - if (e instanceof Error) { - error = e; - } else { - error = new Error(e); - } - continue; + } else { + query = network.simpleQuery(relay, method, "ipfs", { + hash, + path, + }); + } + let resp = await query.result; + if (resp.error) { + throw new Error(resp.error); } - if (resp) { - return resp; + if (!stream) { + return resp.data; } } @@ -168,11 +179,13 @@ async function relayHasMethods( methodList: string[], relay: string ): Promise { - let methods: string | string[] = []; - try { - methods = (await rpcCall(relay, "misc", "get_methods")) as []; - } catch (e) { - return false; + let methods: string[]; + let query = network.simpleQuery(relay, "get_methods", "core"); + + let resp = (await query.result) as MethodsRPCResponse; + + if (resp.data) { + methods = resp.data; } let has = true; @@ -185,54 +198,6 @@ async function relayHasMethods( return has; } - -async function rpcCall( - relay: string, - chain: string, - query: string, - stream?: (data: any) => void, - data = {} -) { - const socket = await dht.connect(relay); - return new Promise((resolve, reject) => { - let dataCount = 0; - socket.on("data", (res) => { - dataCount++; - const response = unpack(res); - - if (!response || response.error) { - socket.end(); - reject(response?.error); - return; - } - - if (!stream && 1 === dataCount) { - socket.end(); - resolve(response?.data); - return; - } - - if (stream) { - if (response?.data.done) { - socket.end(); - resolve(true); - return; - } - stream(response?.data.data); - } - }); - socket.write("rpc"); - socket.write( - pack({ - query, - chain, - data, - bypassCache: true, - }) - ); - }); -} - async function refreshGatewayList() { let processResolve: any; blockingGatewayUpdate = new Promise((resolve) => { @@ -255,33 +220,26 @@ async function refreshGatewayList() { .map((item: any[]) => item[1]); processResolve(); } - function checkRelayLatency(relay: string, list: any[]) { return async () => { const start = Date.now(); - let resp; - try { - resp = await rpcCall(relay, "misc", "ping", undefined, {}); - } catch { - return; - } - // @ts-ignore - if (!resp.pong) { + let query = network.simpleQuery(relay, "ping", "core"); + + let resp = (await query.result) as PingRPCResponse; + + if (!resp?.data?.ping) { return; } const end = Date.now() - start; - try { - resp = await relayHasMethods( + if ( + !(await relayHasMethods( ["stat_ipfs", "stat_ipns", "fetch_ipfs", "fetch_ipns"], relay - ); - if (!resp) { - return; - } - } catch { + )) + ) { return; }