*Use updated rpc client/protocol

This commit is contained in:
Derrick Hammer 2022-08-31 15:19:35 -04:00
parent 5d0f30c49d
commit d7a17ca163
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 50 additions and 92 deletions

View File

@ -10,17 +10,17 @@
},
"dependencies": {
"@lumeweb/kernel-dht-client": "https://github.com/LumeWeb/kernel-dht-client.git",
"@lumeweb/kernel-rpc-client": "https://github.com/LumeWeb/kernel-rpc-client.git",
"@lumeweb/kernel-utils": "https://github.com/LumeWeb/kernel-utils.git",
"buffer": "^6.0.3",
"fetch-retry": "^5.0.3",
"is-ipfs": "^6.0.2",
"libkmodule": "^0.2.12",
"libskynet": "^0.0.62",
"msgpackr": "^1.6.2",
"p-queue": "^7.3.0",
"timers-browserify": "^2.0.12"
},
"devDependencies": {
"@lumeweb/relay-types": "https://github.com/LumeWeb/relay-types.git",
"@types/events": "^3.0.0",
"@types/node": "^18.0.3",
"@types/read": "^0.0.29",

View File

@ -1,12 +1,10 @@
import { addHandler, handleMessage, log } from "libkmodule";
import { addHandler, handleMessage } from "libkmodule";
import type { ActiveQuery } from "libkmodule";
import PQueue from "p-queue";
import { ipfsPath, ipnsPath } from "is-ipfs";
import { DHT } from "@lumeweb/kernel-dht-client";
import { pack, unpack } from "msgpackr";
import { DataFn } from "libskynet";
onmessage = handleMessage;
import { RpcNetwork } from "@lumeweb/kernel-rpc-client";
import { RPCResponse } from "@lumeweb/relay-types";
interface StatFileResponse {
exists: boolean;
@ -15,6 +13,17 @@ interface StatFileResponse {
directory: boolean;
files: string[];
}
interface PingRPCResponse extends RPCResponse {
data?: {
ping?: any;
};
}
interface MethodsRPCResponse extends RPCResponse {
data?: string[];
}
onmessage = handleMessage;
let blockingGatewayUpdate = Promise.resolve();
@ -23,7 +32,7 @@ let relays = [
"25c2a0a833782d64213c08879b95dd5a60af244b44a058f3a7a70d6722f4bda7",
];
let dht: DHT;
let network: RpcNetwork;
addHandler("presentSeed", handlePresentSeed);
addHandler("refreshGatewayList", handleRefreshGatewayList);
@ -38,11 +47,11 @@ let readyPromise = new Promise((resolve) => {
});
async function handlePresentSeed() {
dht = new DHT(false);
network = new RpcNetwork(false);
for (const relay of relays) {
await dht.addRelay(relay);
await network.addRelay(relay);
}
await dht.ready();
refreshGatewayList();
readyPromiseResolve();
}
@ -141,23 +150,25 @@ async function fetchFromRelays(
await refreshGatewayList();
}
for (const relay of activeRelays) {
let resp;
try {
resp = await rpcCall(relay, "ipfs", method, stream, {
let query;
if (stream) {
query = network.streamingQuery(relay, method, "ipfs", stream, {
hash,
path,
});
} catch (e: any) {
if (e instanceof Error) {
error = e;
} else {
error = new Error(e);
}
continue;
} else {
query = network.simpleQuery(relay, method, "ipfs", {
hash,
path,
});
}
let resp = await query.result;
if (resp.error) {
throw new Error(resp.error);
}
if (resp) {
return resp;
if (!stream) {
return resp.data;
}
}
@ -168,11 +179,13 @@ async function relayHasMethods(
methodList: string[],
relay: string
): Promise<boolean> {
let methods: string | string[] = [];
try {
methods = (await rpcCall(relay, "misc", "get_methods")) as [];
} catch (e) {
return false;
let methods: string[];
let query = network.simpleQuery(relay, "get_methods", "core");
let resp = (await query.result) as MethodsRPCResponse;
if (resp.data) {
methods = resp.data;
}
let has = true;
@ -185,54 +198,6 @@ async function relayHasMethods(
return has;
}
async function rpcCall(
relay: string,
chain: string,
query: string,
stream?: (data: any) => void,
data = {}
) {
const socket = await dht.connect(relay);
return new Promise((resolve, reject) => {
let dataCount = 0;
socket.on("data", (res) => {
dataCount++;
const response = unpack(res);
if (!response || response.error) {
socket.end();
reject(response?.error);
return;
}
if (!stream && 1 === dataCount) {
socket.end();
resolve(response?.data);
return;
}
if (stream) {
if (response?.data.done) {
socket.end();
resolve(true);
return;
}
stream(response?.data.data);
}
});
socket.write("rpc");
socket.write(
pack({
query,
chain,
data,
bypassCache: true,
})
);
});
}
async function refreshGatewayList() {
let processResolve: any;
blockingGatewayUpdate = new Promise((resolve) => {
@ -255,33 +220,26 @@ async function refreshGatewayList() {
.map((item: any[]) => item[1]);
processResolve();
}
function checkRelayLatency(relay: string, list: any[]) {
return async () => {
const start = Date.now();
let resp;
try {
resp = await rpcCall(relay, "misc", "ping", undefined, {});
} catch {
return;
}
// @ts-ignore
if (!resp.pong) {
let query = network.simpleQuery(relay, "ping", "core");
let resp = (await query.result) as PingRPCResponse;
if (!resp?.data?.ping) {
return;
}
const end = Date.now() - start;
try {
resp = await relayHasMethods(
if (
!(await relayHasMethods(
["stat_ipfs", "stat_ipns", "fetch_ipfs", "fetch_ipns"],
relay
);
if (!resp) {
return;
}
} catch {
))
) {
return;
}