diff --git a/index.js b/index.js index bf88bed..4f021e5 100644 --- a/index.js +++ b/index.js @@ -148,8 +148,12 @@ class Channel { if (this.closed === true) return; this.closed = true; + await this._mux.pullInfos(); + this._info.opened--; + await this._mux.pushInfos(); + if (this._remoteId > 0) { this._mux._remote[this._remoteId - 1] = null; this._remoteId = 0; @@ -168,7 +172,7 @@ class Channel { this._mux._local[this._localId - 1] = null; this._localId = 0; - this._mux._gc(this._info); + await this._mux._gc(this._info); this._track(this.onclose(isRemote, this)); if (this._active === 0) this._destroy(); @@ -361,7 +365,7 @@ module.exports = class Protomux { return info ? info.opened > 0 : false; } - createChannel({ + async createChannel({ userData = null, protocol, aliases = [], @@ -375,7 +379,7 @@ module.exports = class Protomux { }) { if (this.stream.destroyed) return null; - const info = this._get(protocol, id, aliases); + const info = await this._get(protocol, id, aliases); if (unique && info.opened > 0) return null; if (info.incoming.length === 0) { @@ -396,7 +400,12 @@ module.exports = class Protomux { this._remoteBacklog--; + await this.pullInfos(); + const remoteId = info.incoming.shift(); + + await this.pushInfos(); + const r = this._remote[remoteId - 1]; if (r === null) return null; @@ -461,9 +470,11 @@ module.exports = class Protomux { this.stream.write(state.buffer); } - _get(protocol, id, aliases = []) { + async _get(protocol, id, aliases = []) { const key = toKey(protocol, id); + await this.pullInfos(); + let info = this._infos.get(key); if (info) return info; @@ -486,18 +497,22 @@ module.exports = class Protomux { this._infos.set(key, info); } + await this.pushInfos(); + return info; } - _gc(info) { + async _gc(info) { if ( info.opened === 0 && info.outgoing.length === 0 && info.incoming.length === 0 ) { + await this.pullInfos(); this._infos.delete(info.key); for (const alias of info.aliases) this._infos.delete(alias); + await this.pullInfos(); } } @@ -606,7 +621,7 @@ module.exports = class Protomux { } const rid = remoteId - 1; - const info = this._get(protocol, id); + const info = await this._get(protocol, id); if (this._slave) { await this.pullRemote(); @@ -625,9 +640,12 @@ module.exports = class Protomux { } if (info.outgoing.length > 0) { + await this.pullInfos(); const localId = info.outgoing.shift(); const session = this._local[localId - 1]; + await this.pushInfos(); + if (session === null) { if (this._slave) { await this.pullFree(); @@ -662,9 +680,13 @@ module.exports = class Protomux { throw new Error("Remote exceeded backlog"); } + await this.pullInfos(); + info.pairing++; info.incoming.push(remoteId); + await this.pushInfos(); + if (this._slave) { await this.pushRemote(remoteId); } @@ -680,8 +702,12 @@ module.exports = class Protomux { const i = info.outgoing.indexOf(localId); if (i === -1) continue; + await this.pullInfos(); + info.outgoing.splice(i, 1); + await this.pushInfos(); + const session = this._local[localId - 1]; if (this._slave) { @@ -695,7 +721,7 @@ module.exports = class Protomux { if (session !== null) session._close(true); - this._gc(info); + await this._gc(info); return; } @@ -722,13 +748,25 @@ module.exports = class Protomux { if (notify) await notify(id); - if (--info.pairing > 0) return; + await this.pullInfos(); + + let p = --info.pairing; + + await this.pushInfos(); + + if (p > 0) return; while (info.incoming.length > 0) { - this._rejectSession(info, info.incoming.shift()); + await this.pullInfos(); + + const id = info.incoming.shift(); + + await this.pushInfos(); + + this._rejectSession(info, id); } - this._gc(info); + await this._gc(info); } _rejectSession(info, remoteId) { @@ -794,6 +832,15 @@ module.exports = class Protomux { async pushRemote(id) { await this.stream.syncProtomux("pushRemote", id); } + async pushInfos() { + if (!this._slave) { + return; + } + await this.stream.syncProtomux( + "pushInfos", + Array.from(this._infos.entries()) + ); + } async pullLocal() { const ids = await this.stream.syncProtomux("pullLocal"); @@ -821,6 +868,18 @@ module.exports = class Protomux { } }); } + async pullInfos() { + if (!this._slave) { + return; + } + + const info = await this.stream.syncProtomux( + "infos", + Array.from(this._infos.entries()) + ); + + this._infos = new Map(info); + } }; function noop() {}