*Update to use new multisocket proxy

This commit is contained in:
Derrick Hammer 2023-04-16 20:51:32 -04:00
parent 824881ed88
commit 2fd5b11582
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
5 changed files with 38 additions and 532 deletions

View File

@ -3,7 +3,7 @@ import { createHelia } from "helia";
import { yamux } from "@chainsafe/libp2p-yamux"; import { yamux } from "@chainsafe/libp2p-yamux";
// @ts-ignore // @ts-ignore
import Hyperswarm from "hyperswarm"; import Hyperswarm from "hyperswarm";
import { Peer, Proxy } from "@lumeweb/libhyperproxy"; import { Peer, MultiSocketProxy } from "@lumeweb/libhyperproxy";
// @ts-ignore // @ts-ignore
import sodium from "sodium-universal"; import sodium from "sodium-universal";
// @ts-ignore // @ts-ignore
@ -11,7 +11,6 @@ import { CustomEvent } from "@libp2p/interfaces/events";
// @ts-ignore // @ts-ignore
import { fixed32, raw } from "compact-encoding"; import { fixed32, raw } from "compact-encoding";
import { mplex } from "@libp2p/mplex"; import { mplex } from "@libp2p/mplex";
import PeerManager from "./peerManager.js";
import { hypercoreTransport } from "./libp2p/transport.js"; import { hypercoreTransport } from "./libp2p/transport.js";
import { UnixFS, unixfs } from "@helia/unixfs"; import { UnixFS, unixfs } from "@helia/unixfs";
// @ts-ignore // @ts-ignore
@ -39,6 +38,7 @@ import { bootstrap } from "@libp2p/bootstrap";
import { IDBBlockstore } from "blockstore-idb"; import { IDBBlockstore } from "blockstore-idb";
import { IDBDatastore } from "datastore-idb"; import { IDBDatastore } from "datastore-idb";
import defer from "p-defer"; import defer from "p-defer";
import { Helia } from "@helia/interface";
const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys( const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys(
bases bases
@ -51,12 +51,14 @@ const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys(
onmessage = handleMessage; onmessage = handleMessage;
const moduleDefer = defer(); const moduleDefer = defer();
let activePeersDefer = defer(); let activeIpfsPeersDefer = defer();
let networkPeersAvailable = defer();
let swarm; let swarm;
let proxy: Proxy; let proxy: MultiSocketProxy;
let fs: UnixFS; let fs: UnixFS;
let IPNS: IPNS; let IPNS: IPNS;
let ipfs: Helia;
// @ts-ignore // @ts-ignore
BigInt.prototype.toJSON = function () { BigInt.prototype.toJSON = function () {
@ -76,6 +78,15 @@ async function handlePresentSeed() {
const client = createIpfsHttpClient(getDelegateConfig()); const client = createIpfsHttpClient(getDelegateConfig());
proxy = new MultiSocketProxy({
swarm,
listen: true,
protocol: PROTOCOL,
autostart: true,
emulateWebsocket: true,
server: false,
});
const libp2p = await createLibp2p({ const libp2p = await createLibp2p({
peerDiscovery: [ peerDiscovery: [
bootstrap({ bootstrap({
@ -137,10 +148,12 @@ async function handlePresentSeed() {
], ],
}), }),
], ],
transports: [hypercoreTransport({ peerManager: PeerManager.instance })], transports: [hypercoreTransport({ proxy })],
connectionEncryption: [noise()], connectionEncryption: [noise()],
connectionManager: { connectionManager: {
autoDial: true, autoDial: true,
minConnections: 5,
maxConnections: 20,
}, },
streamMuxers: [yamux(), mplex()], streamMuxers: [yamux(), mplex()],
start: false, start: false,
@ -169,7 +182,7 @@ async function handlePresentSeed() {
await blockstore.open(); await blockstore.open();
await datastore.open(); await datastore.open();
const ipfs = await createHelia({ ipfs = await createHelia({
// @ts-ignore // @ts-ignore
blockstore, blockstore,
// @ts-ignore // @ts-ignore
@ -177,28 +190,13 @@ async function handlePresentSeed() {
libp2p, libp2p,
}); });
PeerManager.instance.ipfs = ipfs; proxy.on("peerChannelOpen", async () => {
if (!ipfs.libp2p.isStarted()) {
proxy = new Proxy({ await ipfs.libp2p.start();
swarm, networkPeersAvailable.resolve();
listen: true, }
protocol: PROTOCOL,
autostart: true,
emulateWebsocket: true,
createDefaultMessage: false,
onchannel(peer: Peer, channel: any) {
PeerManager.instance.handleNewPeerChannel(peer, channel);
},
onopen() {
PeerManager.instance.handleNewPeer();
},
onclose(peer: Peer) {
PeerManager.instance.handleClosePeer(peer);
},
}); });
PeerManager.instance.ipfsReady;
swarm.join(PROTOCOL); swarm.join(PROTOCOL);
await swarm.start(); await swarm.start();
await swarm.ready(); await swarm.ready();
@ -208,13 +206,13 @@ async function handlePresentSeed() {
ipfs.libp2p.addEventListener("peer:connect", () => { ipfs.libp2p.addEventListener("peer:connect", () => {
if (ipfs.libp2p.getPeers().length > 0) { if (ipfs.libp2p.getPeers().length > 0) {
activePeersDefer.resolve(); activeIpfsPeersDefer.resolve();
} }
}); });
ipfs.libp2p.addEventListener("peer:disconnect", () => { ipfs.libp2p.addEventListener("peer:disconnect", () => {
if (ipfs.libp2p.getPeers().length === 0) { if (ipfs.libp2p.getPeers().length === 0) {
activePeersDefer = defer(); activeIpfsPeersDefer = defer();
} }
}); });
@ -339,13 +337,13 @@ async function handleCat(aq: ActiveQuery) {
async function handleIpnsResolve(aq: ActiveQuery) { async function handleIpnsResolve(aq: ActiveQuery) {
await ready(); await ready();
await activePeersDefer.promise; await activeIpfsPeersDefer.promise;
if (PeerManager.instance.ipfs.libp2p.getPeers().length === 0) { if (ipfs.libp2p.getPeers().length === 0) {
activePeersDefer = defer(); activeIpfsPeersDefer = defer();
} }
await activePeersDefer.promise; await activeIpfsPeersDefer.promise;
if (!aq.callerInput || !("cid" in aq.callerInput)) { if (!aq.callerInput || !("cid" in aq.callerInput)) {
aq.reject("cid required"); aq.reject("cid required");
@ -384,12 +382,12 @@ function getCID(cid: string): CID {
async function handleGetActivePeers(aq: ActiveQuery) { async function handleGetActivePeers(aq: ActiveQuery) {
await ready(); await ready();
aq.respond(PeerManager.instance.ipfs.libp2p.getPeers()); aq.respond(ipfs.libp2p.getPeers());
} }
async function ready() { async function ready() {
await moduleDefer.promise; await moduleDefer.promise;
await PeerManager.instance.ipfsReady; await networkPeersAvailable.promise;
} }
function getDelegateConfig(): Options { function getDelegateConfig(): Options {

View File

@ -1,14 +1,12 @@
import { symbol } from "@libp2p/interface-transport"; import { symbol } from "@libp2p/interface-transport";
// @ts-ignore // @ts-ignore
import { TCP, TCPComponents, TCPDialOptions, TCPOptions } from "@libp2p/tcp"; import { TCP, TCPComponents, TCPDialOptions, TCPOptions } from "@libp2p/tcp";
import PeerManager from "../peerManager.js";
import { Multiaddr } from "@multiformats/multiaddr"; import { Multiaddr } from "@multiformats/multiaddr";
import { IpcSocketConnectOpts, TcpSocketConnectOpts } from "net"; import { IpcSocketConnectOpts, TcpSocketConnectOpts } from "net";
import { logger } from "@libp2p/logger"; import { logger } from "@libp2p/logger";
import { AbortError, CodeError } from "@libp2p/interfaces/errors"; import { AbortError, CodeError } from "@libp2p/interfaces/errors";
// @ts-ignore // @ts-ignore
import { multiaddrToNetConfig } from "@libp2p/tcp/utils"; import { multiaddrToNetConfig } from "@libp2p/tcp/utils";
import { Socket } from "../socket.js";
import { Connection } from "@libp2p/interface-connection"; import { Connection } from "@libp2p/interface-connection";
// @ts-ignore // @ts-ignore
import { toMultiaddrConnection } from "@libp2p/tcp/socket-to-conn"; import { toMultiaddrConnection } from "@libp2p/tcp/socket-to-conn";
@ -17,13 +15,14 @@ import * as mafmt from "@multiformats/mafmt";
const log = logger("libp2p:hypercore"); const log = logger("libp2p:hypercore");
import isPrivateIp from "private-ip"; import isPrivateIp from "private-ip";
import { DummySocket, MultiSocketProxy, Socket } from "@lumeweb/libhyperproxy";
const CODE_P2P = 421; const CODE_P2P = 421;
const CODE_CIRCUIT = 290; const CODE_CIRCUIT = 290;
const CODE_UNIX = 400; const CODE_UNIX = 400;
export interface HypercoreOptions extends TCPOptions { export interface HypercoreOptions extends TCPOptions {
peerManager?: PeerManager; proxy?: MultiSocketProxy;
} }
class HypercoreTransport extends TCP { class HypercoreTransport extends TCP {
private readonly opts?: HypercoreOptions; private readonly opts?: HypercoreOptions;
@ -31,10 +30,9 @@ class HypercoreTransport extends TCP {
constructor(components: TCPComponents, options: HypercoreOptions = {}) { constructor(components: TCPComponents, options: HypercoreOptions = {}) {
super(components, options); super(components, options);
this.opts = options; this.opts = options;
if (!options.peerManager) { if (!options.proxy) {
throw new Error("options.peerManager is required"); throw new Error("options.peerManager is required");
} }
this.opts?.peerManager;
} }
get [symbol](): true { get [symbol](): true {
@ -52,7 +50,7 @@ class HypercoreTransport extends TCP {
const socket = await this._connect(ma, options); const socket = await this._connect(ma, options);
// Avoid uncaught errors caused by unstable connections // Avoid uncaught errors caused by unstable connections
socket.on("error", (err) => { socket.on("error", (err: any) => {
log("socket error", err); log("socket error", err);
}); });
@ -154,9 +152,7 @@ class HypercoreTransport extends TCP {
}; };
try { try {
rawSocket = (await this.opts?.peerManager?.createSocket( rawSocket = (await this.opts?.proxy?.createSocket(cOpts)) as Socket;
cOpts
)) as Socket;
} catch (e: any) { } catch (e: any) {
onError(e); onError(e);
} }
@ -175,7 +171,7 @@ class HypercoreTransport extends TCP {
options.signal.addEventListener("abort", onAbort); options.signal.addEventListener("abort", onAbort);
} }
rawSocket?.connect(); (rawSocket as DummySocket)?.connect();
}); });
} }

View File

@ -1,290 +0,0 @@
import { Peer } from "@lumeweb/libhyperproxy";
import b4a from "b4a";
// @ts-ignore
import { fixed32, json, raw, uint } from "compact-encoding";
import { TcpSocketConnectOpts } from "net";
import { Helia } from "@helia/interface";
import { deserializeError } from "serialize-error";
import defer from "p-defer";
import {
CloseSocketRequest,
ErrorSocketRequest,
PeerEntity,
PeerInfoResult,
SocketRequest,
WriteSocketRequest,
} from "./types.js";
import { Socket } from "./socket.js";
function idFactory(start: number, step = 1, limit = 2 ** 32) {
let id = start;
return function nextId() {
const nextId = id;
id += step;
if (id >= limit) id = start;
return nextId;
};
}
const nextSocketId = idFactory(1);
function roundRobinFactory(list: Map<string, any>) {
let index = 0;
return (): PeerEntity => {
const keys = [...list.keys()].sort();
if (index >= keys.length) {
index = 0;
}
return list.get(keys[index++]);
};
}
const socketEncoding = {
preencode(state: any, m: SocketRequest) {
uint.preencode(state, m.id);
uint.preencode(state, m.remoteId);
},
encode(state: any, m: SocketRequest) {
uint.encode(state, m.id);
uint.encode(state, m.remoteId);
},
decode(state: any, m: any): SocketRequest {
return {
remoteId: uint.decode(state, m),
id: uint.decode(state, m),
};
},
};
const writeSocketEncoding = {
preencode(state: any, m: WriteSocketRequest) {
socketEncoding.preencode(state, m);
raw.preencode(state, m.data);
},
encode(state: any, m: WriteSocketRequest) {
socketEncoding.encode(state, m);
raw.encode(state, m.data);
},
decode(state: any, m: any): WriteSocketRequest {
const socket = socketEncoding.decode(state, m);
return {
...socket,
data: raw.decode(state, m),
};
},
};
const errorSocketEncoding = {
decode(state: any, m: any): ErrorSocketRequest {
const socket = socketEncoding.decode(state, m);
return {
...socket,
err: deserializeError(json.decode(state, m)),
};
},
};
export default class PeerManager {
private static _instance: PeerManager;
public static get instance(): PeerManager {
if (!PeerManager._instance) {
PeerManager._instance = new PeerManager();
}
return PeerManager._instance;
}
private _sockets = new Map<number, Socket>();
get sockets(): Map<number, Socket> {
return this._sockets;
}
private _socketMap = new Map<number, number>();
get socketMap(): Map<number, number> {
return this._socketMap;
}
private _peers: Map<string, PeerEntity> = new Map<string, PeerEntity>();
private _nextPeer = roundRobinFactory(this._peers);
get peers(): Map<string, PeerEntity> {
return this._peers;
}
private _ipfs?: Helia;
get ipfs(): Helia {
return this._ipfs as Helia;
}
set ipfs(value: Helia) {
this._ipfs = value as Helia;
}
private _ipfsReady?: Promise<unknown>;
private _ipfsResolve?: () => void;
get ipfsReady(): Promise<void> {
if (!this._ipfsReady) {
let ipfsDefer = defer();
this._ipfsReady = ipfsDefer.promise;
this._ipfsResolve = ipfsDefer.resolve;
}
return this._ipfsReady as Promise<any>;
}
handleNewPeerChannel(peer: Peer, channel: any) {
this.update(peer.socket.remotePublicKey, { peer });
this._registerOpenSocketMessage(peer, channel);
this._registerWriteSocketMessage(peer, channel);
this._registerCloseSocketMessage(peer, channel);
this._registerTimeoutSocketMessage(peer, channel);
this._registerErrorSocketMessage(peer, channel);
}
async handleNewPeer() {
if (!this.ipfs.libp2p.isStarted()) {
await this.ipfs.libp2p.start();
this._ipfsResolve?.();
}
}
async handleClosePeer(peer: Peer) {
for (const item of this._sockets) {
if (item[1].peer.peer === peer) {
item[1].end();
}
}
const pubkey = this._toString(peer.socket.remotePublicKey);
if (this._peers.has(pubkey)) {
this._peers.delete(pubkey);
}
}
public get(pubkey: Uint8Array): PeerEntity | undefined {
if (this._peers.has(this._toString(pubkey))) {
return this._peers.get(this._toString(pubkey)) as PeerEntity;
}
return undefined;
}
public update(pubkey: Uint8Array, data: Partial<PeerEntity>): void {
const peer = this.get(pubkey) ?? ({} as PeerEntity);
this._peers.set(this._toString(pubkey), {
...peer,
...data,
...{
messages: {
...peer?.messages,
...data?.messages,
},
},
} as PeerEntity);
}
public async createSocket(options: TcpSocketConnectOpts): Promise<Socket> {
if (!this.peers.size) {
throw new Error("no peers found");
}
const peer = this._nextPeer();
const socketId = nextSocketId();
const socket = new Socket(socketId, this, peer, options);
this._sockets.set(socketId, socket);
return socket;
}
private _registerOpenSocketMessage(peer: Peer, channel: any) {
const self = this;
const message = channel.addMessage({
encoding: {
preencode: json.preencode,
encode: json.encode,
decode: socketEncoding.decode,
},
async onmessage(m: SocketRequest) {
const socket = self._sockets.get(m.id);
if (socket) {
socket.remoteId = m.remoteId;
// @ts-ignore
socket.emit("connect");
}
},
});
this.update(peer.socket.remotePublicKey, {
messages: { openSocket: message },
});
}
private _registerWriteSocketMessage(peer: Peer, channel: any) {
const self = this;
const message = channel.addMessage({
encoding: writeSocketEncoding,
onmessage(m: WriteSocketRequest) {
self._sockets.get(m.id)?.push(m.data);
},
});
this.update(peer.socket.remotePublicKey, {
messages: { writeSocket: message },
});
}
private _registerCloseSocketMessage(peer: Peer, channel: any) {
const self = this;
const message = channel.addMessage({
encoding: socketEncoding,
onmessage(m: CloseSocketRequest) {
self._sockets.get(m.id)?.end();
},
});
this.update(peer.socket.remotePublicKey, {
messages: { closeSocket: message },
});
}
private _registerTimeoutSocketMessage(peer: Peer, channel: any) {
const self = this;
const message = channel.addMessage({
encoding: socketEncoding,
onmessage(m: SocketRequest) {
// @ts-ignore
self._sockets.get(m.id)?.emit("timeout");
},
});
this.update(peer.socket.remotePublicKey, {
messages: { timeoutSocket: message },
});
}
private _registerErrorSocketMessage(peer: Peer, channel: any) {
const self = this;
const message = channel.addMessage({
encoding: errorSocketEncoding,
onmessage(m: ErrorSocketRequest) {
// @ts-ignore
self._sockets.get(m.id)?.emit("error", m.err);
},
});
this.update(peer.socket.remotePublicKey, {
messages: { errorSocket: message },
});
}
private _toString(pubkey: Uint8Array) {
return b4a.from(pubkey).toString("hex");
}
}

View File

@ -1,158 +0,0 @@
import { Callback, Duplex } from "streamx";
import { TcpSocketConnectOpts } from "net";
import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js";
import PeerManager from "./peerManager.js";
import { clearTimeout } from "timers";
import { maybeGetAsyncProperty } from "@lumeweb/libkernel-universal";
const asyncIterator = Symbol.asyncIterator || Symbol("asyncIterator");
const STREAM_DESTROYED = new Error("Stream was destroyed");
const READ_DONE = 0b0010000000000 << 4;
const DESTROYED = 0b1000;
export class Socket extends Duplex {
private _options: TcpSocketConnectOpts;
private _id: number;
private _manager: PeerManager;
private _connectTimeout?: number;
constructor(
id: number,
manager: PeerManager,
peer: PeerEntity,
options: TcpSocketConnectOpts
) {
super();
this._id = id;
this._manager = manager;
this._peer = peer;
this._options = options;
// @ts-ignore
this.on("timeout", () => {
if (this._connectTimeout) {
clearTimeout(this._connectTimeout);
}
});
}
private _remoteId = 0;
set remoteId(value: number) {
this._remoteId = value;
this._manager.socketMap.set(this._id, value);
}
private _peer;
get peer() {
return this._peer;
}
public async _write(data: any, cb: any): Promise<void> {
(await maybeGetAsyncProperty(this._peer.messages.writeSocket))?.send({
id: this._id,
remoteId: this._remoteId,
data,
} as WriteSocketRequest);
cb();
}
public async _destroy(cb: Callback) {
(await maybeGetAsyncProperty(this._peer.messages.closeSocket))?.send({
id: this._id,
remoteId: this._remoteId,
} as SocketRequest);
this._manager.socketMap.delete(this._id);
this._manager.sockets.delete(this._id);
}
public async connect() {
(await maybeGetAsyncProperty(this._peer.messages.openSocket))?.send({
...this._options,
id: this._id,
});
}
public setTimeout(ms: number, cb: Function) {
if (this._connectTimeout) {
clearTimeout(this._connectTimeout);
}
this._connectTimeout = setTimeout(() => {
cb && cb();
}, ms) as any;
}
[asyncIterator]() {
const stream = this;
let error: Error | null = null;
let promiseResolve: ((arg0: { value: any; done: boolean }) => void) | null =
null;
let promiseReject: ((arg0: Error) => void) | null = null;
this.on("error", (err) => {
error = err;
});
this.on("data", ondata);
this.on("close", onclose);
return {
[asyncIterator]() {
return this;
},
next() {
return new Promise(function (resolve, reject) {
promiseResolve = resolve;
promiseReject = reject;
const data = stream.read();
if (data !== null) ondata(data);
else {
// @ts-ignore
if ((stream._duplexState & DESTROYED) !== 0) ondata(null);
}
});
},
return() {
return destroy(null);
},
throw(err: any) {
return destroy(err);
},
};
function onreadable() {
if (promiseResolve !== null) ondata(stream.read());
}
function onclose() {
if (promiseResolve !== null) ondata(null);
}
function ondata(data: any) {
if (promiseReject === null) return;
if (error) promiseReject(error);
// @ts-ignore
else if (data === null && (stream._duplexState & READ_DONE) === 0)
promiseReject(STREAM_DESTROYED);
else promiseResolve?.({ value: data, done: data === null });
promiseReject = promiseResolve = null;
}
function destroy(err: any) {
stream.destroy(err);
return new Promise((resolve, reject) => {
// @ts-ignore
if (stream._duplexState & DESTROYED)
return resolve({ value: undefined, done: true });
stream.once("close", function () {
if (err) reject(err);
else resolve({ value: undefined, done: true });
});
});
}
}
}

View File

@ -1,40 +0,0 @@
import { Peer } from "@lumeweb/libhyperproxy";
type Message = {
send: (pubkey: Uint8Array | any) => void;
};
export interface PeerEntityMessages {
keyExchange: Message;
openSocket: Message;
writeSocket: Message;
closeSocket: Message;
timeoutSocket: Message;
errorSocket: Message;
}
export interface PeerEntity {
messages: PeerEntityMessages | Partial<PeerEntityMessages>;
submitKeyExchange: (pubkey: Uint8Array) => void;
peer: Peer;
}
export interface PeerInfoResult {
publicKey: Uint8Array;
libp2pPublicKey: Uint8Array;
}
export interface SocketRequest {
remoteId: number;
id: number;
}
export type CloseSocketRequest = SocketRequest;
export interface WriteSocketRequest extends SocketRequest {
data: Uint8Array;
}
export interface ErrorSocketRequest extends SocketRequest {
err: Error;
}