Compare commits

..

3 Commits

1 changed files with 85 additions and 10 deletions

View File

@ -51,6 +51,11 @@ class Channel {
} }
async open(handshake) { async open(handshake) {
if (this._slave) {
await this._mux.pullLocal();
await this._mux.pullFree();
}
const freeLen = this._mux._free.length;
const id = const id =
this._mux._free.length > 0 this._mux._free.length > 0
? this._mux._free.pop() ? this._mux._free.pop()
@ -60,6 +65,14 @@ class Channel {
this._localId = id + 1; this._localId = id + 1;
this._mux._local[id] = this; this._mux._local[id] = this;
if (this._slave) {
if (freeLen > 0) {
await this._mux.popFree(id);
} else {
await this._mux.pushLocal(id);
}
}
if (this._remoteId === 0) { if (this._remoteId === 0) {
this._info.outgoing.push(this._localId); this._info.outgoing.push(this._localId);
} }
@ -81,7 +94,6 @@ class Channel {
if (this._handshake) this._handshake.encode(state, handshake); if (this._handshake) this._handshake.encode(state, handshake);
this._mux._write0(state.buffer); this._mux._write0(state.buffer);
await this._mux.syncState?.();
} }
_dec() { _dec() {
@ -96,7 +108,6 @@ class Channel {
async _fullyOpenSoon() { async _fullyOpenSoon() {
this._mux._remote[this._remoteId - 1].session = this; this._mux._remote[this._remoteId - 1].session = this;
queueTick(this._fullyOpen.bind(this)); queueTick(this._fullyOpen.bind(this));
await this._mux.syncState?.();
} }
_fullyOpen() { _fullyOpen() {
@ -142,9 +153,16 @@ class Channel {
if (this._remoteId > 0) { if (this._remoteId > 0) {
this._mux._remote[this._remoteId - 1] = null; this._mux._remote[this._remoteId - 1] = null;
this._remoteId = 0; this._remoteId = 0;
if (this._slave) {
await this._mux.pullFree();
}
// If remote has acked, we can reuse the local id now // If remote has acked, we can reuse the local id now
// otherwise, we need to wait for the "ack" to arrive // otherwise, we need to wait for the "ack" to arrive
this._mux._free.push(this._localId - 1); this._mux._free.push(this._localId - 1);
if (this._slave) {
await this._mux.pushFree(this._localId - 1);
}
} }
this._mux._local[this._localId - 1] = null; this._mux._local[this._localId - 1] = null;
@ -154,7 +172,6 @@ class Channel {
this._track(this.onclose(isRemote, this)); this._track(this.onclose(isRemote, this));
if (this._active === 0) this._destroy(); if (this._active === 0) this._destroy();
await this._mux.syncState?.();
} }
_destroy() { _destroy() {
@ -498,7 +515,7 @@ module.exports = class Protomux {
const type = c.uint.decode(state); const type = c.uint.decode(state);
if (remoteId === 0) { if (remoteId === 0) {
await this._oncontrolsession(type, state); this._oncontrolsession(type, state);
return; return;
} }
@ -513,11 +530,10 @@ module.exports = class Protomux {
return; return;
} }
await this.syncState?.();
r.session._recv(type, state); r.session._recv(type, state);
} }
async _oncontrolsession(type, state) { _oncontrolsession(type, state) {
switch (type) { switch (type) {
case 0: case 0:
this._onbatch(state); this._onbatch(state);
@ -535,8 +551,6 @@ module.exports = class Protomux {
this._onclosesession(state); this._onclosesession(state);
break; break;
} }
await this.syncState?.();
} }
_bufferMessage(r, type, { buffer, start, end }) { _bufferMessage(r, type, { buffer, start, end }) {
@ -591,12 +605,15 @@ module.exports = class Protomux {
const info = this._get(protocol, id); const info = this._get(protocol, id);
if (this._slave) { if (this._slave) {
await this._mux.syncState?.(); await this._mux.pullRemote();
} }
// allow the remote to grow the ids by one // allow the remote to grow the ids by one
if (this._remote.length === rid) { if (this._remote.length === rid) {
this._remote.push(null); this._remote.push(null);
if (this._slave) {
await this._mux.pushRemote(this._remote.length - 1);
}
} }
if (rid >= this._remote.length || this._remote[rid] !== null) { if (rid >= this._remote.length || this._remote[rid] !== null) {
@ -608,8 +625,14 @@ module.exports = class Protomux {
const session = this._local[localId - 1]; const session = this._local[localId - 1];
if (session === null) { if (session === null) {
if (this._slave) {
await this._mux.pullFree();
}
// we already closed the channel - ignore // we already closed the channel - ignore
this._free.push(localId - 1); this._free.push(localId - 1);
if (this._slave) {
await this._mux.pushFree(localId - 1);
}
return; return;
} }
@ -634,10 +657,14 @@ module.exports = class Protomux {
info.pairing++; info.pairing++;
info.incoming.push(remoteId); info.incoming.push(remoteId);
if (this._slave) {
await this._mux.pushRemotes();
}
this._requestSession(protocol, id, info).catch(this._safeDestroyBound); this._requestSession(protocol, id, info).catch(this._safeDestroyBound);
} }
_onrejectsession(state) { async _onrejectsession(state) {
const localId = c.uint.decode(state); const localId = c.uint.decode(state);
// TODO: can be done smarter... // TODO: can be done smarter...
@ -649,7 +676,15 @@ module.exports = class Protomux {
const session = this._local[localId - 1]; const session = this._local[localId - 1];
if (this._slave) {
await this._mux.pullFree(localId - 1);
}
this._free.push(localId - 1); this._free.push(localId - 1);
if (this._slave) {
await this._mux.pushFree(localId - 1);
}
if (session !== null) session._close(true); if (session !== null) session._close(true);
this._gc(info); this._gc(info);
@ -738,6 +773,46 @@ module.exports = class Protomux {
if (s !== null) s._close(true); if (s !== null) s._close(true);
} }
} }
async popFree(id) {
await this.userData.syncProtomux("popFree", id);
}
async pushFree(id) {
await this.userData.syncProtomux("pushFree", id);
}
async pushLocal(id) {
await this.userData.syncProtomux("pushLocal", id);
}
async pushRemote(id) {
await this.userData.syncProtomux("pushRemote", id);
}
async pullLocal() {
const ids = await this.userData.syncProtomux("pullLocal");
ids.forEach((item) => {
item = parseInt(item);
if (typeof this._local[item] === "undefined") {
this._local[item] = null;
}
});
}
async pullFree() {
const ids = await this.userData.syncProtomux("pullFree");
this._free = Array.from(new Set([...this._free, ...ids]));
this._free = this._free.filter((item) => item !== null);
}
async pullRemote() {
const ids = await this.userData.syncProtomux("pullRemote");
ids.forEach((item) => {
item = parseInt(item);
if (typeof this._remote[item] === "undefined") {
this._remote[item] = null;
}
});
}
}; };
function noop() {} function noop() {}