From 774e84996e84ad067abce2f50bad90821f113419 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sat, 15 Apr 2023 22:17:30 -0400 Subject: [PATCH] *further refactoring --- src/peer.ts | 108 ++++++------------------- src/proxies/basic.ts | 13 ++- src/proxies/basic/peer.ts | 61 ++++++++++++++ src/proxies/multiSocket.ts | 8 +- src/proxies/multiSocket/dummySocket.ts | 10 ++- src/proxies/multiSocket/peer.ts | 16 ++++ src/proxies/multiSocket/tcpSocket.ts | 3 +- src/proxy.ts | 12 +-- src/socket.ts | 2 +- 9 files changed, 131 insertions(+), 102 deletions(-) create mode 100644 src/proxies/basic/peer.ts create mode 100644 src/proxies/multiSocket/peer.ts diff --git a/src/peer.ts b/src/peer.ts index 63ea25f..a8c39b7 100644 --- a/src/peer.ts +++ b/src/peer.ts @@ -1,6 +1,7 @@ import Proxy from "./proxy.js"; import Socket from "./socket.js"; import { Buffer } from "buffer"; +import { maybeGetAsyncProperty } from "./util.js"; export type OnOpen = ( peer: Peer, socket: Socket, @@ -38,7 +39,6 @@ export interface DataSocketOptions { onclose?: OnClose; onchannel?: OnChannel; emulateWebsocket?: boolean; - createDefaultMessage?: boolean; } export interface PeerOptions { @@ -50,17 +50,16 @@ export interface PeerOptionsWithProxy extends PeerOptions { proxy: Proxy; } -export default class Peer { - private _proxy: Proxy; - private _peer: any; - private _muxer: any; - private _onopen: OnOpenBound; - private _onreceive: OnReceiveBound; - private _onsend: OnSendBound; - private _onclose: OnCloseBound; - private _onchannel: OnChannelBound; - private _emulateWebsocket: boolean; - private _createDefaultMessage: boolean; +export default abstract class Peer { + protected _proxy: Proxy; + protected _peer: any; + protected _muxer: any; + protected _onopen: OnOpenBound; + protected _onreceive: OnReceiveBound; + protected _onsend: OnSendBound; + protected _onclose: OnCloseBound; + protected _onchannel: OnChannelBound; + protected _emulateWebsocket: boolean; constructor({ proxy, @@ -72,7 +71,6 @@ export default class Peer { onclose, onchannel, emulateWebsocket = false, - createDefaultMessage = true, }: PeerOptionsWithProxy & DataSocketOptions) { this._proxy = proxy; this._peer = peer; @@ -83,98 +81,44 @@ export default class Peer { this._onclose = onclose?.bind(undefined, this); this._onchannel = onchannel?.bind(undefined, this); this._emulateWebsocket = emulateWebsocket; - this._createDefaultMessage = createDefaultMessage; } - private _socket?: Socket; + protected _socket?: Socket; get socket(): Socket { return this._socket; } - private _channel?: any; + protected _channel?: any; get channel(): any { return this._channel; } - async init() { + protected abstract initSocket(); + + protected abstract handleChannelOnOpen(m: any): Promise; + protected abstract handleChannelOnClose(socket: Socket): Promise; + + protected async initChannel() { const self = this; - let pipe; - const raw = await maybeGetAsyncProperty(self._peer.rawStream); - this._socket = new Socket({ - remoteAddress: raw.remoteHost, - remotePort: raw.remotePort, - remotePublicKey: await maybeGetAsyncProperty(self._peer.remotePublicKey), - async write(data: any, cb: Function) { - if (pipe) { - pipe.send(data); - } - await self._onsend?.(data); - cb(); - }, - emulateWebsocket: self._emulateWebsocket, - }); this._channel = await this._muxer.createChannel({ protocol: this._proxy.protocol, - async onopen(m: any) { - if (!m) { - m = Buffer.from([]); - } - - if (m instanceof Uint8Array) { - m = Buffer.from(m); - } - - self._socket.on("end", () => this._channel.close()); - let ret = await self._onopen?.(self._socket, m); - if (!ret || (ret && ret.connect === false)) { - // @ts-ignore - self._socket.emit("connect"); - } - - self._socket.emit("data", m); - }, - async onclose() { - self._socket?.destroy(); - await self._onclose?.(self._socket); - }, + onopen: this.handleChannelOnOpen.bind(this), + onclose: this.handleChannelOnClose.bind(this), }); - if (this._createDefaultMessage) { - pipe = await this._channel.addMessage({ - async onmessage(m: any) { - if (m instanceof Uint8Array) { - m = Buffer.from(m); - } - self._socket.emit("data", m); - await self._onreceive?.(m); - }, - }); - } + await this.initMessages(); await this._onchannel?.(this._channel); await this._channel.open(); } -} -async function maybeGetAsyncProperty(object: any) { - if (typeof object === "function") { - object = object(); + async init() { + await this.initSocket(); + await this.initChannel(); } - if (isPromise(object)) { - object = await object; - } - - return object; -} - -function isPromise(obj: Promise) { - return ( - !!obj && - (typeof obj === "object" || typeof obj === "function") && - typeof obj.then === "function" - ); + protected async initMessages() {} } diff --git a/src/proxies/basic.ts b/src/proxies/basic.ts index 53bcef6..bcd692a 100644 --- a/src/proxies/basic.ts +++ b/src/proxies/basic.ts @@ -1,3 +1,14 @@ import Proxy from "../proxy.js"; +import { DataSocketOptions, PeerOptions } from "../peer.js"; +import BasicPeer from "./basic/peer.js"; -export default class BasicProxy extends Proxy {} +export default class BasicProxy extends Proxy { + protected handlePeer({ + peer, + muxer, + ...options + }: DataSocketOptions & PeerOptions) { + const conn = new BasicPeer({ proxy: this, peer, muxer, ...options }); + conn.init(); + } +} diff --git a/src/proxies/basic/peer.ts b/src/proxies/basic/peer.ts new file mode 100644 index 0000000..39df9f4 --- /dev/null +++ b/src/proxies/basic/peer.ts @@ -0,0 +1,61 @@ +import BasePeer from "../../peer.js"; +import { maybeGetAsyncProperty } from "../../util.js"; +import Socket from "../../socket.js"; +import { Buffer } from "buffer"; + +export default class Peer extends BasePeer { + private _pipe?: any; + protected async initSocket() { + const self = this; + + const raw = await maybeGetAsyncProperty(self._peer.rawStream); + this._socket = new Socket({ + remoteAddress: raw.remoteHost, + remotePort: raw.remotePort, + remotePublicKey: await maybeGetAsyncProperty(self._peer.remotePublicKey), + async write(data: any, cb: Function) { + self._pipe?.send(data); + await self._onsend?.(data); + cb(); + }, + emulateWebsocket: self._emulateWebsocket, + }); + } + + protected async handleChannelOnOpen(m: any) { + if (!m) { + m = Buffer.from([]); + } + + if (m instanceof Uint8Array) { + m = Buffer.from(m); + } + + this._socket?.on("end", () => this._channel.close()); + let ret = await this._onopen?.(this._socket, m); + if (!ret || (ret && ret.connect === false)) { + // @ts-ignore + self._socket?.emit("connect"); + } + + this._socket?.emit("data", m); + } + + protected async handleChannelOnClose(socket: Socket): Promise { + this._socket?.destroy(); + await this._onclose?.(this._socket); + } + + protected async initMessages(): Promise { + const self = this; + this._pipe = await this._channel.addMessage({ + async onmessage(m: any) { + if (m instanceof Uint8Array) { + m = Buffer.from(m); + } + self._socket.emit("data", m); + await self._onreceive?.(m); + }, + }); + } +} diff --git a/src/proxies/multiSocket.ts b/src/proxies/multiSocket.ts index f3e5a6f..50d0b2a 100644 --- a/src/proxies/multiSocket.ts +++ b/src/proxies/multiSocket.ts @@ -4,7 +4,7 @@ import { json, raw, uint } from "compact-encoding"; import { deserializeError } from "serialize-error"; import b4a from "b4a"; import type { TcpSocketConnectOpts } from "net"; -import Peer from "../peer.js"; +import Peer, { DataSocketOptions, PeerOptions } from "../peer.js"; import { roundRobinFactory, idFactory } from "../util.js"; import { CloseSocketRequest, @@ -69,6 +69,7 @@ const errorSocketEncoding = { const nextSocketId = idFactory(1); export default class MultiSocketProxy extends Proxy { + handlePeer({ peer, muxer, ...options }: DataSocketOptions & PeerOptions) {} private socketClass: any; private _peers: Map = new Map(); private _nextPeer = roundRobinFactory(this._peers); @@ -76,10 +77,7 @@ export default class MultiSocketProxy extends Proxy { private _allowedPorts = []; constructor(options: MultiSocketProxyOptions) { - super({ - createDefaultMessage: false, - ...options, - }); + super(options); this._socketOptions.onchannel = this.handleNewPeerChannel.bind(this); this._socketOptions.onclose = this.handleClosePeer.bind(this); this._socketOptions.onopen = this.handlePeer.bind(this); diff --git a/src/proxies/multiSocket/dummySocket.ts b/src/proxies/multiSocket/dummySocket.ts index bd84e7e..1ba0783 100644 --- a/src/proxies/multiSocket/dummySocket.ts +++ b/src/proxies/multiSocket/dummySocket.ts @@ -4,8 +4,9 @@ import { clearTimeout } from "timers"; import MultiSocketProxy from "../multiSocket.js"; import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js"; import { maybeGetAsyncProperty } from "../../util.js"; +import Socket, { SocketOptions } from "../../socket.js"; -export default class DummySocket extends Duplex { +export default class DummySocket extends Socket { private _options: TcpSocketConnectOpts; private _id: number; private _proxy: MultiSocketProxy; @@ -16,13 +17,14 @@ export default class DummySocket extends Duplex { id: number, manager: MultiSocketProxy, peer: PeerEntity, - options: TcpSocketConnectOpts + connectOptions: TcpSocketConnectOpts, + socketOptions: SocketOptions ) { - super(); + super(socketOptions); this._id = id; this._proxy = manager; this._peer = peer; - this._options = options; + this._options = connectOptions; // @ts-ignore this.on("timeout", () => { diff --git a/src/proxies/multiSocket/peer.ts b/src/proxies/multiSocket/peer.ts new file mode 100644 index 0000000..1ba59ae --- /dev/null +++ b/src/proxies/multiSocket/peer.ts @@ -0,0 +1,16 @@ +import BasePeer from "../../peer.js"; +import Socket from "../../socket.js"; +import MultiSocketProxy from "../multiSocket.js"; + +export default class Peer extends BasePeer { + protected declare _proxy: MultiSocketProxy; + protected async initSocket() {} + + protected async handleChannelOnClose(socket: Socket): Promise { + return this._proxy.handleClosePeer(this); + } + + protected async handleChannelOnOpen(m: any): Promise { + await this._proxy.handleNewPeerChannel(this._peer, this._channel); + } +} diff --git a/src/proxies/multiSocket/tcpSocket.ts b/src/proxies/multiSocket/tcpSocket.ts index b5a99b7..e862250 100644 --- a/src/proxies/multiSocket/tcpSocket.ts +++ b/src/proxies/multiSocket/tcpSocket.ts @@ -3,8 +3,9 @@ import { Socket, TcpSocketConnectOpts } from "net"; import MultiSocketProxy from "../multiSocket.js"; import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js"; import * as net from "net"; +import BaseSocket from "../../socket.js"; -export default class TcpSocket extends Duplex { +export default class TcpSocket extends BaseSocket { private _options; private _id: number; private _remoteId: number; diff --git a/src/proxy.ts b/src/proxy.ts index 32b704f..39cfbad 100644 --- a/src/proxy.ts +++ b/src/proxy.ts @@ -1,5 +1,5 @@ import Protomux from "protomux"; -import Peer, { PeerOptions, DataSocketOptions } from "./peer.js"; +import { DataSocketOptions, PeerOptions } from "./peer.js"; export interface ProxyOptions extends DataSocketOptions { swarm: any; @@ -23,7 +23,6 @@ export default abstract class Proxy { listen = false, autostart = false, emulateWebsocket = false, - createDefaultMessage = true, }: ProxyOptions) { this._swarm = swarm; this._protocol = protocol; @@ -36,7 +35,6 @@ export default abstract class Proxy { onclose, onchannel, emulateWebsocket, - createDefaultMessage, }; this.init(); } @@ -58,14 +56,12 @@ export default abstract class Proxy { get protocol(): string { return this._protocol; } - public handlePeer({ + + protected abstract handlePeer({ peer, muxer, ...options - }: DataSocketOptions & PeerOptions) { - const conn = new Peer({ proxy: this, peer, muxer, ...options }); - conn.init(); - } + }: DataSocketOptions & PeerOptions); protected _init() { // Implement in subclasses diff --git a/src/socket.ts b/src/socket.ts index 46356e6..d5ee641 100644 --- a/src/socket.ts +++ b/src/socket.ts @@ -6,7 +6,7 @@ const IPV6 = "IPv6"; type AddressFamily = "IPv6" | "IPv4"; -interface SocketOptions { +export interface SocketOptions { allowHalfOpen?: boolean; remoteAddress?: string; remotePort?: number;