From 90c6f789cc5621b796e00cc49d1ed86cfbc61036 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sun, 16 Apr 2023 20:52:56 -0400 Subject: [PATCH] *Update to use new multisocket proxy --- package.json | 12 +-- src/index.ts | 22 ++--- src/peerManager.ts | 220 --------------------------------------------- src/socket.ts | 101 --------------------- src/types.ts | 36 -------- 5 files changed, 11 insertions(+), 380 deletions(-) delete mode 100644 src/peerManager.ts delete mode 100644 src/socket.ts delete mode 100644 src/types.ts diff --git a/package.json b/package.json index fb9c72a..d62d20c 100644 --- a/package.json +++ b/package.json @@ -8,21 +8,21 @@ "devDependencies": { "@libp2p/interface-connection-encrypter": "^3.0.6", "@libp2p/interface-peer-id": "^2.0.1", - "@libp2p/multistream-select": "^3.1.2", + "@libp2p/multistream-select": "^3.1.4", "@lumeweb/relay-types": "git+https://git.lumeweb.com/LumeWeb/relay-types.git", "@types/b4a": "^1.6.0", "@types/streamx": "^2.9.1", "esbuild": "^0.15.18", - "prettier": "^2.8.4", + "prettier": "^2.8.7", "rimraf": "^3.0.2" }, "dependencies": { - "@chainsafe/libp2p-noise": "^11.0.1", - "@libp2p/interface-connection": "^3.1.0", - "@libp2p/peer-id": "^2.0.2", + "@chainsafe/libp2p-noise": "^11.0.4", + "@libp2p/interface-connection": "^3.1.1", + "@libp2p/peer-id": "^2.0.3", "@lumeweb/libhyperproxy": "git+https://git.lumeweb.com/LumeWeb/libhyperproxy.git", "@multiformats/multiaddr": "^11.6.1", - "b4a": "^1.6.2", + "b4a": "^1.6.3", "compact-encoding": "^2.11.0", "debug-stream": "^3.0.1", "serialize-error": "^11.0.0", diff --git a/src/index.ts b/src/index.ts index 18e595b..22d60ca 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,12 +1,5 @@ import type { Plugin, PluginAPI } from "@lumeweb/relay-types"; -import { Peer, Proxy, Socket } from "@lumeweb/libhyperproxy"; -// @ts-ignore -import debugStream from "debug-stream"; -// @ts-ignore -import toIterable from "stream-to-it"; -// @ts-ignore -import { fixed32, raw } from "compact-encoding"; -import PeerManager from "./peerManager"; +import { MultiSocketProxy } from "@lumeweb/libhyperproxy"; const PROTOCOL = "lumeweb.proxy.ipfs"; @@ -18,22 +11,17 @@ interface PeerInfoResult { const plugin: Plugin = { name: "ipfs", async plugin(api: PluginAPI): Promise { - api.swarm.join(api.util.crypto.createHash(PROTOCOL)); - const proxy = new Proxy({ + const proxy = new MultiSocketProxy({ swarm: api.swarm, protocol: PROTOCOL, + allowedPorts: [5001, 5002], + server: true, }); + api.swarm.join(api.util.crypto.createHash(PROTOCOL)); api.protocols.register(PROTOCOL, (peer: any, muxer: any) => { proxy.handlePeer({ peer, muxer, - createDefaultMessage: false, - onchannel(peer: Peer, channel: any) { - PeerManager.instance(api).handleNewPeerChannel(peer, channel); - }, - onclose(peer: Peer) { - PeerManager.instance(api).handleClosePeer(peer); - }, }); }); }, diff --git a/src/peerManager.ts b/src/peerManager.ts deleted file mode 100644 index 454a32e..0000000 --- a/src/peerManager.ts +++ /dev/null @@ -1,220 +0,0 @@ -import { PluginAPI } from "@lumeweb/relay-types"; -import { Peer, Socket } from "@lumeweb/libhyperproxy"; -import net from "net"; -// @ts-ignore -import { fixed32, json, raw, uint } from "compact-encoding"; -import b4a from "b4a"; -import { - CloseSocketRequest, - ErrorSocketRequest, - PeerEntity, - PeerInfoResult, - SocketRequest, - WriteSocketRequest, -} from "./types"; -import { TCPSocket } from "./socket"; -import { serializeError } from "serialize-error"; - -const socketEncoding = { - preencode(state: any, m: SocketRequest) { - uint.preencode(state, m.id); - uint.preencode(state, m.remoteId); - }, - encode(state: any, m: SocketRequest) { - uint.encode(state, m.id); - uint.encode(state, m.remoteId); - }, - decode(state: any, m: any): SocketRequest { - return { - remoteId: uint.decode(state, m), - id: uint.decode(state, m), - }; - }, -}; - -const writeSocketEncoding = { - preencode(state: any, m: WriteSocketRequest) { - socketEncoding.preencode(state, m); - raw.preencode(state, m.data); - }, - encode(state: any, m: WriteSocketRequest) { - socketEncoding.encode(state, m); - raw.encode(state, m.data); - }, - decode(state: any, m: any): WriteSocketRequest { - const socket = socketEncoding.decode(state, m); - return { - ...socket, - data: raw.decode(state, m), - }; - }, -}; - -const errorSocketEncoding = { - preencode(state: any, m: ErrorSocketRequest) { - socketEncoding.preencode(state, m); - json.preencode(state, serializeError(m.err)); - }, - encode(state: any, m: ErrorSocketRequest) { - socketEncoding.encode(state, m); - json.encode(state, serializeError(m.err)); - }, -}; - -function idFactory(start: number, step = 1, limit = 2 ** 32) { - let id = start; - - return function nextId() { - const nextId = id; - id += step; - if (id >= limit) id = start; - return nextId; - }; -} - -const nextSocketId = idFactory(1); - -export default class PeerManager { - private static _instance: PeerManager; - private _api: PluginAPI; - private _peers: Map = new Map(); - - constructor(api: PluginAPI) { - this._api = api; - } - - private _sockets = new Map(); - - get sockets(): Map { - return this._sockets; - } - - private _socketMap = new Map(); - - get socketMap(): Map { - return this._socketMap; - } - - public static instance(api?: PluginAPI): PeerManager { - if (!PeerManager._instance) { - if (!api) { - throw new Error("api argument required"); - } - PeerManager._instance = new PeerManager(api as PluginAPI); - } - - return PeerManager._instance; - } - - handleNewPeerChannel(peer: Peer, channel: any) { - this._registerOpenSocketMessage(peer, channel); - this._registerWriteSocketMessage(peer, channel); - this._registerCloseSocketMessage(peer, channel); - this._registerTimeoutSocketMessage(peer, channel); - this._registerErrorSocketMessage(peer, channel); - } - - public get(pubkey: Uint8Array): PeerEntity | undefined { - if (this._peers.has(this._toString(pubkey))) { - return this._peers.get(this._toString(pubkey)) as PeerEntity; - } - - return undefined; - } - - public update(pubkey: Uint8Array, data: Partial): void { - const peer = this.get(pubkey) ?? ({} as PeerEntity); - - this._peers.set(this._toString(pubkey), { - ...peer, - ...data, - ...{ - messages: { - ...peer?.messages, - ...data?.messages, - }, - }, - } as PeerEntity); - } - async handleClosePeer(peer: Peer) { - for (const item of this._sockets) { - if (item[1].peer.peer === peer) { - item[1].end(); - } - } - - const pubkey = this._toString(peer.socket.remotePublicKey); - - if (this._peers.has(pubkey)) { - this._peers.delete(pubkey); - } - } - - private _registerOpenSocketMessage(peer: Peer, channel: any) { - const self = this; - const message = channel.addMessage({ - encoding: { - ...socketEncoding, - decode: json.decode, - }, - async onmessage(m: any) { - // @ts-ignore - new TCPSocket( - nextSocketId(), - m.id, - self, - self.get(peer.socket.remotePublicKey) as PeerEntity, - m - ).connect(); - }, - }); - this.update(peer.socket.remotePublicKey, { - messages: { openSocket: message }, - }); - } - - private _registerWriteSocketMessage(peer: Peer, channel: any) { - const self = this; - const message = channel.addMessage({ - encoding: writeSocketEncoding, - onmessage(m: WriteSocketRequest) { - self._sockets.get(m.id)?.push(m.data); - }, - }); - this.update(peer.socket.remotePublicKey, { - messages: { writeSocket: message }, - }); - } - private _registerCloseSocketMessage(peer: Peer, channel: any) { - const self = this; - const message = channel.addMessage({ - encoding: socketEncoding, - async onmessage(m: CloseSocketRequest) { - self._sockets.get(m.id)?.end(); - }, - }); - this.update(peer.socket.remotePublicKey, { - messages: { closeSocket: message }, - }); - } - private _registerTimeoutSocketMessage(peer: Peer, channel: any) { - const message = channel.addMessage({ - encoding: socketEncoding, - }); - this.update(peer.socket.remotePublicKey, { - messages: { timeoutSocket: message }, - }); - } - private _registerErrorSocketMessage(peer: Peer, channel: any) { - const message = channel.addMessage({ - encoding: errorSocketEncoding, - }); - this.update(peer.socket.remotePublicKey, { - messages: { errorSocket: message }, - }); - } - - private _toString(pubkey: Uint8Array) { - return b4a.from(pubkey).toString("hex"); - } -} diff --git a/src/socket.ts b/src/socket.ts deleted file mode 100644 index d0a28e2..0000000 --- a/src/socket.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { - Callback, - Duplex, - DuplexEvents, - EventName, - EventListener, -} from "streamx"; -import net, { TcpSocketConnectOpts } from "net"; -import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types"; -import PeerManager from "./peerManager"; -import { Socket } from "net"; - -export class TCPSocket extends Duplex { - private _options; - private _id: number; - private _remoteId: number; - private _manager: PeerManager; - - private _socket?: Socket; - - constructor( - id: number, - remoteId: number, - manager: PeerManager, - peer: PeerEntity, - options: TcpSocketConnectOpts - ) { - super(); - this._remoteId = remoteId; - this._manager = manager; - this._id = id; - this._peer = peer; - this._options = options; - - this._manager.sockets.set(this._id, this); - this._manager.socketMap.set(this._id, this._remoteId); - console.log(options); - } - - private _peer; - - get peer() { - return this._peer; - } - - public _write(data: any, cb: any): void { - this._peer.messages.writeSocket?.send({ - ...this._getSocketRequest(), - data, - } as WriteSocketRequest); - cb(); - } - - public _destroy(cb: Callback) { - this._manager.sockets.delete(this._id); - this._manager.socketMap.delete(this._id); - this._peer.messages.closeSocket?.send(this._getSocketRequest()); - } - - public connect() { - this.on("error", (err: Error) => { - this._peer.messages.errorSocket?.send({ - ...this._getSocketRequest(), - err, - }); - }); - - // @ts-ignore - this.on("timeout", () => { - this._peer.messages.timeoutSocket?.send(this._getSocketRequest()); - }); - // @ts-ignore - this.on("connect", () => { - this._peer.messages.openSocket?.send(this._getSocketRequest()); - }); - - if (![4001, 4002].includes(this._options.port)) { - this.emit("error", new Error(`port ${this._options.port} not allowed`)); - return; - } - - this._socket = net.connect(this._options); - ["timeout", "error", "connect", "end", "destroy", "close"].forEach( - (event) => { - this._socket?.on(event, (...args: any) => - this.emit(event as any, ...args) - ); - } - ); - - this._socket.pipe(this as any); - this.pipe(this._socket); - } - - private _getSocketRequest(): SocketRequest { - return { - id: this._id, - remoteId: this._remoteId, - }; - } -} diff --git a/src/types.ts b/src/types.ts deleted file mode 100644 index 424ccf0..0000000 --- a/src/types.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { Peer } from "@lumeweb/libhyperproxy"; - -export interface PeerInfoResult { - publicKey: Uint8Array; - libp2pPublicKey: Uint8Array; -} - -export type Message = { - send: (pubkey: Uint8Array | any) => void; -}; - -export interface PeerEntityMessages { - keyExchange: Message; - openSocket: Message; - writeSocket: Message; - closeSocket: Message; - timeoutSocket: Message; - errorSocket: Message; -} - -export interface PeerEntity { - messages: PeerEntityMessages | Partial; - peer: Peer; -} -export interface SocketRequest { - remoteId: number; - id: number; -} -export type CloseSocketRequest = SocketRequest; - -export interface WriteSocketRequest extends SocketRequest { - data: Uint8Array; -} -export interface ErrorSocketRequest extends SocketRequest { - err: Error; -}