Compare commits
2 Commits
10a7b4ebc6
...
c3c8e6fb3b
Author | SHA1 | Date |
---|---|---|
Derrick Hammer | c3c8e6fb3b | |
Derrick Hammer | 96dd1ad46e |
|
@ -21,7 +21,7 @@ export default class MultiSocketProxy extends Proxy {
|
|||
get socketMap(): Map<number, number>;
|
||||
private _sockets;
|
||||
get sockets(): Map<number, any>;
|
||||
handleNewPeerChannel(peer: Peer): void;
|
||||
handleNewPeerChannel(peer: Peer): Promise<void>;
|
||||
handleClosePeer(peer: Peer): Promise<void>;
|
||||
get(pubkey: Uint8Array): PeerEntity | undefined;
|
||||
update(pubkey: Uint8Array, data: Partial<PeerEntity>): void;
|
||||
|
@ -32,4 +32,5 @@ export default class MultiSocketProxy extends Proxy {
|
|||
private _registerTimeoutSocketMessage;
|
||||
private _registerErrorSocketMessage;
|
||||
private _toString;
|
||||
private _getPublicKey;
|
||||
}
|
||||
|
|
|
@ -96,13 +96,15 @@ class MultiSocketProxy extends proxy_js_1.default {
|
|||
get sockets() {
|
||||
return this._sockets;
|
||||
}
|
||||
handleNewPeerChannel(peer) {
|
||||
this.update(peer.stream.remotePublicKey, { peer });
|
||||
this._registerOpenSocketMessage(peer);
|
||||
this._registerWriteSocketMessage(peer);
|
||||
this._registerCloseSocketMessage(peer);
|
||||
this._registerTimeoutSocketMessage(peer);
|
||||
this._registerErrorSocketMessage(peer);
|
||||
async handleNewPeerChannel(peer) {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
peer,
|
||||
});
|
||||
await this._registerOpenSocketMessage(peer);
|
||||
await this._registerWriteSocketMessage(peer);
|
||||
await this._registerCloseSocketMessage(peer);
|
||||
await this._registerTimeoutSocketMessage(peer);
|
||||
await this._registerErrorSocketMessage(peer);
|
||||
}
|
||||
async handleClosePeer(peer) {
|
||||
for (const item of this._sockets) {
|
||||
|
@ -110,7 +112,7 @@ class MultiSocketProxy extends proxy_js_1.default {
|
|||
item[1].end();
|
||||
}
|
||||
}
|
||||
const pubkey = this._toString(peer.stream.remotePublicKey);
|
||||
const pubkey = this._toString(await this._getPublicKey(peer));
|
||||
if (this._peers.has(pubkey)) {
|
||||
this._peers.delete(pubkey);
|
||||
}
|
||||
|
@ -144,7 +146,7 @@ class MultiSocketProxy extends proxy_js_1.default {
|
|||
this._sockets.set(socketId, socket);
|
||||
return socket;
|
||||
}
|
||||
_registerOpenSocketMessage(peer) {
|
||||
async _registerOpenSocketMessage(peer) {
|
||||
const self = this;
|
||||
const message = peer.channel.addMessage({
|
||||
encoding: {
|
||||
|
@ -155,7 +157,7 @@ class MultiSocketProxy extends proxy_js_1.default {
|
|||
async onmessage(m) {
|
||||
if (self._allowedPorts.length &&
|
||||
!self._allowedPorts.includes(m.port)) {
|
||||
self.get(peer.stream.remotePublicKey).messages.errorSocket.send({
|
||||
self.get(await this._getPublicKey(peer)).messages.errorSocket.send({
|
||||
id: m.id,
|
||||
err: new Error(`port ${m.port} not allowed`),
|
||||
});
|
||||
|
@ -163,7 +165,7 @@ class MultiSocketProxy extends proxy_js_1.default {
|
|||
}
|
||||
m = m;
|
||||
if (self._server) {
|
||||
new self.socketClass(nextSocketId(), m, self, self.get(peer.stream.remotePublicKey), m).connect();
|
||||
new self.socketClass(nextSocketId(), m, self, self.get(await this._getPublicKey(peer)), m).connect();
|
||||
return;
|
||||
}
|
||||
const socket = self._sockets.get(m.id);
|
||||
|
@ -174,11 +176,11 @@ class MultiSocketProxy extends proxy_js_1.default {
|
|||
}
|
||||
},
|
||||
});
|
||||
this.update(peer.stream.remotePublicKey, {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
messages: { openSocket: message },
|
||||
});
|
||||
}
|
||||
_registerWriteSocketMessage(peer) {
|
||||
async _registerWriteSocketMessage(peer) {
|
||||
const self = this;
|
||||
const message = peer.channel.addMessage({
|
||||
encoding: writeSocketEncoding,
|
||||
|
@ -186,11 +188,11 @@ class MultiSocketProxy extends proxy_js_1.default {
|
|||
self._sockets.get(m.id)?.push(m.data);
|
||||
},
|
||||
});
|
||||
this.update(peer.stream.remotePublicKey, {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
messages: { writeSocket: message },
|
||||
});
|
||||
}
|
||||
_registerCloseSocketMessage(peer) {
|
||||
async _registerCloseSocketMessage(peer) {
|
||||
const self = this;
|
||||
const message = peer.channel.addMessage({
|
||||
encoding: socketEncoding,
|
||||
|
@ -198,11 +200,11 @@ class MultiSocketProxy extends proxy_js_1.default {
|
|||
self._sockets.get(m.id)?.end();
|
||||
},
|
||||
});
|
||||
this.update(peer.stream.remotePublicKey, {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
messages: { closeSocket: message },
|
||||
});
|
||||
}
|
||||
_registerTimeoutSocketMessage(peer) {
|
||||
async _registerTimeoutSocketMessage(peer) {
|
||||
const self = this;
|
||||
const message = peer.channel.addMessage({
|
||||
encoding: socketEncoding,
|
||||
|
@ -211,11 +213,11 @@ class MultiSocketProxy extends proxy_js_1.default {
|
|||
self._sockets.get(m.id)?.emit("timeout");
|
||||
},
|
||||
});
|
||||
this.update(peer.stream.remotePublicKey, {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
messages: { timeoutSocket: message },
|
||||
});
|
||||
}
|
||||
_registerErrorSocketMessage(peer) {
|
||||
async _registerErrorSocketMessage(peer) {
|
||||
const self = this;
|
||||
const message = peer.channel.addMessage({
|
||||
encoding: errorSocketEncoding,
|
||||
|
@ -224,12 +226,15 @@ class MultiSocketProxy extends proxy_js_1.default {
|
|||
self._sockets.get(m.id)?.emit("error", m.err);
|
||||
},
|
||||
});
|
||||
this.update(peer.stream.remotePublicKey, {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
messages: { errorSocket: message },
|
||||
});
|
||||
}
|
||||
_toString(pubkey) {
|
||||
return b4a_1.default.from(pubkey).toString("hex");
|
||||
}
|
||||
async _getPublicKey(peer) {
|
||||
return (0, util_js_1.maybeGetAsyncProperty)(peer.stream.remotePublicKey);
|
||||
}
|
||||
}
|
||||
exports.default = MultiSocketProxy;
|
||||
|
|
|
@ -5,7 +5,11 @@ import { deserializeError } from "serialize-error";
|
|||
import b4a from "b4a";
|
||||
import type { TcpSocketConnectOpts } from "net";
|
||||
import { DataSocketOptions, PeerOptions } from "../peer.js";
|
||||
import { roundRobinFactory, idFactory } from "../util.js";
|
||||
import {
|
||||
roundRobinFactory,
|
||||
idFactory,
|
||||
maybeGetAsyncProperty,
|
||||
} from "../util.js";
|
||||
import {
|
||||
CloseSocketRequest,
|
||||
ErrorSocketRequest,
|
||||
|
@ -119,14 +123,16 @@ export default class MultiSocketProxy extends Proxy {
|
|||
return this._sockets;
|
||||
}
|
||||
|
||||
handleNewPeerChannel(peer: Peer) {
|
||||
this.update(peer.stream.remotePublicKey, { peer });
|
||||
async handleNewPeerChannel(peer: Peer) {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
peer,
|
||||
});
|
||||
|
||||
this._registerOpenSocketMessage(peer);
|
||||
this._registerWriteSocketMessage(peer);
|
||||
this._registerCloseSocketMessage(peer);
|
||||
this._registerTimeoutSocketMessage(peer);
|
||||
this._registerErrorSocketMessage(peer);
|
||||
await this._registerOpenSocketMessage(peer);
|
||||
await this._registerWriteSocketMessage(peer);
|
||||
await this._registerCloseSocketMessage(peer);
|
||||
await this._registerTimeoutSocketMessage(peer);
|
||||
await this._registerErrorSocketMessage(peer);
|
||||
}
|
||||
|
||||
async handleClosePeer(peer: Peer) {
|
||||
|
@ -136,7 +142,7 @@ export default class MultiSocketProxy extends Proxy {
|
|||
}
|
||||
}
|
||||
|
||||
const pubkey = this._toString(peer.stream.remotePublicKey);
|
||||
const pubkey = this._toString(await this._getPublicKey(peer));
|
||||
|
||||
if (this._peers.has(pubkey)) {
|
||||
this._peers.delete(pubkey);
|
||||
|
@ -181,7 +187,7 @@ export default class MultiSocketProxy extends Proxy {
|
|||
return socket;
|
||||
}
|
||||
|
||||
private _registerOpenSocketMessage(peer: Peer) {
|
||||
private async _registerOpenSocketMessage(peer: Peer) {
|
||||
const self = this;
|
||||
const message = peer.channel.addMessage({
|
||||
encoding: {
|
||||
|
@ -194,7 +200,7 @@ export default class MultiSocketProxy extends Proxy {
|
|||
self._allowedPorts.length &&
|
||||
!self._allowedPorts.includes((m as TcpSocketConnectOpts).port)
|
||||
) {
|
||||
self.get(peer.stream.remotePublicKey).messages.errorSocket.send({
|
||||
self.get(await this._getPublicKey(peer)).messages.errorSocket.send({
|
||||
id: (m as SocketRequest).id,
|
||||
err: new Error(
|
||||
`port ${(m as TcpSocketConnectOpts).port} not allowed`
|
||||
|
@ -210,7 +216,7 @@ export default class MultiSocketProxy extends Proxy {
|
|||
nextSocketId(),
|
||||
m,
|
||||
self,
|
||||
self.get(peer.stream.remotePublicKey) as PeerEntity,
|
||||
self.get(await this._getPublicKey(peer)) as PeerEntity,
|
||||
m
|
||||
).connect();
|
||||
return;
|
||||
|
@ -224,12 +230,12 @@ export default class MultiSocketProxy extends Proxy {
|
|||
}
|
||||
},
|
||||
});
|
||||
this.update(peer.stream.remotePublicKey, {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
messages: { openSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _registerWriteSocketMessage(peer: Peer) {
|
||||
private async _registerWriteSocketMessage(peer: Peer) {
|
||||
const self = this;
|
||||
const message = peer.channel.addMessage({
|
||||
encoding: writeSocketEncoding,
|
||||
|
@ -237,12 +243,12 @@ export default class MultiSocketProxy extends Proxy {
|
|||
self._sockets.get(m.id)?.push(m.data);
|
||||
},
|
||||
});
|
||||
this.update(peer.stream.remotePublicKey, {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
messages: { writeSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _registerCloseSocketMessage(peer: Peer) {
|
||||
private async _registerCloseSocketMessage(peer: Peer) {
|
||||
const self = this;
|
||||
const message = peer.channel.addMessage({
|
||||
encoding: socketEncoding,
|
||||
|
@ -250,12 +256,12 @@ export default class MultiSocketProxy extends Proxy {
|
|||
self._sockets.get(m.id)?.end();
|
||||
},
|
||||
});
|
||||
this.update(peer.stream.remotePublicKey, {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
messages: { closeSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _registerTimeoutSocketMessage(peer: Peer) {
|
||||
private async _registerTimeoutSocketMessage(peer: Peer) {
|
||||
const self = this;
|
||||
const message = peer.channel.addMessage({
|
||||
encoding: socketEncoding,
|
||||
|
@ -264,12 +270,12 @@ export default class MultiSocketProxy extends Proxy {
|
|||
self._sockets.get(m.id)?.emit("timeout");
|
||||
},
|
||||
});
|
||||
this.update(peer.stream.remotePublicKey, {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
messages: { timeoutSocket: message },
|
||||
});
|
||||
}
|
||||
|
||||
private _registerErrorSocketMessage(peer: Peer) {
|
||||
private async _registerErrorSocketMessage(peer: Peer) {
|
||||
const self = this;
|
||||
const message = peer.channel.addMessage({
|
||||
encoding: errorSocketEncoding,
|
||||
|
@ -278,7 +284,7 @@ export default class MultiSocketProxy extends Proxy {
|
|||
self._sockets.get(m.id)?.emit("error", m.err);
|
||||
},
|
||||
});
|
||||
this.update(peer.stream.remotePublicKey, {
|
||||
this.update(await this._getPublicKey(peer), {
|
||||
messages: { errorSocket: message },
|
||||
});
|
||||
}
|
||||
|
@ -286,4 +292,8 @@ export default class MultiSocketProxy extends Proxy {
|
|||
private _toString(pubkey: Uint8Array) {
|
||||
return b4a.from(pubkey).toString("hex");
|
||||
}
|
||||
|
||||
private async _getPublicKey(peer: Peer) {
|
||||
return maybeGetAsyncProperty(peer.stream.remotePublicKey);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue