*Move slave checks inside sync functions

This commit is contained in:
Derrick Hammer 2023-04-07 01:32:35 -04:00
parent 5094ec4b26
commit 2b1789f963
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 42 additions and 41 deletions

View File

@ -51,10 +51,9 @@ class Channel {
} }
async open(handshake) { async open(handshake) {
if (this._mux._slave) {
await this._mux.pullLocal(); await this._mux.pullLocal();
await this._mux.pullFree(); await this._mux.pullFree();
}
const freeLen = this._mux._free.length; const freeLen = this._mux._free.length;
const id = const id =
this._mux._free.length > 0 this._mux._free.length > 0
@ -65,13 +64,11 @@ class Channel {
this._localId = id + 1; this._localId = id + 1;
this._mux._local[id] = this; this._mux._local[id] = this;
if (this._mux._slave) {
if (freeLen > 0) { if (freeLen > 0) {
await this._mux.popFree(id); await this._mux.popFree(id);
} else { } else {
await this._mux.pushLocal(id); await this._mux.pushLocal(id);
} }
}
if (this._remoteId === 0) { if (this._remoteId === 0) {
this._info.outgoing.push(this._localId); this._info.outgoing.push(this._localId);
@ -157,17 +154,13 @@ class Channel {
if (this._remoteId > 0) { if (this._remoteId > 0) {
this._mux._remote[this._remoteId - 1] = null; this._mux._remote[this._remoteId - 1] = null;
this._remoteId = 0; this._remoteId = 0;
if (this._mux._slave) {
await this._mux.pullFree(); await this._mux.pullFree();
}
// If remote has acked, we can reuse the local id now // If remote has acked, we can reuse the local id now
// otherwise, we need to wait for the "ack" to arrive // otherwise, we need to wait for the "ack" to arrive
this._mux._free.push(this._localId - 1); this._mux._free.push(this._localId - 1);
if (this._mux._slave) {
await this._mux.pushFree(this._localId - 1); await this._mux.pushFree(this._localId - 1);
} }
}
this._mux._local[this._localId - 1] = null; this._mux._local[this._localId - 1] = null;
this._localId = 0; this._localId = 0;
@ -623,17 +616,14 @@ module.exports = class Protomux {
const rid = remoteId - 1; const rid = remoteId - 1;
const info = await this._get(protocol, id); const info = await this._get(protocol, id);
if (this._slave) {
await this.pullRemote(); await this.pullRemote();
}
// allow the remote to grow the ids by one // allow the remote to grow the ids by one
if (this._remote.length === rid) { if (this._remote.length === rid) {
this._remote.push(null); this._remote.push(null);
if (this._slave) {
await this.pushRemote(this._remote.length - 1); await this.pushRemote(this._remote.length - 1);
} }
}
if (rid >= this._remote.length || this._remote[rid] !== null) { if (rid >= this._remote.length || this._remote[rid] !== null) {
throw new Error("Invalid open message"); throw new Error("Invalid open message");
@ -647,22 +637,18 @@ module.exports = class Protomux {
await this.pushInfos(); await this.pushInfos();
if (session === null) { if (session === null) {
if (this._slave) {
await this.pullFree(); await this.pullFree();
}
// we already closed the channel - ignore // we already closed the channel - ignore
this._free.push(localId - 1); this._free.push(localId - 1);
if (this._slave) {
await this.pushFree(localId - 1); await this.pushFree(localId - 1);
}
return; return;
} }
this._remote[rid] = { state, pending: null, session: null }; this._remote[rid] = { state, pending: null, session: null };
if (this._slave) {
await this.pushRemote(remoteId); await this.pushRemote(remoteId);
}
session._remoteId = remoteId; session._remoteId = remoteId;
session._fullyOpen(); session._fullyOpen();
@ -686,10 +672,7 @@ module.exports = class Protomux {
info.incoming.push(remoteId); info.incoming.push(remoteId);
await this.pushInfos(); await this.pushInfos();
if (this._slave) {
await this.pushRemote(remoteId); await this.pushRemote(remoteId);
}
this._requestSession(protocol, id, info).catch(this._safeDestroyBound); this._requestSession(protocol, id, info).catch(this._safeDestroyBound);
} }
@ -710,14 +693,11 @@ module.exports = class Protomux {
const session = this._local[localId - 1]; const session = this._local[localId - 1];
if (this._slave) {
await this.pullFree(localId - 1); await this.pullFree(localId - 1);
}
this._free.push(localId - 1); this._free.push(localId - 1);
if (this._slave) {
await this.pushFree(localId - 1); await this.pushFree(localId - 1);
}
if (session !== null) session._close(true); if (session !== null) session._close(true);
@ -821,15 +801,27 @@ module.exports = class Protomux {
} }
async popFree(id) { async popFree(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("popFree", id); await this.stream.syncProtomux("popFree", id);
} }
async pushFree(id) { async pushFree(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("pushFree", id); await this.stream.syncProtomux("pushFree", id);
} }
async pushLocal(id) { async pushLocal(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("pushLocal", id); await this.stream.syncProtomux("pushLocal", id);
} }
async pushRemote(id) { async pushRemote(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("pushRemote", id); await this.stream.syncProtomux("pushRemote", id);
} }
async pushInfos() { async pushInfos() {
@ -843,6 +835,9 @@ module.exports = class Protomux {
} }
async pullLocal() { async pullLocal() {
if (!this._slave) {
return;
}
const ids = await this.stream.syncProtomux("pullLocal"); const ids = await this.stream.syncProtomux("pullLocal");
ids.forEach((item) => { ids.forEach((item) => {
@ -853,12 +848,18 @@ module.exports = class Protomux {
}); });
} }
async pullFree() { async pullFree() {
if (!this._slave) {
return;
}
const ids = await this.stream.syncProtomux("pullFree"); const ids = await this.stream.syncProtomux("pullFree");
this._free = Array.from(new Set([...this._free, ...ids])); this._free = Array.from(new Set([...this._free, ...ids]));
this._free = this._free.filter((item) => item !== null); this._free = this._free.filter((item) => item !== null);
} }
async pullRemote() { async pullRemote() {
if (!this._slave) {
return;
}
const ids = await this.stream.syncProtomux("pullRemote"); const ids = await this.stream.syncProtomux("pullRemote");
ids.forEach((item) => { ids.forEach((item) => {