kernel-ipfs/src/index.ts

265 lines
5.5 KiB
TypeScript
Raw Normal View History

2022-08-31 19:19:35 +00:00
import { addHandler, handleMessage } from "libkmodule";
2022-08-05 13:25:55 +00:00
import type { ActiveQuery } from "libkmodule";
import PQueue from "p-queue";
import { ipfsPath, ipnsPath } from "is-ipfs";
2022-08-22 01:42:39 +00:00
import { DataFn } from "libskynet";
2022-08-31 19:19:35 +00:00
import { RpcNetwork } from "@lumeweb/kernel-rpc-client";
import { RPCResponse } from "@lumeweb/relay-types";
2022-08-05 13:25:55 +00:00
interface StatFileResponse {
exists: boolean;
contentType: string | null;
error: any;
directory: boolean;
files: string[];
}
2022-08-31 19:19:35 +00:00
interface PingRPCResponse extends RPCResponse {
data?: "pong";
2022-08-31 19:19:35 +00:00
}
interface MethodsRPCResponse extends RPCResponse {
data?: string[];
}
onmessage = handleMessage;
2022-08-05 13:25:55 +00:00
let blockingGatewayUpdate = Promise.resolve();
2022-08-22 01:42:39 +00:00
let activeRelays: string | any[] = [];
2022-08-05 13:25:55 +00:00
let relays = [
"25c2a0a833782d64213c08879b95dd5a60af244b44a058f3a7a70d6722f4bda7",
];
2022-08-31 19:19:35 +00:00
let network: RpcNetwork;
2022-08-05 13:25:55 +00:00
addHandler("presentSeed", handlePresentSeed);
addHandler("refreshGatewayList", handleRefreshGatewayList);
addHandler("statIpfs", handleStatIpfs);
2022-08-06 05:13:15 +00:00
addHandler("fetchIpfs", handleFetchIpfs);
2022-08-05 13:25:55 +00:00
addHandler("statIpns", handleStatIpns);
addHandler("fetchIpns", handleFetchIpns);
let readyPromiseResolve: any;
let readyPromise = new Promise((resolve) => {
readyPromiseResolve = resolve;
});
2022-08-13 09:41:36 +00:00
async function handlePresentSeed() {
2022-08-31 19:19:35 +00:00
network = new RpcNetwork(false);
2022-08-05 13:25:55 +00:00
for (const relay of relays) {
2022-08-31 19:19:35 +00:00
await network.addRelay(relay);
2022-08-05 13:25:55 +00:00
}
2022-08-31 19:19:35 +00:00
2022-08-05 13:25:55 +00:00
refreshGatewayList();
readyPromiseResolve();
}
async function handleRefreshGatewayList(aq: ActiveQuery) {
await readyPromise;
await blockingGatewayUpdate;
await refreshGatewayList();
aq.respond();
}
async function handleStatIpfs(aq: ActiveQuery) {
return handleStat(aq, "stat_ipfs", "ipfs");
}
async function handleFetchIpfs(aq: ActiveQuery) {
return handleFetch(aq, "fetch_ipfs", "ipfs");
}
async function handleStatIpns(aq: ActiveQuery) {
2022-08-06 05:13:15 +00:00
return handleStat(aq, "stat_ipns", "ipns");
2022-08-05 13:25:55 +00:00
}
async function handleFetchIpns(aq: ActiveQuery) {
2022-08-10 21:51:41 +00:00
return handleFetch(aq, "fetch_ipns", "ipns");
2022-08-05 13:25:55 +00:00
}
async function validateInputs(aq: ActiveQuery, type: "ipns" | "ipfs") {
const { hash = null } = aq.callerInput;
const { path = "" } = aq.callerInput;
if (!hash) {
aq.reject("hash missing");
return;
}
if (type === "ipfs" && !ipfsPath(`/ipfs/${hash}`)) {
aq.reject("ipfs hash is invalid");
return;
}
if (type === "ipns" && !ipnsPath(`/ipns/${hash}`)) {
aq.reject("ipns hash is invalid");
return;
}
await readyPromise;
await blockingGatewayUpdate;
return { hash, path };
}
async function handleStat(
aq: ActiveQuery,
method: string,
type: "ipns" | "ipfs"
): Promise<void> {
const valid = await validateInputs(aq, type);
if (!valid) {
return;
}
const { hash, path } = valid;
try {
let resp = (await fetchFromRelays(hash, path, method)) as StatFileResponse;
aq.respond(resp);
2022-08-22 01:42:39 +00:00
} catch (e: any) {
2022-08-05 13:25:55 +00:00
aq.reject(e);
}
}
async function handleFetch(
aq: ActiveQuery,
method: string,
type: "ipns" | "ipfs"
): Promise<void> {
const valid = await validateInputs(aq, type);
if (!valid) {
return;
}
const { hash, path } = valid;
try {
await fetchFromRelays(hash, path, method, aq.sendUpdate);
aq.respond();
2022-08-22 01:42:39 +00:00
} catch (e: any) {
2022-08-05 13:25:55 +00:00
aq.reject(e);
}
}
async function fetchFromRelays(
hash: string,
path: string,
method: string,
2022-08-22 01:42:39 +00:00
stream: DataFn | undefined = undefined
2022-08-05 13:25:55 +00:00
) {
let error = new Error("NOT_FOUND");
if (0 == activeRelays.length) {
await refreshGatewayList();
}
2022-08-05 13:25:55 +00:00
for (const relay of activeRelays) {
2022-08-31 19:19:35 +00:00
let query;
if (stream) {
2022-08-31 22:25:55 +00:00
query = network.streamingQuery(
relay,
method,
"ipfs",
stream,
{
hash,
path,
},
{ relayTimeout: 30 }
);
2022-08-31 19:19:35 +00:00
} else {
2022-08-31 22:25:55 +00:00
query = network.simpleQuery(
relay,
method,
"ipfs",
{
hash,
path,
},
{ relayTimeout: 30 }
);
2022-08-31 19:19:35 +00:00
}
let resp = await query.result;
if (resp.error) {
throw new Error(resp.error);
2022-08-05 13:25:55 +00:00
}
2022-08-31 19:19:35 +00:00
if (!stream) {
return resp.data;
2022-08-05 13:25:55 +00:00
}
}
throw error;
}
async function relayHasMethods(
methodList: string[],
relay: string
): Promise<boolean> {
2022-08-31 22:26:18 +00:00
let methods: string[] = [];
2022-08-31 19:19:35 +00:00
let query = network.simpleQuery(relay, "get_methods", "core");
let resp = (await query.result) as MethodsRPCResponse;
if (resp.data) {
methods = resp.data;
2022-08-05 13:25:55 +00:00
}
let has = true;
methodList.forEach((item) => {
if (!methods.includes(item)) {
has = false;
}
});
return has;
}
async function refreshGatewayList() {
let processResolve: any;
blockingGatewayUpdate = new Promise((resolve) => {
processResolve = resolve;
});
const queue = new PQueue({ concurrency: 10 });
2022-08-22 01:42:39 +00:00
let latencies: any[] = [];
2022-08-05 13:25:55 +00:00
relays.forEach((item) => {
queue.add(checkRelayLatency(item, latencies));
});
await queue.onIdle();
activeRelays = latencies
.sort((a: any[], b: any[]) => {
return a[0] - b[0];
})
.map((item: any[]) => item[1]);
processResolve();
}
function checkRelayLatency(relay: string, list: any[]) {
return async () => {
const start = Date.now();
2022-08-31 19:19:35 +00:00
let query = network.simpleQuery(relay, "ping", "core");
let resp = (await query.result) as PingRPCResponse;
if (resp?.data !== "pong") {
2022-08-05 13:25:55 +00:00
return;
}
const end = Date.now() - start;
2022-08-31 19:19:35 +00:00
if (
!(await relayHasMethods(
[
"ipfs.stat_ipfs",
"ipfs.stat_ipns",
"ipfs.fetch_ipfs",
"ipfs.fetch_ipns",
],
2022-08-05 13:25:55 +00:00
relay
2022-08-31 19:19:35 +00:00
))
) {
2022-08-05 13:25:55 +00:00
return;
}
list.push([end, relay]);
};
}