Compare commits

..

No commits in common. "499d2cfbf71c28cb579bcd111a18b5510950b44a" and "824881ed88477155ece20a4dcfae73be7d36002a" have entirely different histories.

7 changed files with 543 additions and 49 deletions

View File

@ -9,22 +9,22 @@
"build": "npm run compile && node ./dist-build/build.mjs dev" "build": "npm run compile && node ./dist-build/build.mjs dev"
}, },
"dependencies": { "dependencies": {
"@chainsafe/libp2p-gossipsub": "^6.3.0", "@chainsafe/libp2p-gossipsub": "^6.2.0",
"@chainsafe/libp2p-noise": "^11.0.4", "@chainsafe/libp2p-noise": "^11.0.4",
"@chainsafe/libp2p-yamux": "^3.0.10", "@chainsafe/libp2p-yamux": "^3.0.7",
"@dao-xyz/libp2p-noise": "^11.1.3", "@dao-xyz/libp2p-noise": "^11.1.3",
"@helia/ipns": "^1.1.0", "@helia/ipns": "^1.1.0",
"@helia/unixfs": "^1.2.2", "@helia/unixfs": "^1.2.1",
"@libp2p/bootstrap": "^6.0.3", "@libp2p/bootstrap": "^6.0.3",
"@libp2p/crypto": "^1.0.15", "@libp2p/crypto": "^1.0.15",
"@libp2p/delegated-content-routing": "^4.0.3", "@libp2p/delegated-content-routing": "^4.0.3",
"@libp2p/delegated-peer-routing": "^4.0.5", "@libp2p/delegated-peer-routing": "^4.0.5",
"@libp2p/interface-metrics": "^4.0.6", "@libp2p/interface-metrics": "^4.0.5",
"@libp2p/interfaces": "^3.3.1", "@libp2p/interfaces": "^3.3.1",
"@libp2p/logger": "^2.0.7", "@libp2p/logger": "^2.0.7",
"@libp2p/mplex": "^7.1.7", "@libp2p/mplex": "^7.1.3",
"@libp2p/tcp": "^6.2.1", "@libp2p/tcp": "^6.1.5",
"@libp2p/utils": "^3.0.8", "@libp2p/utils": "^3.0.7",
"@lumeweb/kernel-protomux-client": "git+https://git.lumeweb.com/LumeWeb/kernel-protomux-client.git", "@lumeweb/kernel-protomux-client": "git+https://git.lumeweb.com/LumeWeb/kernel-protomux-client.git",
"@lumeweb/kernel-swarm-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git", "@lumeweb/kernel-swarm-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git",
"@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git", "@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git",
@ -46,7 +46,7 @@
"p-queue": "^7.3.4", "p-queue": "^7.3.4",
"private-ip": "^3.0.0", "private-ip": "^3.0.0",
"rewire": "^6.0.0", "rewire": "^6.0.0",
"runes2": "^1.1.2", "runes2": "^1.0.10",
"serialize-error": "^11.0.0", "serialize-error": "^11.0.0",
"sodium-universal": "^4.0.0", "sodium-universal": "^4.0.0",
"streamx": "^2.13.2", "streamx": "^2.13.2",
@ -56,11 +56,11 @@
"@helia/interface": "^0.0.0", "@helia/interface": "^0.0.0",
"@libp2p/interface-connection": "^3.1.1", "@libp2p/interface-connection": "^3.1.1",
"@libp2p/interface-peer-info": "^1.0.9", "@libp2p/interface-peer-info": "^1.0.9",
"@libp2p/interface-transport": "^2.1.3", "@libp2p/interface-transport": "^2.1.2",
"@libp2p/kad-dht": "^7.0.3", "@libp2p/kad-dht": "^7.0.3",
"@libp2p/peer-id": "^2.0.3", "@libp2p/peer-id": "^2.0.3",
"@libp2p/peer-id-factory": "^2.0.3", "@libp2p/peer-id-factory": "^2.0.3",
"@libp2p/websockets": "^5.0.10", "@libp2p/websockets": "^5.0.8",
"@lumeweb/libhyperproxy": "git+https://git.lumeweb.com/LumeWeb/libhyperproxy.git", "@lumeweb/libhyperproxy": "git+https://git.lumeweb.com/LumeWeb/libhyperproxy.git",
"@multiformats/multiaddr": "^11.6.1", "@multiformats/multiaddr": "^11.6.1",
"@scure/bip39": "^1.2.0", "@scure/bip39": "^1.2.0",
@ -109,7 +109,7 @@
"protomux": "git+https://git.lumeweb.com/LumeWeb/kernel-protomux-client.git" "protomux": "git+https://git.lumeweb.com/LumeWeb/kernel-protomux-client.git"
}, },
"patchedDependencies": { "patchedDependencies": {
"@libp2p/tcp@6.2.1": "patches/@libp2p__tcp@6.2.1.patch", "@libp2p/tcp@6.1.5": "patches/@libp2p__tcp@6.1.5.patch",
"b4a@1.6.3": "patches/b4a@1.6.3.patch" "b4a@1.6.3": "patches/b4a@1.6.3.patch"
} }
} }

