diff --git a/.gitignore b/.gitignore index df9289e..34c4488 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ node_modules sandbox.js +sandbox.mjs sandbox coverage diff --git a/index.js b/index.js index fc9505f..55026d0 100644 --- a/index.js +++ b/index.js @@ -1,410 +1,626 @@ -const c = require('compact-encoding') const b4a = require('b4a') +const c = require('compact-encoding') +const queueTick = require('queue-tick') const safetyCatch = require('safety-catch') -const { addProtocol } = require('./messages') -const EMPTY = [] +const MAX_BUFFERED = 32768 +const MAX_BACKLOG = 256 -class Protocol { - constructor (mux) { - this.mux = mux - - this.name = null - this.version = null - this.messages = EMPTY - this.context = null - this.offset = 0 - this.length = 0 - this.opened = false - this.corked = false - - this.remoteVersion = null - this.remoteOffset = 0 - this.remoteEnd = 0 - this.remoteOpened = false - this.remoteClosed = false - - this.onremoteopen = noop - this.onremoteclose = noop - } - - _attach ({ name, version = { major: 0, minor: 0 }, messages = 0, context = null, onremoteopen = noop, onremoteclose = noop }) { - const opened = this.opened - - this.name = name - this.version = version - this.messages = new Array(messages) +class Session { + constructor (mux, info, protocol, id, context, messages, onopen, onclose, ondestroy) { + this.protocol = protocol + this.id = id this.context = context - this.offset = this.mux.offset - this.length = messages - this.opened = true - this.corked = false + this.messages = [] + this.remoteMessages = this.messages - this.onremoteopen = onremoteopen - this.onremoteclose = onremoteclose + this.opened = false + this.closed = false + this.destroyed = false - return !opened + this.onopen = onopen + this.onclose = onclose + this.ondestroy = ondestroy + + this._mux = mux + this._info = info + this._localId = 0 + this._remoteId = 0 + this._active = 0 + this._extensions = null + + this._decBound = this._dec.bind(this) + this._decAndDestroyBound = this._decAndDestroy.bind(this) + + for (const m of messages) this.addMessage(m) } - _nextMessage () { - for (let i = this.messages.length - 1; i >= 0; i--) { - if (this.messages[i] === undefined && (i === 0 || this.messages[i - 1] !== undefined)) { - return i + _open () { + const id = this._mux._free.length > 0 + ? this._mux._free.pop() + : this._mux._local.push(null) - 1 + + this._info.opened++ + this._localId = id + 1 + this._mux._local[id] = this + + const state = { buffer: null, start: 2, end: 2 } + + c.string.preencode(state, this.protocol) + c.buffer.preencode(state, this.id) + c.uint.preencode(state, this._localId) + + state.buffer = this._mux._alloc(state.end) + + state.buffer[0] = 0 + state.buffer[1] = 1 + c.string.encode(state, this.protocol) + c.buffer.encode(state, this.id) + c.uint.encode(state, this._localId) + + this._mux._write0(state.buffer) + } + + _dec () { + if (--this._active === 0 && this.closed === true) this._destroy() + } + + _decAndDestroy (err) { + this._dec() + this._mux._safeDestroy(err) + } + + _fullyOpenSoon () { + this._mux._remote[this._remoteId - 1].session = this + queueTick(this._fullyOpen.bind(this)) + } + + _fullyOpen () { + if (this.opened === true || this.closed === true) return + + const remote = this._mux._remote[this._remoteId - 1] + + this.opened = true + this._track(this.onopen()) + + remote.session = this + if (remote.pending !== null) this._drain(remote) + } + + _drain (remote) { + for (let i = 0; i < remote.pending.length; i++) { + const p = remote.pending[i] + this._mux._buffered -= byteSize(p.state) + this._recv(p.type, p.state) + } + + remote.pending = null + this._mux._resumeMaybe() + } + + _track (p) { + if (isPromise(p) === true) { + this._active++ + p.then(this._decBound, this._decAndDestroyBound) + } + } + + _close (isRemote) { + if (this.closed === true) return + this.closed = true + + this._info.opened-- + + if (this._remoteId > 0) { + this._mux._remote[this._remoteId - 1] = null + this._remoteId = 0 + // If remote has acked, we can reuse the local id now + // otherwise, we need to wait for the "ack" to arrive + this._mux._free.push(this._localId - 1) + } + + this._mux._local[this._localId - 1] = null + this._localId = 0 + + this._mux._gc(this._info) + this._track(this.onclose(isRemote)) + + if (this._active === 0) this._destroy() + } + + _destroy () { + if (this.destroyed === true) return + this.destroyed = true + this._track(this.ondestroy()) + } + + _recv (type, state) { + if (type < this.remoteMessages.length) { + this.remoteMessages[type].recv(state, this) + } + } + + cork () { + this._mux.cork() + } + + uncork () { + this._mux.uncork() + } + + close () { + if (this.closed === true) return + + const state = { buffer: null, start: 2, end: 2 } + + c.uint.preencode(state, this._localId) + + state.buffer = this._mux._alloc(state.end) + + state.buffer[0] = 0 + state.buffer[1] = 3 + c.uint.encode(state, this._localId) + + this._close(false) + this._mux._write0(state.buffer) + } + + addMessage (opts) { + if (!opts) return this._skipMessage() + + const type = this.messages.length + const encoding = opts.encoding || c.raw + const onmessage = opts.onmessage || noop + + const s = this + const typeLen = encodingLength(c.uint, type) + + const m = { + type, + encoding, + onmessage, + recv (state, session) { + session._track(m.onmessage(encoding.decode(state), session)) + }, + send (m, session = s) { + if (session.closed === true) return false + + const mux = session._mux + const state = { buffer: null, start: 0, end: typeLen } + + if (mux._batch !== null) { + encoding.preencode(state, m) + state.buffer = mux._alloc(state.end) + + c.uint.encode(state, type) + encoding.encode(state, m) + + mux._pushBatch(session._localId, state.buffer) + return true + } + + c.uint.preencode(state, session._localId) + encoding.preencode(state, m) + + state.buffer = mux._alloc(state.end) + + c.uint.encode(state, session._localId) + c.uint.encode(state, type) + encoding.encode(state, m) + + return mux.stream.write(state.buffer) } } - return -1 - } - addMessage ({ type = this._nextMessage(), encoding = c.binary, onmessage = noop } = {}) { - if (type < 0 || type >= this.messages.length) { - throw new Error('Invalid type, must be <= ' + this.messages.length) - } - - const t = this.offset + type - const send = (message) => this.opened && this.mux._push(t, m.encoding, message) - const m = this.messages[type] = { encoding, onmessage, send } + this.messages.push(m) return m } - cork () { - if (this.corked) return - this.corked = true - this.mux.cork() - } + _skipMessage () { + const type = this.messages.length + const m = { + type, + encoding: c.raw, + onmessage: noop, + recv (state, session) {}, + send (m, session) {} + } - uncork () { - if (!this.corked) return - this.corked = false - this.mux.uncork() - } - - close () { - if (this.opened === false) return - this.opened = false - this.mux._unopened++ - - const offset = this.offset - - this.version = null - this.messages = EMPTY - this.offset = 0 - this.length = 0 - this.onremoteopen = this.onremoteclose = noop - this.mux._push(2, c.uint, offset) - this._gc() - - if (this.corked) this.uncork() - } - - _gc () { - if (this.opened || this.remoteOpened) return - this.mux._removeProtocol(this) - } - - _recv (type, state) { - if (type >= this.messages.length) return - - const m = this.messages[type] - if (m !== undefined) this.mux._catch(m.onmessage(m.encoding.decode(state), this.context)) + this.messages.push(m) + return m } } module.exports = class Protomux { - constructor (stream, { backlog = 128, alloc, onacceptprotocol } = {}) { - this.stream = stream - this.protocols = [] - this.remoteProtocols = [] - this.offset = 4 // 4 messages reserved - this.corked = 0 - this.backlog = backlog - this.onacceptprotocol = onacceptprotocol || (() => this._unopened < this.backlog) + constructor (stream, { alloc } = {}) { this.isProtomux = true + this.stream = stream + this.corked = 0 - this._unopened = 0 - this._batch = null this._alloc = alloc || (typeof stream.alloc === 'function' ? stream.alloc.bind(stream) : b4a.allocUnsafe) this._safeDestroyBound = this._safeDestroy.bind(this) + this._remoteBacklog = 0 + this._buffered = 0 + this._paused = false + this._remote = [] + this._local = [] + this._free = [] + this._batch = null + this._batchState = null + + this._infos = new Map() + this._notify = new Map() + this.stream.on('data', this._ondata.bind(this)) + this.stream.on('error', noop) // we handle this in "close" this.stream.on('close', this._shutdown.bind(this)) } - static from (other, opts) { - return other.isProtomux === true ? other : new Protomux(other, opts) + static from (stream, opts) { + if (stream.isProtomux) return stream + return new this(stream, opts) } - sendKeepAlive () { - this.stream.write(this._alloc(0)) + * [Symbol.iterator] () { + for (const session of this._local) { + if (session !== null) yield session + } } cork () { - if (++this.corked === 1) this._batch = [] + if (++this.corked === 1) { + this._batch = [] + this._batchState = { buffer: null, start: 0, end: 1 } + } } uncork () { - if (--this.corked !== 0) return - - const batch = this._batch - this._batch = null - - const state = { start: 0, end: 1, buffer: null } - const lens = new Array(batch.length) - - for (let i = 0; i < batch.length; i++) { - const b = batch[i] - const end = state.end - - c.uint.preencode(state, b.type) - b.encoding.preencode(state, b.message) - c.uint.preencode(state, lens[i] = (state.end - end)) + if (--this.corked === 0) { + this._sendBatch(this._batch, this._batchState) + this._batch = null + this._batchState = null } + } + + pair ({ protocol, id = null }, notify) { + this._notify.set(toKey(protocol, id), notify) + } + + unpair ({ protocol, id = null }) { + this._notify.delete(toKey(protocol, id)) + } + + opened ({ protocol, id = null }) { + const key = toKey(protocol, id) + const info = this._infos.get(key) + return info ? info.opened > 0 : false + } + + open ({ protocol, id = null, context = null, unique = true, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) { + if (this.stream.destroyed) return null + + const info = this._get(protocol, id) + if (unique && info.opened > 0) return null + + if (info.incoming.length === 0) { + const session = new Session(this, info, protocol, id, context, messages, onopen, onclose, ondestroy) + session._open() + info.outgoing.push(session._localId) + return session + } + + this._remoteBacklog-- + + const remoteId = info.incoming.shift() + const r = this._remote[remoteId - 1] + if (r === null) return null + + const session = new Session(this, info, protocol, id, context, messages, onopen, onclose, ondestroy) + + session._remoteId = remoteId + session._open() + session._fullyOpenSoon() + + return session + } + + _pushBatch (localId, buffer) { + if (this._batch.length === 0 || this._batch[this._batch.length - 1].localId !== localId) { + this._batchState.end++ + c.uint.preencode(this._batchState, localId) + } + c.buffer.preencode(this._batchState, buffer) + this._batch.push({ localId, buffer }) + } + + _sendBatch (batch, state) { + if (batch.length === 0) return + + let prev = batch[0].localId state.buffer = this._alloc(state.end) state.buffer[state.start++] = 0 + state.buffer[state.start++] = 0 + + c.uint.encode(state, prev) for (let i = 0; i < batch.length; i++) { const b = batch[i] - - c.uint.encode(state, lens[i]) - c.uint.encode(state, b.type) - b.encoding.encode(state, b.message) + if (prev !== b.localId) { + state.buffer[state.start++] = 0 + c.uint.encode(state, (prev = b.localId)) + } + c.buffer.encode(state, b.buffer) } this.stream.write(state.buffer) } - hasProtocol (opts) { - return !!this.getProtocol(opts) + _get (protocol, id) { + const key = toKey(protocol, id) + + let info = this._infos.get(key) + if (info) return info + + info = { key, protocol, id, pairing: 0, opened: 0, incoming: [], outgoing: [] } + this._infos.set(key, info) + return info } - getProtocol ({ name, version }) { - return this._getProtocol(name, version, false) - } - - addProtocol (opts) { - const p = this._getProtocol(opts.name, (opts.version && opts.version.major) || 0, true) - - if (opts.cork) p.cork() - if (!p._attach(opts)) return p - - this._unopened-- - this.offset += p.length - this._push(1, addProtocol, { - name: p.name, - version: p.version, - offset: p.offset, - length: p.length - }) - - return p - } - - destroy (err) { - this.stream.destroy(err) - } - - _shutdown () { - while (this.protocols.length) { - const p = this.protocols.pop() - if (!p.remoteOpened) continue - if (p.remoteClosed) continue - p.remoteOpened = false - p.remoteClosed = true - this._catch(p.onremoteclose()) - } - } - - _safeDestroy (err) { - safetyCatch(err) - this.destroy(err) - } - - _catch (p) { - if (isPromise(p)) p.catch(this._safeDestroyBound) - } - - async _acceptMaybe (added) { - let accept = false - - try { - accept = await this.onacceptprotocol(added) - } catch (err) { - this._safeDestroy(err) - return - } - - if (!accept) this._rejectProtocol(added) - } - - _rejectProtocol (added) { - for (let i = 0; i < this.protocols.length; i++) { - const p = this.protocols[i] - if (p.opened || p.name !== added.name || !p.remoteOpened) continue - if (p.remoteVersion.major !== added.version.major) continue - - this._unopened-- - this.protocols.splice(i, 1) - this._push(3, c.uint, added.offset) - return + _gc (info) { + if (info.opened === 0 && info.outgoing.length === 0 && info.incoming.length === 0) { + this._infos.delete(info.key) } } _ondata (buffer) { - if (buffer.byteLength === 0) return // keep alive - - const end = buffer.byteLength - const state = { start: 0, end, buffer } - try { - const type = c.uint.decode(state) - if (type === 0) this._recvBatch(end, state) - else this._recv(type, state) + const state = { buffer, start: 0, end: buffer.byteLength } + this._decode(c.uint.decode(state), state) } catch (err) { this._safeDestroy(err) } } - _getProtocol (name, major, upsert) { - for (let i = 0; i < this.protocols.length; i++) { - const p = this.protocols[i] - const v = p.remoteVersion === null ? p.version : p.remoteVersion - if (p.name === name && (v !== null && v.major === major)) return p + _decode (remoteId, state) { + const type = c.uint.decode(state) + + if (remoteId === 0) { + this._oncontrolsession(type, state) + return } - if (!upsert) return null + const r = remoteId <= this._remote.length ? this._remote[remoteId - 1] : null - const p = new Protocol(this) - this.protocols.push(p) - this._unopened++ - return p + // if the channel is closed ignore - could just be a pipeline message... + if (r === null) return + + if (r.pending !== null) { + this._bufferMessage(r, type, state) + return + } + + r.session._recv(type, state) } - _removeProtocol (p) { - const i = this.protocols.indexOf(this) - if (i > -1) this.protocols.splice(i, 1) - if (!p.opened) this._unopened-- - } + _oncontrolsession (type, state) { + switch (type) { + case 0: + this._onbatch(state) + break - _recvAddProtocol (state) { - const add = addProtocol.decode(state) + case 1: + this._onopensession(state) + break - const p = this._getProtocol(add.name, add.version.major, true) - if (p.remoteOpened) throw new Error('Duplicate protocol received') + case 2: + this._onrejectsession(state) + break - p.name = add.name - p.remoteVersion = add.version - p.remoteOffset = add.offset - p.remoteEnd = add.offset + add.length - p.remoteOpened = true - p.remoteClosed = false - - if (p.opened) { - this._catch(p.onremoteopen()) - } else { - this._acceptMaybe(add) + case 3: + this._onclosesession(state) + break } } - _recvRemoveProtocol (state) { - const offset = c.uint.decode(state) - - for (let i = 0; i < this.protocols.length; i++) { - const p = this.protocols[i] - - if (p.remoteOffset === offset && p.remoteOpened) { - p.remoteVersion = null - p.remoteOpened = false - p.remoteClosed = true - this._catch(p.onremoteclose()) - p._gc() - return - } - } + _bufferMessage (r, type, { buffer, start, end }) { + const state = { buffer, start, end } // copy + r.pending.push({ type, state }) + this._buffered += byteSize(state) + this._pauseMaybe() } - _recvRejectedProtocol (state) { - const offset = c.uint.decode(state) - - for (let i = 0; i < this.protocols.length; i++) { - const p = this.protocols[i] - - if (p.offset === offset && !p.remoteClosed) { - p.remoteClosed = true - this._catch(p.onremoteclose()) - p._gc() - } - } + _pauseMaybe () { + if (this._paused === true || this._buffered <= MAX_BUFFERED) return + this._paused = true + this.stream.pause() } - _recvBatch (end, state) { - while (state.start < state.end) { + _resumeMaybe () { + if (this._paused === false || this._buffered > MAX_BUFFERED) return + this._paused = false + this.stream.resume() + } + + _onbatch (state) { + const end = state.end + let remoteId = c.uint.decode(state) + + while (state.end > state.start) { const len = c.uint.decode(state) - const type = c.uint.decode(state) - state.end = state.start + len - this._recv(type, state) + if (len === 0) { + remoteId = c.uint.decode(state) + continue + } + state.end = state.start + end + this._decode(remoteId, state) state.end = end } } - _recv (type, state) { - if (type < 4) { - if (type === 0) { - throw new Error('Invalid nested batch') - } - - if (type === 1) { - this._recvAddProtocol(state) - return - } - - if (type === 2) { - this._recvRemoveProtocol(state) - return - } - - if (type === 3) { - this._recvRejectedProtocol(state) - return - } + _onopensession (state) { + const protocol = c.string.decode(state) + const id = c.buffer.decode(state) + const remoteId = c.uint.decode(state) + // remote tried to open the control session - auto reject for now + // as we can use as an explicit control protocol declaration if we need to + if (remoteId === 0) { + this._rejectSession(0) return } - // TODO: Consider make this array sorted by remoteOffset and use a bisect here. - // For now we use very few protocols in practice, so it might be overkill. - for (let i = 0; i < this.protocols.length; i++) { - const p = this.protocols[i] + const rid = remoteId - 1 + const info = this._get(protocol, id) - if (p.remoteOffset <= type && type < p.remoteEnd) { - p._recv(type - p.remoteOffset, state) - break - } + // allow the remote to grow the ids by one + if (this._remote.length === rid) { + this._remote.push(null) } + + if (rid >= this._remote.length || this._remote[rid] !== null) { + throw new Error('Invalid open message') + } + + if (info.outgoing.length > 0) { + const localId = info.outgoing.shift() + const session = this._local[localId - 1] + + if (session === null) { // we already closed the channel - ignore + this._free.push(localId - 1) + return + } + + this._remote[rid] = { pending: null, session: null } + + session._remoteId = remoteId + session._fullyOpen() + return + } + + this._remote[rid] = { pending: [], session: null } + + if (++this._remoteBacklog > MAX_BACKLOG) { + throw new Error('Remote exceeded backlog') + } + + info.pairing++ + info.incoming.push(remoteId) + + this._requestSession(protocol, id, info).catch(this._safeDestroyBound) } - _push (type, enc, message) { - if (this.corked > 0) { - this._batch.push({ type, encoding: enc, message }) - return false + _onrejectsession (state) { + const protocol = c.string.decode(state) + const id = c.buffer.decode(state) + const info = this._get(protocol, id) + + if (info.outgoing.length === 0) { + throw new Error('Invalid reject message') } - const state = { start: 0, end: 0, buffer: null } + const localId = info.outgoing.shift() + const session = this._local[localId - 1] - c.uint.preencode(state, type) - enc.preencode(state, message) + this._free.push(localId - 1) + if (session !== null) session._close(true) + } + + _onclosesession (state) { + const remoteId = c.uint.decode(state) + + if (remoteId === 0) return // ignore + + const rid = remoteId - 1 + const r = rid < this._remote.length ? this._remote[rid] : null + + if (r === null) return + + if (r.session !== null) r.session._close(true) + } + + async _requestSession (protocol, id, info) { + const notify = this._notify.get(toKey(protocol, id)) || this._notify.get(toKey(protocol, null)) + + if (notify) await notify(id) + + if (--info.pairing > 0) return + + while (info.incoming.length > 0) { + this._rejectSession(info, info.incoming.pop()) + } + + this._gc(info) + } + + _rejectSession (info, remoteId) { + if (remoteId > 0) { + const r = this._remote[remoteId - 1] + + if (r.pending !== null) { + for (let i = 0; i < r.pending.length; i++) { + this._buffered -= byteSize(r.pending[i].state) + } + } + + this._remote[remoteId - 1] = null + this._resumeMaybe() + } + + const state = { buffer: null, start: 2, end: 2 } + + c.string.preencode(state, info.protocol) + c.buffer.preencode(state, info.id) state.buffer = this._alloc(state.end) - c.uint.encode(state, type) - enc.encode(state, message) + state.buffer[0] = 0 + state.buffer[1] = 2 + c.string.encode(state, info.protocol) + c.buffer.encode(state, info.id) - return this.stream.write(state.buffer) + this._write0(state.buffer) + } + + _write0 (buffer) { + if (this._batch !== null) { + this._pushBatch(0, buffer.subarray(1)) + return + } + + this.stream.write(buffer) + } + + _safeDestroy (err) { + safetyCatch(err) + this.stream.destroy(err) + } + + _shutdown () { + for (const s of this._local) { + if (s !== null) s._close(true) + } } } function noop () {} -function isPromise (p) { - return typeof p === 'object' && p !== null && !!p.catch +function toKey (protocol, id) { + return protocol + '##' + (id ? b4a.toString(id, 'hex') : '') +} + +function byteSize (state) { + return 512 + (state.end - state.start) +} + +function isPromise (p) { + return !!(p && typeof p.then === 'function') +} + +function encodingLength (enc, val) { + const state = { buffer: null, start: 0, end: 0 } + enc.preencode(state, val) + return state.end } diff --git a/messages.js b/messages.js deleted file mode 100644 index 4ea764c..0000000 --- a/messages.js +++ /dev/null @@ -1,41 +0,0 @@ -const c = require('compact-encoding') - -const version = { - preencode (state, v) { - c.uint.preencode(state, v.major) - c.uint.preencode(state, v.minor) - }, - encode (state, v) { - c.uint.encode(state, v.major) - c.uint.encode(state, v.minor) - }, - decode (state, v) { - return { - major: c.uint.decode(state), - minor: c.uint.decode(state) - } - } -} - -exports.addProtocol = { - preencode (state, p) { - c.string.preencode(state, p.name) - version.preencode(state, p.version) - c.uint.preencode(state, p.offset) - c.uint.preencode(state, p.length) - }, - encode (state, p) { - c.string.encode(state, p.name) - version.encode(state, p.version) - c.uint.encode(state, p.offset) - c.uint.encode(state, p.length) - }, - decode (state, p) { - return { - name: c.string.decode(state), - version: version.decode(state), - offset: c.uint.decode(state), - length: c.uint.decode(state) - } - } -} diff --git a/test.js b/test.js index 6ecc90f..a6d017c 100644 --- a/test.js +++ b/test.js @@ -9,10 +9,9 @@ test('basic', function (t) { replicate(a, b) - const p = a.addProtocol({ - name: 'foo', - messages: 1, - onremoteopen () { + const p = a.open({ + protocol: 'foo', + onopen () { t.pass('a remote opened') } }) @@ -24,9 +23,8 @@ test('basic', function (t) { } }) - const bp = b.addProtocol({ - name: 'foo', - messages: 1 + const bp = b.open({ + protocol: 'foo' }) t.plan(2) @@ -40,9 +38,8 @@ test('echo message', function (t) { replicate(a, b) - const ap = a.addProtocol({ - name: 'foo', - messages: 1 + const ap = a.open({ + protocol: 'foo' }) const aEcho = ap.addMessage({ @@ -52,15 +49,13 @@ test('echo message', function (t) { } }) - b.addProtocol({ - name: 'other', - messages: 2 + b.open({ + protocol: 'other' }) - const bp = b.addProtocol({ - name: 'foo', - messages: 1, - onremoteopen () { + const bp = b.open({ + protocol: 'foo', + onopen () { t.pass('b remote opened') } }) @@ -80,14 +75,12 @@ test('echo message', function (t) { test('multi message', function (t) { const a = new Protomux(new SecretStream(true)) - a.addProtocol({ - name: 'other', - messages: 2 + a.open({ + protocol: 'other' }) - const ap = a.addProtocol({ - name: 'multi', - messages: 3 + const ap = a.open({ + protocol: 'multi' }) const a1 = ap.addMessage({ encoding: c.int }) @@ -96,9 +89,8 @@ test('multi message', function (t) { const b = new Protomux(new SecretStream(false)) - const bp = b.addProtocol({ - name: 'multi', - messages: 2 + const bp = b.open({ + protocol: 'multi' }) const b1 = bp.addMessage({ encoding: c.int }) @@ -131,14 +123,12 @@ test('corks', function (t) { a.cork() - a.addProtocol({ - name: 'other', - messages: 2 + a.open({ + protocol: 'other' }) - const ap = a.addProtocol({ - name: 'multi', - messages: 2 + const ap = a.open({ + protocol: 'multi' }) const a1 = ap.addMessage({ encoding: c.int }) @@ -146,9 +136,8 @@ test('corks', function (t) { const b = new Protomux(new SecretStream(false)) - const bp = b.addProtocol({ - name: 'multi', - messages: 2 + const bp = b.open({ + protocol: 'multi' }) const b1 = bp.addMessage({ encoding: c.int })