From 2fd5b11582a58667f08f7bf46bfdb232ebcd5195 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sun, 16 Apr 2023 20:51:32 -0400 Subject: [PATCH] *Update to use new multisocket proxy --- src/index.ts | 66 +++++---- src/libp2p/transport.ts | 16 +-- src/peerManager.ts | 290 ---------------------------------------- src/socket.ts | 158 ---------------------- src/types.ts | 40 ------ 5 files changed, 38 insertions(+), 532 deletions(-) delete mode 100644 src/peerManager.ts delete mode 100644 src/socket.ts delete mode 100644 src/types.ts diff --git a/src/index.ts b/src/index.ts index 9964f5c..999e5a0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,7 +3,7 @@ import { createHelia } from "helia"; import { yamux } from "@chainsafe/libp2p-yamux"; // @ts-ignore import Hyperswarm from "hyperswarm"; -import { Peer, Proxy } from "@lumeweb/libhyperproxy"; +import { Peer, MultiSocketProxy } from "@lumeweb/libhyperproxy"; // @ts-ignore import sodium from "sodium-universal"; // @ts-ignore @@ -11,7 +11,6 @@ import { CustomEvent } from "@libp2p/interfaces/events"; // @ts-ignore import { fixed32, raw } from "compact-encoding"; import { mplex } from "@libp2p/mplex"; -import PeerManager from "./peerManager.js"; import { hypercoreTransport } from "./libp2p/transport.js"; import { UnixFS, unixfs } from "@helia/unixfs"; // @ts-ignore @@ -39,6 +38,7 @@ import { bootstrap } from "@libp2p/bootstrap"; import { IDBBlockstore } from "blockstore-idb"; import { IDBDatastore } from "datastore-idb"; import defer from "p-defer"; +import { Helia } from "@helia/interface"; const basesByPrefix: { [prefix: string]: MultibaseDecoder } = Object.keys( bases @@ -51,12 +51,14 @@ const basesByPrefix: { [prefix: string]: MultibaseDecoder } = Object.keys( onmessage = handleMessage; const moduleDefer = defer(); -let activePeersDefer = defer(); +let activeIpfsPeersDefer = defer(); +let networkPeersAvailable = defer(); let swarm; -let proxy: Proxy; +let proxy: MultiSocketProxy; let fs: UnixFS; let IPNS: IPNS; +let ipfs: Helia; // @ts-ignore BigInt.prototype.toJSON = function () { @@ -76,6 +78,15 @@ async function handlePresentSeed() { const client = createIpfsHttpClient(getDelegateConfig()); + proxy = new MultiSocketProxy({ + swarm, + listen: true, + protocol: PROTOCOL, + autostart: true, + emulateWebsocket: true, + server: false, + }); + const libp2p = await createLibp2p({ peerDiscovery: [ bootstrap({ @@ -137,10 +148,12 @@ async function handlePresentSeed() { ], }), ], - transports: [hypercoreTransport({ peerManager: PeerManager.instance })], + transports: [hypercoreTransport({ proxy })], connectionEncryption: [noise()], connectionManager: { autoDial: true, + minConnections: 5, + maxConnections: 20, }, streamMuxers: [yamux(), mplex()], start: false, @@ -169,7 +182,7 @@ async function handlePresentSeed() { await blockstore.open(); await datastore.open(); - const ipfs = await createHelia({ + ipfs = await createHelia({ // @ts-ignore blockstore, // @ts-ignore @@ -177,28 +190,13 @@ async function handlePresentSeed() { libp2p, }); - PeerManager.instance.ipfs = ipfs; - - proxy = new Proxy({ - swarm, - listen: true, - protocol: PROTOCOL, - autostart: true, - emulateWebsocket: true, - createDefaultMessage: false, - onchannel(peer: Peer, channel: any) { - PeerManager.instance.handleNewPeerChannel(peer, channel); - }, - onopen() { - PeerManager.instance.handleNewPeer(); - }, - onclose(peer: Peer) { - PeerManager.instance.handleClosePeer(peer); - }, + proxy.on("peerChannelOpen", async () => { + if (!ipfs.libp2p.isStarted()) { + await ipfs.libp2p.start(); + networkPeersAvailable.resolve(); + } }); - PeerManager.instance.ipfsReady; - swarm.join(PROTOCOL); await swarm.start(); await swarm.ready(); @@ -208,13 +206,13 @@ async function handlePresentSeed() { ipfs.libp2p.addEventListener("peer:connect", () => { if (ipfs.libp2p.getPeers().length > 0) { - activePeersDefer.resolve(); + activeIpfsPeersDefer.resolve(); } }); ipfs.libp2p.addEventListener("peer:disconnect", () => { if (ipfs.libp2p.getPeers().length === 0) { - activePeersDefer = defer(); + activeIpfsPeersDefer = defer(); } }); @@ -339,13 +337,13 @@ async function handleCat(aq: ActiveQuery) { async function handleIpnsResolve(aq: ActiveQuery) { await ready(); - await activePeersDefer.promise; + await activeIpfsPeersDefer.promise; - if (PeerManager.instance.ipfs.libp2p.getPeers().length === 0) { - activePeersDefer = defer(); + if (ipfs.libp2p.getPeers().length === 0) { + activeIpfsPeersDefer = defer(); } - await activePeersDefer.promise; + await activeIpfsPeersDefer.promise; if (!aq.callerInput || !("cid" in aq.callerInput)) { aq.reject("cid required"); @@ -384,12 +382,12 @@ function getCID(cid: string): CID { async function handleGetActivePeers(aq: ActiveQuery) { await ready(); - aq.respond(PeerManager.instance.ipfs.libp2p.getPeers()); + aq.respond(ipfs.libp2p.getPeers()); } async function ready() { await moduleDefer.promise; - await PeerManager.instance.ipfsReady; + await networkPeersAvailable.promise; } function getDelegateConfig(): Options { diff --git a/src/libp2p/transport.ts b/src/libp2p/transport.ts index 7a66a33..94dcee0 100644 --- a/src/libp2p/transport.ts +++ b/src/libp2p/transport.ts @@ -1,14 +1,12 @@ import { symbol } from "@libp2p/interface-transport"; // @ts-ignore import { TCP, TCPComponents, TCPDialOptions, TCPOptions } from "@libp2p/tcp"; -import PeerManager from "../peerManager.js"; import { Multiaddr } from "@multiformats/multiaddr"; import { IpcSocketConnectOpts, TcpSocketConnectOpts } from "net"; import { logger } from "@libp2p/logger"; import { AbortError, CodeError } from "@libp2p/interfaces/errors"; // @ts-ignore import { multiaddrToNetConfig } from "@libp2p/tcp/utils"; -import { Socket } from "../socket.js"; import { Connection } from "@libp2p/interface-connection"; // @ts-ignore import { toMultiaddrConnection } from "@libp2p/tcp/socket-to-conn"; @@ -17,13 +15,14 @@ import * as mafmt from "@multiformats/mafmt"; const log = logger("libp2p:hypercore"); import isPrivateIp from "private-ip"; +import { DummySocket, MultiSocketProxy, Socket } from "@lumeweb/libhyperproxy"; const CODE_P2P = 421; const CODE_CIRCUIT = 290; const CODE_UNIX = 400; export interface HypercoreOptions extends TCPOptions { - peerManager?: PeerManager; + proxy?: MultiSocketProxy; } class HypercoreTransport extends TCP { private readonly opts?: HypercoreOptions; @@ -31,10 +30,9 @@ class HypercoreTransport extends TCP { constructor(components: TCPComponents, options: HypercoreOptions = {}) { super(components, options); this.opts = options; - if (!options.peerManager) { + if (!options.proxy) { throw new Error("options.peerManager is required"); } - this.opts?.peerManager; } get [symbol](): true { @@ -52,7 +50,7 @@ class HypercoreTransport extends TCP { const socket = await this._connect(ma, options); // Avoid uncaught errors caused by unstable connections - socket.on("error", (err) => { + socket.on("error", (err: any) => { log("socket error", err); }); @@ -154,9 +152,7 @@ class HypercoreTransport extends TCP { }; try { - rawSocket = (await this.opts?.peerManager?.createSocket( - cOpts - )) as Socket; + rawSocket = (await this.opts?.proxy?.createSocket(cOpts)) as Socket; } catch (e: any) { onError(e); } @@ -175,7 +171,7 @@ class HypercoreTransport extends TCP { options.signal.addEventListener("abort", onAbort); } - rawSocket?.connect(); + (rawSocket as DummySocket)?.connect(); }); } diff --git a/src/peerManager.ts b/src/peerManager.ts deleted file mode 100644 index 8a07356..0000000 --- a/src/peerManager.ts +++ /dev/null @@ -1,290 +0,0 @@ -import { Peer } from "@lumeweb/libhyperproxy"; -import b4a from "b4a"; -// @ts-ignore -import { fixed32, json, raw, uint } from "compact-encoding"; -import { TcpSocketConnectOpts } from "net"; -import { Helia } from "@helia/interface"; -import { deserializeError } from "serialize-error"; -import defer from "p-defer"; -import { - CloseSocketRequest, - ErrorSocketRequest, - PeerEntity, - PeerInfoResult, - SocketRequest, - WriteSocketRequest, -} from "./types.js"; -import { Socket } from "./socket.js"; - -function idFactory(start: number, step = 1, limit = 2 ** 32) { - let id = start; - - return function nextId() { - const nextId = id; - id += step; - if (id >= limit) id = start; - return nextId; - }; -} - -const nextSocketId = idFactory(1); - -function roundRobinFactory(list: Map) { - let index = 0; - - return (): PeerEntity => { - const keys = [...list.keys()].sort(); - if (index >= keys.length) { - index = 0; - } - - return list.get(keys[index++]); - }; -} - -const socketEncoding = { - preencode(state: any, m: SocketRequest) { - uint.preencode(state, m.id); - uint.preencode(state, m.remoteId); - }, - encode(state: any, m: SocketRequest) { - uint.encode(state, m.id); - uint.encode(state, m.remoteId); - }, - decode(state: any, m: any): SocketRequest { - return { - remoteId: uint.decode(state, m), - id: uint.decode(state, m), - }; - }, -}; - -const writeSocketEncoding = { - preencode(state: any, m: WriteSocketRequest) { - socketEncoding.preencode(state, m); - raw.preencode(state, m.data); - }, - encode(state: any, m: WriteSocketRequest) { - socketEncoding.encode(state, m); - raw.encode(state, m.data); - }, - decode(state: any, m: any): WriteSocketRequest { - const socket = socketEncoding.decode(state, m); - return { - ...socket, - data: raw.decode(state, m), - }; - }, -}; - -const errorSocketEncoding = { - decode(state: any, m: any): ErrorSocketRequest { - const socket = socketEncoding.decode(state, m); - return { - ...socket, - err: deserializeError(json.decode(state, m)), - }; - }, -}; - -export default class PeerManager { - private static _instance: PeerManager; - - public static get instance(): PeerManager { - if (!PeerManager._instance) { - PeerManager._instance = new PeerManager(); - } - - return PeerManager._instance; - } - - private _sockets = new Map(); - - get sockets(): Map { - return this._sockets; - } - - private _socketMap = new Map(); - - get socketMap(): Map { - return this._socketMap; - } - - private _peers: Map = new Map(); - - private _nextPeer = roundRobinFactory(this._peers); - - get peers(): Map { - return this._peers; - } - - private _ipfs?: Helia; - - get ipfs(): Helia { - return this._ipfs as Helia; - } - - set ipfs(value: Helia) { - this._ipfs = value as Helia; - } - - private _ipfsReady?: Promise; - private _ipfsResolve?: () => void; - - get ipfsReady(): Promise { - if (!this._ipfsReady) { - let ipfsDefer = defer(); - this._ipfsReady = ipfsDefer.promise; - this._ipfsResolve = ipfsDefer.resolve; - } - - return this._ipfsReady as Promise; - } - - handleNewPeerChannel(peer: Peer, channel: any) { - this.update(peer.socket.remotePublicKey, { peer }); - - this._registerOpenSocketMessage(peer, channel); - this._registerWriteSocketMessage(peer, channel); - this._registerCloseSocketMessage(peer, channel); - this._registerTimeoutSocketMessage(peer, channel); - this._registerErrorSocketMessage(peer, channel); - } - - async handleNewPeer() { - if (!this.ipfs.libp2p.isStarted()) { - await this.ipfs.libp2p.start(); - this._ipfsResolve?.(); - } - } - - async handleClosePeer(peer: Peer) { - for (const item of this._sockets) { - if (item[1].peer.peer === peer) { - item[1].end(); - } - } - - const pubkey = this._toString(peer.socket.remotePublicKey); - - if (this._peers.has(pubkey)) { - this._peers.delete(pubkey); - } - } - - public get(pubkey: Uint8Array): PeerEntity | undefined { - if (this._peers.has(this._toString(pubkey))) { - return this._peers.get(this._toString(pubkey)) as PeerEntity; - } - - return undefined; - } - - public update(pubkey: Uint8Array, data: Partial): void { - const peer = this.get(pubkey) ?? ({} as PeerEntity); - - this._peers.set(this._toString(pubkey), { - ...peer, - ...data, - ...{ - messages: { - ...peer?.messages, - ...data?.messages, - }, - }, - } as PeerEntity); - } - - public async createSocket(options: TcpSocketConnectOpts): Promise { - if (!this.peers.size) { - throw new Error("no peers found"); - } - - const peer = this._nextPeer(); - const socketId = nextSocketId(); - const socket = new Socket(socketId, this, peer, options); - this._sockets.set(socketId, socket); - - return socket; - } - - private _registerOpenSocketMessage(peer: Peer, channel: any) { - const self = this; - const message = channel.addMessage({ - encoding: { - preencode: json.preencode, - encode: json.encode, - decode: socketEncoding.decode, - }, - async onmessage(m: SocketRequest) { - const socket = self._sockets.get(m.id); - if (socket) { - socket.remoteId = m.remoteId; - // @ts-ignore - socket.emit("connect"); - } - }, - }); - this.update(peer.socket.remotePublicKey, { - messages: { openSocket: message }, - }); - } - - private _registerWriteSocketMessage(peer: Peer, channel: any) { - const self = this; - const message = channel.addMessage({ - encoding: writeSocketEncoding, - onmessage(m: WriteSocketRequest) { - self._sockets.get(m.id)?.push(m.data); - }, - }); - this.update(peer.socket.remotePublicKey, { - messages: { writeSocket: message }, - }); - } - - private _registerCloseSocketMessage(peer: Peer, channel: any) { - const self = this; - const message = channel.addMessage({ - encoding: socketEncoding, - onmessage(m: CloseSocketRequest) { - self._sockets.get(m.id)?.end(); - }, - }); - this.update(peer.socket.remotePublicKey, { - messages: { closeSocket: message }, - }); - } - - private _registerTimeoutSocketMessage(peer: Peer, channel: any) { - const self = this; - const message = channel.addMessage({ - encoding: socketEncoding, - onmessage(m: SocketRequest) { - // @ts-ignore - self._sockets.get(m.id)?.emit("timeout"); - }, - }); - this.update(peer.socket.remotePublicKey, { - messages: { timeoutSocket: message }, - }); - } - - private _registerErrorSocketMessage(peer: Peer, channel: any) { - const self = this; - const message = channel.addMessage({ - encoding: errorSocketEncoding, - onmessage(m: ErrorSocketRequest) { - // @ts-ignore - self._sockets.get(m.id)?.emit("error", m.err); - }, - }); - this.update(peer.socket.remotePublicKey, { - messages: { errorSocket: message }, - }); - } - - private _toString(pubkey: Uint8Array) { - return b4a.from(pubkey).toString("hex"); - } -} diff --git a/src/socket.ts b/src/socket.ts deleted file mode 100644 index 000e450..0000000 --- a/src/socket.ts +++ /dev/null @@ -1,158 +0,0 @@ -import { Callback, Duplex } from "streamx"; -import { TcpSocketConnectOpts } from "net"; -import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js"; -import PeerManager from "./peerManager.js"; -import { clearTimeout } from "timers"; -import { maybeGetAsyncProperty } from "@lumeweb/libkernel-universal"; - -const asyncIterator = Symbol.asyncIterator || Symbol("asyncIterator"); - -const STREAM_DESTROYED = new Error("Stream was destroyed"); -const READ_DONE = 0b0010000000000 << 4; -const DESTROYED = 0b1000; - -export class Socket extends Duplex { - private _options: TcpSocketConnectOpts; - private _id: number; - private _manager: PeerManager; - - private _connectTimeout?: number; - - constructor( - id: number, - manager: PeerManager, - peer: PeerEntity, - options: TcpSocketConnectOpts - ) { - super(); - this._id = id; - this._manager = manager; - this._peer = peer; - this._options = options; - - // @ts-ignore - this.on("timeout", () => { - if (this._connectTimeout) { - clearTimeout(this._connectTimeout); - } - }); - } - - private _remoteId = 0; - - set remoteId(value: number) { - this._remoteId = value; - this._manager.socketMap.set(this._id, value); - } - - private _peer; - - get peer() { - return this._peer; - } - - public async _write(data: any, cb: any): Promise { - (await maybeGetAsyncProperty(this._peer.messages.writeSocket))?.send({ - id: this._id, - remoteId: this._remoteId, - data, - } as WriteSocketRequest); - cb(); - } - - public async _destroy(cb: Callback) { - (await maybeGetAsyncProperty(this._peer.messages.closeSocket))?.send({ - id: this._id, - remoteId: this._remoteId, - } as SocketRequest); - this._manager.socketMap.delete(this._id); - this._manager.sockets.delete(this._id); - } - - public async connect() { - (await maybeGetAsyncProperty(this._peer.messages.openSocket))?.send({ - ...this._options, - id: this._id, - }); - } - - public setTimeout(ms: number, cb: Function) { - if (this._connectTimeout) { - clearTimeout(this._connectTimeout); - } - - this._connectTimeout = setTimeout(() => { - cb && cb(); - }, ms) as any; - } - - [asyncIterator]() { - const stream = this; - - let error: Error | null = null; - let promiseResolve: ((arg0: { value: any; done: boolean }) => void) | null = - null; - let promiseReject: ((arg0: Error) => void) | null = null; - - this.on("error", (err) => { - error = err; - }); - this.on("data", ondata); - this.on("close", onclose); - - return { - [asyncIterator]() { - return this; - }, - next() { - return new Promise(function (resolve, reject) { - promiseResolve = resolve; - promiseReject = reject; - const data = stream.read(); - if (data !== null) ondata(data); - else { - // @ts-ignore - if ((stream._duplexState & DESTROYED) !== 0) ondata(null); - } - }); - }, - return() { - return destroy(null); - }, - throw(err: any) { - return destroy(err); - }, - }; - - function onreadable() { - if (promiseResolve !== null) ondata(stream.read()); - } - - function onclose() { - if (promiseResolve !== null) ondata(null); - } - - function ondata(data: any) { - if (promiseReject === null) return; - if (error) promiseReject(error); - // @ts-ignore - else if (data === null && (stream._duplexState & READ_DONE) === 0) - promiseReject(STREAM_DESTROYED); - else promiseResolve?.({ value: data, done: data === null }); - promiseReject = promiseResolve = null; - } - - function destroy(err: any) { - stream.destroy(err); - return new Promise((resolve, reject) => { - // @ts-ignore - if (stream._duplexState & DESTROYED) - return resolve({ value: undefined, done: true }); - stream.once("close", function () { - if (err) reject(err); - else resolve({ value: undefined, done: true }); - }); - }); - } - } -} diff --git a/src/types.ts b/src/types.ts deleted file mode 100644 index f1d8a2b..0000000 --- a/src/types.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { Peer } from "@lumeweb/libhyperproxy"; - -type Message = { - send: (pubkey: Uint8Array | any) => void; -}; - -export interface PeerEntityMessages { - keyExchange: Message; - openSocket: Message; - writeSocket: Message; - closeSocket: Message; - timeoutSocket: Message; - errorSocket: Message; -} - -export interface PeerEntity { - messages: PeerEntityMessages | Partial; - submitKeyExchange: (pubkey: Uint8Array) => void; - peer: Peer; -} - -export interface PeerInfoResult { - publicKey: Uint8Array; - libp2pPublicKey: Uint8Array; -} - -export interface SocketRequest { - remoteId: number; - id: number; -} - -export type CloseSocketRequest = SocketRequest; - -export interface WriteSocketRequest extends SocketRequest { - data: Uint8Array; -} - -export interface ErrorSocketRequest extends SocketRequest { - err: Error; -}