Compare commits
23 Commits
bbe30a3de5
...
e0170f9dfc
Author | SHA1 | Date |
---|---|---|
Derrick Hammer | e0170f9dfc | |
Derrick Hammer | 912150c6e4 | |
Derrick Hammer | f9955c18d2 | |
Derrick Hammer | 2f0deaf2bf | |
Derrick Hammer | 702a859661 | |
Derrick Hammer | 05c91000e1 | |
Derrick Hammer | 2b1789f963 | |
Derrick Hammer | 5094ec4b26 | |
Derrick Hammer | 98bb74d8b1 | |
Derrick Hammer | e0178e41ed | |
Derrick Hammer | 1387b060a4 | |
Derrick Hammer | 54904ab1ad | |
Derrick Hammer | ab4632b644 | |
Derrick Hammer | 2b645af1e9 | |
Derrick Hammer | eb4a8f1c9e | |
Derrick Hammer | 74b0f1d596 | |
Derrick Hammer | bef4631c30 | |
Derrick Hammer | f457294772 | |
Derrick Hammer | 81eac54453 | |
Derrick Hammer | d37d37048c | |
Derrick Hammer | d7dede7894 | |
Derrick Hammer | 5907966d77 | |
Derrick Hammer | 6cb9fd8283 |
181
index.js
181
index.js
|
@ -50,7 +50,11 @@ class Channel {
|
||||||
for (const m of messages) this.addMessage(m);
|
for (const m of messages) this.addMessage(m);
|
||||||
}
|
}
|
||||||
|
|
||||||
open(handshake) {
|
async open(handshake) {
|
||||||
|
await this._mux.pullLocal();
|
||||||
|
await this._mux.pullFree();
|
||||||
|
|
||||||
|
const freeLen = this._mux._free.length;
|
||||||
const id =
|
const id =
|
||||||
this._mux._free.length > 0
|
this._mux._free.length > 0
|
||||||
? this._mux._free.pop()
|
? this._mux._free.pop()
|
||||||
|
@ -60,8 +64,15 @@ class Channel {
|
||||||
this._localId = id + 1;
|
this._localId = id + 1;
|
||||||
this._mux._local[id] = this;
|
this._mux._local[id] = this;
|
||||||
|
|
||||||
|
if (freeLen > 0) {
|
||||||
|
await this._mux.popFree(id);
|
||||||
|
} else {
|
||||||
|
await this._mux.pushLocal(id);
|
||||||
|
}
|
||||||
|
|
||||||
if (this._remoteId === 0) {
|
if (this._remoteId === 0) {
|
||||||
this._info.outgoing.push(this._localId);
|
this._info.outgoing.push(this._localId);
|
||||||
|
await this._mux.pushInfos();
|
||||||
}
|
}
|
||||||
|
|
||||||
const state = { buffer: null, start: 2, end: 2 };
|
const state = { buffer: null, start: 2, end: 2 };
|
||||||
|
@ -92,7 +103,7 @@ class Channel {
|
||||||
this._mux._safeDestroy(err);
|
this._mux._safeDestroy(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
_fullyOpenSoon() {
|
async _fullyOpenSoon() {
|
||||||
this._mux._remote[this._remoteId - 1].session = this;
|
this._mux._remote[this._remoteId - 1].session = this;
|
||||||
queueTick(this._fullyOpen.bind(this));
|
queueTick(this._fullyOpen.bind(this));
|
||||||
}
|
}
|
||||||
|
@ -131,24 +142,31 @@ class Channel {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_close(isRemote) {
|
async _close(isRemote) {
|
||||||
if (this.closed === true) return;
|
if (this.closed === true) return;
|
||||||
this.closed = true;
|
this.closed = true;
|
||||||
|
|
||||||
|
await this._mux.pullInfos();
|
||||||
|
|
||||||
this._info.opened--;
|
this._info.opened--;
|
||||||
|
|
||||||
|
await this._mux.pushInfos();
|
||||||
|
|
||||||
if (this._remoteId > 0) {
|
if (this._remoteId > 0) {
|
||||||
this._mux._remote[this._remoteId - 1] = null;
|
this._mux._remote[this._remoteId - 1] = null;
|
||||||
this._remoteId = 0;
|
this._remoteId = 0;
|
||||||
|
await this._mux.pullFree();
|
||||||
// If remote has acked, we can reuse the local id now
|
// If remote has acked, we can reuse the local id now
|
||||||
// otherwise, we need to wait for the "ack" to arrive
|
// otherwise, we need to wait for the "ack" to arrive
|
||||||
this._mux._free.push(this._localId - 1);
|
this._mux._free.push(this._localId - 1);
|
||||||
|
|
||||||
|
await this._mux.pushFree(this._localId - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
this._mux._local[this._localId - 1] = null;
|
this._mux._local[this._localId - 1] = null;
|
||||||
this._localId = 0;
|
this._localId = 0;
|
||||||
|
|
||||||
this._mux._gc(this._info);
|
await this._mux._gc(this._info);
|
||||||
this._track(this.onclose(isRemote, this));
|
this._track(this.onclose(isRemote, this));
|
||||||
|
|
||||||
if (this._active === 0) this._destroy();
|
if (this._active === 0) this._destroy();
|
||||||
|
@ -174,7 +192,7 @@ class Channel {
|
||||||
this._mux.uncork();
|
this._mux.uncork();
|
||||||
}
|
}
|
||||||
|
|
||||||
close() {
|
async close() {
|
||||||
if (this.closed === true) return;
|
if (this.closed === true) return;
|
||||||
|
|
||||||
const state = { buffer: null, start: 2, end: 2 };
|
const state = { buffer: null, start: 2, end: 2 };
|
||||||
|
@ -187,7 +205,7 @@ class Channel {
|
||||||
state.buffer[1] = 3;
|
state.buffer[1] = 3;
|
||||||
c.uint.encode(state, this._localId);
|
c.uint.encode(state, this._localId);
|
||||||
|
|
||||||
this._close(false);
|
await this._mux._close(false);
|
||||||
this._mux._write0(state.buffer);
|
this._mux._write0(state.buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,34 +223,34 @@ class Channel {
|
||||||
type,
|
type,
|
||||||
encoding,
|
encoding,
|
||||||
onmessage,
|
onmessage,
|
||||||
recv(state, session) {
|
async recv(state, session) {
|
||||||
session._track(m.onmessage(encoding.decode(state), session));
|
session._track(m.onmessage(await encoding.decode(state), session));
|
||||||
},
|
},
|
||||||
send(m, session = s) {
|
async send(m, session = s) {
|
||||||
if (session.closed === true) return false;
|
if (session.closed === true) return false;
|
||||||
|
|
||||||
const mux = session._mux;
|
const mux = session._mux;
|
||||||
const state = { buffer: null, start: 0, end: typeLen };
|
const state = { buffer: null, start: 0, end: typeLen };
|
||||||
|
|
||||||
if (mux._batch !== null) {
|
if (mux._batch !== null) {
|
||||||
encoding.preencode(state, m);
|
await encoding.preencode(state, m);
|
||||||
state.buffer = mux._alloc(state.end);
|
state.buffer = mux._alloc(state.end);
|
||||||
|
|
||||||
c.uint.encode(state, type);
|
c.uint.encode(state, type);
|
||||||
encoding.encode(state, m);
|
await encoding.encode(state, m);
|
||||||
|
|
||||||
mux._pushBatch(session._localId, state.buffer);
|
mux._pushBatch(session._localId, state.buffer);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
c.uint.preencode(state, session._localId);
|
c.uint.preencode(state, session._localId);
|
||||||
encoding.preencode(state, m);
|
await encoding.preencode(state, m);
|
||||||
|
|
||||||
state.buffer = mux._alloc(state.end);
|
state.buffer = mux._alloc(state.end);
|
||||||
|
|
||||||
c.uint.encode(state, session._localId);
|
c.uint.encode(state, session._localId);
|
||||||
c.uint.encode(state, type);
|
c.uint.encode(state, type);
|
||||||
encoding.encode(state, m);
|
await encoding.encode(state, m);
|
||||||
|
|
||||||
return mux.stream.write(state.buffer);
|
return mux.stream.write(state.buffer);
|
||||||
},
|
},
|
||||||
|
@ -259,8 +277,12 @@ class Channel {
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = class Protomux {
|
module.exports = class Protomux {
|
||||||
constructor(stream, { alloc } = {}) {
|
constructor(stream, { alloc, slave = false } = {}) {
|
||||||
if (stream.userData === null) stream.userData = this;
|
if (stream.userData) {
|
||||||
|
throw new Error("Mux already set for this stream");
|
||||||
|
}
|
||||||
|
|
||||||
|
stream.userData = this;
|
||||||
|
|
||||||
this.isProtomux = true;
|
this.isProtomux = true;
|
||||||
this.stream = stream;
|
this.stream = stream;
|
||||||
|
@ -335,8 +357,7 @@ module.exports = class Protomux {
|
||||||
const info = this._infos.get(key);
|
const info = this._infos.get(key);
|
||||||
return info ? info.opened > 0 : false;
|
return info ? info.opened > 0 : false;
|
||||||
}
|
}
|
||||||
|
async createChannel({
|
||||||
createChannel({
|
|
||||||
userData = null,
|
userData = null,
|
||||||
protocol,
|
protocol,
|
||||||
aliases = [],
|
aliases = [],
|
||||||
|
@ -350,7 +371,7 @@ module.exports = class Protomux {
|
||||||
}) {
|
}) {
|
||||||
if (this.stream.destroyed) return null;
|
if (this.stream.destroyed) return null;
|
||||||
|
|
||||||
const info = this._get(protocol, id, aliases);
|
const info = await this._get(protocol, id, aliases);
|
||||||
if (unique && info.opened > 0) return null;
|
if (unique && info.opened > 0) return null;
|
||||||
|
|
||||||
if (info.incoming.length === 0) {
|
if (info.incoming.length === 0) {
|
||||||
|
@ -371,7 +392,12 @@ module.exports = class Protomux {
|
||||||
|
|
||||||
this._remoteBacklog--;
|
this._remoteBacklog--;
|
||||||
|
|
||||||
|
await this.pullInfos();
|
||||||
|
|
||||||
const remoteId = info.incoming.shift();
|
const remoteId = info.incoming.shift();
|
||||||
|
|
||||||
|
await this.pushInfos();
|
||||||
|
|
||||||
const r = this._remote[remoteId - 1];
|
const r = this._remote[remoteId - 1];
|
||||||
if (r === null) return null;
|
if (r === null) return null;
|
||||||
|
|
||||||
|
@ -436,9 +462,11 @@ module.exports = class Protomux {
|
||||||
this.stream.write(state.buffer);
|
this.stream.write(state.buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
_get(protocol, id, aliases = []) {
|
async _get(protocol, id, aliases = []) {
|
||||||
const key = toKey(protocol, id);
|
const key = toKey(protocol, id);
|
||||||
|
|
||||||
|
await this.pullInfos();
|
||||||
|
|
||||||
let info = this._infos.get(key);
|
let info = this._infos.get(key);
|
||||||
if (info) return info;
|
if (info) return info;
|
||||||
|
|
||||||
|
@ -461,18 +489,22 @@ module.exports = class Protomux {
|
||||||
this._infos.set(key, info);
|
this._infos.set(key, info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this.pushInfos();
|
||||||
|
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
_gc(info) {
|
async _gc(info) {
|
||||||
if (
|
if (
|
||||||
info.opened === 0 &&
|
info.opened === 0 &&
|
||||||
info.outgoing.length === 0 &&
|
info.outgoing.length === 0 &&
|
||||||
info.incoming.length === 0
|
info.incoming.length === 0
|
||||||
) {
|
) {
|
||||||
|
await this.pullInfos();
|
||||||
this._infos.delete(info.key);
|
this._infos.delete(info.key);
|
||||||
|
|
||||||
for (const alias of info.aliases) this._infos.delete(alias);
|
for (const alias of info.aliases) this._infos.delete(alias);
|
||||||
|
await this.pullInfos();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -490,7 +522,7 @@ module.exports = class Protomux {
|
||||||
this.stream.end();
|
this.stream.end();
|
||||||
}
|
}
|
||||||
|
|
||||||
_decode(remoteId, state) {
|
async _decode(remoteId, state) {
|
||||||
const type = c.uint.decode(state);
|
const type = c.uint.decode(state);
|
||||||
|
|
||||||
if (remoteId === 0) {
|
if (remoteId === 0) {
|
||||||
|
@ -568,7 +600,7 @@ module.exports = class Protomux {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_onopensession(state) {
|
async _onopensession(state) {
|
||||||
const remoteId = c.uint.decode(state);
|
const remoteId = c.uint.decode(state);
|
||||||
const protocol = c.string.decode(state);
|
const protocol = c.string.decode(state);
|
||||||
const id = c.buffer.decode(state);
|
const id = c.buffer.decode(state);
|
||||||
|
@ -581,11 +613,15 @@ module.exports = class Protomux {
|
||||||
}
|
}
|
||||||
|
|
||||||
const rid = remoteId - 1;
|
const rid = remoteId - 1;
|
||||||
const info = this._get(protocol, id);
|
const info = await this._get(protocol, id);
|
||||||
|
|
||||||
|
await this.pullRemote();
|
||||||
|
|
||||||
// allow the remote to grow the ids by one
|
// allow the remote to grow the ids by one
|
||||||
if (this._remote.length === rid) {
|
if (this._remote.length === rid) {
|
||||||
this._remote.push(null);
|
this._remote.push(null);
|
||||||
|
|
||||||
|
await this.pushRemote(this._remote.length - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rid >= this._remote.length || this._remote[rid] !== null) {
|
if (rid >= this._remote.length || this._remote[rid] !== null) {
|
||||||
|
@ -593,17 +629,26 @@ module.exports = class Protomux {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (info.outgoing.length > 0) {
|
if (info.outgoing.length > 0) {
|
||||||
|
await this.pullInfos();
|
||||||
const localId = info.outgoing.shift();
|
const localId = info.outgoing.shift();
|
||||||
const session = this._local[localId - 1];
|
const session = this._local[localId - 1];
|
||||||
|
|
||||||
|
await this.pushInfos();
|
||||||
|
|
||||||
if (session === null) {
|
if (session === null) {
|
||||||
|
await this.pullFree();
|
||||||
// we already closed the channel - ignore
|
// we already closed the channel - ignore
|
||||||
this._free.push(localId - 1);
|
this._free.push(localId - 1);
|
||||||
|
|
||||||
|
await this.pushFree(localId - 1);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this._remote[rid] = { state, pending: null, session: null };
|
this._remote[rid] = { state, pending: null, session: null };
|
||||||
|
|
||||||
|
await this.pushRemote(remoteId);
|
||||||
|
|
||||||
session._remoteId = remoteId;
|
session._remoteId = remoteId;
|
||||||
session._fullyOpen();
|
session._fullyOpen();
|
||||||
return;
|
return;
|
||||||
|
@ -620,13 +665,18 @@ module.exports = class Protomux {
|
||||||
throw new Error("Remote exceeded backlog");
|
throw new Error("Remote exceeded backlog");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this.pullInfos();
|
||||||
|
|
||||||
info.pairing++;
|
info.pairing++;
|
||||||
info.incoming.push(remoteId);
|
info.incoming.push(remoteId);
|
||||||
|
|
||||||
|
await this.pushInfos();
|
||||||
|
await this.pushRemote(remoteId);
|
||||||
|
|
||||||
this._requestSession(protocol, id, info).catch(this._safeDestroyBound);
|
this._requestSession(protocol, id, info).catch(this._safeDestroyBound);
|
||||||
}
|
}
|
||||||
|
|
||||||
_onrejectsession(state) {
|
async _onrejectsession(state) {
|
||||||
const localId = c.uint.decode(state);
|
const localId = c.uint.decode(state);
|
||||||
|
|
||||||
// TODO: can be done smarter...
|
// TODO: can be done smarter...
|
||||||
|
@ -634,14 +684,23 @@ module.exports = class Protomux {
|
||||||
const i = info.outgoing.indexOf(localId);
|
const i = info.outgoing.indexOf(localId);
|
||||||
if (i === -1) continue;
|
if (i === -1) continue;
|
||||||
|
|
||||||
|
await this.pullInfos();
|
||||||
|
|
||||||
info.outgoing.splice(i, 1);
|
info.outgoing.splice(i, 1);
|
||||||
|
|
||||||
|
await this.pushInfos();
|
||||||
|
|
||||||
const session = this._local[localId - 1];
|
const session = this._local[localId - 1];
|
||||||
|
|
||||||
|
await this.pullFree(localId - 1);
|
||||||
|
|
||||||
this._free.push(localId - 1);
|
this._free.push(localId - 1);
|
||||||
|
|
||||||
|
await this.pushFree(localId - 1);
|
||||||
|
|
||||||
if (session !== null) session._close(true);
|
if (session !== null) session._close(true);
|
||||||
|
|
||||||
this._gc(info);
|
await this._gc(info);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -668,13 +727,25 @@ module.exports = class Protomux {
|
||||||
|
|
||||||
if (notify) await notify(id);
|
if (notify) await notify(id);
|
||||||
|
|
||||||
if (--info.pairing > 0) return;
|
await this.pullInfos();
|
||||||
|
|
||||||
|
let p = --info.pairing;
|
||||||
|
|
||||||
|
await this.pushInfos();
|
||||||
|
|
||||||
|
if (p > 0) return;
|
||||||
|
|
||||||
while (info.incoming.length > 0) {
|
while (info.incoming.length > 0) {
|
||||||
this._rejectSession(info, info.incoming.shift());
|
await this.pullInfos();
|
||||||
|
|
||||||
|
const id = info.incoming.shift();
|
||||||
|
|
||||||
|
await this.pushInfos();
|
||||||
|
|
||||||
|
this._rejectSession(info, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
this._gc(info);
|
await this._gc(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
_rejectSession(info, remoteId) {
|
_rejectSession(info, remoteId) {
|
||||||
|
@ -727,6 +798,60 @@ module.exports = class Protomux {
|
||||||
if (s !== null) s._close(true);
|
if (s !== null) s._close(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async popFree(id) {
|
||||||
|
await this.stream.syncProtomux("popFree", id);
|
||||||
|
}
|
||||||
|
async pushFree(id) {
|
||||||
|
await this.stream.syncProtomux("pushFree", id);
|
||||||
|
}
|
||||||
|
async pushLocal(id) {
|
||||||
|
await this.stream.syncProtomux("pushLocal", id);
|
||||||
|
}
|
||||||
|
async pushRemote(id) {
|
||||||
|
await this.stream.syncProtomux("pushRemote", id);
|
||||||
|
}
|
||||||
|
async pushInfos() {
|
||||||
|
await this.stream.syncProtomux(
|
||||||
|
"pushInfos",
|
||||||
|
Array.from(this._infos.entries())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async pullLocal() {
|
||||||
|
const ids = await this.stream.syncProtomux("pullLocal");
|
||||||
|
|
||||||
|
ids.forEach((item) => {
|
||||||
|
item = parseInt(item);
|
||||||
|
if (typeof this._local[item] === "undefined") {
|
||||||
|
this._local[item] = null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
async pullFree() {
|
||||||
|
const ids = await this.stream.syncProtomux("pullFree");
|
||||||
|
|
||||||
|
this._free = Array.from(new Set([...this._free, ...ids]));
|
||||||
|
this._free = this._free.filter((item) => item !== null);
|
||||||
|
}
|
||||||
|
async pullRemote() {
|
||||||
|
const ids = await this.stream.syncProtomux("pullRemote");
|
||||||
|
|
||||||
|
ids.forEach((item) => {
|
||||||
|
item = parseInt(item);
|
||||||
|
if (typeof this._remote[item] === "undefined") {
|
||||||
|
this._remote[item] = null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
async pullInfos() {
|
||||||
|
const info = await this.stream.syncProtomux(
|
||||||
|
"pullInfos",
|
||||||
|
Array.from(this._infos.entries())
|
||||||
|
);
|
||||||
|
|
||||||
|
this._infos = new Map(info);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
function noop() {}
|
function noop() {}
|
||||||
|
|
Reference in New Issue