From bef4631c30c9b2fd524b36cc11d027e3806e78f0 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Thu, 6 Apr 2023 15:58:39 -0400 Subject: [PATCH] *Switch sync approach. sync state properties ondemand before and after accessing them --- index.js | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 85 insertions(+), 10 deletions(-) diff --git a/index.js b/index.js index 4313d42..41139a2 100644 --- a/index.js +++ b/index.js @@ -51,6 +51,11 @@ class Channel { } async open(handshake) { + if (this._slave) { + await this._mux.pullLocal(); + await this._mux.pullFree(); + } + const freeLen = this._mux._free.length; const id = this._mux._free.length > 0 ? this._mux._free.pop() @@ -60,6 +65,14 @@ class Channel { this._localId = id + 1; this._mux._local[id] = this; + if (this._slave) { + if (freeLen > 0) { + await this._mux.popFree(id); + } else { + await this._mux.pushLocal(id); + } + } + if (this._remoteId === 0) { this._info.outgoing.push(this._localId); } @@ -81,7 +94,6 @@ class Channel { if (this._handshake) this._handshake.encode(state, handshake); this._mux._write0(state.buffer); - await this._mux.syncState?.(); } _dec() { @@ -96,7 +108,6 @@ class Channel { async _fullyOpenSoon() { this._mux._remote[this._remoteId - 1].session = this; queueTick(this._fullyOpen.bind(this)); - await this._mux.syncState?.(); } _fullyOpen() { @@ -142,9 +153,16 @@ class Channel { if (this._remoteId > 0) { this._mux._remote[this._remoteId - 1] = null; this._remoteId = 0; + if (this._slave) { + await this._mux.pullFree(); + } // If remote has acked, we can reuse the local id now // otherwise, we need to wait for the "ack" to arrive this._mux._free.push(this._localId - 1); + + if (this._slave) { + await this._mux.pushFree(this._localId - 1); + } } this._mux._local[this._localId - 1] = null; @@ -154,7 +172,6 @@ class Channel { this._track(this.onclose(isRemote, this)); if (this._active === 0) this._destroy(); - await this._mux.syncState?.(); } _destroy() { @@ -498,7 +515,7 @@ module.exports = class Protomux { const type = c.uint.decode(state); if (remoteId === 0) { - await this._oncontrolsession(type, state); + this._oncontrolsession(type, state); return; } @@ -513,11 +530,10 @@ module.exports = class Protomux { return; } - await this.syncState?.(); r.session._recv(type, state); } - async _oncontrolsession(type, state) { + _oncontrolsession(type, state) { switch (type) { case 0: this._onbatch(state); @@ -535,8 +551,6 @@ module.exports = class Protomux { this._onclosesession(state); break; } - - await this.syncState?.(); } _bufferMessage(r, type, { buffer, start, end }) { @@ -591,12 +605,15 @@ module.exports = class Protomux { const info = this._get(protocol, id); if (this._slave) { - await this._mux.syncState?.(); + await this._mux.pullRemote(); } // allow the remote to grow the ids by one if (this._remote.length === rid) { this._remote.push(null); + if (this._slave) { + await this._mux.pushRemote(this._remote.length - 1); + } } if (rid >= this._remote.length || this._remote[rid] !== null) { @@ -608,8 +625,14 @@ module.exports = class Protomux { const session = this._local[localId - 1]; if (session === null) { + if (this._slave) { + await this._mux.pullFree(); + } // we already closed the channel - ignore this._free.push(localId - 1); + if (this._slave) { + await this._mux.pushFree(localId - 1); + } return; } @@ -634,10 +657,14 @@ module.exports = class Protomux { info.pairing++; info.incoming.push(remoteId); + if (this._slave) { + await this._mux.pushRemotes(); + } + this._requestSession(protocol, id, info).catch(this._safeDestroyBound); } - _onrejectsession(state) { + async _onrejectsession(state) { const localId = c.uint.decode(state); // TODO: can be done smarter... @@ -649,7 +676,15 @@ module.exports = class Protomux { const session = this._local[localId - 1]; + if (this._slave) { + await this._mux.pullFree(localId - 1); + } + this._free.push(localId - 1); + if (this._slave) { + await this._mux.pushFree(localId - 1); + } + if (session !== null) session._close(true); this._gc(info); @@ -738,6 +773,46 @@ module.exports = class Protomux { if (s !== null) s._close(true); } } + + async popFree(id) { + await this.userData.syncProtomux("popFree", id); + } + async pushFree(id) { + await this.userData.syncProtomux("pushFree", id); + } + async pushLocal(id) { + await this.userData.syncProtomux("pushLocal", id); + } + async pushRemote(id) { + await this.userData.syncProtomux("pushRemote", id); + } + + async pullLocal() { + const ids = await this.userData.syncProtomux("pullLocal"); + + ids.forEach((item) => { + item = parseInt(item); + if (typeof this._local[item] === "undefined") { + this._local[item] = null; + } + }); + } + async pullFree() { + const ids = await this.userData.syncProtomux("pullFree"); + + this._free = Array.from(new Set([...this._free, ...ids])); + this._free = this._free.filter((item) => item !== null); + } + async pullRemote() { + const ids = await this.userData.syncProtomux("pullRemote"); + + ids.forEach((item) => { + item = parseInt(item); + if (typeof this._remote[item] === "undefined") { + this._remote[item] = null; + } + }); + } }; function noop() {}