This repository has been archived on 2023-04-09. You can view files and clone it, but cannot push or open issues or pull requests.
kernel-protomux/index.js

876 lines
19 KiB
JavaScript
Raw Normal View History

2023-04-05 07:36:37 +00:00
const b4a = require("b4a");
const c = require("compact-encoding");
const queueTick = require("queue-tick");
const safetyCatch = require("safety-catch");
2021-12-30 20:13:47 +00:00
2023-04-05 07:36:37 +00:00
const MAX_BUFFERED = 32768;
const MAX_BACKLOG = Infinity; // TODO: impl "open" backpressure
const MAX_BATCH = 8 * 1024 * 1024;
2021-11-10 14:15:09 +00:00
class Channel {
2023-04-05 07:36:37 +00:00
constructor(
mux,
info,
userData,
protocol,
aliases,
id,
handshake,
messages,
onopen,
onclose,
ondestroy
) {
this.userData = userData;
this.protocol = protocol;
this.aliases = aliases;
this.id = id;
this.handshake = null;
this.messages = [];
this.opened = false;
this.closed = false;
this.destroyed = false;
this.onopen = onopen;
this.onclose = onclose;
this.ondestroy = ondestroy;
this._handshake = handshake;
this._mux = mux;
this._info = info;
this._localId = 0;
this._remoteId = 0;
this._active = 0;
this._extensions = null;
this._decBound = this._dec.bind(this);
this._decAndDestroyBound = this._decAndDestroy.bind(this);
for (const m of messages) this.addMessage(m);
}
async open(handshake) {
await this._mux.pullLocal();
await this._mux.pullFree();
const freeLen = this._mux._free.length;
2023-04-05 07:36:37 +00:00
const id =
this._mux._free.length > 0
? this._mux._free.pop()
: this._mux._local.push(null) - 1;
this._info.opened++;
this._localId = id + 1;
this._mux._local[id] = this;
2022-03-10 22:08:37 +00:00
if (freeLen > 0) {
await this._mux.popFree(id);
} else {
await this._mux.pushLocal(id);
}
if (this._remoteId === 0) {
2023-04-05 07:36:37 +00:00
this._info.outgoing.push(this._localId);
2023-04-07 16:22:03 +00:00
await this._mux.pushInfos();
}
2023-04-05 07:36:37 +00:00
const state = { buffer: null, start: 2, end: 2 };
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
c.uint.preencode(state, this._localId);
c.string.preencode(state, this.protocol);
c.buffer.preencode(state, this.id);
if (this._handshake) this._handshake.preencode(state, handshake);
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
state.buffer = this._mux._alloc(state.end);
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
state.buffer[0] = 0;
state.buffer[1] = 1;
c.uint.encode(state, this._localId);
c.string.encode(state, this.protocol);
c.buffer.encode(state, this.id);
if (this._handshake) this._handshake.encode(state, handshake);
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
this._mux._write0(state.buffer);
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
_dec() {
if (--this._active === 0 && this.closed === true) this._destroy();
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
_decAndDestroy(err) {
this._dec();
this._mux._safeDestroy(err);
2022-03-10 22:08:37 +00:00
}
async _fullyOpenSoon() {
2023-04-05 07:36:37 +00:00
this._mux._remote[this._remoteId - 1].session = this;
queueTick(this._fullyOpen.bind(this));
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
_fullyOpen() {
if (this.opened === true || this.closed === true) return;
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
const remote = this._mux._remote[this._remoteId - 1];
2021-12-30 20:13:47 +00:00
2023-04-05 07:36:37 +00:00
this.opened = true;
this.handshake = this._handshake
? this._handshake.decode(remote.state)
: null;
this._track(this.onopen(this.handshake, this));
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
remote.session = this;
remote.state = null;
if (remote.pending !== null) this._drain(remote);
2022-03-10 22:08:37 +00:00
}
2021-12-30 20:13:47 +00:00
2023-04-05 07:36:37 +00:00
_drain(remote) {
2022-03-10 22:08:37 +00:00
for (let i = 0; i < remote.pending.length; i++) {
2023-04-05 07:36:37 +00:00
const p = remote.pending[i];
this._mux._buffered -= byteSize(p.state);
this._recv(p.type, p.state);
2022-03-10 22:08:37 +00:00
}
2021-12-30 20:13:47 +00:00
2023-04-05 07:36:37 +00:00
remote.pending = null;
this._mux._resumeMaybe();
2021-11-10 14:15:09 +00:00
}
2023-04-05 07:36:37 +00:00
_track(p) {
2022-03-10 22:08:37 +00:00
if (isPromise(p) === true) {
2023-04-05 07:36:37 +00:00
this._active++;
p.then(this._decBound, this._decAndDestroyBound);
2022-02-01 18:09:56 +00:00
}
}
async _close(isRemote) {
2023-04-05 07:36:37 +00:00
if (this.closed === true) return;
this.closed = true;
2022-03-10 22:08:37 +00:00
await this._mux.pullInfos();
2023-04-05 07:36:37 +00:00
this._info.opened--;
2022-03-10 22:08:37 +00:00
await this._mux.pushInfos();
2022-03-10 22:08:37 +00:00
if (this._remoteId > 0) {
2023-04-05 07:36:37 +00:00
this._mux._remote[this._remoteId - 1] = null;
this._remoteId = 0;
await this._mux.pullFree();
2022-03-10 22:08:37 +00:00
// If remote has acked, we can reuse the local id now
// otherwise, we need to wait for the "ack" to arrive
2023-04-05 07:36:37 +00:00
this._mux._free.push(this._localId - 1);
await this._mux.pushFree(this._localId - 1);
2022-02-01 18:09:56 +00:00
}
2023-04-05 07:36:37 +00:00
this._mux._local[this._localId - 1] = null;
this._localId = 0;
2022-02-01 18:09:56 +00:00
await this._mux._gc(this._info);
2023-04-05 07:36:37 +00:00
this._track(this.onclose(isRemote, this));
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
if (this._active === 0) this._destroy();
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
_destroy() {
if (this.destroyed === true) return;
this.destroyed = true;
this._track(this.ondestroy(this));
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
_recv(type, state) {
if (type < this.messages.length) {
2023-04-05 07:36:37 +00:00
this.messages[type].recv(state, this);
2022-03-10 22:08:37 +00:00
}
2022-02-01 18:09:56 +00:00
}
2023-04-05 07:36:37 +00:00
cork() {
this._mux.cork();
2021-11-10 14:15:09 +00:00
}
2023-04-05 07:36:37 +00:00
uncork() {
this._mux.uncork();
2021-11-10 14:15:09 +00:00
}
async close() {
2023-04-05 07:36:37 +00:00
if (this.closed === true) return;
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
const state = { buffer: null, start: 2, end: 2 };
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
c.uint.preencode(state, this._localId);
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
state.buffer = this._mux._alloc(state.end);
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
state.buffer[0] = 0;
state.buffer[1] = 3;
c.uint.encode(state, this._localId);
2021-11-10 14:15:09 +00:00
await this._mux._close(false);
2023-04-05 07:36:37 +00:00
this._mux._write0(state.buffer);
2021-12-30 20:13:47 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
addMessage(opts) {
if (!opts) return this._skipMessage();
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
const type = this.messages.length;
const encoding = opts.encoding || c.raw;
const onmessage = opts.onmessage || noop;
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
const s = this;
const typeLen = encodingLength(c.uint, type);
2022-03-10 22:08:37 +00:00
const m = {
type,
encoding,
onmessage,
async recv(state, session) {
session._track(m.onmessage(await encoding.decode(state), session));
2022-03-10 22:08:37 +00:00
},
async send(m, session = s) {
2023-04-05 07:36:37 +00:00
if (session.closed === true) return false;
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
const mux = session._mux;
const state = { buffer: null, start: 0, end: typeLen };
2022-03-10 22:08:37 +00:00
if (mux._batch !== null) {
await encoding.preencode(state, m);
2023-04-05 07:36:37 +00:00
state.buffer = mux._alloc(state.end);
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
c.uint.encode(state, type);
await encoding.encode(state, m);
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
mux._pushBatch(session._localId, state.buffer);
return true;
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
c.uint.preencode(state, session._localId);
await encoding.preencode(state, m);
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
state.buffer = mux._alloc(state.end);
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
c.uint.encode(state, session._localId);
c.uint.encode(state, type);
await encoding.encode(state, m);
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
return mux.stream.write(state.buffer);
},
};
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
this.messages.push(m);
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
return m;
2021-11-10 14:15:09 +00:00
}
2023-04-05 07:36:37 +00:00
_skipMessage() {
const type = this.messages.length;
2022-03-10 22:08:37 +00:00
const m = {
type,
encoding: c.raw,
onmessage: noop,
2023-04-05 07:36:37 +00:00
recv(state, session) {},
send(m, session) {},
};
2021-12-30 20:13:47 +00:00
2023-04-05 07:36:37 +00:00
this.messages.push(m);
return m;
2021-11-10 14:15:09 +00:00
}
}
module.exports = class Protomux {
constructor(stream, { alloc, slave = false } = {}) {
if (stream.userData) {
throw new Error("Mux already set for this stream");
}
stream.userData = this;
2023-04-05 07:36:37 +00:00
this.isProtomux = true;
this.stream = stream;
this.corked = 0;
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
this._alloc =
alloc ||
(typeof stream.alloc === "function"
? stream.alloc.bind(stream)
: b4a.allocUnsafe);
this._safeDestroyBound = this._safeDestroy.bind(this);
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
this._remoteBacklog = 0;
this._buffered = 0;
this._paused = false;
this._remote = [];
this._local = [];
this._free = [];
this._batch = null;
this._batchState = null;
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
this._infos = new Map();
this._notify = new Map();
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
this.stream.on("data", this._ondata.bind(this));
this.stream.on("end", this._onend.bind(this));
this.stream.on("error", noop); // we handle this in "close"
this.stream.on("close", this._shutdown.bind(this));
2021-11-10 14:15:09 +00:00
}
2023-04-05 07:36:37 +00:00
static from(stream, opts) {
if (stream.userData && stream.userData.isProtomux) return stream.userData;
if (stream.isProtomux) return stream;
return new this(stream, opts);
2022-01-28 15:00:09 +00:00
}
2023-04-05 07:36:37 +00:00
static isProtomux(mux) {
return typeof mux === "object" && mux.isProtomux === true;
2022-03-11 01:18:31 +00:00
}
2023-04-05 07:36:37 +00:00
*[Symbol.iterator]() {
2022-03-10 22:08:37 +00:00
for (const session of this._local) {
2023-04-05 07:36:37 +00:00
if (session !== null) yield session;
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
}
2023-04-05 07:36:37 +00:00
cork() {
2022-03-10 22:08:37 +00:00
if (++this.corked === 1) {
2023-04-05 07:36:37 +00:00
this._batch = [];
this._batchState = { buffer: null, start: 0, end: 1 };
2022-03-10 22:08:37 +00:00
}
2021-12-30 20:13:47 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
uncork() {
2022-03-10 22:08:37 +00:00
if (--this.corked === 0) {
2023-04-05 07:36:37 +00:00
this._sendBatch(this._batch, this._batchState);
this._batch = null;
this._batchState = null;
2022-03-10 22:08:37 +00:00
}
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
pair({ protocol, id = null }, notify) {
this._notify.set(toKey(protocol, id), notify);
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
unpair({ protocol, id = null }) {
this._notify.delete(toKey(protocol, id));
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
opened({ protocol, id = null }) {
const key = toKey(protocol, id);
const info = this._infos.get(key);
return info ? info.opened > 0 : false;
2022-03-10 22:08:37 +00:00
}
async createChannel({
2023-04-05 07:36:37 +00:00
userData = null,
protocol,
aliases = [],
id = null,
unique = true,
handshake = null,
messages = [],
onopen = noop,
onclose = noop,
ondestroy = noop,
}) {
if (this.stream.destroyed) return null;
2022-03-10 22:08:37 +00:00
const info = await this._get(protocol, id, aliases);
2023-04-05 07:36:37 +00:00
if (unique && info.opened > 0) return null;
2021-12-27 15:09:34 +00:00
2022-03-10 22:08:37 +00:00
if (info.incoming.length === 0) {
2023-04-05 07:36:37 +00:00
return new Channel(
this,
info,
userData,
protocol,
aliases,
id,
handshake,
messages,
onopen,
onclose,
ondestroy
);
2021-11-10 14:15:09 +00:00
}
2023-04-05 07:36:37 +00:00
this._remoteBacklog--;
2021-12-30 20:13:47 +00:00
await this.pullInfos();
2023-04-05 07:36:37 +00:00
const remoteId = info.incoming.shift();
await this.pushInfos();
2023-04-05 07:36:37 +00:00
const r = this._remote[remoteId - 1];
if (r === null) return null;
2021-12-30 20:13:47 +00:00
2023-04-05 07:36:37 +00:00
const session = new Channel(
this,
info,
userData,
protocol,
aliases,
id,
handshake,
messages,
onopen,
onclose,
ondestroy
);
2021-12-30 20:13:47 +00:00
2023-04-05 07:36:37 +00:00
session._remoteId = remoteId;
session._fullyOpenSoon();
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
return session;
2021-12-30 20:13:47 +00:00
}
2021-12-27 15:09:34 +00:00
2023-04-05 07:36:37 +00:00
_pushBatch(localId, buffer) {
2022-07-06 12:39:04 +00:00
if (this._batchState.end >= MAX_BATCH) {
2023-04-05 07:36:37 +00:00
this._sendBatch(this._batch, this._batchState);
this._batch = [];
this._batchState = { buffer: null, start: 0, end: 1 };
2022-07-06 12:39:04 +00:00
}
2023-04-05 07:36:37 +00:00
if (
this._batch.length === 0 ||
this._batch[this._batch.length - 1].localId !== localId
) {
this._batchState.end++;
c.uint.preencode(this._batchState, localId);
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
c.buffer.preencode(this._batchState, buffer);
this._batch.push({ localId, buffer });
2021-12-30 20:13:47 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
_sendBatch(batch, state) {
if (batch.length === 0) return;
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
let prev = batch[0].localId;
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
state.buffer = this._alloc(state.end);
state.buffer[state.start++] = 0;
state.buffer[state.start++] = 0;
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
c.uint.encode(state, prev);
2021-11-10 14:15:09 +00:00
2022-03-10 22:08:37 +00:00
for (let i = 0; i < batch.length; i++) {
2023-04-05 07:36:37 +00:00
const b = batch[i];
2022-03-10 22:08:37 +00:00
if (prev !== b.localId) {
2023-04-05 07:36:37 +00:00
state.buffer[state.start++] = 0;
c.uint.encode(state, (prev = b.localId));
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
c.buffer.encode(state, b.buffer);
2021-11-10 14:15:09 +00:00
}
2023-04-05 07:36:37 +00:00
this.stream.write(state.buffer);
2021-11-10 14:15:09 +00:00
}
async _get(protocol, id, aliases = []) {
2023-04-05 07:36:37 +00:00
const key = toKey(protocol, id);
2022-03-10 22:08:37 +00:00
await this.pullInfos();
2023-04-05 07:36:37 +00:00
let info = this._infos.get(key);
if (info) return info;
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
info = {
key,
protocol,
aliases: [],
id,
pairing: 0,
opened: 0,
incoming: [],
outgoing: [],
};
this._infos.set(key, info);
for (const alias of aliases) {
2023-04-05 07:36:37 +00:00
const key = toKey(alias, id);
info.aliases.push(key);
2023-04-05 07:36:37 +00:00
this._infos.set(key, info);
}
await this.pushInfos();
2023-04-05 07:36:37 +00:00
return info;
2021-12-30 20:13:47 +00:00
}
2021-12-27 15:09:34 +00:00
async _gc(info) {
2023-04-05 07:36:37 +00:00
if (
info.opened === 0 &&
info.outgoing.length === 0 &&
info.incoming.length === 0
) {
await this.pullInfos();
2023-04-05 07:36:37 +00:00
this._infos.delete(info.key);
2023-04-05 07:36:37 +00:00
for (const alias of info.aliases) this._infos.delete(alias);
await this.pullInfos();
2022-03-10 22:08:37 +00:00
}
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
_ondata(buffer) {
2021-11-10 14:15:09 +00:00
try {
2023-04-05 07:36:37 +00:00
const state = { buffer, start: 0, end: buffer.byteLength };
this._decode(c.uint.decode(state), state);
2021-11-10 14:15:09 +00:00
} catch (err) {
2023-04-05 07:36:37 +00:00
this._safeDestroy(err);
2022-03-10 22:08:37 +00:00
}
}
2023-04-05 07:36:37 +00:00
_onend() {
// TODO: support half open mode for the users who wants that here
this.stream.end();
2022-03-24 14:19:55 +00:00
}
async _decode(remoteId, state) {
2023-04-05 07:36:37 +00:00
const type = c.uint.decode(state);
2022-03-10 22:08:37 +00:00
if (remoteId === 0) {
this._oncontrolsession(type, state);
2023-04-05 07:36:37 +00:00
return;
2021-11-10 14:15:09 +00:00
}
2021-12-30 20:13:47 +00:00
2023-04-05 07:36:37 +00:00
const r =
remoteId <= this._remote.length ? this._remote[remoteId - 1] : null;
2021-11-10 14:15:09 +00:00
2022-03-10 22:08:37 +00:00
// if the channel is closed ignore - could just be a pipeline message...
2023-04-05 07:36:37 +00:00
if (r === null) return;
2021-11-10 14:15:09 +00:00
2022-03-10 22:08:37 +00:00
if (r.pending !== null) {
2023-04-05 07:36:37 +00:00
this._bufferMessage(r, type, state);
return;
2021-11-10 14:15:09 +00:00
}
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
r.session._recv(type, state);
2021-11-10 14:15:09 +00:00
}
_oncontrolsession(type, state) {
2022-03-10 22:08:37 +00:00
switch (type) {
case 0:
2023-04-05 07:36:37 +00:00
this._onbatch(state);
break;
2021-12-30 20:13:47 +00:00
2022-03-10 22:08:37 +00:00
case 1:
2023-04-05 07:36:37 +00:00
this._onopensession(state);
break;
2021-11-10 14:15:09 +00:00
2022-03-10 22:08:37 +00:00
case 2:
2023-04-05 07:36:37 +00:00
this._onrejectsession(state);
break;
2022-03-10 22:08:37 +00:00
case 3:
2023-04-05 07:36:37 +00:00
this._onclosesession(state);
break;
2021-11-10 14:15:09 +00:00
}
}
2023-04-05 07:36:37 +00:00
_bufferMessage(r, type, { buffer, start, end }) {
const state = { buffer, start, end }; // copy
r.pending.push({ type, state });
this._buffered += byteSize(state);
this._pauseMaybe();
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
_pauseMaybe() {
if (this._paused === true || this._buffered <= MAX_BUFFERED) return;
this._paused = true;
this.stream.pause();
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
_resumeMaybe() {
if (this._paused === false || this._buffered > MAX_BUFFERED) return;
this._paused = false;
this.stream.resume();
2021-11-10 14:15:09 +00:00
}
2023-04-05 07:36:37 +00:00
_onbatch(state) {
const end = state.end;
let remoteId = c.uint.decode(state);
2022-03-10 22:08:37 +00:00
while (state.end > state.start) {
2023-04-05 07:36:37 +00:00
const len = c.uint.decode(state);
2022-03-10 22:08:37 +00:00
if (len === 0) {
2023-04-05 07:36:37 +00:00
remoteId = c.uint.decode(state);
continue;
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
state.end = state.start + len;
this._decode(remoteId, state);
state.start = state.end;
state.end = end;
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
}
async _onopensession(state) {
2023-04-05 07:36:37 +00:00
const remoteId = c.uint.decode(state);
const protocol = c.string.decode(state);
const id = c.buffer.decode(state);
2021-12-30 20:13:47 +00:00
2022-03-10 22:08:37 +00:00
// remote tried to open the control session - auto reject for now
// as we can use as an explicit control protocol declaration if we need to
if (remoteId === 0) {
2023-04-05 07:36:37 +00:00
this._rejectSession(0);
return;
2022-03-10 22:08:37 +00:00
}
2021-12-30 20:13:47 +00:00
2023-04-05 07:36:37 +00:00
const rid = remoteId - 1;
const info = await this._get(protocol, id);
2021-12-30 20:13:47 +00:00
await this.pullRemote();
2022-03-10 22:08:37 +00:00
// allow the remote to grow the ids by one
if (this._remote.length === rid) {
2023-04-05 07:36:37 +00:00
this._remote.push(null);
await this.pushRemote(this._remote.length - 1);
2021-11-10 14:15:09 +00:00
}
2022-03-10 22:08:37 +00:00
if (rid >= this._remote.length || this._remote[rid] !== null) {
2023-04-05 07:36:37 +00:00
throw new Error("Invalid open message");
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
2022-03-10 22:08:37 +00:00
if (info.outgoing.length > 0) {
await this.pullInfos();
2023-04-05 07:36:37 +00:00
const localId = info.outgoing.shift();
const session = this._local[localId - 1];
2021-12-30 20:13:47 +00:00
await this.pushInfos();
2023-04-05 07:36:37 +00:00
if (session === null) {
await this.pullFree();
2023-04-05 07:36:37 +00:00
// we already closed the channel - ignore
this._free.push(localId - 1);
await this.pushFree(localId - 1);
2023-04-05 07:36:37 +00:00
return;
2021-12-30 20:13:47 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
this._remote[rid] = { state, pending: null, session: null };
2021-11-10 14:15:09 +00:00
await this.pushRemote(remoteId);
2023-04-06 21:24:06 +00:00
2023-04-05 07:36:37 +00:00
session._remoteId = remoteId;
session._fullyOpen();
return;
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
const copyState = {
buffer: state.buffer,
start: state.start,
end: state.end,
};
this._remote[rid] = { state: copyState, pending: [], session: null };
2022-03-10 22:08:37 +00:00
if (++this._remoteBacklog > MAX_BACKLOG) {
2023-04-05 07:36:37 +00:00
throw new Error("Remote exceeded backlog");
2021-12-30 20:13:47 +00:00
}
2022-03-10 22:08:37 +00:00
await this.pullInfos();
2023-04-05 07:36:37 +00:00
info.pairing++;
info.incoming.push(remoteId);
2022-03-10 22:08:37 +00:00
await this.pushInfos();
await this.pushRemote(remoteId);
2023-04-05 07:36:37 +00:00
this._requestSession(protocol, id, info).catch(this._safeDestroyBound);
2021-12-30 20:13:47 +00:00
}
2021-11-10 14:15:09 +00:00
async _onrejectsession(state) {
2023-04-05 07:36:37 +00:00
const localId = c.uint.decode(state);
2022-03-10 22:08:37 +00:00
// TODO: can be done smarter...
for (const info of this._infos.values()) {
2023-04-05 07:36:37 +00:00
const i = info.outgoing.indexOf(localId);
if (i === -1) continue;
2022-03-10 22:08:37 +00:00
await this.pullInfos();
2023-04-05 07:36:37 +00:00
info.outgoing.splice(i, 1);
await this.pushInfos();
2023-04-05 07:36:37 +00:00
const session = this._local[localId - 1];
await this.pullFree(localId - 1);
2023-04-05 07:36:37 +00:00
this._free.push(localId - 1);
await this.pushFree(localId - 1);
2023-04-05 07:36:37 +00:00
if (session !== null) session._close(true);
await this._gc(info);
2023-04-05 07:36:37 +00:00
return;
}
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
throw new Error("Invalid reject message");
2021-12-30 20:13:47 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
_onclosesession(state) {
const remoteId = c.uint.decode(state);
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
if (remoteId === 0) return; // ignore
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
const rid = remoteId - 1;
const r = rid < this._remote.length ? this._remote[rid] : null;
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
if (r === null) return;
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
if (r.session !== null) r.session._close(true);
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
async _requestSession(protocol, id, info) {
const notify =
this._notify.get(toKey(protocol, id)) ||
this._notify.get(toKey(protocol, null));
2021-12-27 15:09:34 +00:00
2023-04-05 07:36:37 +00:00
if (notify) await notify(id);
2022-03-10 22:08:37 +00:00
await this.pullInfos();
let p = --info.pairing;
await this.pushInfos();
if (p > 0) return;
2022-03-10 22:08:37 +00:00
while (info.incoming.length > 0) {
await this.pullInfos();
const id = info.incoming.shift();
await this.pushInfos();
this._rejectSession(info, id);
2021-11-10 14:15:09 +00:00
}
2022-03-10 22:08:37 +00:00
await this._gc(info);
2021-12-30 20:13:47 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
_rejectSession(info, remoteId) {
2022-03-10 22:08:37 +00:00
if (remoteId > 0) {
2023-04-05 07:36:37 +00:00
const r = this._remote[remoteId - 1];
2022-03-10 22:08:37 +00:00
if (r.pending !== null) {
for (let i = 0; i < r.pending.length; i++) {
2023-04-05 07:36:37 +00:00
this._buffered -= byteSize(r.pending[i].state);
2022-03-10 22:08:37 +00:00
}
}
2023-04-05 07:36:37 +00:00
this._remote[remoteId - 1] = null;
this._resumeMaybe();
2021-11-10 14:15:09 +00:00
}
2023-04-05 07:36:37 +00:00
const state = { buffer: null, start: 2, end: 2 };
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
c.uint.preencode(state, remoteId);
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
state.buffer = this._alloc(state.end);
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
state.buffer[0] = 0;
state.buffer[1] = 2;
c.uint.encode(state, remoteId);
2022-03-10 22:08:37 +00:00
2023-04-05 07:36:37 +00:00
this._write0(state.buffer);
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
_write0(buffer) {
2022-03-10 22:08:37 +00:00
if (this._batch !== null) {
2023-04-05 07:36:37 +00:00
this._pushBatch(0, buffer.subarray(1));
return;
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
this.stream.write(buffer);
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
destroy(err) {
this.stream.destroy(err);
2022-03-11 18:46:06 +00:00
}
2023-04-05 07:36:37 +00:00
_safeDestroy(err) {
safetyCatch(err);
this.stream.destroy(err);
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
_shutdown() {
2022-03-10 22:08:37 +00:00
for (const s of this._local) {
2023-04-05 07:36:37 +00:00
if (s !== null) s._close(true);
2022-03-10 22:08:37 +00:00
}
2021-11-10 14:15:09 +00:00
}
async popFree(id) {
2023-04-06 21:29:29 +00:00
await this.stream.syncProtomux("popFree", id);
}
async pushFree(id) {
2023-04-06 21:29:29 +00:00
await this.stream.syncProtomux("pushFree", id);
}
async pushLocal(id) {
2023-04-06 21:29:29 +00:00
await this.stream.syncProtomux("pushLocal", id);
}
async pushRemote(id) {
2023-04-06 21:29:29 +00:00
await this.stream.syncProtomux("pushRemote", id);
}
async pushInfos() {
await this.stream.syncProtomux(
"pushInfos",
Array.from(this._infos.entries())
);
}
async pullLocal() {
2023-04-06 21:29:29 +00:00
const ids = await this.stream.syncProtomux("pullLocal");
ids.forEach((item) => {
item = parseInt(item);
if (typeof this._local[item] === "undefined") {
this._local[item] = null;
}
});
}
async pullFree() {
2023-04-06 21:29:29 +00:00
const ids = await this.stream.syncProtomux("pullFree");
this._free = Array.from(new Set([...this._free, ...ids]));
this._free = this._free.filter((item) => item !== null);
}
async pullRemote() {
2023-04-06 21:29:29 +00:00
const ids = await this.stream.syncProtomux("pullRemote");
ids.forEach((item) => {
item = parseInt(item);
if (typeof this._remote[item] === "undefined") {
this._remote[item] = null;
}
});
}
async pullInfos() {
const info = await this.stream.syncProtomux(
2023-04-07 05:32:53 +00:00
"pullInfos",
Array.from(this._infos.entries())
);
this._infos = new Map(info);
}
2023-04-05 07:36:37 +00:00
};
2021-11-10 14:15:09 +00:00
2023-04-05 07:36:37 +00:00
function noop() {}
2021-12-30 20:13:47 +00:00
2023-04-05 07:36:37 +00:00
function toKey(protocol, id) {
return protocol + "##" + (id ? b4a.toString(id, "hex") : "");
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
function byteSize(state) {
return 512 + (state.end - state.start);
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
function isPromise(p) {
return !!(p && typeof p.then === "function");
2022-03-10 22:08:37 +00:00
}
2023-04-05 07:36:37 +00:00
function encodingLength(enc, val) {
const state = { buffer: null, start: 0, end: 0 };
enc.preencode(state, val);
return state.end;
2021-12-30 20:13:47 +00:00
}