Compare commits

...

3 Commits

3 changed files with 39 additions and 16 deletions

View File

@ -39,6 +39,7 @@
"libkmodule": "^0.2.53", "libkmodule": "^0.2.53",
"libp2p": "^0.42.2", "libp2p": "^0.42.2",
"multiformats": "^11.0.2", "multiformats": "^11.0.2",
"p-defer": "^4.0.0",
"p-queue": "^7.3.4", "p-queue": "^7.3.4",
"private-ip": "^3.0.0", "private-ip": "^3.0.0",
"rewire": "^6.0.0", "rewire": "^6.0.0",

View File

@ -38,6 +38,7 @@ import { peerIdFromCID } from "@libp2p/peer-id";
import { bootstrap } from "@libp2p/bootstrap"; import { bootstrap } from "@libp2p/bootstrap";
import { IDBBlockstore } from "blockstore-idb"; import { IDBBlockstore } from "blockstore-idb";
import { IDBDatastore } from "datastore-idb"; import { IDBDatastore } from "datastore-idb";
import defer from "p-defer";
const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys( const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys(
bases bases
@ -49,10 +50,8 @@ const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys(
onmessage = handleMessage; onmessage = handleMessage;
let moduleLoadedResolve: Function; const moduleDefer = defer();
let moduleLoaded: Promise<void> = new Promise((resolve) => { let activePeersDefer = defer();
moduleLoadedResolve = resolve;
});
let swarm; let swarm;
let proxy: Proxy; let proxy: Proxy;
@ -69,6 +68,7 @@ addHandler("stat", handleStat);
addHandler("ls", handleLs, { receiveUpdates: true }); addHandler("ls", handleLs, { receiveUpdates: true });
addHandler("cat", handleCat, { receiveUpdates: true }); addHandler("cat", handleCat, { receiveUpdates: true });
addHandler("ipnsResolve", handleIpnsResolve); addHandler("ipnsResolve", handleIpnsResolve);
addHandler("getActivePeers", handleGetActivePeers);
async function handlePresentSeed() { async function handlePresentSeed() {
swarm = createClient(); swarm = createClient();
@ -168,7 +168,7 @@ async function handlePresentSeed() {
await blockstore.open(); await blockstore.open();
await datastore.open(); await datastore.open();
PeerManager.instance.ipfs = await createHelia({ const ipfs = await createHelia({
// @ts-ignore // @ts-ignore
blockstore, blockstore,
// @ts-ignore // @ts-ignore
@ -176,6 +176,8 @@ async function handlePresentSeed() {
libp2p, libp2p,
}); });
PeerManager.instance.ipfs = ipfs;
proxy = new Proxy({ proxy = new Proxy({
swarm, swarm,
listen: true, listen: true,
@ -200,12 +202,22 @@ async function handlePresentSeed() {
await swarm.start(); await swarm.start();
await swarm.ready(); await swarm.ready();
// @ts-ignore // @ts-ignore
fs = unixfs(PeerManager.instance.ipfs); fs = unixfs(ipfs);
IPNS = ipns(PeerManager.instance.ipfs as any, [ IPNS = ipns(ipfs as any, [dht(ipfs), pubsub(ipfs as any)]);
dht(PeerManager.instance.ipfs),
pubsub(PeerManager.instance.ipfs as any), ipfs.libp2p.addEventListener("peer:connect", () => {
]); if (ipfs.libp2p.getPeers().length > 0) {
moduleLoadedResolve(); activePeersDefer.resolve();
}
});
ipfs.libp2p.addEventListener("peer:disconnect", () => {
if (ipfs.libp2p.getPeers().length === 0) {
activePeersDefer = defer();
}
});
moduleDefer.resolve();
} }
async function handleStat(aq: ActiveQuery) { async function handleStat(aq: ActiveQuery) {
@ -289,6 +301,9 @@ async function handleCat(aq: ActiveQuery) {
async function handleIpnsResolve(aq: ActiveQuery) { async function handleIpnsResolve(aq: ActiveQuery) {
await ready(); await ready();
await activePeersDefer.promise;
if (!aq.callerInput || !("cid" in aq.callerInput)) { if (!aq.callerInput || !("cid" in aq.callerInput)) {
aq.reject("cid required"); aq.reject("cid required");
return; return;
@ -315,8 +330,14 @@ async function handleIpnsResolve(aq: ActiveQuery) {
} }
} }
async function handleGetActivePeers(aq: ActiveQuery) {
await ready();
aq.respond(PeerManager.instance.ipfs.libp2p.getPeers());
}
async function ready() { async function ready() {
await moduleLoaded; await moduleDefer.promise;
await PeerManager.instance.ipfsReady; await PeerManager.instance.ipfsReady;
} }

View File

@ -5,6 +5,7 @@ import { fixed32, json, raw, uint } from "compact-encoding";
import { TcpSocketConnectOpts } from "net"; import { TcpSocketConnectOpts } from "net";
import { Helia } from "@helia/interface"; import { Helia } from "@helia/interface";
import { deserializeError } from "serialize-error"; import { deserializeError } from "serialize-error";
import defer from "p-defer";
import { import {
CloseSocketRequest, CloseSocketRequest,
ErrorSocketRequest, ErrorSocketRequest,
@ -127,14 +128,14 @@ export default class PeerManager {
this._ipfs = value as Helia; this._ipfs = value as Helia;
} }
private _ipfsReady?: Promise<void>; private _ipfsReady?: Promise<unknown>;
private _ipfsResolve?: () => void; private _ipfsResolve?: () => void;
get ipfsReady(): Promise<void> { get ipfsReady(): Promise<void> {
if (!this._ipfsReady) { if (!this._ipfsReady) {
this._ipfsReady = new Promise((resolve) => { let ipfsDefer = defer();
this._ipfsResolve = resolve; this._ipfsReady = ipfsDefer.promise;
}); this._ipfsResolve = ipfsDefer.resolve;
} }
return this._ipfsReady as Promise<any>; return this._ipfsReady as Promise<any>;