Compare commits

..

No commits in common. "c3c8e6fb3b22e615815f39b0b3b14eee2c2384c7" and "10a7b4ebc63f60fd622dd4621cc46b17276cadb2" have entirely different histories.

3 changed files with 42 additions and 58 deletions

View File

@ -21,7 +21,7 @@ export default class MultiSocketProxy extends Proxy {
get socketMap(): Map<number, number>; get socketMap(): Map<number, number>;
private _sockets; private _sockets;
get sockets(): Map<number, any>; get sockets(): Map<number, any>;
handleNewPeerChannel(peer: Peer): Promise<void>; handleNewPeerChannel(peer: Peer): void;
handleClosePeer(peer: Peer): Promise<void>; handleClosePeer(peer: Peer): Promise<void>;
get(pubkey: Uint8Array): PeerEntity | undefined; get(pubkey: Uint8Array): PeerEntity | undefined;
update(pubkey: Uint8Array, data: Partial<PeerEntity>): void; update(pubkey: Uint8Array, data: Partial<PeerEntity>): void;
@ -32,5 +32,4 @@ export default class MultiSocketProxy extends Proxy {
private _registerTimeoutSocketMessage; private _registerTimeoutSocketMessage;
private _registerErrorSocketMessage; private _registerErrorSocketMessage;
private _toString; private _toString;
private _getPublicKey;
} }

View File

@ -96,15 +96,13 @@ class MultiSocketProxy extends proxy_js_1.default {
get sockets() { get sockets() {
return this._sockets; return this._sockets;
} }
async handleNewPeerChannel(peer) { handleNewPeerChannel(peer) {
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, { peer });
peer, this._registerOpenSocketMessage(peer);
}); this._registerWriteSocketMessage(peer);
await this._registerOpenSocketMessage(peer); this._registerCloseSocketMessage(peer);
await this._registerWriteSocketMessage(peer); this._registerTimeoutSocketMessage(peer);
await this._registerCloseSocketMessage(peer); this._registerErrorSocketMessage(peer);
await this._registerTimeoutSocketMessage(peer);
await this._registerErrorSocketMessage(peer);
} }
async handleClosePeer(peer) { async handleClosePeer(peer) {
for (const item of this._sockets) { for (const item of this._sockets) {
@ -112,7 +110,7 @@ class MultiSocketProxy extends proxy_js_1.default {
item[1].end(); item[1].end();
} }
} }
const pubkey = this._toString(await this._getPublicKey(peer)); const pubkey = this._toString(peer.stream.remotePublicKey);
if (this._peers.has(pubkey)) { if (this._peers.has(pubkey)) {
this._peers.delete(pubkey); this._peers.delete(pubkey);
} }
@ -146,7 +144,7 @@ class MultiSocketProxy extends proxy_js_1.default {
this._sockets.set(socketId, socket); this._sockets.set(socketId, socket);
return socket; return socket;
} }
async _registerOpenSocketMessage(peer) { _registerOpenSocketMessage(peer) {
const self = this; const self = this;
const message = peer.channel.addMessage({ const message = peer.channel.addMessage({
encoding: { encoding: {
@ -157,7 +155,7 @@ class MultiSocketProxy extends proxy_js_1.default {
async onmessage(m) { async onmessage(m) {
if (self._allowedPorts.length && if (self._allowedPorts.length &&
!self._allowedPorts.includes(m.port)) { !self._allowedPorts.includes(m.port)) {
self.get(await this._getPublicKey(peer)).messages.errorSocket.send({ self.get(peer.stream.remotePublicKey).messages.errorSocket.send({
id: m.id, id: m.id,
err: new Error(`port ${m.port} not allowed`), err: new Error(`port ${m.port} not allowed`),
}); });
@ -165,7 +163,7 @@ class MultiSocketProxy extends proxy_js_1.default {
} }
m = m; m = m;
if (self._server) { if (self._server) {
new self.socketClass(nextSocketId(), m, self, self.get(await this._getPublicKey(peer)), m).connect(); new self.socketClass(nextSocketId(), m, self, self.get(peer.stream.remotePublicKey), m).connect();
return; return;
} }
const socket = self._sockets.get(m.id); const socket = self._sockets.get(m.id);
@ -176,11 +174,11 @@ class MultiSocketProxy extends proxy_js_1.default {
} }
}, },
}); });
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, {
messages: { openSocket: message }, messages: { openSocket: message },
}); });
} }
async _registerWriteSocketMessage(peer) { _registerWriteSocketMessage(peer) {
const self = this; const self = this;
const message = peer.channel.addMessage({ const message = peer.channel.addMessage({
encoding: writeSocketEncoding, encoding: writeSocketEncoding,
@ -188,11 +186,11 @@ class MultiSocketProxy extends proxy_js_1.default {
self._sockets.get(m.id)?.push(m.data); self._sockets.get(m.id)?.push(m.data);
}, },
}); });
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, {
messages: { writeSocket: message }, messages: { writeSocket: message },
}); });
} }
async _registerCloseSocketMessage(peer) { _registerCloseSocketMessage(peer) {
const self = this; const self = this;
const message = peer.channel.addMessage({ const message = peer.channel.addMessage({
encoding: socketEncoding, encoding: socketEncoding,
@ -200,11 +198,11 @@ class MultiSocketProxy extends proxy_js_1.default {
self._sockets.get(m.id)?.end(); self._sockets.get(m.id)?.end();
}, },
}); });
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, {
messages: { closeSocket: message }, messages: { closeSocket: message },
}); });
} }
async _registerTimeoutSocketMessage(peer) { _registerTimeoutSocketMessage(peer) {
const self = this; const self = this;
const message = peer.channel.addMessage({ const message = peer.channel.addMessage({
encoding: socketEncoding, encoding: socketEncoding,
@ -213,11 +211,11 @@ class MultiSocketProxy extends proxy_js_1.default {
self._sockets.get(m.id)?.emit("timeout"); self._sockets.get(m.id)?.emit("timeout");
}, },
}); });
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, {
messages: { timeoutSocket: message }, messages: { timeoutSocket: message },
}); });
} }
async _registerErrorSocketMessage(peer) { _registerErrorSocketMessage(peer) {
const self = this; const self = this;
const message = peer.channel.addMessage({ const message = peer.channel.addMessage({
encoding: errorSocketEncoding, encoding: errorSocketEncoding,
@ -226,15 +224,12 @@ class MultiSocketProxy extends proxy_js_1.default {
self._sockets.get(m.id)?.emit("error", m.err); self._sockets.get(m.id)?.emit("error", m.err);
}, },
}); });
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, {
messages: { errorSocket: message }, messages: { errorSocket: message },
}); });
} }
_toString(pubkey) { _toString(pubkey) {
return b4a_1.default.from(pubkey).toString("hex"); return b4a_1.default.from(pubkey).toString("hex");
} }
async _getPublicKey(peer) {
return (0, util_js_1.maybeGetAsyncProperty)(peer.stream.remotePublicKey);
}
} }
exports.default = MultiSocketProxy; exports.default = MultiSocketProxy;

