Compare commits

..

23 Commits

Author SHA1 Message Date
Derrick Hammer e0170f9dfc
*make message encoding/decoding async for use with the kernel 2023-04-08 21:09:49 -04:00
Derrick Hammer 912150c6e4
*Add missing pushInfos 2023-04-07 12:22:03 -04:00
Derrick Hammer f9955c18d2
*we don't need a slave mode, just use this async fork on the slave webworkers 2023-04-07 03:34:19 -04:00
Derrick Hammer 2f0deaf2bf
*__get should not be async 2023-04-07 02:15:02 -04:00
Derrick Hammer 702a859661
*refactor to use a a proxy function for createChannnel and _get so that it promises are only used in slave mode 2023-04-07 01:48:06 -04:00
Derrick Hammer 05c91000e1
*Fix api call 2023-04-07 01:32:53 -04:00
Derrick Hammer 2b1789f963
*Move slave checks inside sync functions 2023-04-07 01:32:35 -04:00
Derrick Hammer 5094ec4b26
*add syncing for info
*need to make createChannel async
2023-04-07 01:28:54 -04:00
Derrick Hammer 98bb74d8b1
*Change userData logic to throw error if set, but try to always set it 2023-04-06 22:53:15 -04:00
Derrick Hammer e0178e41ed
_slave is on _mutex on channel 2023-04-06 22:04:27 -04:00
Derrick Hammer 1387b060a4
use this.stream 2023-04-06 17:29:29 -04:00
Derrick Hammer 54904ab1ad
*Bug fix pushRemote 2023-04-06 17:24:06 -04:00
Derrick Hammer ab4632b644
*_mux doesnt exist in Protomux 2023-04-06 17:22:18 -04:00
Derrick Hammer 2b645af1e9
*Bug fix slave 2023-04-06 17:10:08 -04:00
Derrick Hammer eb4a8f1c9e
Revert "*Change pullRemote to throw if the item is not empty"
This reverts commit 74b0f1d596.
2023-04-06 16:21:30 -04:00
Derrick Hammer 74b0f1d596
*Change pullRemote to throw if the item is not empty 2023-04-06 16:20:20 -04:00
Derrick Hammer bef4631c30
*Switch sync approach. sync state properties ondemand before and after accessing them 2023-04-06 15:58:39 -04:00
Derrick Hammer f457294772
*Add concept of being a slave
*on _onopensession, if we are a slave, sync before checking _remote
2023-04-06 14:21:01 -04:00
Derrick Hammer 81eac54453
Revert "Revert "*make syncing async""
This reverts commit d37d37048c.
2023-04-06 14:15:39 -04:00
Derrick Hammer d37d37048c
Revert "*make syncing async"
This reverts commit 5907966d
2023-04-06 13:26:20 -04:00
Derrick Hammer d7dede7894
*need to use _mux on Channel calls to syncState 2023-04-05 20:03:12 -04:00
Derrick Hammer 5907966d77
*make syncing async 2023-04-05 17:46:07 -04:00
Derrick Hammer 6cb9fd8283
*add calls to this.syncState?.() which may be dynamically set higher in the stack to sync the state of the channels and messages 2023-04-05 03:45:10 -04:00
1 changed files with 153 additions and 28 deletions

181
index.js
View File

