*further refactoring

This commit is contained in:
Derrick Hammer 2023-04-15 22:17:30 -04:00
parent 4b1b828c69
commit 774e84996e
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
9 changed files with 131 additions and 102 deletions

View File

@ -1,6 +1,7 @@
import Proxy from "./proxy.js";
import Socket from "./socket.js";
import { Buffer } from "buffer";
import { maybeGetAsyncProperty } from "./util.js";
export type OnOpen = (
peer: Peer,
socket: Socket,
@ -38,7 +39,6 @@ export interface DataSocketOptions {
onclose?: OnClose;
onchannel?: OnChannel;
emulateWebsocket?: boolean;
createDefaultMessage?: boolean;
}
export interface PeerOptions {
@ -50,17 +50,16 @@ export interface PeerOptionsWithProxy extends PeerOptions {
proxy: Proxy;
}
export default class Peer {
private _proxy: Proxy;
private _peer: any;
private _muxer: any;
private _onopen: OnOpenBound;
private _onreceive: OnReceiveBound;
private _onsend: OnSendBound;
private _onclose: OnCloseBound;
private _onchannel: OnChannelBound;
private _emulateWebsocket: boolean;
private _createDefaultMessage: boolean;
export default abstract class Peer {
protected _proxy: Proxy;
protected _peer: any;
protected _muxer: any;
protected _onopen: OnOpenBound;
protected _onreceive: OnReceiveBound;
protected _onsend: OnSendBound;
protected _onclose: OnCloseBound;
protected _onchannel: OnChannelBound;
protected _emulateWebsocket: boolean;
constructor({
proxy,
@ -72,7 +71,6 @@ export default class Peer {
onclose,
onchannel,
emulateWebsocket = false,
createDefaultMessage = true,
}: PeerOptionsWithProxy & DataSocketOptions) {
this._proxy = proxy;
this._peer = peer;
@ -83,98 +81,44 @@ export default class Peer {
this._onclose = onclose?.bind(undefined, this);
this._onchannel = onchannel?.bind(undefined, this);
this._emulateWebsocket = emulateWebsocket;
this._createDefaultMessage = createDefaultMessage;
}
private _socket?: Socket;
protected _socket?: Socket;
get socket(): Socket {
return this._socket;
}
private _channel?: any;
protected _channel?: any;
get channel(): any {
return this._channel;
}
async init() {
protected abstract initSocket();
protected abstract handleChannelOnOpen(m: any): Promise<void>;
protected abstract handleChannelOnClose(socket: Socket): Promise<void>;
protected async initChannel() {
const self = this;
let pipe;
const raw = await maybeGetAsyncProperty(self._peer.rawStream);
this._socket = new Socket({
remoteAddress: raw.remoteHost,
remotePort: raw.remotePort,
remotePublicKey: await maybeGetAsyncProperty(self._peer.remotePublicKey),
async write(data: any, cb: Function) {
if (pipe) {
pipe.send(data);
}
await self._onsend?.(data);
cb();
},
emulateWebsocket: self._emulateWebsocket,
});
this._channel = await this._muxer.createChannel({
protocol: this._proxy.protocol,
async onopen(m: any) {
if (!m) {
m = Buffer.from([]);
}
if (m instanceof Uint8Array) {
m = Buffer.from(m);
}
self._socket.on("end", () => this._channel.close());
let ret = await self._onopen?.(self._socket, m);
if (!ret || (ret && ret.connect === false)) {
// @ts-ignore
self._socket.emit("connect");
}
self._socket.emit("data", m);
},
async onclose() {
self._socket?.destroy();
await self._onclose?.(self._socket);
},
onopen: this.handleChannelOnOpen.bind(this),
onclose: this.handleChannelOnClose.bind(this),
});
if (this._createDefaultMessage) {
pipe = await this._channel.addMessage({
async onmessage(m: any) {
if (m instanceof Uint8Array) {
m = Buffer.from(m);
}
self._socket.emit("data", m);
await self._onreceive?.(m);
},
});
}
await this.initMessages();
await this._onchannel?.(this._channel);
await this._channel.open();
}
}
async function maybeGetAsyncProperty(object: any) {
if (typeof object === "function") {
object = object();
async init() {
await this.initSocket();
await this.initChannel();
}
if (isPromise(object)) {
object = await object;
}
return object;
}
function isPromise(obj: Promise<any>) {
return (
!!obj &&
(typeof obj === "object" || typeof obj === "function") &&
typeof obj.then === "function"
);
protected async initMessages() {}
}

View File

@ -1,3 +1,14 @@
import Proxy from "../proxy.js";
import { DataSocketOptions, PeerOptions } from "../peer.js";
import BasicPeer from "./basic/peer.js";
export default class BasicProxy extends Proxy {}
export default class BasicProxy extends Proxy {
protected handlePeer({
peer,
muxer,
...options
}: DataSocketOptions & PeerOptions) {
const conn = new BasicPeer({ proxy: this, peer, muxer, ...options });
conn.init();
}
}

61
src/proxies/basic/peer.ts Normal file
View File

@ -0,0 +1,61 @@
import BasePeer from "../../peer.js";
import { maybeGetAsyncProperty } from "../../util.js";
import Socket from "../../socket.js";
import { Buffer } from "buffer";
export default class Peer extends BasePeer {
private _pipe?: any;
protected async initSocket() {
const self = this;
const raw = await maybeGetAsyncProperty(self._peer.rawStream);
this._socket = new Socket({
remoteAddress: raw.remoteHost,
remotePort: raw.remotePort,
remotePublicKey: await maybeGetAsyncProperty(self._peer.remotePublicKey),
async write(data: any, cb: Function) {
self._pipe?.send(data);
await self._onsend?.(data);
cb();
},
emulateWebsocket: self._emulateWebsocket,
});
}
protected async handleChannelOnOpen(m: any) {
if (!m) {
m = Buffer.from([]);
}
if (m instanceof Uint8Array) {
m = Buffer.from(m);
}
this._socket?.on("end", () => this._channel.close());
let ret = await this._onopen?.(this._socket, m);
if (!ret || (ret && ret.connect === false)) {
// @ts-ignore
self._socket?.emit("connect");
}
this._socket?.emit("data", m);
}
protected async handleChannelOnClose(socket: Socket): Promise<void> {
this._socket?.destroy();
await this._onclose?.(this._socket);
}
protected async initMessages(): Promise<void> {
const self = this;
this._pipe = await this._channel.addMessage({
async onmessage(m: any) {
if (m instanceof Uint8Array) {
m = Buffer.from(m);
}
self._socket.emit("data", m);
await self._onreceive?.(m);
},
});
}
}

View File

@ -4,7 +4,7 @@ import { json, raw, uint } from "compact-encoding";
import { deserializeError } from "serialize-error";
import b4a from "b4a";
import type { TcpSocketConnectOpts } from "net";
import Peer from "../peer.js";
import Peer, { DataSocketOptions, PeerOptions } from "../peer.js";
import { roundRobinFactory, idFactory } from "../util.js";
import {
CloseSocketRequest,
@ -69,6 +69,7 @@ const errorSocketEncoding = {
const nextSocketId = idFactory(1);
export default class MultiSocketProxy extends Proxy {
handlePeer({ peer, muxer, ...options }: DataSocketOptions & PeerOptions) {}
private socketClass: any;
private _peers: Map<string, PeerEntity> = new Map<string, PeerEntity>();
private _nextPeer = roundRobinFactory(this._peers);
@ -76,10 +77,7 @@ export default class MultiSocketProxy extends Proxy {
private _allowedPorts = [];
constructor(options: MultiSocketProxyOptions) {
super({
createDefaultMessage: false,
...options,
});
super(options);
this._socketOptions.onchannel = this.handleNewPeerChannel.bind(this);
this._socketOptions.onclose = this.handleClosePeer.bind(this);
this._socketOptions.onopen = this.handlePeer.bind(this);

View File

@ -4,8 +4,9 @@ import { clearTimeout } from "timers";
import MultiSocketProxy from "../multiSocket.js";
import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js";
import { maybeGetAsyncProperty } from "../../util.js";
import Socket, { SocketOptions } from "../../socket.js";
export default class DummySocket extends Duplex {
export default class DummySocket extends Socket {
private _options: TcpSocketConnectOpts;
private _id: number;
private _proxy: MultiSocketProxy;
@ -16,13 +17,14 @@ export default class DummySocket extends Duplex {
id: number,
manager: MultiSocketProxy,
peer: PeerEntity,
options: TcpSocketConnectOpts
connectOptions: TcpSocketConnectOpts,
socketOptions: SocketOptions
) {
super();
super(socketOptions);
this._id = id;
this._proxy = manager;
this._peer = peer;
this._options = options;
this._options = connectOptions;
// @ts-ignore
this.on("timeout", () => {

View File

@ -0,0 +1,16 @@
import BasePeer from "../../peer.js";
import Socket from "../../socket.js";
import MultiSocketProxy from "../multiSocket.js";
export default class Peer extends BasePeer {
protected declare _proxy: MultiSocketProxy;
protected async initSocket() {}
protected async handleChannelOnClose(socket: Socket): Promise<void> {
return this._proxy.handleClosePeer(this);
}
protected async handleChannelOnOpen(m: any): Promise<void> {
await this._proxy.handleNewPeerChannel(this._peer, this._channel);
}
}

View File

@ -3,8 +3,9 @@ import { Socket, TcpSocketConnectOpts } from "net";
import MultiSocketProxy from "../multiSocket.js";
import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js";
import * as net from "net";
import BaseSocket from "../../socket.js";
export default class TcpSocket extends Duplex {
export default class TcpSocket extends BaseSocket {
private _options;
private _id: number;
private _remoteId: number;

View File

@ -1,5 +1,5 @@
import Protomux from "protomux";
import Peer, { PeerOptions, DataSocketOptions } from "./peer.js";
import { DataSocketOptions, PeerOptions } from "./peer.js";
export interface ProxyOptions extends DataSocketOptions {
swarm: any;
@ -23,7 +23,6 @@ export default abstract class Proxy {
listen = false,
autostart = false,
emulateWebsocket = false,
createDefaultMessage = true,
}: ProxyOptions) {
this._swarm = swarm;
this._protocol = protocol;
@ -36,7 +35,6 @@ export default abstract class Proxy {
onclose,
onchannel,
emulateWebsocket,
createDefaultMessage,
};
this.init();
}
@ -58,14 +56,12 @@ export default abstract class Proxy {
get protocol(): string {
return this._protocol;
}
public handlePeer({
protected abstract handlePeer({
peer,
muxer,
...options
}: DataSocketOptions & PeerOptions) {
const conn = new Peer({ proxy: this, peer, muxer, ...options });
conn.init();
}
}: DataSocketOptions & PeerOptions);
protected _init() {
// Implement in subclasses

View File

@ -6,7 +6,7 @@ const IPV6 = "IPv6";
type AddressFamily = "IPv6" | "IPv4";
interface SocketOptions {
export interface SocketOptions {
allowHalfOpen?: boolean;
remoteAddress?: string;
remotePort?: number;