Compare commits
3 Commits
824881ed88
...
499d2cfbf7
Author | SHA1 | Date |
---|---|---|
Derrick Hammer | 499d2cfbf7 | |
Derrick Hammer | acca3680ac | |
Derrick Hammer | 2fd5b11582 |
22
package.json
22
package.json
|
@ -9,22 +9,22 @@
|
|||
"build": "npm run compile && node ./dist-build/build.mjs dev"
|
||||
},
|
||||
"dependencies": {
|
||||
"@chainsafe/libp2p-gossipsub": "^6.2.0",
|
||||
"@chainsafe/libp2p-gossipsub": "^6.3.0",
|
||||
"@chainsafe/libp2p-noise": "^11.0.4",
|
||||
"@chainsafe/libp2p-yamux": "^3.0.7",
|
||||
"@chainsafe/libp2p-yamux": "^3.0.10",
|
||||
"@dao-xyz/libp2p-noise": "^11.1.3",
|
||||
"@helia/ipns": "^1.1.0",
|
||||
"@helia/unixfs": "^1.2.1",
|
||||
"@helia/unixfs": "^1.2.2",
|
||||
"@libp2p/bootstrap": "^6.0.3",
|
||||
"@libp2p/crypto": "^1.0.15",
|
||||
"@libp2p/delegated-content-routing": "^4.0.3",
|
||||
"@libp2p/delegated-peer-routing": "^4.0.5",
|
||||
"@libp2p/interface-metrics": "^4.0.5",
|
||||
"@libp2p/interface-metrics": "^4.0.6",
|
||||
"@libp2p/interfaces": "^3.3.1",
|
||||
"@libp2p/logger": "^2.0.7",
|
||||
"@libp2p/mplex": "^7.1.3",
|
||||
"@libp2p/tcp": "^6.1.5",
|
||||
"@libp2p/utils": "^3.0.7",
|
||||
"@libp2p/mplex": "^7.1.7",
|
||||
"@libp2p/tcp": "^6.2.1",
|
||||
"@libp2p/utils": "^3.0.8",
|
||||
"@lumeweb/kernel-protomux-client": "git+https://git.lumeweb.com/LumeWeb/kernel-protomux-client.git",
|
||||
"@lumeweb/kernel-swarm-client": "git+https://git.lumeweb.com/LumeWeb/kernel-swarm-client.git",
|
||||
"@lumeweb/libkernel-universal": "git+https://git.lumeweb.com/LumeWeb/libkernel-universal.git",
|
||||
|
@ -46,7 +46,7 @@
|
|||
"p-queue": "^7.3.4",
|
||||
"private-ip": "^3.0.0",
|
||||
"rewire": "^6.0.0",
|
||||
"runes2": "^1.0.10",
|
||||
"runes2": "^1.1.2",
|
||||
"serialize-error": "^11.0.0",
|
||||
"sodium-universal": "^4.0.0",
|
||||
"streamx": "^2.13.2",
|
||||
|
@ -56,11 +56,11 @@
|
|||
"@helia/interface": "^0.0.0",
|
||||
"@libp2p/interface-connection": "^3.1.1",
|
||||
"@libp2p/interface-peer-info": "^1.0.9",
|
||||
"@libp2p/interface-transport": "^2.1.2",
|
||||
"@libp2p/interface-transport": "^2.1.3",
|
||||
"@libp2p/kad-dht": "^7.0.3",
|
||||
"@libp2p/peer-id": "^2.0.3",
|
||||
"@libp2p/peer-id-factory": "^2.0.3",
|
||||
"@libp2p/websockets": "^5.0.8",
|
||||
"@libp2p/websockets": "^5.0.10",
|
||||
"@lumeweb/libhyperproxy": "git+https://git.lumeweb.com/LumeWeb/libhyperproxy.git",
|
||||
"@multiformats/multiaddr": "^11.6.1",
|
||||
"@scure/bip39": "^1.2.0",
|
||||
|
@ -109,7 +109,7 @@
|
|||
"protomux": "git+https://git.lumeweb.com/LumeWeb/kernel-protomux-client.git"
|
||||
},
|
||||
"patchedDependencies": {
|
||||
"@libp2p/tcp@6.1.5": "patches/@libp2p__tcp@6.1.5.patch",
|
||||
"@libp2p/tcp@6.2.1": "patches/@libp2p__tcp@6.2.1.patch",
|
||||
"b4a@1.6.3": "patches/b4a@1.6.3.patch"
|
||||
}
|
||||
}
|
||||
|
|
66
src/index.ts
66
src/index.ts
|
@ -3,7 +3,7 @@ import { createHelia } from "helia";
|
|||
import { yamux } from "@chainsafe/libp2p-yamux";
|
||||
// @ts-ignore
|
||||
import Hyperswarm from "hyperswarm";
|
||||
import { Peer, Proxy } from "@lumeweb/libhyperproxy";
|
||||
import { Peer, MultiSocketProxy } from "@lumeweb/libhyperproxy";
|
||||
// @ts-ignore
|
||||
import sodium from "sodium-universal";
|
||||
// @ts-ignore
|
||||
|
@ -11,7 +11,6 @@ import { CustomEvent } from "@libp2p/interfaces/events";
|
|||
// @ts-ignore
|
||||
import { fixed32, raw } from "compact-encoding";
|
||||
import { mplex } from "@libp2p/mplex";
|
||||
import PeerManager from "./peerManager.js";
|
||||
import { hypercoreTransport } from "./libp2p/transport.js";
|
||||
import { UnixFS, unixfs } from "@helia/unixfs";
|
||||
// @ts-ignore
|
||||
|
@ -39,6 +38,7 @@ import { bootstrap } from "@libp2p/bootstrap";
|
|||
import { IDBBlockstore } from "blockstore-idb";
|
||||
import { IDBDatastore } from "datastore-idb";
|
||||
import defer from "p-defer";
|
||||
import { Helia } from "@helia/interface";
|
||||
|
||||
const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys(
|
||||
bases
|
||||
|
@ -51,12 +51,14 @@ const basesByPrefix: { [prefix: string]: MultibaseDecoder<any> } = Object.keys(
|
|||
onmessage = handleMessage;
|
||||
|
||||
const moduleDefer = defer();
|
||||
let activePeersDefer = defer();
|
||||
let activeIpfsPeersDefer = defer();
|
||||
let networkPeersAvailable = defer();
|
||||
|
||||
let swarm;
|
||||
let proxy: Proxy;
|
||||
let proxy: MultiSocketProxy;
|
||||
let fs: UnixFS;
|
||||
let IPNS: IPNS;
|
||||
let ipfs: Helia;
|
||||
|
||||
// @ts-ignore
|
||||
BigInt.prototype.toJSON = function () {
|
||||
|
@ -76,6 +78,15 @@ async function handlePresentSeed() {
|
|||
|
||||
const client = createIpfsHttpClient(getDelegateConfig());
|
||||
|
||||
proxy = new MultiSocketProxy({
|
||||
swarm,
|
||||
listen: true,
|
||||
protocol: PROTOCOL,
|
||||
autostart: true,
|
||||
emulateWebsocket: true,
|
||||
server: false,
|
||||
});
|
||||
|
||||
const libp2p = await createLibp2p({
|
||||
peerDiscovery: [
|
||||
bootstrap({
|
||||
|
@ -137,10 +148,12 @@ async function handlePresentSeed() {
|
|||
],
|
||||
}),
|
||||
],
|
||||
transports: [hypercoreTransport({ peerManager: PeerManager.instance })],
|
||||
transports: [hypercoreTransport({ proxy })],
|
||||
connectionEncryption: [noise()],
|
||||
connectionManager: {
|
||||
autoDial: true,
|
||||
minConnections: 5,
|
||||
maxConnections: 20,
|
||||
},
|
||||
streamMuxers: [yamux(), mplex()],
|
||||
start: false,
|
||||
|
@ -169,7 +182,7 @@ async function handlePresentSeed() {
|
|||
await blockstore.open();
|
||||
await datastore.open();
|
||||
|
||||
const ipfs = await createHelia({
|
||||
ipfs = await createHelia({
|
||||
// @ts-ignore
|
||||
blockstore,
|
||||
// @ts-ignore
|
||||
|
@ -177,28 +190,13 @@ async function handlePresentSeed() {
|
|||
libp2p,
|
||||
});
|
||||
|
||||
PeerManager.instance.ipfs = ipfs;
|
||||
|
||||
proxy = new Proxy({
|
||||
swarm,
|
||||
listen: true,
|
||||
protocol: PROTOCOL,
|
||||
autostart: true,
|
||||
emulateWebsocket: true,
|
||||
createDefaultMessage: false,
|
||||
onchannel(peer: Peer, channel: any) {
|
||||
PeerManager.instance.handleNewPeerChannel(peer, channel);
|
||||
},
|
||||
onopen() {
|
||||
PeerManager.instance.handleNewPeer();
|
||||
},
|
||||
onclose(peer: Peer) {
|
||||
PeerManager.instance.handleClosePeer(peer);
|
||||
},
|
||||
proxy.on("peerChannelOpen", async () => {
|
||||
if (!ipfs.libp2p.isStarted()) {
|
||||
await ipfs.libp2p.start();
|
||||
networkPeersAvailable.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
PeerManager.instance.ipfsReady;
|
||||
|
||||
swarm.join(PROTOCOL);
|
||||
await swarm.start();
|
||||
await swarm.ready();
|
||||
|
@ -208,13 +206,13 @@ async function handlePresentSeed() {
|
|||
|
||||
ipfs.libp2p.addEventListener("peer:connect", () => {
|
||||
if (ipfs.libp2p.getPeers().length > 0) {
|
||||
activePeersDefer.resolve();
|
||||
activeIpfsPeersDefer.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
ipfs.libp2p.addEventListener("peer:disconnect", () => {
|
||||
if (ipfs.libp2p.getPeers().length === 0) {
|
||||
activePeersDefer = defer();
|
||||
activeIpfsPeersDefer = defer();
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -339,13 +337,13 @@ async function handleCat(aq: ActiveQuery) {
|
|||
async function handleIpnsResolve(aq: ActiveQuery) {
|
||||
await ready();
|
||||
|
||||
await activePeersDefer.promise;
|
||||
await activeIpfsPeersDefer.promise;
|
||||
|
||||
if (PeerManager.instance.ipfs.libp2p.getPeers().length === 0) {
|
||||
activePeersDefer = defer();
|
||||
if (ipfs.libp2p.getPeers().length === 0) {
|
||||
activeIpfsPeersDefer = defer();
|
||||
}
|
||||
|
||||
await activePeersDefer.promise;
|
||||
await activeIpfsPeersDefer.promise;
|
||||
|
||||
if (!aq.callerInput || !("cid" in aq.callerInput)) {
|
||||
aq.reject("cid required");
|
||||
|
@ -384,12 +382,12 @@ function getCID(cid: string): CID {
|
|||
async function handleGetActivePeers(aq: ActiveQuery) {
|
||||
await ready();
|
||||
|
||||
aq.respond(PeerManager.instance.ipfs.libp2p.getPeers());
|
||||
aq.respond(ipfs.libp2p.getPeers());
|
||||
}
|
||||
|
||||
async function ready() {
|
||||
await moduleDefer.promise;
|
||||
await PeerManager.instance.ipfsReady;
|
||||
await networkPeersAvailable.promise;
|
||||
}
|
||||
|
||||
function getDelegateConfig(): Options {
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
import { symbol } from "@libp2p/interface-transport";
|
||||
// @ts-ignore
|
||||
import { TCP, TCPComponents, TCPDialOptions, TCPOptions } from "@libp2p/tcp";
|
||||
import PeerManager from "../peerManager.js";
|
||||
import { Multiaddr } from "@multiformats/multiaddr";
|
||||
import { IpcSocketConnectOpts, TcpSocketConnectOpts } from "net";
|
||||
import { logger } from "@libp2p/logger";
|
||||
import { AbortError, CodeError } from "@libp2p/interfaces/errors";
|
||||
// @ts-ignore
|
||||
import { multiaddrToNetConfig } from "@libp2p/tcp/utils";
|
||||
import { Socket } from "../socket.js";
|
||||
import { Connection } from "@libp2p/interface-connection";
|
||||
// @ts-ignore
|
||||
import { toMultiaddrConnection } from "@libp2p/tcp/socket-to-conn";
|
||||
|
@ -17,13 +15,14 @@ import * as mafmt from "@multiformats/mafmt";
|
|||
const log = logger("libp2p:hypercore");
|
||||
|
||||
import isPrivateIp from "private-ip";
|
||||
import { DummySocket, MultiSocketProxy, Socket } from "@lumeweb/libhyperproxy";
|
||||
|
||||
const CODE_P2P = 421;
|
||||
const CODE_CIRCUIT = 290;
|
||||
const CODE_UNIX = 400;
|
||||
|
||||
export interface HypercoreOptions extends TCPOptions {
|
||||
peerManager?: PeerManager;
|
||||
proxy?: MultiSocketProxy;
|
||||
}
|
||||
class HypercoreTransport extends TCP {
|
||||
private readonly opts?: HypercoreOptions;
|
||||
|
@ -31,10 +30,9 @@ class HypercoreTransport extends TCP {
|
|||
constructor(components: TCPComponents, options: HypercoreOptions = {}) {
|
||||
super(components, options);
|
||||
this.opts = options;
|
||||
if (!options.peerManager) {
|
||||
if (!options.proxy) {
|
||||
throw new Error("options.peerManager is required");
|
||||
}
|
||||
this.opts?.peerManager;
|
||||
}
|
||||
|
||||
get [symbol](): true {
|
||||
|
@ -52,7 +50,7 @@ class HypercoreTransport extends TCP {
|
|||
const socket = await this._connect(ma, options);
|
||||
|
||||
// Avoid uncaught errors caused by unstable connections
|
||||
socket.on("error", (err) => {
|
||||
socket.on("error", (err: any) => {
|
||||
log("socket error", err);
|
||||
});
|
||||
|
||||
|
@ -154,9 +152,7 @@ class HypercoreTransport extends TCP {
|
|||
};
|
||||
|
||||
try {
|
||||
rawSocket = (await this.opts?.peerManager?.createSocket(
|
||||
cOpts
|
||||
)) as Socket;
|
||||
rawSocket = (await this.opts?.proxy?.createSocket(cOpts)) as Socket;
|
||||
} catch (e: any) {
|
||||
onError(e);
|
||||
}
|
||||
|
@ -175,7 +171,7 @@ class HypercoreTransport extends TCP {
|
|||
options.signal.addEventListener("abort", onAbort);
|
||||
}
|
||||
|
||||
rawSocket?.connect();
|
||||
(rawSocket as DummySocket)?.connect();
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -1,290 +0,0 @@
|
|||
import { Peer } from "@lumeweb/libhyperproxy";
|
||||
import b4a from "b4a";
|
||||
// @ts-ignore
|
||||
import { fixed32, json, raw, uint } from "compact-encoding";
|
||||
import { TcpSocketConnectOpts } from "net";
|
||||
import { Helia } from "@helia/interface";
|
||||
import { deserializeError } from "serialize-error";
|
||||
import defer from "p-defer";
|
||||
import {
|
||||
CloseSocketRequest,
|
||||
ErrorSocketRequest,
|
||||
PeerEntity,
|
||||
PeerInfoResult,
|
||||
SocketRequest,
|
||||
WriteSocketRequest,
|
||||
} from "./types.js";
|
||||
import { Socket } from "./socket.js";
|
||||
|
||||
function idFactory(start: number, step = 1, limit = 2 ** 32) {
|
||||
let id = start;
|
||||
|
||||
return function nextId() {
|
||||
const nextId = id;
|
||||
id += step;
|
||||
if (id >= limit) id = start;
|
||||
return nextId;
|
||||
};
|
||||
}
|
||||
|
||||
const nextSocketId = idFactory(1);
|
||||
|
||||
function roundRobinFactory(list: Map<string, any>) {
|
||||
let index = 0;
|
||||
|
||||
return (): PeerEntity => {
|
||||
const keys = [...list.keys()].sort();
|
||||
if (index >= keys.length) {
|
||||
index = 0;
|
||||
}
|
||||
|
||||
return list.get(keys[index++]);
|
||||
};
|
||||
}
|
||||
|
||||
const socketEncoding = {
|
||||
preencode(state: any, m: SocketRequest) {
|
||||
uint.preencode(state, m.id);
|
||||
uint.preencode(state, m.remoteId);
|
||||
},
|
||||
encode(state: any, m: SocketRequest) {
|
||||
uint.encode(state, m.id);
|
||||
uint.encode(state, m.remoteId);
|
||||
},
|
||||
decode(state: any, m: any): SocketRequest {
|
||||
return {
|
||||
remoteId: uint.decode(state, m),
|
||||
id: uint.decode(state, m),
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
const writeSocketEncoding = {
|
||||
preencode(state: any, m: WriteSocketRequest) {
|
||||
socketEncoding.preencode(state, m);
|
||||
raw.preencode(state, m.data);
|
||||
},
|
||||
encode(state: any, m: WriteSocketRequest) {
|
||||
socketEncoding.encode(state, m);
|
||||
raw.encode(state, m.data);
|
||||
},
|
||||
decode(state: any, m: any): WriteSocketRequest {
|
||||
const socket = socketEncoding.decode(state, m);
|
||||
return {
|
||||
...socket,
|
||||
data: raw.decode(state, m),
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
const errorSocketEncoding = {
|
||||
decode(state: any, m: any): ErrorSocketRequest {
|
||||
const socket = socketEncoding.decode(state, m);
|
||||
return {
|
||||
...socket,
|
||||
err: deserializeError(json.decode(state, m)),
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
export default class PeerManager {
|
||||
private static _instance: PeerManager;
|
||||
|
||||
public static get instance(): PeerManager {
|
||||
if (!PeerManager._instance) {
|
||||
PeerManager._instance = new PeerManager();
|
||||
}
|
||||
|
||||
return PeerManager._instance;
|
||||
}
|
||||
|
||||
private _sockets = new Map<number, Socket>();
|
||||
|
||||
get sockets(): Map<number, Socket> {
|
||||
return this._sockets;
|
||||
}
|
||||
|
||||
private _socketMap = new Map<number, number>();
|
||||
|
||||
get socketMap(): Map<number, number> {
|
||||
return this._socketMap;
|
||||
}
|
||||
|
||||
private _peers: Map<string, PeerEntity> = new Map<string, PeerEntity>();
|
||||
|
||||
private _nextPeer = roundRobinFactory(this._peers);
|
||||
|
||||
get peers(): Map<string, PeerEntity> {
|
||||
return this._peers;
|
||||
}
|
||||
|
||||
private _ipfs?: Helia;
|
||||
|
||||
get ipfs(): Helia {
|
||||
return this._ipfs as Helia;
|
||||
}
|
||||
|
||||
set ipfs(value: Helia) {
|
||||
this._ipfs = value as Helia;
|
||||
}
|
||||
|
||||
private _ipfsReady?: Promise<unknown>;
|
||||
private _ipfsResolve?: () => void;
|
||||
|
||||
get ipfsReady(): Promise<void> {
|
||||
if (!this._ipfsReady) {
|
||||
let ipfsDefer = defer();
|
||||
this._ipfsReady = ipfsDefer.promise;
|
||||
this._ipfsResolve = ipfsDefer.resolve;
|
||||
}
|
||||
|
||||
return this._ipfsReady as Promise<any>;
|
||||
}
|
||||
|
||||
handleNewPeerChannel(peer: Peer, channel: any) {
|
||||
this.update(peer.socket.remotePublicKey, { peer });
|
||||
|
||||
this._registerOpenSocketMessage(peer, channel);
|
||||
this._registerWriteSocketMessage(peer, channel);
|
||||
this._registerCloseSocketMessage(peer, channel);
|
||||
this._registerTimeoutSocketMessage(peer, channel);
|
||||
this._registerErrorSocketMessage(peer, channel);
|
||||
}
|
||||
|
||||
async handleNewPeer() {
|
||||
if (!this.ipfs.libp2p.isStarted()) {
|
||||
await this.ipfs.libp2p.start();
|
||||
this._ipfsResolve?.();
|
||||
}
|
||||
}
|
||||
|
||||
async handleClosePeer(peer: Peer) {
|
||||
for (const item of this._sockets) {
|
||||
if (item[1].peer.peer === peer) {
|
||||
item[1].end();
|
||||
}
|
||||
}
|
||||
|
||||
const pubkey = this._toString(peer.socket.remotePublicKey);
|
||||
|
||||
if (this._peers.has(pubkey)) {
|
||||
this._peers.delete(pubkey);
|
||||
}
|
||||
}
|
||||
|
||||
public get(pubkey: Uint8Array): PeerEntity | undefined {
|
||||
if (this._peers.has(this._toString(pubkey))) {
|
||||
return this._peers.get(this._toString(pubkey)) as PeerEntity;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
public update(pubkey: Uint8Array, data: Partial<PeerEntity>): void {
|
||||
const peer = this.get(pubkey) ?? ({} as PeerEntity);
|
||||
|
||||
this._peers.set(this._toString(pubkey), {
|
||||
...peer,
|
||||
...data,
|
||||
...{
|
||||
messages: {
|
||||
...peer?.messages,
|
||||
...data?.messages,
|
||||
},
|
||||
},
|
||||
} as PeerEntity);
|
||||
}
|
||||
|
||||
public async createSocket(options: TcpSocketConnectOpts): Promise<Socket> {
|
||||
if (!this.peers.size) {
|
||||
throw new Error("no peers found");
|
||||
}
|
||||
|
||||
const peer = this._nextPeer();
|
||||
const socketId = nextSocketId();
|
||||
const socket = new Socket(socketId, this, peer, options);
|
||||
this._sockets.set(socketId, socket);
|
||||
|
||||
return socket;
|
||||
}
|
||||
|
||||
private _registerOpenSocketMessage(peer: Peer, channel: any) {
|
||||
const self = this;
|
||||
const message = channel.addMessage({
|
||||
encoding: {
|
||||
preencode: json.preencode,
|
||||
encode: json.encode,
|
||||
decode: socketEncoding.decode,
|
||||
},
|
||||
async onmessage(m: SocketRequest) {
|
||||
const socket = self._sockets.get(m.id);
|
||||
if (socket) {
|
||||
socket.remoteId = m.remoteId;
|
||||
// @ts-ignore
|
||||
socket.emit("connect");
|
||||
}
|
||||
},
|
||||
});
|
||||
this.update(peer.socket.remotePublicKey, {
|
||||
messages: { openSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _registerWriteSocketMessage(peer: Peer, channel: any) {
|
||||
const self = this;
|
||||
const message = channel.addMessage({
|
||||
encoding: writeSocketEncoding,
|
||||
onmessage(m: WriteSocketRequest) {
|
||||
self._sockets.get(m.id)?.push(m.data);
|
||||
},
|
||||
});
|
||||
this.update(peer.socket.remotePublicKey, {
|
||||
messages: { writeSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _registerCloseSocketMessage(peer: Peer, channel: any) {
|
||||
const self = this;
|
||||
const message = channel.addMessage({
|
||||
encoding: socketEncoding,
|
||||
onmessage(m: CloseSocketRequest) {
|
||||
self._sockets.get(m.id)?.end();
|
||||
},
|
||||
});
|
||||
this.update(peer.socket.remotePublicKey, {
|
||||
messages: { closeSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _registerTimeoutSocketMessage(peer: Peer, channel: any) {
|
||||
const self = this;
|
||||
const message = channel.addMessage({
|
||||
encoding: socketEncoding,
|
||||
onmessage(m: SocketRequest) {
|
||||
// @ts-ignore
|
||||
self._sockets.get(m.id)?.emit("timeout");
|
||||
},
|
||||
});
|
||||
this.update(peer.socket.remotePublicKey, {
|
||||
messages: { timeoutSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _registerErrorSocketMessage(peer: Peer, channel: any) {
|
||||
const self = this;
|
||||
const message = channel.addMessage({
|
||||
encoding: errorSocketEncoding,
|
||||
onmessage(m: ErrorSocketRequest) {
|
||||
// @ts-ignore
|
||||
self._sockets.get(m.id)?.emit("error", m.err);
|
||||
},
|
||||
});
|
||||
this.update(peer.socket.remotePublicKey, {
|
||||
messages: { errorSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _toString(pubkey: Uint8Array) {
|
||||
return b4a.from(pubkey).toString("hex");
|
||||
}
|
||||
}
|
158
src/socket.ts
158
src/socket.ts
|
@ -1,158 +0,0 @@
|
|||
import { Callback, Duplex } from "streamx";
|
||||
import { TcpSocketConnectOpts } from "net";
|
||||
import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js";
|
||||
import PeerManager from "./peerManager.js";
|
||||
import { clearTimeout } from "timers";
|
||||
import { maybeGetAsyncProperty } from "@lumeweb/libkernel-universal";
|
||||
|
||||
const asyncIterator = Symbol.asyncIterator || Symbol("asyncIterator");
|
||||
|
||||
const STREAM_DESTROYED = new Error("Stream was destroyed");
|
||||
const READ_DONE = 0b0010000000000 << 4;
|
||||
const DESTROYED = 0b1000;
|
||||
|
||||
export class Socket extends Duplex {
|
||||
private _options: TcpSocketConnectOpts;
|
||||
private _id: number;
|
||||
private _manager: PeerManager;
|
||||
|
||||
private _connectTimeout?: number;
|
||||
|
||||
constructor(
|
||||
id: number,
|
||||
manager: PeerManager,
|
||||
peer: PeerEntity,
|
||||
options: TcpSocketConnectOpts
|
||||
) {
|
||||
super();
|
||||
this._id = id;
|
||||
this._manager = manager;
|
||||
this._peer = peer;
|
||||
this._options = options;
|
||||
|
||||
// @ts-ignore
|
||||
this.on("timeout", () => {
|
||||
if (this._connectTimeout) {
|
||||
clearTimeout(this._connectTimeout);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private _remoteId = 0;
|
||||
|
||||
set remoteId(value: number) {
|
||||
this._remoteId = value;
|
||||
this._manager.socketMap.set(this._id, value);
|
||||
}
|
||||
|
||||
private _peer;
|
||||
|
||||
get peer() {
|
||||
return this._peer;
|
||||
}
|
||||
|
||||
public async _write(data: any, cb: any): Promise<void> {
|
||||
(await maybeGetAsyncProperty(this._peer.messages.writeSocket))?.send({
|
||||
id: this._id,
|
||||
remoteId: this._remoteId,
|
||||
data,
|
||||
} as WriteSocketRequest);
|
||||
cb();
|
||||
}
|
||||
|
||||
public async _destroy(cb: Callback) {
|
||||
(await maybeGetAsyncProperty(this._peer.messages.closeSocket))?.send({
|
||||
id: this._id,
|
||||
remoteId: this._remoteId,
|
||||
} as SocketRequest);
|
||||
this._manager.socketMap.delete(this._id);
|
||||
this._manager.sockets.delete(this._id);
|
||||
}
|
||||
|
||||
public async connect() {
|
||||
(await maybeGetAsyncProperty(this._peer.messages.openSocket))?.send({
|
||||
...this._options,
|
||||
id: this._id,
|
||||
});
|
||||
}
|
||||
|
||||
public setTimeout(ms: number, cb: Function) {
|
||||
if (this._connectTimeout) {
|
||||
clearTimeout(this._connectTimeout);
|
||||
}
|
||||
|
||||
this._connectTimeout = setTimeout(() => {
|
||||
cb && cb();
|
||||
}, ms) as any;
|
||||
}
|
||||
|
||||
[asyncIterator]() {
|
||||
const stream = this;
|
||||
|
||||
let error: Error | null = null;
|
||||
let promiseResolve: ((arg0: { value: any; done: boolean }) => void) | null =
|
||||
null;
|
||||
let promiseReject: ((arg0: Error) => void) | null = null;
|
||||
|
||||
this.on("error", (err) => {
|
||||
error = err;
|
||||
});
|
||||
this.on("data", ondata);
|
||||
this.on("close", onclose);
|
||||
|
||||
return {
|
||||
[asyncIterator]() {
|
||||
return this;
|
||||
},
|
||||
next() {
|
||||
return new Promise(function (resolve, reject) {
|
||||
promiseResolve = resolve;
|
||||
promiseReject = reject;
|
||||
const data = stream.read();
|
||||
if (data !== null) ondata(data);
|
||||
else {
|
||||
// @ts-ignore
|
||||
if ((stream._duplexState & DESTROYED) !== 0) ondata(null);
|
||||
}
|
||||
});
|
||||
},
|
||||
return() {
|
||||
return destroy(null);
|
||||
},
|
||||
throw(err: any) {
|
||||
return destroy(err);
|
||||
},
|
||||
};
|
||||
|
||||
function onreadable() {
|
||||
if (promiseResolve !== null) ondata(stream.read());
|
||||
}
|
||||
|
||||
function onclose() {
|
||||
if (promiseResolve !== null) ondata(null);
|
||||
}
|
||||
|
||||
function ondata(data: any) {
|
||||
if (promiseReject === null) return;
|
||||
if (error) promiseReject(error);
|
||||
// @ts-ignore
|
||||
else if (data === null && (stream._duplexState & READ_DONE) === 0)
|
||||
promiseReject(STREAM_DESTROYED);
|
||||
else promiseResolve?.({ value: data, done: data === null });
|
||||
promiseReject = promiseResolve = null;
|
||||
}
|
||||
|
||||
function destroy(err: any) {
|
||||
stream.destroy(err);
|
||||
return new Promise((resolve, reject) => {
|
||||
// @ts-ignore
|
||||
if (stream._duplexState & DESTROYED)
|
||||
return resolve({ value: undefined, done: true });
|
||||
stream.once("close", function () {
|
||||
if (err) reject(err);
|
||||
else resolve({ value: undefined, done: true });
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
40
src/types.ts
40
src/types.ts
|
@ -1,40 +0,0 @@
|
|||
import { Peer } from "@lumeweb/libhyperproxy";
|
||||
|
||||
type Message = {
|
||||
send: (pubkey: Uint8Array | any) => void;
|
||||
};
|
||||
|
||||
export interface PeerEntityMessages {
|
||||
keyExchange: Message;
|
||||
openSocket: Message;
|
||||
writeSocket: Message;
|
||||
closeSocket: Message;
|
||||
timeoutSocket: Message;
|
||||
errorSocket: Message;
|
||||
}
|
||||
|
||||
export interface PeerEntity {
|
||||
messages: PeerEntityMessages | Partial<PeerEntityMessages>;
|
||||
submitKeyExchange: (pubkey: Uint8Array) => void;
|
||||
peer: Peer;
|
||||
}
|
||||
|
||||
export interface PeerInfoResult {
|
||||
publicKey: Uint8Array;
|
||||
libp2pPublicKey: Uint8Array;
|
||||
}
|
||||
|
||||
export interface SocketRequest {
|
||||
remoteId: number;
|
||||
id: number;
|
||||
}
|
||||
|
||||
export type CloseSocketRequest = SocketRequest;
|
||||
|
||||
export interface WriteSocketRequest extends SocketRequest {
|
||||
data: Uint8Array;
|
||||
}
|
||||
|
||||
export interface ErrorSocketRequest extends SocketRequest {
|
||||
err: Error;
|
||||
}
|
Loading…
Reference in New Issue