Compare commits

..

2 Commits

Author SHA1 Message Date
Derrick Hammer 5fa5385249
*update dist 2023-04-15 22:17:57 -04:00
Derrick Hammer 774e84996e
*further refactoring 2023-04-15 22:17:30 -04:00
26 changed files with 288 additions and 213 deletions

33
dist/peer.d.ts vendored
View File

@ -27,7 +27,6 @@ export interface DataSocketOptions {
onclose?: OnClose; onclose?: OnClose;
onchannel?: OnChannel; onchannel?: OnChannel;
emulateWebsocket?: boolean; emulateWebsocket?: boolean;
createDefaultMessage?: boolean;
} }
export interface PeerOptions { export interface PeerOptions {
peer: any; peer: any;
@ -36,21 +35,25 @@ export interface PeerOptions {
export interface PeerOptionsWithProxy extends PeerOptions { export interface PeerOptionsWithProxy extends PeerOptions {
proxy: Proxy; proxy: Proxy;
} }
export default class Peer { export default abstract class Peer {
private _proxy; protected _proxy: Proxy;
private _peer; protected _peer: any;
private _muxer; protected _muxer: any;
private _onopen; protected _onopen: OnOpenBound;
private _onreceive; protected _onreceive: OnReceiveBound;
private _onsend; protected _onsend: OnSendBound;
private _onclose; protected _onclose: OnCloseBound;
private _onchannel; protected _onchannel: OnChannelBound;
private _emulateWebsocket; protected _emulateWebsocket: boolean;
private _createDefaultMessage; constructor({ proxy, peer, muxer, onopen, onreceive, onsend, onclose, onchannel, emulateWebsocket, }: PeerOptionsWithProxy & DataSocketOptions);
constructor({ proxy, peer, muxer, onopen, onreceive, onsend, onclose, onchannel, emulateWebsocket, createDefaultMessage, }: PeerOptionsWithProxy & DataSocketOptions); protected _socket?: Socket;
private _socket?;
get socket(): Socket; get socket(): Socket;
private _channel?; protected _channel?: any;
get channel(): any; get channel(): any;
protected abstract initSocket(): any;
protected abstract handleChannelOnOpen(m: any): Promise<void>;
protected abstract handleChannelOnClose(socket: Socket): Promise<void>;
protected initChannel(): Promise<void>;
init(): Promise<void>; init(): Promise<void>;
protected initMessages(): Promise<void>;
} }

78
dist/peer.js vendored
View File

@ -1,10 +1,5 @@
"use strict"; "use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
const socket_js_1 = __importDefault(require("./socket.js"));
const buffer_1 = require("buffer");
class Peer { class Peer {
_proxy; _proxy;
_peer; _peer;
@ -15,8 +10,7 @@ class Peer {
_onclose; _onclose;
_onchannel; _onchannel;
_emulateWebsocket; _emulateWebsocket;
_createDefaultMessage; constructor({ proxy, peer, muxer, onopen, onreceive, onsend, onclose, onchannel, emulateWebsocket = false, }) {
constructor({ proxy, peer, muxer, onopen, onreceive, onsend, onclose, onchannel, emulateWebsocket = false, createDefaultMessage = true, }) {
this._proxy = proxy; this._proxy = proxy;
this._peer = peer; this._peer = peer;
this._muxer = muxer; this._muxer = muxer;
@ -26,7 +20,6 @@ class Peer {
this._onclose = onclose?.bind(undefined, this); this._onclose = onclose?.bind(undefined, this);
this._onchannel = onchannel?.bind(undefined, this); this._onchannel = onchannel?.bind(undefined, this);
this._emulateWebsocket = emulateWebsocket; this._emulateWebsocket = emulateWebsocket;
this._createDefaultMessage = createDefaultMessage;
} }
_socket; _socket;
get socket() { get socket() {
@ -36,72 +29,21 @@ class Peer {
get channel() { get channel() {
return this._channel; return this._channel;
} }
async init() { async initChannel() {
const self = this; const self = this;
let pipe;
const raw = await maybeGetAsyncProperty(self._peer.rawStream);
this._socket = new socket_js_1.default({
remoteAddress: raw.remoteHost,
remotePort: raw.remotePort,
remotePublicKey: await maybeGetAsyncProperty(self._peer.remotePublicKey),
async write(data, cb) {
if (pipe) {
pipe.send(data);
}
await self._onsend?.(data);
cb();
},
emulateWebsocket: self._emulateWebsocket,
});
this._channel = await this._muxer.createChannel({ this._channel = await this._muxer.createChannel({
protocol: this._proxy.protocol, protocol: this._proxy.protocol,
async onopen(m) { onopen: this.handleChannelOnOpen.bind(this),
if (!m) { onclose: this.handleChannelOnClose.bind(this),
m = buffer_1.Buffer.from([]);
}
if (m instanceof Uint8Array) {
m = buffer_1.Buffer.from(m);
}
self._socket.on("end", () => this._channel.close());
let ret = await self._onopen?.(self._socket, m);
if (!ret || (ret && ret.connect === false)) {
// @ts-ignore
self._socket.emit("connect");
}
self._socket.emit("data", m);
},
async onclose() {
self._socket?.destroy();
await self._onclose?.(self._socket);
},
}); });
if (this._createDefaultMessage) { await this.initMessages();
pipe = await this._channel.addMessage({
async onmessage(m) {
if (m instanceof Uint8Array) {
m = buffer_1.Buffer.from(m);
}
self._socket.emit("data", m);
await self._onreceive?.(m);
},
});
}
await this._onchannel?.(this._channel); await this._onchannel?.(this._channel);
await this._channel.open(); await this._channel.open();
} }
async init() {
await this.initSocket();
await this.initChannel();
}
async initMessages() { }
} }
exports.default = Peer; exports.default = Peer;
async function maybeGetAsyncProperty(object) {
if (typeof object === "function") {
object = object();
}
if (isPromise(object)) {
object = await object;
}
return object;
}
function isPromise(obj) {
return (!!obj &&
(typeof obj === "object" || typeof obj === "function") &&
typeof obj.then === "function");
}

View File

@ -1,3 +1,5 @@
import Proxy from "../proxy.js"; import Proxy from "../proxy.js";
import { DataSocketOptions, PeerOptions } from "../peer.js";
export default class BasicProxy extends Proxy { export default class BasicProxy extends Proxy {
protected handlePeer({ peer, muxer, ...options }: DataSocketOptions & PeerOptions): void;
} }

View File

@ -4,6 +4,11 @@ var __importDefault = (this && this.__importDefault) || function (mod) {
}; };
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
const proxy_js_1 = __importDefault(require("../proxy.js")); const proxy_js_1 = __importDefault(require("../proxy.js"));
const peer_js_1 = __importDefault(require("./basic/peer.js"));
class BasicProxy extends proxy_js_1.default { class BasicProxy extends proxy_js_1.default {
handlePeer({ peer, muxer, ...options }) {
const conn = new peer_js_1.default({ proxy: this, peer, muxer, ...options });
conn.init();
}
} }
exports.default = BasicProxy; exports.default = BasicProxy;

9
dist/proxies/basic/peer.d.ts vendored Normal file
View File

@ -0,0 +1,9 @@
import BasePeer from "../../peer.js";
import Socket from "../../socket.js";
export default class Peer extends BasePeer {
private _pipe?;
protected initSocket(): Promise<void>;
protected handleChannelOnOpen(m: any): Promise<void>;
protected handleChannelOnClose(socket: Socket): Promise<void>;
protected initMessages(): Promise<void>;
}

59
dist/proxies/basic/peer.js vendored Normal file
View File

@ -0,0 +1,59 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const peer_js_1 = __importDefault(require("../../peer.js"));
const util_js_1 = require("../../util.js");
const socket_js_1 = __importDefault(require("../../socket.js"));
const buffer_1 = require("buffer");
class Peer extends peer_js_1.default {
_pipe;
async initSocket() {
const self = this;
const raw = await (0, util_js_1.maybeGetAsyncProperty)(self._peer.rawStream);
this._socket = new socket_js_1.default({
remoteAddress: raw.remoteHost,
remotePort: raw.remotePort,
remotePublicKey: await (0, util_js_1.maybeGetAsyncProperty)(self._peer.remotePublicKey),
async write(data, cb) {
self._pipe?.send(data);
await self._onsend?.(data);
cb();
},
emulateWebsocket: self._emulateWebsocket,
});
}
async handleChannelOnOpen(m) {
if (!m) {
m = buffer_1.Buffer.from([]);
}
if (m instanceof Uint8Array) {
m = buffer_1.Buffer.from(m);
}
this._socket?.on("end", () => this._channel.close());
let ret = await this._onopen?.(this._socket, m);
if (!ret || (ret && ret.connect === false)) {
// @ts-ignore
self._socket?.emit("connect");
}
this._socket?.emit("data", m);
}
async handleChannelOnClose(socket) {
this._socket?.destroy();
await this._onclose?.(this._socket);
}
async initMessages() {
const self = this;
this._pipe = await this._channel.addMessage({
async onmessage(m) {
if (m instanceof Uint8Array) {
m = buffer_1.Buffer.from(m);
}
self._socket.emit("data", m);
await self._onreceive?.(m);
},
});
}
}
exports.default = Peer;

View File

@ -1,7 +1,7 @@
/// <reference types="node" /> /// <reference types="node" />
import Proxy, { ProxyOptions } from "../proxy.js"; import Proxy, { ProxyOptions } from "../proxy.js";
import type { TcpSocketConnectOpts } from "net"; import type { TcpSocketConnectOpts } from "net";
import Peer from "../peer.js"; import Peer, { DataSocketOptions, PeerOptions } from "../peer.js";
import { PeerEntity } from "./multiSocket/types.js"; import { PeerEntity } from "./multiSocket/types.js";
export interface MultiSocketProxyOptions extends ProxyOptions { export interface MultiSocketProxyOptions extends ProxyOptions {
socketClass?: any; socketClass?: any;
@ -9,6 +9,7 @@ export interface MultiSocketProxyOptions extends ProxyOptions {
allowedPorts?: number[]; allowedPorts?: number[];
} }
export default class MultiSocketProxy extends Proxy { export default class MultiSocketProxy extends Proxy {
handlePeer({ peer, muxer, ...options }: DataSocketOptions & PeerOptions): void;
private socketClass; private socketClass;
private _peers; private _peers;
private _nextPeer; private _nextPeer;

View File

@ -54,16 +54,14 @@ const errorSocketEncoding = {
}; };
const nextSocketId = (0, util_js_1.idFactory)(1); const nextSocketId = (0, util_js_1.idFactory)(1);
class MultiSocketProxy extends proxy_js_1.default { class MultiSocketProxy extends proxy_js_1.default {
handlePeer({ peer, muxer, ...options }) { }
socketClass; socketClass;
_peers = new Map(); _peers = new Map();
_nextPeer = (0, util_js_1.roundRobinFactory)(this._peers); _nextPeer = (0, util_js_1.roundRobinFactory)(this._peers);
_server = false; _server = false;
_allowedPorts = []; _allowedPorts = [];
constructor(options) { constructor(options) {
super({ super(options);
createDefaultMessage: false,
...options,
});
this._socketOptions.onchannel = this.handleNewPeerChannel.bind(this); this._socketOptions.onchannel = this.handleNewPeerChannel.bind(this);
this._socketOptions.onclose = this.handleClosePeer.bind(this); this._socketOptions.onclose = this.handleClosePeer.bind(this);
this._socketOptions.onopen = this.handlePeer.bind(this); this._socketOptions.onopen = this.handlePeer.bind(this);

View File

@ -1,14 +1,15 @@
/// <reference types="node" /> /// <reference types="node" />
import { Callback, Duplex } from "streamx"; import { Callback } from "streamx";
import { TcpSocketConnectOpts } from "net"; import { TcpSocketConnectOpts } from "net";
import MultiSocketProxy from "../multiSocket.js"; import MultiSocketProxy from "../multiSocket.js";
import { PeerEntity } from "./types.js"; import { PeerEntity } from "./types.js";
export default class DummySocket extends Duplex { import Socket, { SocketOptions } from "../../socket.js";
export default class DummySocket extends Socket {
private _options; private _options;
private _id; private _id;
private _proxy; private _proxy;
private _connectTimeout?; private _connectTimeout?;
constructor(id: number, manager: MultiSocketProxy, peer: PeerEntity, options: TcpSocketConnectOpts); constructor(id: number, manager: MultiSocketProxy, peer: PeerEntity, connectOptions: TcpSocketConnectOpts, socketOptions: SocketOptions);
private _remoteId; private _remoteId;
set remoteId(value: number); set remoteId(value: number);
private _peer; private _peer;

View File

@ -1,19 +1,22 @@
"use strict"; "use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
const streamx_1 = require("streamx");
const timers_1 = require("timers"); const timers_1 = require("timers");
const util_js_1 = require("../../util.js"); const util_js_1 = require("../../util.js");
class DummySocket extends streamx_1.Duplex { const socket_js_1 = __importDefault(require("../../socket.js"));
class DummySocket extends socket_js_1.default {
_options; _options;
_id; _id;
_proxy; _proxy;
_connectTimeout; _connectTimeout;
constructor(id, manager, peer, options) { constructor(id, manager, peer, connectOptions, socketOptions) {
super(); super(socketOptions);
this._id = id; this._id = id;
this._proxy = manager; this._proxy = manager;
this._peer = peer; this._peer = peer;
this._options = options; this._options = connectOptions;
// @ts-ignore // @ts-ignore
this.on("timeout", () => { this.on("timeout", () => {
if (this._connectTimeout) { if (this._connectTimeout) {

9
dist/proxies/multiSocket/peer.d.ts vendored Normal file
View File

@ -0,0 +1,9 @@
import BasePeer from "../../peer.js";
import Socket from "../../socket.js";
import MultiSocketProxy from "../multiSocket.js";
export default class Peer extends BasePeer {
protected _proxy: MultiSocketProxy;
protected initSocket(): Promise<void>;
protected handleChannelOnClose(socket: Socket): Promise<void>;
protected handleChannelOnOpen(m: any): Promise<void>;
}

16
dist/proxies/multiSocket/peer.js vendored Normal file
View File

@ -0,0 +1,16 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const peer_js_1 = __importDefault(require("../../peer.js"));
class Peer extends peer_js_1.default {
async initSocket() { }
async handleChannelOnClose(socket) {
return this._proxy.handleClosePeer(this);
}
async handleChannelOnOpen(m) {
await this._proxy.handleNewPeerChannel(this._peer, this._channel);
}
}
exports.default = Peer;

View File

@ -1,9 +1,10 @@
/// <reference types="node" /> /// <reference types="node" />
import { Callback, Duplex } from "streamx"; import { Callback } from "streamx";
import { TcpSocketConnectOpts } from "net"; import { TcpSocketConnectOpts } from "net";
import MultiSocketProxy from "../multiSocket.js"; import MultiSocketProxy from "../multiSocket.js";
import { PeerEntity } from "./types.js"; import { PeerEntity } from "./types.js";
export default class TcpSocket extends Duplex { import BaseSocket from "../../socket.js";
export default class TcpSocket extends BaseSocket {
private _options; private _options;
private _id; private _id;
private _remoteId; private _remoteId;

View File

@ -22,10 +22,13 @@ var __importStar = (this && this.__importStar) || function (mod) {
__setModuleDefault(result, mod); __setModuleDefault(result, mod);
return result; return result;
}; };
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
const streamx_1 = require("streamx");
const net = __importStar(require("net")); const net = __importStar(require("net"));
class TcpSocket extends streamx_1.Duplex { const socket_js_1 = __importDefault(require("../../socket.js"));
class TcpSocket extends socket_js_1.default {
_options; _options;
_id; _id;
_remoteId; _remoteId;

6
dist/proxy.d.ts vendored
View File

@ -1,4 +1,4 @@
import { PeerOptions, DataSocketOptions } from "./peer.js"; import { DataSocketOptions, PeerOptions } from "./peer.js";
export interface ProxyOptions extends DataSocketOptions { export interface ProxyOptions extends DataSocketOptions {
swarm: any; swarm: any;
protocol: string; protocol: string;
@ -8,14 +8,14 @@ export interface ProxyOptions extends DataSocketOptions {
export default abstract class Proxy { export default abstract class Proxy {
protected _listen: any; protected _listen: any;
protected _autostart: boolean; protected _autostart: boolean;
constructor({ swarm, protocol, onopen, onreceive, onsend, onclose, onchannel, listen, autostart, emulateWebsocket, createDefaultMessage, }: ProxyOptions); constructor({ swarm, protocol, onopen, onreceive, onsend, onclose, onchannel, listen, autostart, emulateWebsocket, }: ProxyOptions);
protected _socketOptions: DataSocketOptions; protected _socketOptions: DataSocketOptions;
get socketOptions(): DataSocketOptions; get socketOptions(): DataSocketOptions;
private _swarm; private _swarm;
get swarm(): any; get swarm(): any;
private _protocol; private _protocol;
get protocol(): string; get protocol(): string;
handlePeer({ peer, muxer, ...options }: DataSocketOptions & PeerOptions): void; protected abstract handlePeer({ peer, muxer, ...options }: DataSocketOptions & PeerOptions): any;
protected _init(): void; protected _init(): void;
private init; private init;
private _handleConnection; private _handleConnection;

8
dist/proxy.js vendored
View File

@ -4,11 +4,10 @@ var __importDefault = (this && this.__importDefault) || function (mod) {
}; };
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
const protomux_1 = __importDefault(require("protomux")); const protomux_1 = __importDefault(require("protomux"));
const peer_js_1 = __importDefault(require("./peer.js"));
class Proxy { class Proxy {
_listen; _listen;
_autostart; _autostart;
constructor({ swarm, protocol, onopen, onreceive, onsend, onclose, onchannel, listen = false, autostart = false, emulateWebsocket = false, createDefaultMessage = true, }) { constructor({ swarm, protocol, onopen, onreceive, onsend, onclose, onchannel, listen = false, autostart = false, emulateWebsocket = false, }) {
this._swarm = swarm; this._swarm = swarm;
this._protocol = protocol; this._protocol = protocol;
this._listen = listen; this._listen = listen;
@ -20,7 +19,6 @@ class Proxy {
onclose, onclose,
onchannel, onchannel,
emulateWebsocket, emulateWebsocket,
createDefaultMessage,
}; };
this.init(); this.init();
} }
@ -36,10 +34,6 @@ class Proxy {
get protocol() { get protocol() {
return this._protocol; return this._protocol;
} }
handlePeer({ peer, muxer, ...options }) {
const conn = new peer_js_1.default({ proxy: this, peer, muxer, ...options });
conn.init();
}
_init() { _init() {
// Implement in subclasses // Implement in subclasses
} }

2
dist/socket.d.ts vendored
View File

@ -1,6 +1,6 @@
import { Duplex, DuplexEvents, Callback } from "streamx"; import { Duplex, DuplexEvents, Callback } from "streamx";
type AddressFamily = "IPv6" | "IPv4"; type AddressFamily = "IPv6" | "IPv4";
interface SocketOptions { export interface SocketOptions {
allowHalfOpen?: boolean; allowHalfOpen?: boolean;
remoteAddress?: string; remoteAddress?: string;
remotePort?: number; remotePort?: number;

View File

@ -1,6 +1,7 @@
import Proxy from "./proxy.js"; import Proxy from "./proxy.js";
import Socket from "./socket.js"; import Socket from "./socket.js";
import { Buffer } from "buffer"; import { Buffer } from "buffer";
import { maybeGetAsyncProperty } from "./util.js";
export type OnOpen = ( export type OnOpen = (
peer: Peer, peer: Peer,
socket: Socket, socket: Socket,
@ -38,7 +39,6 @@ export interface DataSocketOptions {
onclose?: OnClose; onclose?: OnClose;
onchannel?: OnChannel; onchannel?: OnChannel;
emulateWebsocket?: boolean; emulateWebsocket?: boolean;
createDefaultMessage?: boolean;
} }
export interface PeerOptions { export interface PeerOptions {
@ -50,17 +50,16 @@ export interface PeerOptionsWithProxy extends PeerOptions {
proxy: Proxy; proxy: Proxy;
} }
export default class Peer { export default abstract class Peer {
private _proxy: Proxy; protected _proxy: Proxy;
private _peer: any; protected _peer: any;
private _muxer: any; protected _muxer: any;
private _onopen: OnOpenBound; protected _onopen: OnOpenBound;
private _onreceive: OnReceiveBound; protected _onreceive: OnReceiveBound;
private _onsend: OnSendBound; protected _onsend: OnSendBound;
private _onclose: OnCloseBound; protected _onclose: OnCloseBound;
private _onchannel: OnChannelBound; protected _onchannel: OnChannelBound;
private _emulateWebsocket: boolean; protected _emulateWebsocket: boolean;
private _createDefaultMessage: boolean;
constructor({ constructor({
proxy, proxy,
@ -72,7 +71,6 @@ export default class Peer {
onclose, onclose,
onchannel, onchannel,
emulateWebsocket = false, emulateWebsocket = false,
createDefaultMessage = true,
}: PeerOptionsWithProxy & DataSocketOptions) { }: PeerOptionsWithProxy & DataSocketOptions) {
this._proxy = proxy; this._proxy = proxy;
this._peer = peer; this._peer = peer;
@ -83,98 +81,44 @@ export default class Peer {
this._onclose = onclose?.bind(undefined, this); this._onclose = onclose?.bind(undefined, this);
this._onchannel = onchannel?.bind(undefined, this); this._onchannel = onchannel?.bind(undefined, this);
this._emulateWebsocket = emulateWebsocket; this._emulateWebsocket = emulateWebsocket;
this._createDefaultMessage = createDefaultMessage;
} }
private _socket?: Socket; protected _socket?: Socket;
get socket(): Socket { get socket(): Socket {
return this._socket; return this._socket;
} }
private _channel?: any; protected _channel?: any;
get channel(): any { get channel(): any {
return this._channel; return this._channel;
} }
async init() { protected abstract initSocket();
protected abstract handleChannelOnOpen(m: any): Promise<void>;
protected abstract handleChannelOnClose(socket: Socket): Promise<void>;
protected async initChannel() {
const self = this; const self = this;
let pipe;
const raw = await maybeGetAsyncProperty(self._peer.rawStream);
this._socket = new Socket({
remoteAddress: raw.remoteHost,
remotePort: raw.remotePort,
remotePublicKey: await maybeGetAsyncProperty(self._peer.remotePublicKey),
async write(data: any, cb: Function) {
if (pipe) {
pipe.send(data);
}
await self._onsend?.(data);
cb();
},
emulateWebsocket: self._emulateWebsocket,
});
this._channel = await this._muxer.createChannel({ this._channel = await this._muxer.createChannel({
protocol: this._proxy.protocol, protocol: this._proxy.protocol,
async onopen(m: any) { onopen: this.handleChannelOnOpen.bind(this),
if (!m) { onclose: this.handleChannelOnClose.bind(this),
m = Buffer.from([]);
}
if (m instanceof Uint8Array) {
m = Buffer.from(m);
}
self._socket.on("end", () => this._channel.close());
let ret = await self._onopen?.(self._socket, m);
if (!ret || (ret && ret.connect === false)) {
// @ts-ignore
self._socket.emit("connect");
}
self._socket.emit("data", m);
},
async onclose() {
self._socket?.destroy();
await self._onclose?.(self._socket);
},
}); });
if (this._createDefaultMessage) { await this.initMessages();
pipe = await this._channel.addMessage({
async onmessage(m: any) {
if (m instanceof Uint8Array) {
m = Buffer.from(m);
}
self._socket.emit("data", m);
await self._onreceive?.(m);
},
});
}
await this._onchannel?.(this._channel); await this._onchannel?.(this._channel);
await this._channel.open(); await this._channel.open();
} }
async init() {
await this.initSocket();
await this.initChannel();
} }
async function maybeGetAsyncProperty(object: any) { protected async initMessages() {}
if (typeof object === "function") {
object = object();
}
if (isPromise(object)) {
object = await object;
}
return object;
}
function isPromise(obj: Promise<any>) {
return (
!!obj &&
(typeof obj === "object" || typeof obj === "function") &&
typeof obj.then === "function"
);
} }

View File

@ -1,3 +1,14 @@
import Proxy from "../proxy.js"; import Proxy from "../proxy.js";
import { DataSocketOptions, PeerOptions } from "../peer.js";
import BasicPeer from "./basic/peer.js";
export default class BasicProxy extends Proxy {} export default class BasicProxy extends Proxy {
protected handlePeer({
peer,
muxer,
...options
}: DataSocketOptions & PeerOptions) {
const conn = new BasicPeer({ proxy: this, peer, muxer, ...options });
conn.init();
}
}

61
src/proxies/basic/peer.ts Normal file
View File

@ -0,0 +1,61 @@
import BasePeer from "../../peer.js";
import { maybeGetAsyncProperty } from "../../util.js";
import Socket from "../../socket.js";
import { Buffer } from "buffer";
export default class Peer extends BasePeer {
private _pipe?: any;
protected async initSocket() {
const self = this;
const raw = await maybeGetAsyncProperty(self._peer.rawStream);
this._socket = new Socket({
remoteAddress: raw.remoteHost,
remotePort: raw.remotePort,
remotePublicKey: await maybeGetAsyncProperty(self._peer.remotePublicKey),
async write(data: any, cb: Function) {
self._pipe?.send(data);
await self._onsend?.(data);
cb();
},
emulateWebsocket: self._emulateWebsocket,
});
}
protected async handleChannelOnOpen(m: any) {
if (!m) {
m = Buffer.from([]);
}
if (m instanceof Uint8Array) {
m = Buffer.from(m);
}
this._socket?.on("end", () => this._channel.close());
let ret = await this._onopen?.(this._socket, m);
if (!ret || (ret && ret.connect === false)) {
// @ts-ignore
self._socket?.emit("connect");
}
this._socket?.emit("data", m);
}
protected async handleChannelOnClose(socket: Socket): Promise<void> {
this._socket?.destroy();
await this._onclose?.(this._socket);
}
protected async initMessages(): Promise<void> {
const self = this;
this._pipe = await this._channel.addMessage({
async onmessage(m: any) {
if (m instanceof Uint8Array) {
m = Buffer.from(m);
}
self._socket.emit("data", m);
await self._onreceive?.(m);
},
});
}
}

View File

@ -4,7 +4,7 @@ import { json, raw, uint } from "compact-encoding";
import { deserializeError } from "serialize-error"; import { deserializeError } from "serialize-error";
import b4a from "b4a"; import b4a from "b4a";
import type { TcpSocketConnectOpts } from "net"; import type { TcpSocketConnectOpts } from "net";
import Peer from "../peer.js"; import Peer, { DataSocketOptions, PeerOptions } from "../peer.js";
import { roundRobinFactory, idFactory } from "../util.js"; import { roundRobinFactory, idFactory } from "../util.js";
import { import {
CloseSocketRequest, CloseSocketRequest,
@ -69,6 +69,7 @@ const errorSocketEncoding = {
const nextSocketId = idFactory(1); const nextSocketId = idFactory(1);
export default class MultiSocketProxy extends Proxy { export default class MultiSocketProxy extends Proxy {
handlePeer({ peer, muxer, ...options }: DataSocketOptions & PeerOptions) {}
private socketClass: any; private socketClass: any;
private _peers: Map<string, PeerEntity> = new Map<string, PeerEntity>(); private _peers: Map<string, PeerEntity> = new Map<string, PeerEntity>();
private _nextPeer = roundRobinFactory(this._peers); private _nextPeer = roundRobinFactory(this._peers);
@ -76,10 +77,7 @@ export default class MultiSocketProxy extends Proxy {
private _allowedPorts = []; private _allowedPorts = [];
constructor(options: MultiSocketProxyOptions) { constructor(options: MultiSocketProxyOptions) {
super({ super(options);
createDefaultMessage: false,
...options,
});
this._socketOptions.onchannel = this.handleNewPeerChannel.bind(this); this._socketOptions.onchannel = this.handleNewPeerChannel.bind(this);
this._socketOptions.onclose = this.handleClosePeer.bind(this); this._socketOptions.onclose = this.handleClosePeer.bind(this);
this._socketOptions.onopen = this.handlePeer.bind(this); this._socketOptions.onopen = this.handlePeer.bind(this);

View File

@ -4,8 +4,9 @@ import { clearTimeout } from "timers";
import MultiSocketProxy from "../multiSocket.js"; import MultiSocketProxy from "../multiSocket.js";
import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js"; import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js";
import { maybeGetAsyncProperty } from "../../util.js"; import { maybeGetAsyncProperty } from "../../util.js";
import Socket, { SocketOptions } from "../../socket.js";
export default class DummySocket extends Duplex { export default class DummySocket extends Socket {
private _options: TcpSocketConnectOpts; private _options: TcpSocketConnectOpts;
private _id: number; private _id: number;
private _proxy: MultiSocketProxy; private _proxy: MultiSocketProxy;
@ -16,13 +17,14 @@ export default class DummySocket extends Duplex {
id: number, id: number,
manager: MultiSocketProxy, manager: MultiSocketProxy,
peer: PeerEntity, peer: PeerEntity,
options: TcpSocketConnectOpts connectOptions: TcpSocketConnectOpts,
socketOptions: SocketOptions
) { ) {
super(); super(socketOptions);
this._id = id; this._id = id;
this._proxy = manager; this._proxy = manager;
this._peer = peer; this._peer = peer;
this._options = options; this._options = connectOptions;
// @ts-ignore // @ts-ignore
this.on("timeout", () => { this.on("timeout", () => {

View File

@ -0,0 +1,16 @@
import BasePeer from "../../peer.js";
import Socket from "../../socket.js";
import MultiSocketProxy from "../multiSocket.js";
export default class Peer extends BasePeer {
protected declare _proxy: MultiSocketProxy;
protected async initSocket() {}
protected async handleChannelOnClose(socket: Socket): Promise<void> {
return this._proxy.handleClosePeer(this);
}
protected async handleChannelOnOpen(m: any): Promise<void> {
await this._proxy.handleNewPeerChannel(this._peer, this._channel);
}
}

View File

@ -3,8 +3,9 @@ import { Socket, TcpSocketConnectOpts } from "net";
import MultiSocketProxy from "../multiSocket.js"; import MultiSocketProxy from "../multiSocket.js";
import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js"; import { PeerEntity, SocketRequest, WriteSocketRequest } from "./types.js";
import * as net from "net"; import * as net from "net";
import BaseSocket from "../../socket.js";
export default class TcpSocket extends Duplex { export default class TcpSocket extends BaseSocket {
private _options; private _options;
private _id: number; private _id: number;
private _remoteId: number; private _remoteId: number;

View File

@ -1,5 +1,5 @@
import Protomux from "protomux"; import Protomux from "protomux";
import Peer, { PeerOptions, DataSocketOptions } from "./peer.js"; import { DataSocketOptions, PeerOptions } from "./peer.js";
export interface ProxyOptions extends DataSocketOptions { export interface ProxyOptions extends DataSocketOptions {
swarm: any; swarm: any;
@ -23,7 +23,6 @@ export default abstract class Proxy {
listen = false, listen = false,
autostart = false, autostart = false,
emulateWebsocket = false, emulateWebsocket = false,
createDefaultMessage = true,
}: ProxyOptions) { }: ProxyOptions) {
this._swarm = swarm; this._swarm = swarm;
this._protocol = protocol; this._protocol = protocol;
@ -36,7 +35,6 @@ export default abstract class Proxy {
onclose, onclose,
onchannel, onchannel,
emulateWebsocket, emulateWebsocket,
createDefaultMessage,
}; };
this.init(); this.init();
} }
@ -58,14 +56,12 @@ export default abstract class Proxy {
get protocol(): string { get protocol(): string {
return this._protocol; return this._protocol;
} }
public handlePeer({
protected abstract handlePeer({
peer, peer,
muxer, muxer,
...options ...options
}: DataSocketOptions & PeerOptions) { }: DataSocketOptions & PeerOptions);
const conn = new Peer({ proxy: this, peer, muxer, ...options });
conn.init();
}
protected _init() { protected _init() {
// Implement in subclasses // Implement in subclasses

View File

@ -6,7 +6,7 @@ const IPV6 = "IPv6";
type AddressFamily = "IPv6" | "IPv4"; type AddressFamily = "IPv6" | "IPv4";
interface SocketOptions { export interface SocketOptions {
allowHalfOpen?: boolean; allowHalfOpen?: boolean;
remoteAddress?: string; remoteAddress?: string;
remotePort?: number; remotePort?: number;