View File

@ -3,7 +3,7 @@ import { createHelia } from "helia";
import { yamux } from "@chainsafe/libp2p-yamux"; import { yamux } from "@chainsafe/libp2p-yamux";
// @ts-ignore // @ts-ignore
import Hyperswarm from "hyperswarm"; import Hyperswarm from "hyperswarm";
import { Peer, MultiSocketProxy } from "@lumeweb/libhyperproxy"; import { Peer, Proxy } from "@lumeweb/libhyperproxy";
// @ts-ignore // @ts-ignore
import sodium from "sodium-universal"; import sodium from "sodium-universal";
// @ts-ignore // @ts-ignore
@ -11,6 +11,7 @@ import { CustomEvent } from "@libp2p/interfaces/events";
// @ts-ignore // @ts-ignore
import { fixed32, raw } from "compact-encoding"; import { fixed32, raw } from "compact-encoding";
import { mplex } from "@libp2p/mplex"; import { mplex } from "@libp2p/mplex";
import PeerManager from "./peerManager.js";
import { hypercoreTransport } from "./libp2p/transport.js"; import { hypercoreTransport } from "./libp2p/transport.js";
import { UnixFS, unixfs } from "@helia/unixfs"; import { UnixFS, unixfs } from "@helia/unixfs";
// @ts-ignore // @ts-ignore
@ -38,7 +39,6 @@ import { bootstrap } from "@libp2p/bootstrap";
import { IDBBlockstore } from "blockstore-idb"; import { IDBBlockstore } from "blockstore-idb";
import { IDBDatastore } from "datastore-idb"; import { IDBDatastore } from "datastore-idb";
import defer from "p-defer"; import defer from "p-defer";
import { Helia } from "@helia/interface";
const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys( const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys(
bases bases
@ -51,14 +51,12 @@ const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys(
onmessage = handleMessage; onmessage = handleMessage;
const moduleDefer = defer(); const moduleDefer = defer();
let activeIpfsPeersDefer = defer(); let activePeersDefer = defer();
let networkPeersAvailable = defer();
let swarm; let swarm;
let proxy: MultiSocketProxy; let proxy: Proxy;
let fs: UnixFS; let fs: UnixFS;
let IPNS: IPNS; let IPNS: IPNS;
let ipfs: Helia;
// @ts-ignore // @ts-ignore
BigInt.prototype.toJSON = function () { BigInt.prototype.toJSON = function () {
@ -78,15 +76,6 @@ async function handlePresentSeed() {
const client = createIpfsHttpClient(getDelegateConfig()); const client = createIpfsHttpClient(getDelegateConfig());
proxy = new MultiSocketProxy({
swarm,
listen: true,
protocol: PROTOCOL,
autostart: true,
emulateWebsocket: true,
server: false,
});
const libp2p = await createLibp2p({ const libp2p = await createLibp2p({
peerDiscovery: [ peerDiscovery: [
bootstrap({ bootstrap({
@ -148,12 +137,10 @@ async function handlePresentSeed() {
], ],
}), }),
], ],
transports: [hypercoreTransport({ proxy })], transports: [hypercoreTransport({ peerManager: PeerManager.instance })],
connectionEncryption: [noise()], connectionEncryption: [noise()],
connectionManager: { connectionManager: {
autoDial: true, autoDial: true,
minConnections: 5,
maxConnections: 20,
}, },
streamMuxers: [yamux(), mplex()], streamMuxers: [yamux(), mplex()],
start: false, start: false,
@ -182,7 +169,7 @@ async function handlePresentSeed() {
await blockstore.open(); await blockstore.open();
await datastore.open(); await datastore.open();
ipfs = await createHelia({ const ipfs = await createHelia({
// @ts-ignore // @ts-ignore
blockstore, blockstore,
// @ts-ignore // @ts-ignore
@ -190,13 +177,28 @@ async function handlePresentSeed() {
libp2p, libp2p,
}); });
proxy.on("peerChannelOpen", async () => { PeerManager.instance.ipfs = ipfs;
if (!ipfs.libp2p.isStarted()) {
await ipfs.libp2p.start(); proxy = new Proxy({
networkPeersAvailable.resolve(); swarm,
} listen: true,
protocol: PROTOCOL,
autostart: true,
emulateWebsocket: true,
createDefaultMessage: false,
onchannel(peer: Peer, channel: any) {
PeerManager.instance.handleNewPeerChannel(peer, channel);
},
onopen() {
PeerManager.instance.handleNewPeer();
},
onclose(peer: Peer) {
PeerManager.instance.handleClosePeer(peer);
},
}); });
PeerManager.instance.ipfsReady;
swarm.join(PROTOCOL); swarm.join(PROTOCOL);
await swarm.start(); await swarm.start();
await swarm.ready(); await swarm.ready();
@ -206,13 +208,13 @@ async function handlePresentSeed() {
ipfs.libp2p.addEventListener("peer:connect", () => { ipfs.libp2p.addEventListener("peer:connect", () => {
if (ipfs.libp2p.getPeers().length > 0) { if (ipfs.libp2p.getPeers().length > 0) {
activeIpfsPeersDefer.resolve(); activePeersDefer.resolve();
} }
}); });
ipfs.libp2p.addEventListener("peer:disconnect", () => { ipfs.libp2p.addEventListener("peer:disconnect", () => {
if (ipfs.libp2p.getPeers().length === 0) { if (ipfs.libp2p.getPeers().length === 0) {
activeIpfsPeersDefer = defer(); activePeersDefer = defer();
} }
}); });
@ -337,13 +339,13 @@ async function handleCat(aq: ActiveQuery) {
async function handleIpnsResolve(aq: ActiveQuery) { async function handleIpnsResolve(aq: ActiveQuery) {
await ready(); await ready();
await activeIpfsPeersDefer.promise; await activePeersDefer.promise;
if (ipfs.libp2p.getPeers().length === 0) { if (PeerManager.instance.ipfs.libp2p.getPeers().length === 0) {
activeIpfsPeersDefer = defer(); activePeersDefer = defer();
} }
await activeIpfsPeersDefer.promise; await activePeersDefer.promise;
if (!aq.callerInput || !("cid" in aq.callerInput)) { if (!aq.callerInput || !("cid" in aq.callerInput)) {
aq.reject("cid required"); aq.reject("cid required");
@ -382,12 +384,12 @@ function getCID(cid: string): CID {
async function handleGetActivePeers(aq: ActiveQuery) { async function handleGetActivePeers(aq: ActiveQuery) {
await ready(); await ready();
aq.respond(ipfs.libp2p.getPeers()); aq.respond(PeerManager.instance.ipfs.libp2p.getPeers());
} }
async function ready() { async function ready() {
await moduleDefer.promise; await moduleDefer.promise;
await networkPeersAvailable.promise; await PeerManager.instance.ipfsReady;
} }
function getDelegateConfig(): Options { function getDelegateConfig(): Options {

View File

@ -1,12 +1,14 @@
import { symbol } from "@libp2p/interface-transport"; import { symbol } from "@libp2p/interface-transport";
// @ts-ignore // @ts-ignore
import { TCP, TCPComponents, TCPDialOptions, TCPOptions } from "@libp2p/tcp"; import { TCP, TCPComponents, TCPDialOptions, TCPOptions } from "@libp2p/tcp";
import PeerManager from "../peerManager.js";
import { Multiaddr } from "@multiformats/multiaddr"; import { Multiaddr } from "@multiformats/multiaddr";
import { IpcSocketConnectOpts, TcpSocketConnectOpts } from "net"; import { IpcSocketConnectOpts, TcpSocketConnectOpts } from "net";
import { logger } from "@libp2p/logger"; import { logger } from "@libp2p/logger";
import { AbortError, CodeError } from "@libp2p/interfaces/errors"; import { AbortError, CodeError } from "@libp2p/interfaces/errors";
// @ts-ignore // @ts-ignore
import { multiaddrToNetConfig } from "@libp2p/tcp/utils"; import { multiaddrToNetConfig } from "@libp2p/tcp/utils";
import { Socket } from "../socket.js";
import { Connection } from "@libp2p/interface-connection"; import { Connection } from "@libp2p/interface-connection";
// @ts-ignore // @ts-ignore
import { toMultiaddrConnection } from "@libp2p/tcp/socket-to-conn"; import { toMultiaddrConnection } from "@libp2p/tcp/socket-to-conn";
@ -15,14 +17,13 @@ import * as mafmt from "@multiformats/mafmt";
const log = logger("libp2p:hypercore"); const log = logger("libp2p:hypercore");
import isPrivateIp from "private-ip"; import isPrivateIp from "private-ip";
import { DummySocket, MultiSocketProxy, Socket } from "@lumeweb/libhyperproxy";
const CODE_P2P = 421; const CODE_P2P = 421;
const CODE_CIRCUIT = 290; const CODE_CIRCUIT = 290;
const CODE_UNIX = 400; const CODE_UNIX = 400;
export interface HypercoreOptions extends TCPOptions { export interface HypercoreOptions extends TCPOptions {
proxy?: MultiSocketProxy; peerManager?: PeerManager;
} }
class HypercoreTransport extends TCP { class HypercoreTransport extends TCP {
private readonly opts?: HypercoreOptions; private readonly opts?: HypercoreOptions;
@ -30,9 +31,10 @@ class HypercoreTransport extends TCP {
constructor(components: TCPComponents, options: HypercoreOptions = {}) { constructor(components: TCPComponents, options: HypercoreOptions = {}) {
super(components, options); super(components, options);
this.opts = options; this.opts = options;
if (!options.proxy) { if (!options.peerManager) {
throw new Error("options.peerManager is required"); throw new Error("options.peerManager is required");
} }
this.opts?.peerManager;
} }
get [symbol](): true { get [symbol](): true {
@ -50,7 +52,7 @@ class HypercoreTransport extends TCP {
const socket = await this._connect(ma, options); const socket = await this._connect(ma, options);
// Avoid uncaught errors caused by unstable connections // Avoid uncaught errors caused by unstable connections
socket.on("error", (err: any) => { socket.on("error", (err) => {
log("socket error", err); log("socket error", err);
}); });
@ -152,7 +154,9 @@ class HypercoreTransport extends TCP {
}; };
try { try {
rawSocket = (await this.opts?.proxy?.createSocket(cOpts)) as Socket; rawSocket = (await this.opts?.peerManager?.createSocket(
cOpts
)) as Socket;
} catch (e: any) { } catch (e: any) {
onError(e); onError(e);
} }
@ -171,7 +175,7 @@ class HypercoreTransport extends TCP {
options.signal.addEventListener("abort", onAbort); options.signal.addEventListener("abort", onAbort);
} }
(rawSocket as DummySocket)?.connect(); rawSocket?.connect();
}); });
} }

290
src/peerManager.ts Normal file
View File

@ -0,0 +1,290 @@
import { Peer } from "@lumeweb/libhyperproxy";
import b4a from "b4a";
// @ts-ignore
import { fixed32, json, raw, uint } from "compact-encoding";
import { TcpSocketConnectOpts } from "net";
import { Helia } from "@helia/interface";
import { deserializeError } from "serialize-error";
import defer from "p-defer";
import {
CloseSocketRequest,
ErrorSocketRequest,
PeerEntity,
PeerInfoResult,
SocketRequest,
WriteSocketRequest,
} from "./types.js";
import { Socket } from "./socket.js";
function idFactory(start: number, step = 1, limit = 2 ** 32) {
let id = start;
return function nextId() {
const nextId = id;
id += step;
if (id >= limit) id = start;
return nextId;
};
}
const nextSocketId = idFactory(1);
function roundRobinFactory(list: Map<string, any>) {
let index = 0;
return (): PeerEntity => {
const keys = [...list.keys()].sort();
if (index >= keys.length) {
index = 0;
}
return list.get(keys[index++]);
};
}
const socketEncoding = {
preencode(state: any, m: SocketRequest) {
uint.preencode(state, m.id);
uint.preencode(state, m.remoteId);
},
encode(state: any, m: SocketRequest) {
uint.encode(state, m.id);
uint.encode(state, m.remoteId);
},
decode(state: any, m: any): SocketRequest {
return {
remoteId: uint.decode(state, m),
id: uint.decode(state, m),
};
},
};
const writeSocketEncoding = {
preencode(state: any, m: WriteSocketRequest) {
socketEncoding.preencode(state, m);
raw.preencode(state, m.data);
},
encode(state: any, m: WriteSocketRequest) {
socketEncoding.encode(state, m);
raw.encode(state, m.data);
},
decode(state: any, m: any): WriteSocketRequest {
const socket = socketEncoding.decode(state, m);
return {
...socket,
data: raw.decode(state, m),
};
},
};
const errorSocketEncoding = {
decode(state: any, m: any): ErrorSocketRequest {
const socket = socketEncoding.decode(state, m);
return {
...socket,
err: deserializeError(json.decode(state, m)),
};
},
};
export default class PeerManager {
private static _instance: PeerManager;
public static get instance(): PeerManager {
if (!PeerManager._instance) {
PeerManager._instance = new PeerManager();
}
return PeerManager._instance;
}
private _sockets = new Map<number, Socket>();
get sockets(): Map<number, Socket> {
return this._sockets;
}
private _socketMap = new Map<number, number>();
get socketMap(): Map<number, number> {
return this._socketMap;
}
private _peers: Map<string, PeerEntity> = new Map<string, PeerEntity>();
private _nextPeer = roundRobinFactory(this._peers);
get peers(): Map<string, PeerEntity> {
return this._peers;
}
private _ipfs?: Helia;
get ipfs(): Helia {
return this._ipfs as Helia;
}
set ipfs(value: Helia) {
this._ipfs = value as Helia;
}
private _ipfsReady?: Promise<unknown>;
private _ipfsResolve?: () => void;
get ipfsReady(): Promise<void> {
if (!this._ipfsReady) {
let ipfsDefer = defer();
this._ipfsReady = ipfsDefer.promise;
this._ipfsResolve = ipfsDefer.resolve;
}
return this._ipfsReady as Promise<any>;
}
handleNewPeerChannel(peer: Peer, channel: any) {
this.update(peer.socket.remotePublicKey, { peer });
this._registerOpenSocketMessage(peer, channel);
this._registerWriteSocketMessage(peer, channel);
this._registerCloseSocketMessage(peer, channel);
this._registerTimeoutSocketMessage(peer, channel);
this._registerErrorSocketMessage(peer, channel);
}
async handleNewPeer() {
if (!this.ipfs.libp2p.isStarted()) {
await this.ipfs.libp2p.start();
this._ipfsResolve?.();
}
}
async handleClosePeer(peer: Peer) {
for (const item of this._sockets) {
if (item[1].peer.peer === peer) {
item[1].end();
}
}
const pubkey = this._toString(peer.socket.remotePublicKey);
if (this._peers.has(pubkey)) {
this._peers.delete(pubkey);
}
}
public get(pubkey: Uint8Array): PeerEntity | undefined {
if (this._peers.has(this._toString(pubkey))) {
return this._peers.get(this._toString(pubkey)) as PeerEntity;
}
return undefined;
}
public update(pubkey: Uint8Array, data: Partial<PeerEntity>): void {
const peer = this.get(pubkey) ?? ({} as PeerEntity);
this._peers.set(this._toString(pubkey), {
...peer,
...data,
...{
messages: {
...peer?.messages,
...data?.messages,
},
},
} as PeerEntity);
}
public async createSocket(options: TcpSocketConnectOpts): Promise<Socket> {
if (!this.peers.size) {
throw new Error("no peers found");
}
const peer = this._nextPeer();
const socketId = nextSocketId();
const socket = new Socket(socketId, this, peer, options);
this._sockets.set(socketId, socket);
return socket;
}
private _registerOpenSocketMessage(peer: Peer, channel: any) {
const self = this;
const message = channel.addMessage({
encoding: {
preencode: json.preencode,
encode: json.encode,
decode: socketEncoding.decode,
},
async onmessage(m: SocketRequest) {
const socket = self._sockets.get(m.id);
if (socket) {
socket.remoteId = m.remoteId;
// @ts-ignore
socket.emit("connect");
}
},
});
this.update(peer.socket.remotePublicKey, {
messages: { openSocket: message },
});
}
private _registerWriteSocketMessage(peer: Peer, channel: any) {
const self = this;
const message = channel.addMessage({
encoding: writeSocketEncoding,
onmessage(m: WriteSocketRequest) {
self._sockets.get(m.id)?.push(m.data);
},
});
this.update(peer.socket.remotePublicKey, {
messages: { writeSocket: message },
});
}
private _registerCloseSocketMessage(peer: Peer, channel: any) {
const self = this;
const message = channel.addMessage({
encoding: socketEncoding,
onmessage(m: CloseSocketRequest) {
self._sockets.get(m.id)?.end();
},
});
this.update(peer.socket.remotePublicKey, {
messages: { closeSocket: message },
});
}
private _registerTimeoutSocketMessage(peer: Peer, channel: any) {
const self = this;
const message = channel.addMessage({
encoding: socketEncoding,
onmessage(m: SocketRequest) {
// @ts-ignore
self._sockets.get(m.id)?.emit("timeout");
},
});
this.update(peer.socket.remotePublicKey, {
messages: { timeoutSocket: message },
});
}
private _registerErrorSocketMessage(peer: Peer, channel: any) {
const self = this;
const message = channel.addMessage({
encoding: errorSocketEncoding,
onmessage(m: ErrorSocketRequest) {
// @ts-ignore
self._sockets.get(m.id)?.emit("error", m.err);
},
});
this.update(peer.socket.remotePublicKey, {
messages: { errorSocket: message },
});
}
private _toString(pubkey: Uint8Array) {
return b4a.from(pubkey).toString("hex");
}
}

158
src/socket.ts Normal file
View File

@ -0,0 +1,158 @@
import { Callback, Duplex } from "streamx";
import { TcpSocketConnectOpts } from "net";
import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js";
import PeerManager from "./peerManager.js";
import { clearTimeout } from "timers";
import { maybeGetAsyncProperty } from "@lumeweb/libkernel-universal";
const asyncIterator = Symbol.asyncIterator || Symbol("asyncIterator");
const STREAM_DESTROYED = new Error("Stream was destroyed");
const READ_DONE = 0b0010000000000 << 4;
const DESTROYED = 0b1000;
export class Socket extends Duplex {
private _options: TcpSocketConnectOpts;
private _id: number;
private _manager: PeerManager;
private _connectTimeout?: number;
constructor(
id: number,
manager: PeerManager,
peer: PeerEntity,
options: TcpSocketConnectOpts
) {
super();
this._id = id;
this._manager = manager;
this._peer = peer;
this._options = options;
// @ts-ignore
this.on("timeout", () => {
if (this._connectTimeout) {
clearTimeout(this._connectTimeout);
}
});
}
private _remoteId = 0;
set remoteId(value: number) {
this._remoteId = value;
this._manager.socketMap.set(this._id, value);
}
private _peer;
get peer() {
return this._peer;
}
public async _write(data: any, cb: any): Promise<void> {
(await maybeGetAsyncProperty(this._peer.messages.writeSocket))?.send({
id: this._id,
remoteId: this._remoteId,
data,
} as WriteSocketRequest);
cb();
}
public async _destroy(cb: Callback) {
(await maybeGetAsyncProperty(this._peer.messages.closeSocket))?.send({
id: this._id,
remoteId: this._remoteId,
} as SocketRequest);
this._manager.socketMap.delete(this._id);
this._manager.sockets.delete(this._id);
}
public async connect() {
(await maybeGetAsyncProperty(this._peer.messages.openSocket))?.send({
...this._options,
id: this._id,
});
}
public setTimeout(ms: number, cb: Function) {
if (this._connectTimeout) {
clearTimeout(this._connectTimeout);
}
this._connectTimeout = setTimeout(() => {
cb && cb();
}, ms) as any;
}
[asyncIterator]() {
const stream = this;
let error: Error | null = null;
let promiseResolve: ((arg0: { value: any; done: boolean }) => void) | null =
null;
let promiseReject: ((arg0: Error) => void) | null = null;
this.on("error", (err) => {
error = err;
});
this.on("data", ondata);
this.on("close", onclose);
return {
[asyncIterator]() {
return this;
},
next() {
return new Promise(function (resolve, reject) {
promiseResolve = resolve;
promiseReject = reject;
const data = stream.read();
if (data !== null) ondata(data);
else {
// @ts-ignore
if ((stream._duplexState & DESTROYED) !== 0) ondata(null);
}
});
},
return() {
return destroy(null);
},
throw(err: any) {
return destroy(err);
},
};
function onreadable() {
if (promiseResolve !== null) ondata(stream.read());
}
function onclose() {
if (promiseResolve !== null) ondata(null);
}
function ondata(data: any) {
if (promiseReject === null) return;
if (error) promiseReject(error);
// @ts-ignore
else if (data === null && (stream._duplexState & READ_DONE) === 0)
promiseReject(STREAM_DESTROYED);
else promiseResolve?.({ value: data, done: data === null });
promiseReject = promiseResolve = null;
}
function destroy(err: any) {
stream.destroy(err);
return new Promise((resolve, reject) => {
// @ts-ignore
if (stream._duplexState & DESTROYED)
return resolve({ value: undefined, done: true });
stream.once("close", function () {
if (err) reject(err);
else resolve({ value: undefined, done: true });
});
});
}
}
}

40
src/types.ts Normal file
View File

@ -0,0 +1,40 @@
import { Peer } from "@lumeweb/libhyperproxy";
type Message = {
send: (pubkey: Uint8Array | any) => void;
};
export interface PeerEntityMessages {
keyExchange: Message;
openSocket: Message;
writeSocket: Message;
closeSocket: Message;
timeoutSocket: Message;
errorSocket: Message;
}
export interface PeerEntity {
messages: PeerEntityMessages | Partial<PeerEntityMessages>;
submitKeyExchange: (pubkey: Uint8Array) => void;
peer: Peer;
}
export interface PeerInfoResult {
publicKey: Uint8Array;
libp2pPublicKey: Uint8Array;
}
export interface SocketRequest {
remoteId: number;
id: number;
}
export type CloseSocketRequest = SocketRequest;
export interface WriteSocketRequest extends SocketRequest {
data: Uint8Array;
}
export interface ErrorSocketRequest extends SocketRequest {
err: Error;
}