2023-03-17 09:36:02 +00:00
|
|
|
import { symbol } from "@libp2p/interface-transport";
|
|
|
|
// @ts-ignore
|
|
|
|
import { TCP, TCPComponents, TCPDialOptions, TCPOptions } from "@libp2p/tcp";
|
|
|
|
import { Multiaddr } from "@multiformats/multiaddr";
|
|
|
|
import { IpcSocketConnectOpts, TcpSocketConnectOpts } from "net";
|
|
|
|
import { logger } from "@libp2p/logger";
|
|
|
|
import { AbortError, CodeError } from "@libp2p/interfaces/errors";
|
|
|
|
// @ts-ignore
|
|
|
|
import { multiaddrToNetConfig } from "@libp2p/tcp/utils";
|
|
|
|
import { Connection } from "@libp2p/interface-connection";
|
|
|
|
// @ts-ignore
|
|
|
|
import { toMultiaddrConnection } from "@libp2p/tcp/socket-to-conn";
|
|
|
|
import * as mafmt from "@multiformats/mafmt";
|
|
|
|
|
|
|
|
const log = logger("libp2p:hypercore");
|
|
|
|
|
|
|
|
import isPrivateIp from "private-ip";
|
2023-04-17 00:51:32 +00:00
|
|
|
import { DummySocket, MultiSocketProxy, Socket } from "@lumeweb/libhyperproxy";
|
2023-03-17 09:36:02 +00:00
|
|
|
|
|
|
|
const CODE_P2P = 421;
|
|
|
|
const CODE_CIRCUIT = 290;
|
|
|
|
const CODE_UNIX = 400;
|
|
|
|
|
|
|
|
export interface HypercoreOptions extends TCPOptions {
|
2023-04-17 00:51:32 +00:00
|
|
|
proxy?: MultiSocketProxy;
|
2023-03-17 09:36:02 +00:00
|
|
|
}
|
|
|
|
class HypercoreTransport extends TCP {
|
|
|
|
private readonly opts?: HypercoreOptions;
|
|
|
|
private metrics: any;
|
|
|
|
constructor(components: TCPComponents, options: HypercoreOptions = {}) {
|
|
|
|
super(components, options);
|
|
|
|
this.opts = options;
|
2023-04-17 00:51:32 +00:00
|
|
|
if (!options.proxy) {
|
2023-03-17 09:36:02 +00:00
|
|
|
throw new Error("options.peerManager is required");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
get [symbol](): true {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
get [Symbol.toStringTag](): string {
|
|
|
|
return "@libp2p/hypercore";
|
|
|
|
}
|
|
|
|
|
|
|
|
async dial(ma: Multiaddr, options: TCPDialOptions): Promise<Connection> {
|
|
|
|
options.keepAlive = options.keepAlive ?? true;
|
|
|
|
|
|
|
|
// options.signal destroys the socket before 'connect' event
|
|
|
|
const socket = await this._connect(ma, options);
|
|
|
|
|
|
|
|
// Avoid uncaught errors caused by unstable connections
|
2023-04-17 00:51:32 +00:00
|
|
|
socket.on("error", (err: any) => {
|
2023-03-17 09:36:02 +00:00
|
|
|
log("socket error", err);
|
|
|
|
});
|
|
|
|
|
|
|
|
const maConn = toMultiaddrConnection(socket as any, {
|
|
|
|
remoteAddr: ma,
|
|
|
|
socketInactivityTimeout: this.opts?.outboundSocketInactivityTimeout,
|
|
|
|
socketCloseTimeout: this.opts?.socketCloseTimeout,
|
|
|
|
metrics: this.metrics?.dialerEvents,
|
|
|
|
});
|
|
|
|
|
|
|
|
const onAbort = (): void => {
|
|
|
|
maConn.close().catch((err: any) => {
|
|
|
|
log.error("Error closing maConn after abort", err);
|
|
|
|
});
|
|
|
|
};
|
|
|
|
options.signal?.addEventListener("abort", onAbort, { once: true });
|
|
|
|
|
|
|
|
log("new outbound connection %s", maConn.remoteAddr);
|
|
|
|
const conn = await options.upgrader.upgradeOutbound(maConn);
|
|
|
|
log("outbound connection %s upgraded", maConn.remoteAddr);
|
|
|
|
|
|
|
|
options.signal?.removeEventListener("abort", onAbort);
|
|
|
|
|
|
|
|
if (options.signal?.aborted === true) {
|
|
|
|
conn.close().catch((err) => {
|
|
|
|
log.error("Error closing conn after abort", err);
|
|
|
|
});
|
|
|
|
|
|
|
|
throw new AbortError();
|
|
|
|
}
|
|
|
|
|
|
|
|
return conn;
|
|
|
|
}
|
|
|
|
|
|
|
|
async _connect(ma: Multiaddr, options: TCPDialOptions): Promise<Socket> {
|
|
|
|
if (options.signal?.aborted === true) {
|
|
|
|
throw new AbortError();
|
|
|
|
}
|
|
|
|
|
|
|
|
return await new Promise<Socket>(async (resolve, reject) => {
|
|
|
|
const start = Date.now();
|
|
|
|
const cOpts = multiaddrToNetConfig(ma) as IpcSocketConnectOpts &
|
|
|
|
TcpSocketConnectOpts;
|
|
|
|
const cOptsStr = cOpts.path ?? `${cOpts.host ?? ""}:${cOpts.port}`;
|
|
|
|
|
|
|
|
log("dialing %j", cOpts);
|
|
|
|
|
|
|
|
let rawSocket: Socket;
|
|
|
|
|
|
|
|
const onError = (err: Error): void => {
|
|
|
|
err.message = `connection error ${cOptsStr}: ${err.message}`;
|
|
|
|
this.metrics?.dialerEvents.increment({ error: true });
|
|
|
|
|
|
|
|
done(err);
|
|
|
|
};
|
|
|
|
|
|
|
|
const onTimeout = (): void => {
|
|
|
|
log("connection timeout %s", cOptsStr);
|
|
|
|
this.metrics?.dialerEvents.increment({ timeout: true });
|
|
|
|
|
|
|
|
const err = new CodeError(
|
|
|
|
`connection timeout after ${Date.now() - start}ms`,
|
|
|
|
"ERR_CONNECT_TIMEOUT"
|
|
|
|
);
|
|
|
|
// Note: this will result in onError() being called
|
|
|
|
rawSocket?.emit("error", err);
|
|
|
|
};
|
|
|
|
|
|
|
|
const onConnect = (): void => {
|
|
|
|
log("connection opened %j", cOpts);
|
|
|
|
this.metrics?.dialerEvents.increment({ connect: true });
|
|
|
|
done();
|
|
|
|
};
|
|
|
|
|
|
|
|
const onAbort = (): void => {
|
|
|
|
log("connection aborted %j", cOpts);
|
|
|
|
this.metrics?.dialerEvents.increment({ abort: true });
|
|
|
|
rawSocket?.destroy();
|
|
|
|
done(new AbortError());
|
|
|
|
};
|
|
|
|
|
|
|
|
const done = (err?: any): void => {
|
|
|
|
rawSocket?.removeListener("error", onError);
|
|
|
|
// @ts-ignore
|
|
|
|
rawSocket?.removeListener("timeout", onTimeout);
|
|
|
|
// @ts-ignore
|
|
|
|
rawSocket?.removeListener("connect", onConnect);
|
|
|
|
|
|
|
|
if (options.signal != null) {
|
|
|
|
options.signal.removeEventListener("abort", onAbort);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (err != null) {
|
|
|
|
reject(err);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
resolve(rawSocket as Socket);
|
|
|
|
};
|
|
|
|
|
|
|
|
try {
|
2023-04-17 00:51:32 +00:00
|
|
|
rawSocket = (await this.opts?.proxy?.createSocket(cOpts)) as Socket;
|
2023-03-17 09:36:02 +00:00
|
|
|
} catch (e: any) {
|
|
|
|
onError(e);
|
|
|
|
}
|
|
|
|
|
|
|
|
// @ts-ignore
|
|
|
|
rawSocket = rawSocket as Socket;
|
|
|
|
|
|
|
|
// @ts-ignore
|
|
|
|
rawSocket?.on("error", onError);
|
|
|
|
// @ts-ignore
|
|
|
|
rawSocket?.on("timeout", onTimeout);
|
|
|
|
// @ts-ignore
|
|
|
|
rawSocket?.on("connect", onConnect);
|
|
|
|
|
|
|
|
if (options.signal != null) {
|
|
|
|
options.signal.addEventListener("abort", onAbort);
|
|
|
|
}
|
|
|
|
|
2023-04-17 00:51:32 +00:00
|
|
|
(rawSocket as DummySocket)?.connect();
|
2023-03-17 09:36:02 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
filter(multiaddrs: Multiaddr[]): Multiaddr[] {
|
|
|
|
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs];
|
|
|
|
|
|
|
|
return multiaddrs.filter((ma) => {
|
|
|
|
if (ma.protoCodes().includes(CODE_CIRCUIT)) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (ma.protoCodes().includes(CODE_UNIX)) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
const addr = ma.nodeAddress();
|
|
|
|
|
|
|
|
if (isPrivateIp(addr.address)) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return mafmt.TCP.matches(ma.decapsulateCode(CODE_P2P));
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
export function hypercoreTransport(
|
|
|
|
init: HypercoreOptions = {}
|
|
|
|
): (components?: TCPComponents) => HypercoreTransport {
|
|
|
|
return (components: TCPComponents = {}) => {
|
|
|
|
return new HypercoreTransport(components, init);
|
|
|
|
};
|
|
|
|
}
|