@ -50,7 +50,11 @@ class Channel {
for (const m of messages) this.addMessage(m);
}
open(handshake) {
async open(handshake) {
await this._mux.pullLocal();
await this._mux.pullFree();
const freeLen = this._mux._free.length;
const id =
this._mux._free.length > 0
? this._mux._free.pop()
@ -60,8 +64,15 @@ class Channel {
this._localId = id + 1;
this._mux._local[id] = this;
if (freeLen > 0) {
await this._mux.popFree(id);
} else {
await this._mux.pushLocal(id);
}
if (this._remoteId === 0) {
this._info.outgoing.push(this._localId);
await this._mux.pushInfos();
}
const state = { buffer: null, start: 2, end: 2 };
@ -92,7 +103,7 @@ class Channel {
this._mux._safeDestroy(err);
}
_fullyOpenSoon() {
async _fullyOpenSoon() {
this._mux._remote[this._remoteId - 1].session = this;
queueTick(this._fullyOpen.bind(this));
}
@ -131,24 +142,31 @@ class Channel {
}
}
_close(isRemote) {
async _close(isRemote) {
if (this.closed === true) return;
this.closed = true;
await this._mux.pullInfos();
this._info.opened--;
await this._mux.pushInfos();
if (this._remoteId > 0) {
this._mux._remote[this._remoteId - 1] = null;
this._remoteId = 0;
await this._mux.pullFree();
// If remote has acked, we can reuse the local id now
// otherwise, we need to wait for the "ack" to arrive
this._mux._free.push(this._localId - 1);
await this._mux.pushFree(this._localId - 1);
}
this._mux._local[this._localId - 1] = null;
this._localId = 0;
this._mux._gc(this._info);
await this._mux._gc(this._info);
this._track(this.onclose(isRemote, this));
if (this._active === 0) this._destroy();
@ -174,7 +192,7 @@ class Channel {
this._mux.uncork();
}
close() {
async close() {
if (this.closed === true) return;
const state = { buffer: null, start: 2, end: 2 };
@ -187,7 +205,7 @@ class Channel {
state.buffer[1] = 3;
c.uint.encode(state, this._localId);
this._close(false);
await this._mux._close(false);
this._mux._write0(state.buffer);
}
@ -205,34 +223,34 @@ class Channel {
type,
encoding,
onmessage,
recv(state, session) {
session._track(m.onmessage(encoding.decode(state), session));
async recv(state, session) {
session._track(m.onmessage(await encoding.decode(state), session));
},
send(m, session = s) {
async send(m, session = s) {
if (session.closed === true) return false;
const mux = session._mux;
const state = { buffer: null, start: 0, end: typeLen };
if (mux._batch !== null) {
encoding.preencode(state, m);
await encoding.preencode(state, m);
state.buffer = mux._alloc(state.end);
c.uint.encode(state, type);
encoding.encode(state, m);
await encoding.encode(state, m);
mux._pushBatch(session._localId, state.buffer);
return true;
}
c.uint.preencode(state, session._localId);
encoding.preencode(state, m);
await encoding.preencode(state, m);
state.buffer = mux._alloc(state.end);
c.uint.encode(state, session._localId);
c.uint.encode(state, type);
encoding.encode(state, m);
await encoding.encode(state, m);
return mux.stream.write(state.buffer);
},
@ -259,8 +277,12 @@ class Channel {
}
module.exports = class Protomux {
constructor(stream, { alloc } = {}) {
if (stream.userData === null) stream.userData = this;
constructor(stream, { alloc, slave = false } = {}) {
if (stream.userData) {
throw new Error("Mux already set for this stream");
}
stream.userData = this;
this.isProtomux = true;
this.stream = stream;
@ -335,8 +357,7 @@ module.exports = class Protomux {
const info = this._infos.get(key);
return info ? info.opened > 0 : false;
}
createChannel({
async createChannel({
userData = null,
protocol,
aliases = [],
@ -350,7 +371,7 @@ module.exports = class Protomux {
}) {
if (this.stream.destroyed) return null;
const info = this._get(protocol, id, aliases);
const info = await this._get(protocol, id, aliases);
if (unique && info.opened > 0) return null;
if (info.incoming.length === 0) {
@ -371,7 +392,12 @@ module.exports = class Protomux {
this._remoteBacklog--;
await this.pullInfos();
const remoteId = info.incoming.shift();
await this.pushInfos();
const r = this._remote[remoteId - 1];
if (r === null) return null;
@ -436,9 +462,11 @@ module.exports = class Protomux {
this.stream.write(state.buffer);
}
_get(protocol, id, aliases = []) {
async _get(protocol, id, aliases = []) {
const key = toKey(protocol, id);
await this.pullInfos();
let info = this._infos.get(key);
if (info) return info;
@ -461,18 +489,22 @@ module.exports = class Protomux {
this._infos.set(key, info);
}
await this.pushInfos();
return info;
}
_gc(info) {
async _gc(info) {
if (
info.opened === 0 &&
info.outgoing.length === 0 &&
info.incoming.length === 0
) {
await this.pullInfos();
this._infos.delete(info.key);
for (const alias of info.aliases) this._infos.delete(alias);
await this.pullInfos();
}
}
@ -490,7 +522,7 @@ module.exports = class Protomux {
this.stream.end();
}
_decode(remoteId, state) {
async _decode(remoteId, state) {
const type = c.uint.decode(state);
if (remoteId === 0) {
@ -568,7 +600,7 @@ module.exports = class Protomux {
}
}
_onopensession(state) {
async _onopensession(state) {
const remoteId = c.uint.decode(state);
const protocol = c.string.decode(state);
const id = c.buffer.decode(state);
@ -581,11 +613,15 @@ module.exports = class Protomux {
}
const rid = remoteId - 1;
const info = this._get(protocol, id);
const info = await this._get(protocol, id);
await this.pullRemote();
// allow the remote to grow the ids by one
if (this._remote.length === rid) {
this._remote.push(null);
await this.pushRemote(this._remote.length - 1);
}
if (rid >= this._remote.length || this._remote[rid] !== null) {
@ -593,17 +629,26 @@ module.exports = class Protomux {
}
if (info.outgoing.length > 0) {
await this.pullInfos();
const localId = info.outgoing.shift();
const session = this._local[localId - 1];
await this.pushInfos();
if (session === null) {
await this.pullFree();
// we already closed the channel - ignore
this._free.push(localId - 1);
await this.pushFree(localId - 1);
return;
}
this._remote[rid] = { state, pending: null, session: null };
await this.pushRemote(remoteId);
session._remoteId = remoteId;
session._fullyOpen();
return;
@ -620,13 +665,18 @@ module.exports = class Protomux {
throw new Error("Remote exceeded backlog");
}
await this.pullInfos();
info.pairing++;
info.incoming.push(remoteId);
await this.pushInfos();
await this.pushRemote(remoteId);
this._requestSession(protocol, id, info).catch(this._safeDestroyBound);
}
_onrejectsession(state) {
async _onrejectsession(state) {
const localId = c.uint.decode(state);
// TODO: can be done smarter...
@ -634,14 +684,23 @@ module.exports = class Protomux {
const i = info.outgoing.indexOf(localId);
if (i === -1) continue;
await this.pullInfos();
info.outgoing.splice(i, 1);
await this.pushInfos();
const session = this._local[localId - 1];
await this.pullFree(localId - 1);
this._free.push(localId - 1);
await this.pushFree(localId - 1);
if (session !== null) session._close(true);
this._gc(info);
await this._gc(info);
return;
}
@ -668,13 +727,25 @@ module.exports = class Protomux {
if (notify) await notify(id);
if (--info.pairing > 0) return;
await this.pullInfos();
let p = --info.pairing;
await this.pushInfos();
if (p > 0) return;
while (info.incoming.length > 0) {
this._rejectSession(info, info.incoming.shift());
await this.pullInfos();
const id = info.incoming.shift();
await this.pushInfos();
this._rejectSession(info, id);
}
this._gc(info);
await this._gc(info);
}
_rejectSession(info, remoteId) {
@ -727,6 +798,60 @@ module.exports = class Protomux {
if (s !== null) s._close(true);
}
}
async popFree(id) {
await this.stream.syncProtomux("popFree", id);
}
async pushFree(id) {
await this.stream.syncProtomux("pushFree", id);
}
async pushLocal(id) {
await this.stream.syncProtomux("pushLocal", id);
}
async pushRemote(id) {
await this.stream.syncProtomux("pushRemote", id);
}
async pushInfos() {
await this.stream.syncProtomux(
"pushInfos",
Array.from(this._infos.entries())
);
}
async pullLocal() {
const ids = await this.stream.syncProtomux("pullLocal");
ids.forEach((item) => {
item = parseInt(item);
if (typeof this._local[item] === "undefined") {
this._local[item] = null;
}
});
}
async pullFree() {
const ids = await this.stream.syncProtomux("pullFree");
this._free = Array.from(new Set([...this._free, ...ids]));
this._free = this._free.filter((item) => item !== null);
}
async pullRemote() {
const ids = await this.stream.syncProtomux("pullRemote");
ids.forEach((item) => {
item = parseInt(item);
if (typeof this._remote[item] === "undefined") {
this._remote[item] = null;
}
});
}
async pullInfos() {
const info = await this.stream.syncProtomux(
"pullInfos",
Array.from(this._infos.entries())
);
this._infos = new Map(info);
}
};
function noop() {}