View File

@ -5,11 +5,7 @@ import { deserializeError } from "serialize-error";
import b4a from "b4a"; import b4a from "b4a";
import type { TcpSocketConnectOpts } from "net"; import type { TcpSocketConnectOpts } from "net";
import { DataSocketOptions, PeerOptions } from "../peer.js"; import { DataSocketOptions, PeerOptions } from "../peer.js";
import { import { roundRobinFactory, idFactory } from "../util.js";
roundRobinFactory,
idFactory,
maybeGetAsyncProperty,
} from "../util.js";
import { import {
CloseSocketRequest, CloseSocketRequest,
ErrorSocketRequest, ErrorSocketRequest,
@ -123,16 +119,14 @@ export default class MultiSocketProxy extends Proxy {
return this._sockets; return this._sockets;
} }
async handleNewPeerChannel(peer: Peer) { handleNewPeerChannel(peer: Peer) {
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, { peer });
peer,
});
await this._registerOpenSocketMessage(peer); this._registerOpenSocketMessage(peer);
await this._registerWriteSocketMessage(peer); this._registerWriteSocketMessage(peer);
await this._registerCloseSocketMessage(peer); this._registerCloseSocketMessage(peer);
await this._registerTimeoutSocketMessage(peer); this._registerTimeoutSocketMessage(peer);
await this._registerErrorSocketMessage(peer); this._registerErrorSocketMessage(peer);
} }
async handleClosePeer(peer: Peer) { async handleClosePeer(peer: Peer) {
@ -142,7 +136,7 @@ export default class MultiSocketProxy extends Proxy {
} }
} }
const pubkey = this._toString(await this._getPublicKey(peer)); const pubkey = this._toString(peer.stream.remotePublicKey);
if (this._peers.has(pubkey)) { if (this._peers.has(pubkey)) {
this._peers.delete(pubkey); this._peers.delete(pubkey);
@ -187,7 +181,7 @@ export default class MultiSocketProxy extends Proxy {
return socket; return socket;
} }
private async _registerOpenSocketMessage(peer: Peer) { private _registerOpenSocketMessage(peer: Peer) {
const self = this; const self = this;
const message = peer.channel.addMessage({ const message = peer.channel.addMessage({
encoding: { encoding: {
@ -200,7 +194,7 @@ export default class MultiSocketProxy extends Proxy {
self._allowedPorts.length && self._allowedPorts.length &&
!self._allowedPorts.includes((m as TcpSocketConnectOpts).port) !self._allowedPorts.includes((m as TcpSocketConnectOpts).port)
) { ) {
self.get(await this._getPublicKey(peer)).messages.errorSocket.send({ self.get(peer.stream.remotePublicKey).messages.errorSocket.send({
id: (m as SocketRequest).id, id: (m as SocketRequest).id,
err: new Error( err: new Error(
`port ${(m as TcpSocketConnectOpts).port} not allowed` `port ${(m as TcpSocketConnectOpts).port} not allowed`
@ -216,7 +210,7 @@ export default class MultiSocketProxy extends Proxy {
nextSocketId(), nextSocketId(),
m, m,
self, self,
self.get(await this._getPublicKey(peer)) as PeerEntity, self.get(peer.stream.remotePublicKey) as PeerEntity,
m m
).connect(); ).connect();
return; return;
@ -230,12 +224,12 @@ export default class MultiSocketProxy extends Proxy {
} }
}, },
}); });
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, {
messages: { openSocket: message }, messages: { openSocket: message },
}); });
} }
private async _registerWriteSocketMessage(peer: Peer) { private _registerWriteSocketMessage(peer: Peer) {
const self = this; const self = this;
const message = peer.channel.addMessage({ const message = peer.channel.addMessage({
encoding: writeSocketEncoding, encoding: writeSocketEncoding,
@ -243,12 +237,12 @@ export default class MultiSocketProxy extends Proxy {
self._sockets.get(m.id)?.push(m.data); self._sockets.get(m.id)?.push(m.data);
}, },
}); });
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, {
messages: { writeSocket: message }, messages: { writeSocket: message },
}); });
} }
private async _registerCloseSocketMessage(peer: Peer) { private _registerCloseSocketMessage(peer: Peer) {
const self = this; const self = this;
const message = peer.channel.addMessage({ const message = peer.channel.addMessage({
encoding: socketEncoding, encoding: socketEncoding,
@ -256,12 +250,12 @@ export default class MultiSocketProxy extends Proxy {
self._sockets.get(m.id)?.end(); self._sockets.get(m.id)?.end();
}, },
}); });
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, {
messages: { closeSocket: message }, messages: { closeSocket: message },
}); });
} }
private async _registerTimeoutSocketMessage(peer: Peer) { private _registerTimeoutSocketMessage(peer: Peer) {
const self = this; const self = this;
const message = peer.channel.addMessage({ const message = peer.channel.addMessage({
encoding: socketEncoding, encoding: socketEncoding,
@ -270,12 +264,12 @@ export default class MultiSocketProxy extends Proxy {
self._sockets.get(m.id)?.emit("timeout"); self._sockets.get(m.id)?.emit("timeout");
}, },
}); });
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, {
messages: { timeoutSocket: message }, messages: { timeoutSocket: message },
}); });
} }
private async _registerErrorSocketMessage(peer: Peer) { private _registerErrorSocketMessage(peer: Peer) {
const self = this; const self = this;
const message = peer.channel.addMessage({ const message = peer.channel.addMessage({
encoding: errorSocketEncoding, encoding: errorSocketEncoding,
@ -284,7 +278,7 @@ export default class MultiSocketProxy extends Proxy {
self._sockets.get(m.id)?.emit("error", m.err); self._sockets.get(m.id)?.emit("error", m.err);
}, },
}); });
this.update(await this._getPublicKey(peer), { this.update(peer.stream.remotePublicKey, {
messages: { errorSocket: message }, messages: { errorSocket: message },
}); });
} }
@ -292,8 +286,4 @@ export default class MultiSocketProxy extends Proxy {
private _toString(pubkey: Uint8Array) { private _toString(pubkey: Uint8Array) {
return b4a.from(pubkey).toString("hex"); return b4a.from(pubkey).toString("hex");
} }
private async _getPublicKey(peer: Peer) {
return maybeGetAsyncProperty(peer.stream.remotePublicKey);
}
} }