Compare commits

..

No commits in common. "05c91000e165fbca440ca60f42f0748feff8fd8c" and "98bb74d8b1a890a7be9c38c625ccca5a968c0303" have entirely different histories.

1 changed files with 50 additions and 110 deletions

160
index.js
View File

@ -51,9 +51,10 @@ class Channel {
} }
async open(handshake) { async open(handshake) {
await this._mux.pullLocal(); if (this._mux._slave) {
await this._mux.pullFree(); await this._mux.pullLocal();
await this._mux.pullFree();
}
const freeLen = this._mux._free.length; const freeLen = this._mux._free.length;
const id = const id =
this._mux._free.length > 0 this._mux._free.length > 0
@ -64,10 +65,12 @@ class Channel {
this._localId = id + 1; this._localId = id + 1;
this._mux._local[id] = this; this._mux._local[id] = this;
if (freeLen > 0) { if (this._mux._slave) {
await this._mux.popFree(id); if (freeLen > 0) {
} else { await this._mux.popFree(id);
await this._mux.pushLocal(id); } else {
await this._mux.pushLocal(id);
}
} }
if (this._remoteId === 0) { if (this._remoteId === 0) {
@ -145,27 +148,27 @@ class Channel {
if (this.closed === true) return; if (this.closed === true) return;
this.closed = true; this.closed = true;
await this._mux.pullInfos();
this._info.opened--; this._info.opened--;
await this._mux.pushInfos();
if (this._remoteId > 0) { if (this._remoteId > 0) {
this._mux._remote[this._remoteId - 1] = null; this._mux._remote[this._remoteId - 1] = null;
this._remoteId = 0; this._remoteId = 0;
await this._mux.pullFree(); if (this._mux._slave) {
await this._mux.pullFree();
}
// If remote has acked, we can reuse the local id now // If remote has acked, we can reuse the local id now
// otherwise, we need to wait for the "ack" to arrive // otherwise, we need to wait for the "ack" to arrive
this._mux._free.push(this._localId - 1); this._mux._free.push(this._localId - 1);
await this._mux.pushFree(this._localId - 1); if (this._mux._slave) {
await this._mux.pushFree(this._localId - 1);
}
} }
this._mux._local[this._localId - 1] = null; this._mux._local[this._localId - 1] = null;
this._localId = 0; this._localId = 0;
await this._mux._gc(this._info); this._mux._gc(this._info);
this._track(this.onclose(isRemote, this)); this._track(this.onclose(isRemote, this));
if (this._active === 0) this._destroy(); if (this._active === 0) this._destroy();
@ -358,7 +361,7 @@ module.exports = class Protomux {
return info ? info.opened > 0 : false; return info ? info.opened > 0 : false;
} }
async createChannel({ createChannel({
userData = null, userData = null,
protocol, protocol,
aliases = [], aliases = [],
@ -372,7 +375,7 @@ module.exports = class Protomux {
}) { }) {
if (this.stream.destroyed) return null; if (this.stream.destroyed) return null;
const info = await this._get(protocol, id, aliases); const info = this._get(protocol, id, aliases);
if (unique && info.opened > 0) return null; if (unique && info.opened > 0) return null;
if (info.incoming.length === 0) { if (info.incoming.length === 0) {
@ -393,12 +396,7 @@ module.exports = class Protomux {
this._remoteBacklog--; this._remoteBacklog--;
await this.pullInfos();
const remoteId = info.incoming.shift(); const remoteId = info.incoming.shift();
await this.pushInfos();
const r = this._remote[remoteId - 1]; const r = this._remote[remoteId - 1];
if (r === null) return null; if (r === null) return null;
@ -463,11 +461,9 @@ module.exports = class Protomux {
this.stream.write(state.buffer); this.stream.write(state.buffer);
} }
async _get(protocol, id, aliases = []) { _get(protocol, id, aliases = []) {
const key = toKey(protocol, id); const key = toKey(protocol, id);
await this.pullInfos();
let info = this._infos.get(key); let info = this._infos.get(key);
if (info) return info; if (info) return info;
@ -490,22 +486,18 @@ module.exports = class Protomux {
this._infos.set(key, info); this._infos.set(key, info);
} }
await this.pushInfos();
return info; return info;
} }
async _gc(info) { _gc(info) {
if ( if (
info.opened === 0 && info.opened === 0 &&
info.outgoing.length === 0 && info.outgoing.length === 0 &&
info.incoming.length === 0 info.incoming.length === 0
) { ) {
await this.pullInfos();
this._infos.delete(info.key); this._infos.delete(info.key);
for (const alias of info.aliases) this._infos.delete(alias); for (const alias of info.aliases) this._infos.delete(alias);
await this.pullInfos();
} }
} }
@ -614,15 +606,18 @@ module.exports = class Protomux {
} }
const rid = remoteId - 1; const rid = remoteId - 1;
const info = await this._get(protocol, id); const info = this._get(protocol, id);
await this.pullRemote(); if (this._slave) {
await this.pullRemote();
}
// allow the remote to grow the ids by one // allow the remote to grow the ids by one
if (this._remote.length === rid) { if (this._remote.length === rid) {
this._remote.push(null); this._remote.push(null);
if (this._slave) {
await this.pushRemote(this._remote.length - 1); await this.pushRemote(this._remote.length - 1);
}
} }
if (rid >= this._remote.length || this._remote[rid] !== null) { if (rid >= this._remote.length || this._remote[rid] !== null) {
@ -630,25 +625,26 @@ module.exports = class Protomux {
} }
if (info.outgoing.length > 0) { if (info.outgoing.length > 0) {
await this.pullInfos();
const localId = info.outgoing.shift(); const localId = info.outgoing.shift();
const session = this._local[localId - 1]; const session = this._local[localId - 1];
await this.pushInfos();
if (session === null) { if (session === null) {
await this.pullFree(); if (this._slave) {
await this.pullFree();
}
// we already closed the channel - ignore // we already closed the channel - ignore
this._free.push(localId - 1); this._free.push(localId - 1);
if (this._slave) {
await this.pushFree(localId - 1); await this.pushFree(localId - 1);
}
return; return;
} }
this._remote[rid] = { state, pending: null, session: null }; this._remote[rid] = { state, pending: null, session: null };
await this.pushRemote(remoteId); if (this._slave) {
await this.pushRemote(remoteId);
}
session._remoteId = remoteId; session._remoteId = remoteId;
session._fullyOpen(); session._fullyOpen();
@ -666,13 +662,12 @@ module.exports = class Protomux {
throw new Error("Remote exceeded backlog"); throw new Error("Remote exceeded backlog");
} }
await this.pullInfos();
info.pairing++; info.pairing++;
info.incoming.push(remoteId); info.incoming.push(remoteId);
await this.pushInfos(); if (this._slave) {
await this.pushRemote(remoteId); await this.pushRemote(remoteId);
}
this._requestSession(protocol, id, info).catch(this._safeDestroyBound); this._requestSession(protocol, id, info).catch(this._safeDestroyBound);
} }
@ -685,23 +680,22 @@ module.exports = class Protomux {
const i = info.outgoing.indexOf(localId); const i = info.outgoing.indexOf(localId);
if (i === -1) continue; if (i === -1) continue;
await this.pullInfos();
info.outgoing.splice(i, 1); info.outgoing.splice(i, 1);
await this.pushInfos();
const session = this._local[localId - 1]; const session = this._local[localId - 1];
await this.pullFree(localId - 1); if (this._slave) {
await this.pullFree(localId - 1);
}
this._free.push(localId - 1); this._free.push(localId - 1);
if (this._slave) {
await this.pushFree(localId - 1); await this.pushFree(localId - 1);
}
if (session !== null) session._close(true); if (session !== null) session._close(true);
await this._gc(info); this._gc(info);
return; return;
} }
@ -728,25 +722,13 @@ module.exports = class Protomux {
if (notify) await notify(id); if (notify) await notify(id);
await this.pullInfos(); if (--info.pairing > 0) return;
let p = --info.pairing;
await this.pushInfos();
if (p > 0) return;
while (info.incoming.length > 0) { while (info.incoming.length > 0) {
await this.pullInfos(); this._rejectSession(info, info.incoming.shift());
const id = info.incoming.shift();
await this.pushInfos();
this._rejectSession(info, id);
} }
await this._gc(info); this._gc(info);
} }
_rejectSession(info, remoteId) { _rejectSession(info, remoteId) {
@ -801,43 +783,19 @@ module.exports = class Protomux {
} }
async popFree(id) { async popFree(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("popFree", id); await this.stream.syncProtomux("popFree", id);
} }
async pushFree(id) { async pushFree(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("pushFree", id); await this.stream.syncProtomux("pushFree", id);
} }
async pushLocal(id) { async pushLocal(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("pushLocal", id); await this.stream.syncProtomux("pushLocal", id);
} }
async pushRemote(id) { async pushRemote(id) {
if (!this._slave) {
return;
}
await this.stream.syncProtomux("pushRemote", id); await this.stream.syncProtomux("pushRemote", id);
} }
async pushInfos() {
if (!this._slave) {
return;
}
await this.stream.syncProtomux(
"pushInfos",
Array.from(this._infos.entries())
);
}
async pullLocal() { async pullLocal() {
if (!this._slave) {
return;
}
const ids = await this.stream.syncProtomux("pullLocal"); const ids = await this.stream.syncProtomux("pullLocal");
ids.forEach((item) => { ids.forEach((item) => {
@ -848,18 +806,12 @@ module.exports = class Protomux {
}); });
} }
async pullFree() { async pullFree() {
if (!this._slave) {
return;
}
const ids = await this.stream.syncProtomux("pullFree"); const ids = await this.stream.syncProtomux("pullFree");
this._free = Array.from(new Set([...this._free, ...ids])); this._free = Array.from(new Set([...this._free, ...ids]));
this._free = this._free.filter((item) => item !== null); this._free = this._free.filter((item) => item !== null);
} }
async pullRemote() { async pullRemote() {
if (!this._slave) {
return;
}
const ids = await this.stream.syncProtomux("pullRemote"); const ids = await this.stream.syncProtomux("pullRemote");
ids.forEach((item) => { ids.forEach((item) => {
@ -869,18 +821,6 @@ module.exports = class Protomux {
} }
}); });
} }
async pullInfos() {
if (!this._slave) {
return;
}
const info = await this.stream.syncProtomux(
"pullInfos",
Array.from(this._infos.entries())
);
this._infos = new Map(info);
}
}; };
function noop() {} function noop() {}