Compare commits

...

2 Commits

Author SHA1 Message Date
Derrick Hammer f457294772
*Add concept of being a slave
*on _onopensession, if we are a slave, sync before checking _remote
2023-04-06 14:21:01 -04:00
Derrick Hammer 81eac54453
Revert "Revert "*make syncing async""
This reverts commit d37d37048c.
2023-04-06 14:15:39 -04:00
1 changed files with 20 additions and 15 deletions

View File

@ -50,7 +50,7 @@ class Channel {
for (const m of messages) this.addMessage(m); for (const m of messages) this.addMessage(m);
} }
open(handshake) { async open(handshake) {
const id = const id =
this._mux._free.length > 0 this._mux._free.length > 0
? this._mux._free.pop() ? this._mux._free.pop()
@ -81,7 +81,7 @@ class Channel {
if (this._handshake) this._handshake.encode(state, handshake); if (this._handshake) this._handshake.encode(state, handshake);
this._mux._write0(state.buffer); this._mux._write0(state.buffer);
this._mux.syncState?.(); await this._mux.syncState?.();
} }
_dec() { _dec() {
@ -93,10 +93,10 @@ class Channel {
this._mux._safeDestroy(err); this._mux._safeDestroy(err);
} }
_fullyOpenSoon() { async _fullyOpenSoon() {
this._mux._remote[this._remoteId - 1].session = this; this._mux._remote[this._remoteId - 1].session = this;
queueTick(this._fullyOpen.bind(this)); queueTick(this._fullyOpen.bind(this));
this._mux.syncState?.(); await this._mux.syncState?.();
} }
_fullyOpen() { _fullyOpen() {
@ -133,7 +133,7 @@ class Channel {
} }
} }
_close(isRemote) { async _close(isRemote) {
if (this.closed === true) return; if (this.closed === true) return;
this.closed = true; this.closed = true;
@ -154,7 +154,7 @@ class Channel {
this._track(this.onclose(isRemote, this)); this._track(this.onclose(isRemote, this));
if (this._active === 0) this._destroy(); if (this._active === 0) this._destroy();
this._mux.syncState?.(); await this._mux.syncState?.();
} }
_destroy() { _destroy() {
@ -177,7 +177,7 @@ class Channel {
this._mux.uncork(); this._mux.uncork();
} }
close() { async close() {
if (this.closed === true) return; if (this.closed === true) return;
const state = { buffer: null, start: 2, end: 2 }; const state = { buffer: null, start: 2, end: 2 };
@ -190,7 +190,7 @@ class Channel {
state.buffer[1] = 3; state.buffer[1] = 3;
c.uint.encode(state, this._localId); c.uint.encode(state, this._localId);
this._mux._close(false); await this._mux._close(false);
this._mux._write0(state.buffer); this._mux._write0(state.buffer);
} }
@ -262,7 +262,7 @@ class Channel {
} }
module.exports = class Protomux { module.exports = class Protomux {
constructor(stream, { alloc } = {}) { constructor(stream, { alloc, slave = false } = {}) {
if (stream.userData === null) stream.userData = this; if (stream.userData === null) stream.userData = this;
this.isProtomux = true; this.isProtomux = true;
@ -287,6 +287,7 @@ module.exports = class Protomux {
this._infos = new Map(); this._infos = new Map();
this._notify = new Map(); this._notify = new Map();
this._slave = false;
this.stream.on("data", this._ondata.bind(this)); this.stream.on("data", this._ondata.bind(this));
this.stream.on("end", this._onend.bind(this)); this.stream.on("end", this._onend.bind(this));
@ -493,11 +494,11 @@ module.exports = class Protomux {
this.stream.end(); this.stream.end();
} }
_decode(remoteId, state) { async _decode(remoteId, state) {
const type = c.uint.decode(state); const type = c.uint.decode(state);
if (remoteId === 0) { if (remoteId === 0) {
this._oncontrolsession(type, state); await this._oncontrolsession(type, state);
return; return;
} }
@ -512,11 +513,11 @@ module.exports = class Protomux {
return; return;
} }
this.syncState?.(); await this.syncState?.();
r.session._recv(type, state); r.session._recv(type, state);
} }
_oncontrolsession(type, state) { async _oncontrolsession(type, state) {
switch (type) { switch (type) {
case 0: case 0:
this._onbatch(state); this._onbatch(state);
@ -535,7 +536,7 @@ module.exports = class Protomux {
break; break;
} }
this.syncState?.(); await this.syncState?.();
} }
_bufferMessage(r, type, { buffer, start, end }) { _bufferMessage(r, type, { buffer, start, end }) {
@ -574,7 +575,7 @@ module.exports = class Protomux {
} }
} }
_onopensession(state) { async _onopensession(state) {
const remoteId = c.uint.decode(state); const remoteId = c.uint.decode(state);
const protocol = c.string.decode(state); const protocol = c.string.decode(state);
const id = c.buffer.decode(state); const id = c.buffer.decode(state);
@ -589,6 +590,10 @@ module.exports = class Protomux {
const rid = remoteId - 1; const rid = remoteId - 1;
const info = this._get(protocol, id); const info = this._get(protocol, id);
if (this._slave) {
await this._mux.syncState?.();
}
// allow the remote to grow the ids by one // allow the remote to grow the ids by one
if (this._remote.length === rid) { if (this._remote.length === rid) {
this._remote.push(null); this._remote.push(null);