move to sessions
This commit is contained in:
parent
717dbd0e50
commit
576e08180f
|
@ -1,4 +1,5 @@
|
||||||
node_modules
|
node_modules
|
||||||
sandbox.js
|
sandbox.js
|
||||||
|
sandbox.mjs
|
||||||
sandbox
|
sandbox
|
||||||
coverage
|
coverage
|
||||||
|
|
820
index.js
820
index.js
|
@ -1,410 +1,626 @@
|
||||||
const c = require('compact-encoding')
|
|
||||||
const b4a = require('b4a')
|
const b4a = require('b4a')
|
||||||
|
const c = require('compact-encoding')
|
||||||
|
const queueTick = require('queue-tick')
|
||||||
const safetyCatch = require('safety-catch')
|
const safetyCatch = require('safety-catch')
|
||||||
const { addProtocol } = require('./messages')
|
|
||||||
|
|
||||||
const EMPTY = []
|
const MAX_BUFFERED = 32768
|
||||||
|
const MAX_BACKLOG = 256
|
||||||
|
|
||||||
class Protocol {
|
class Session {
|
||||||
constructor (mux) {
|
constructor (mux, info, protocol, id, context, messages, onopen, onclose, ondestroy) {
|
||||||
this.mux = mux
|
this.protocol = protocol
|
||||||
|
this.id = id
|
||||||
this.name = null
|
|
||||||
this.version = null
|
|
||||||
this.messages = EMPTY
|
|
||||||
this.context = null
|
|
||||||
this.offset = 0
|
|
||||||
this.length = 0
|
|
||||||
this.opened = false
|
|
||||||
this.corked = false
|
|
||||||
|
|
||||||
this.remoteVersion = null
|
|
||||||
this.remoteOffset = 0
|
|
||||||
this.remoteEnd = 0
|
|
||||||
this.remoteOpened = false
|
|
||||||
this.remoteClosed = false
|
|
||||||
|
|
||||||
this.onremoteopen = noop
|
|
||||||
this.onremoteclose = noop
|
|
||||||
}
|
|
||||||
|
|
||||||
_attach ({ name, version = { major: 0, minor: 0 }, messages = 0, context = null, onremoteopen = noop, onremoteclose = noop }) {
|
|
||||||
const opened = this.opened
|
|
||||||
|
|
||||||
this.name = name
|
|
||||||
this.version = version
|
|
||||||
this.messages = new Array(messages)
|
|
||||||
this.context = context
|
this.context = context
|
||||||
this.offset = this.mux.offset
|
this.messages = []
|
||||||
this.length = messages
|
this.remoteMessages = this.messages
|
||||||
|
|
||||||
|
this.opened = false
|
||||||
|
this.closed = false
|
||||||
|
this.destroyed = false
|
||||||
|
|
||||||
|
this.onopen = onopen
|
||||||
|
this.onclose = onclose
|
||||||
|
this.ondestroy = ondestroy
|
||||||
|
|
||||||
|
this._mux = mux
|
||||||
|
this._info = info
|
||||||
|
this._localId = 0
|
||||||
|
this._remoteId = 0
|
||||||
|
this._active = 0
|
||||||
|
this._extensions = null
|
||||||
|
|
||||||
|
this._decBound = this._dec.bind(this)
|
||||||
|
this._decAndDestroyBound = this._decAndDestroy.bind(this)
|
||||||
|
|
||||||
|
for (const m of messages) this.addMessage(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
_open () {
|
||||||
|
const id = this._mux._free.length > 0
|
||||||
|
? this._mux._free.pop()
|
||||||
|
: this._mux._local.push(null) - 1
|
||||||
|
|
||||||
|
this._info.opened++
|
||||||
|
this._localId = id + 1
|
||||||
|
this._mux._local[id] = this
|
||||||
|
|
||||||
|
const state = { buffer: null, start: 2, end: 2 }
|
||||||
|
|
||||||
|
c.string.preencode(state, this.protocol)
|
||||||
|
c.buffer.preencode(state, this.id)
|
||||||
|
c.uint.preencode(state, this._localId)
|
||||||
|
|
||||||
|
state.buffer = this._mux._alloc(state.end)
|
||||||
|
|
||||||
|
state.buffer[0] = 0
|
||||||
|
state.buffer[1] = 1
|
||||||
|
c.string.encode(state, this.protocol)
|
||||||
|
c.buffer.encode(state, this.id)
|
||||||
|
c.uint.encode(state, this._localId)
|
||||||
|
|
||||||
|
this._mux._write0(state.buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
_dec () {
|
||||||
|
if (--this._active === 0 && this.closed === true) this._destroy()
|
||||||
|
}
|
||||||
|
|
||||||
|
_decAndDestroy (err) {
|
||||||
|
this._dec()
|
||||||
|
this._mux._safeDestroy(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_fullyOpenSoon () {
|
||||||
|
this._mux._remote[this._remoteId - 1].session = this
|
||||||
|
queueTick(this._fullyOpen.bind(this))
|
||||||
|
}
|
||||||
|
|
||||||
|
_fullyOpen () {
|
||||||
|
if (this.opened === true || this.closed === true) return
|
||||||
|
|
||||||
|
const remote = this._mux._remote[this._remoteId - 1]
|
||||||
|
|
||||||
this.opened = true
|
this.opened = true
|
||||||
this.corked = false
|
this._track(this.onopen())
|
||||||
|
|
||||||
this.onremoteopen = onremoteopen
|
remote.session = this
|
||||||
this.onremoteclose = onremoteclose
|
if (remote.pending !== null) this._drain(remote)
|
||||||
|
|
||||||
return !opened
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_nextMessage () {
|
_drain (remote) {
|
||||||
for (let i = this.messages.length - 1; i >= 0; i--) {
|
for (let i = 0; i < remote.pending.length; i++) {
|
||||||
if (this.messages[i] === undefined && (i === 0 || this.messages[i - 1] !== undefined)) {
|
const p = remote.pending[i]
|
||||||
return i
|
this._mux._buffered -= byteSize(p.state)
|
||||||
}
|
this._recv(p.type, p.state)
|
||||||
}
|
|
||||||
return -1
|
|
||||||
}
|
}
|
||||||
|
|
||||||
addMessage ({ type = this._nextMessage(), encoding = c.binary, onmessage = noop } = {}) {
|
remote.pending = null
|
||||||
if (type < 0 || type >= this.messages.length) {
|
this._mux._resumeMaybe()
|
||||||
throw new Error('Invalid type, must be <= ' + this.messages.length)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const t = this.offset + type
|
_track (p) {
|
||||||
const send = (message) => this.opened && this.mux._push(t, m.encoding, message)
|
if (isPromise(p) === true) {
|
||||||
const m = this.messages[type] = { encoding, onmessage, send }
|
this._active++
|
||||||
|
p.then(this._decBound, this._decAndDestroyBound)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_close (isRemote) {
|
||||||
|
if (this.closed === true) return
|
||||||
|
this.closed = true
|
||||||
|
|
||||||
|
this._info.opened--
|
||||||
|
|
||||||
|
if (this._remoteId > 0) {
|
||||||
|
this._mux._remote[this._remoteId - 1] = null
|
||||||
|
this._remoteId = 0
|
||||||
|
// If remote has acked, we can reuse the local id now
|
||||||
|
// otherwise, we need to wait for the "ack" to arrive
|
||||||
|
this._mux._free.push(this._localId - 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
this._mux._local[this._localId - 1] = null
|
||||||
|
this._localId = 0
|
||||||
|
|
||||||
|
this._mux._gc(this._info)
|
||||||
|
this._track(this.onclose(isRemote))
|
||||||
|
|
||||||
|
if (this._active === 0) this._destroy()
|
||||||
|
}
|
||||||
|
|
||||||
|
_destroy () {
|
||||||
|
if (this.destroyed === true) return
|
||||||
|
this.destroyed = true
|
||||||
|
this._track(this.ondestroy())
|
||||||
|
}
|
||||||
|
|
||||||
|
_recv (type, state) {
|
||||||
|
if (type < this.remoteMessages.length) {
|
||||||
|
this.remoteMessages[type].recv(state, this)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cork () {
|
||||||
|
this._mux.cork()
|
||||||
|
}
|
||||||
|
|
||||||
|
uncork () {
|
||||||
|
this._mux.uncork()
|
||||||
|
}
|
||||||
|
|
||||||
|
close () {
|
||||||
|
if (this.closed === true) return
|
||||||
|
|
||||||
|
const state = { buffer: null, start: 2, end: 2 }
|
||||||
|
|
||||||
|
c.uint.preencode(state, this._localId)
|
||||||
|
|
||||||
|
state.buffer = this._mux._alloc(state.end)
|
||||||
|
|
||||||
|
state.buffer[0] = 0
|
||||||
|
state.buffer[1] = 3
|
||||||
|
c.uint.encode(state, this._localId)
|
||||||
|
|
||||||
|
this._close(false)
|
||||||
|
this._mux._write0(state.buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
addMessage (opts) {
|
||||||
|
if (!opts) return this._skipMessage()
|
||||||
|
|
||||||
|
const type = this.messages.length
|
||||||
|
const encoding = opts.encoding || c.raw
|
||||||
|
const onmessage = opts.onmessage || noop
|
||||||
|
|
||||||
|
const s = this
|
||||||
|
const typeLen = encodingLength(c.uint, type)
|
||||||
|
|
||||||
|
const m = {
|
||||||
|
type,
|
||||||
|
encoding,
|
||||||
|
onmessage,
|
||||||
|
recv (state, session) {
|
||||||
|
session._track(m.onmessage(encoding.decode(state), session))
|
||||||
|
},
|
||||||
|
send (m, session = s) {
|
||||||
|
if (session.closed === true) return false
|
||||||
|
|
||||||
|
const mux = session._mux
|
||||||
|
const state = { buffer: null, start: 0, end: typeLen }
|
||||||
|
|
||||||
|
if (mux._batch !== null) {
|
||||||
|
encoding.preencode(state, m)
|
||||||
|
state.buffer = mux._alloc(state.end)
|
||||||
|
|
||||||
|
c.uint.encode(state, type)
|
||||||
|
encoding.encode(state, m)
|
||||||
|
|
||||||
|
mux._pushBatch(session._localId, state.buffer)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
c.uint.preencode(state, session._localId)
|
||||||
|
encoding.preencode(state, m)
|
||||||
|
|
||||||
|
state.buffer = mux._alloc(state.end)
|
||||||
|
|
||||||
|
c.uint.encode(state, session._localId)
|
||||||
|
c.uint.encode(state, type)
|
||||||
|
encoding.encode(state, m)
|
||||||
|
|
||||||
|
return mux.stream.write(state.buffer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.messages.push(m)
|
||||||
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
cork () {
|
_skipMessage () {
|
||||||
if (this.corked) return
|
const type = this.messages.length
|
||||||
this.corked = true
|
const m = {
|
||||||
this.mux.cork()
|
type,
|
||||||
|
encoding: c.raw,
|
||||||
|
onmessage: noop,
|
||||||
|
recv (state, session) {},
|
||||||
|
send (m, session) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
uncork () {
|
this.messages.push(m)
|
||||||
if (!this.corked) return
|
return m
|
||||||
this.corked = false
|
|
||||||
this.mux.uncork()
|
|
||||||
}
|
|
||||||
|
|
||||||
close () {
|
|
||||||
if (this.opened === false) return
|
|
||||||
this.opened = false
|
|
||||||
this.mux._unopened++
|
|
||||||
|
|
||||||
const offset = this.offset
|
|
||||||
|
|
||||||
this.version = null
|
|
||||||
this.messages = EMPTY
|
|
||||||
this.offset = 0
|
|
||||||
this.length = 0
|
|
||||||
this.onremoteopen = this.onremoteclose = noop
|
|
||||||
this.mux._push(2, c.uint, offset)
|
|
||||||
this._gc()
|
|
||||||
|
|
||||||
if (this.corked) this.uncork()
|
|
||||||
}
|
|
||||||
|
|
||||||
_gc () {
|
|
||||||
if (this.opened || this.remoteOpened) return
|
|
||||||
this.mux._removeProtocol(this)
|
|
||||||
}
|
|
||||||
|
|
||||||
_recv (type, state) {
|
|
||||||
if (type >= this.messages.length) return
|
|
||||||
|
|
||||||
const m = this.messages[type]
|
|
||||||
if (m !== undefined) this.mux._catch(m.onmessage(m.encoding.decode(state), this.context))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = class Protomux {
|
module.exports = class Protomux {
|
||||||
constructor (stream, { backlog = 128, alloc, onacceptprotocol } = {}) {
|
constructor (stream, { alloc } = {}) {
|
||||||
this.stream = stream
|
|
||||||
this.protocols = []
|
|
||||||
this.remoteProtocols = []
|
|
||||||
this.offset = 4 // 4 messages reserved
|
|
||||||
this.corked = 0
|
|
||||||
this.backlog = backlog
|
|
||||||
this.onacceptprotocol = onacceptprotocol || (() => this._unopened < this.backlog)
|
|
||||||
this.isProtomux = true
|
this.isProtomux = true
|
||||||
|
this.stream = stream
|
||||||
|
this.corked = 0
|
||||||
|
|
||||||
this._unopened = 0
|
|
||||||
this._batch = null
|
|
||||||
this._alloc = alloc || (typeof stream.alloc === 'function' ? stream.alloc.bind(stream) : b4a.allocUnsafe)
|
this._alloc = alloc || (typeof stream.alloc === 'function' ? stream.alloc.bind(stream) : b4a.allocUnsafe)
|
||||||
this._safeDestroyBound = this._safeDestroy.bind(this)
|
this._safeDestroyBound = this._safeDestroy.bind(this)
|
||||||
|
|
||||||
|
this._remoteBacklog = 0
|
||||||
|
this._buffered = 0
|
||||||
|
this._paused = false
|
||||||
|
this._remote = []
|
||||||
|
this._local = []
|
||||||
|
this._free = []
|
||||||
|
this._batch = null
|
||||||
|
this._batchState = null
|
||||||
|
|
||||||
|
this._infos = new Map()
|
||||||
|
this._notify = new Map()
|
||||||
|
|
||||||
this.stream.on('data', this._ondata.bind(this))
|
this.stream.on('data', this._ondata.bind(this))
|
||||||
|
this.stream.on('error', noop) // we handle this in "close"
|
||||||
this.stream.on('close', this._shutdown.bind(this))
|
this.stream.on('close', this._shutdown.bind(this))
|
||||||
}
|
}
|
||||||
|
|
||||||
static from (other, opts) {
|
static from (stream, opts) {
|
||||||
return other.isProtomux === true ? other : new Protomux(other, opts)
|
if (stream.isProtomux) return stream
|
||||||
|
return new this(stream, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
sendKeepAlive () {
|
* [Symbol.iterator] () {
|
||||||
this.stream.write(this._alloc(0))
|
for (const session of this._local) {
|
||||||
|
if (session !== null) yield session
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cork () {
|
cork () {
|
||||||
if (++this.corked === 1) this._batch = []
|
if (++this.corked === 1) {
|
||||||
|
this._batch = []
|
||||||
|
this._batchState = { buffer: null, start: 0, end: 1 }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uncork () {
|
uncork () {
|
||||||
if (--this.corked !== 0) return
|
if (--this.corked === 0) {
|
||||||
|
this._sendBatch(this._batch, this._batchState)
|
||||||
const batch = this._batch
|
|
||||||
this._batch = null
|
this._batch = null
|
||||||
|
this._batchState = null
|
||||||
const state = { start: 0, end: 1, buffer: null }
|
|
||||||
const lens = new Array(batch.length)
|
|
||||||
|
|
||||||
for (let i = 0; i < batch.length; i++) {
|
|
||||||
const b = batch[i]
|
|
||||||
const end = state.end
|
|
||||||
|
|
||||||
c.uint.preencode(state, b.type)
|
|
||||||
b.encoding.preencode(state, b.message)
|
|
||||||
c.uint.preencode(state, lens[i] = (state.end - end))
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pair ({ protocol, id = null }, notify) {
|
||||||
|
this._notify.set(toKey(protocol, id), notify)
|
||||||
|
}
|
||||||
|
|
||||||
|
unpair ({ protocol, id = null }) {
|
||||||
|
this._notify.delete(toKey(protocol, id))
|
||||||
|
}
|
||||||
|
|
||||||
|
opened ({ protocol, id = null }) {
|
||||||
|
const key = toKey(protocol, id)
|
||||||
|
const info = this._infos.get(key)
|
||||||
|
return info ? info.opened > 0 : false
|
||||||
|
}
|
||||||
|
|
||||||
|
open ({ protocol, id = null, context = null, unique = true, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) {
|
||||||
|
if (this.stream.destroyed) return null
|
||||||
|
|
||||||
|
const info = this._get(protocol, id)
|
||||||
|
if (unique && info.opened > 0) return null
|
||||||
|
|
||||||
|
if (info.incoming.length === 0) {
|
||||||
|
const session = new Session(this, info, protocol, id, context, messages, onopen, onclose, ondestroy)
|
||||||
|
session._open()
|
||||||
|
info.outgoing.push(session._localId)
|
||||||
|
return session
|
||||||
|
}
|
||||||
|
|
||||||
|
this._remoteBacklog--
|
||||||
|
|
||||||
|
const remoteId = info.incoming.shift()
|
||||||
|
const r = this._remote[remoteId - 1]
|
||||||
|
if (r === null) return null
|
||||||
|
|
||||||
|
const session = new Session(this, info, protocol, id, context, messages, onopen, onclose, ondestroy)
|
||||||
|
|
||||||
|
session._remoteId = remoteId
|
||||||
|
session._open()
|
||||||
|
session._fullyOpenSoon()
|
||||||
|
|
||||||
|
return session
|
||||||
|
}
|
||||||
|
|
||||||
|
_pushBatch (localId, buffer) {
|
||||||
|
if (this._batch.length === 0 || this._batch[this._batch.length - 1].localId !== localId) {
|
||||||
|
this._batchState.end++
|
||||||
|
c.uint.preencode(this._batchState, localId)
|
||||||
|
}
|
||||||
|
c.buffer.preencode(this._batchState, buffer)
|
||||||
|
this._batch.push({ localId, buffer })
|
||||||
|
}
|
||||||
|
|
||||||
|
_sendBatch (batch, state) {
|
||||||
|
if (batch.length === 0) return
|
||||||
|
|
||||||
|
let prev = batch[0].localId
|
||||||
|
|
||||||
state.buffer = this._alloc(state.end)
|
state.buffer = this._alloc(state.end)
|
||||||
state.buffer[state.start++] = 0
|
state.buffer[state.start++] = 0
|
||||||
|
state.buffer[state.start++] = 0
|
||||||
|
|
||||||
|
c.uint.encode(state, prev)
|
||||||
|
|
||||||
for (let i = 0; i < batch.length; i++) {
|
for (let i = 0; i < batch.length; i++) {
|
||||||
const b = batch[i]
|
const b = batch[i]
|
||||||
|
if (prev !== b.localId) {
|
||||||
c.uint.encode(state, lens[i])
|
state.buffer[state.start++] = 0
|
||||||
c.uint.encode(state, b.type)
|
c.uint.encode(state, (prev = b.localId))
|
||||||
b.encoding.encode(state, b.message)
|
}
|
||||||
|
c.buffer.encode(state, b.buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
this.stream.write(state.buffer)
|
this.stream.write(state.buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
hasProtocol (opts) {
|
_get (protocol, id) {
|
||||||
return !!this.getProtocol(opts)
|
const key = toKey(protocol, id)
|
||||||
|
|
||||||
|
let info = this._infos.get(key)
|
||||||
|
if (info) return info
|
||||||
|
|
||||||
|
info = { key, protocol, id, pairing: 0, opened: 0, incoming: [], outgoing: [] }
|
||||||
|
this._infos.set(key, info)
|
||||||
|
return info
|
||||||
}
|
}
|
||||||
|
|
||||||
getProtocol ({ name, version }) {
|
_gc (info) {
|
||||||
return this._getProtocol(name, version, false)
|
if (info.opened === 0 && info.outgoing.length === 0 && info.incoming.length === 0) {
|
||||||
}
|
this._infos.delete(info.key)
|
||||||
|
|
||||||
addProtocol (opts) {
|
|
||||||
const p = this._getProtocol(opts.name, (opts.version && opts.version.major) || 0, true)
|
|
||||||
|
|
||||||
if (opts.cork) p.cork()
|
|
||||||
if (!p._attach(opts)) return p
|
|
||||||
|
|
||||||
this._unopened--
|
|
||||||
this.offset += p.length
|
|
||||||
this._push(1, addProtocol, {
|
|
||||||
name: p.name,
|
|
||||||
version: p.version,
|
|
||||||
offset: p.offset,
|
|
||||||
length: p.length
|
|
||||||
})
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
destroy (err) {
|
|
||||||
this.stream.destroy(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_shutdown () {
|
|
||||||
while (this.protocols.length) {
|
|
||||||
const p = this.protocols.pop()
|
|
||||||
if (!p.remoteOpened) continue
|
|
||||||
if (p.remoteClosed) continue
|
|
||||||
p.remoteOpened = false
|
|
||||||
p.remoteClosed = true
|
|
||||||
this._catch(p.onremoteclose())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_safeDestroy (err) {
|
|
||||||
safetyCatch(err)
|
|
||||||
this.destroy(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_catch (p) {
|
|
||||||
if (isPromise(p)) p.catch(this._safeDestroyBound)
|
|
||||||
}
|
|
||||||
|
|
||||||
async _acceptMaybe (added) {
|
|
||||||
let accept = false
|
|
||||||
|
|
||||||
try {
|
|
||||||
accept = await this.onacceptprotocol(added)
|
|
||||||
} catch (err) {
|
|
||||||
this._safeDestroy(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!accept) this._rejectProtocol(added)
|
|
||||||
}
|
|
||||||
|
|
||||||
_rejectProtocol (added) {
|
|
||||||
for (let i = 0; i < this.protocols.length; i++) {
|
|
||||||
const p = this.protocols[i]
|
|
||||||
if (p.opened || p.name !== added.name || !p.remoteOpened) continue
|
|
||||||
if (p.remoteVersion.major !== added.version.major) continue
|
|
||||||
|
|
||||||
this._unopened--
|
|
||||||
this.protocols.splice(i, 1)
|
|
||||||
this._push(3, c.uint, added.offset)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_ondata (buffer) {
|
_ondata (buffer) {
|
||||||
if (buffer.byteLength === 0) return // keep alive
|
|
||||||
|
|
||||||
const end = buffer.byteLength
|
|
||||||
const state = { start: 0, end, buffer }
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const type = c.uint.decode(state)
|
const state = { buffer, start: 0, end: buffer.byteLength }
|
||||||
if (type === 0) this._recvBatch(end, state)
|
this._decode(c.uint.decode(state), state)
|
||||||
else this._recv(type, state)
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this._safeDestroy(err)
|
this._safeDestroy(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_getProtocol (name, major, upsert) {
|
_decode (remoteId, state) {
|
||||||
for (let i = 0; i < this.protocols.length; i++) {
|
const type = c.uint.decode(state)
|
||||||
const p = this.protocols[i]
|
|
||||||
const v = p.remoteVersion === null ? p.version : p.remoteVersion
|
|
||||||
if (p.name === name && (v !== null && v.major === major)) return p
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!upsert) return null
|
if (remoteId === 0) {
|
||||||
|
this._oncontrolsession(type, state)
|
||||||
const p = new Protocol(this)
|
|
||||||
this.protocols.push(p)
|
|
||||||
this._unopened++
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
_removeProtocol (p) {
|
|
||||||
const i = this.protocols.indexOf(this)
|
|
||||||
if (i > -1) this.protocols.splice(i, 1)
|
|
||||||
if (!p.opened) this._unopened--
|
|
||||||
}
|
|
||||||
|
|
||||||
_recvAddProtocol (state) {
|
|
||||||
const add = addProtocol.decode(state)
|
|
||||||
|
|
||||||
const p = this._getProtocol(add.name, add.version.major, true)
|
|
||||||
if (p.remoteOpened) throw new Error('Duplicate protocol received')
|
|
||||||
|
|
||||||
p.name = add.name
|
|
||||||
p.remoteVersion = add.version
|
|
||||||
p.remoteOffset = add.offset
|
|
||||||
p.remoteEnd = add.offset + add.length
|
|
||||||
p.remoteOpened = true
|
|
||||||
p.remoteClosed = false
|
|
||||||
|
|
||||||
if (p.opened) {
|
|
||||||
this._catch(p.onremoteopen())
|
|
||||||
} else {
|
|
||||||
this._acceptMaybe(add)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_recvRemoveProtocol (state) {
|
|
||||||
const offset = c.uint.decode(state)
|
|
||||||
|
|
||||||
for (let i = 0; i < this.protocols.length; i++) {
|
|
||||||
const p = this.protocols[i]
|
|
||||||
|
|
||||||
if (p.remoteOffset === offset && p.remoteOpened) {
|
|
||||||
p.remoteVersion = null
|
|
||||||
p.remoteOpened = false
|
|
||||||
p.remoteClosed = true
|
|
||||||
this._catch(p.onremoteclose())
|
|
||||||
p._gc()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const r = remoteId <= this._remote.length ? this._remote[remoteId - 1] : null
|
||||||
|
|
||||||
|
// if the channel is closed ignore - could just be a pipeline message...
|
||||||
|
if (r === null) return
|
||||||
|
|
||||||
|
if (r.pending !== null) {
|
||||||
|
this._bufferMessage(r, type, state)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.session._recv(type, state)
|
||||||
|
}
|
||||||
|
|
||||||
|
_oncontrolsession (type, state) {
|
||||||
|
switch (type) {
|
||||||
|
case 0:
|
||||||
|
this._onbatch(state)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 1:
|
||||||
|
this._onopensession(state)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 2:
|
||||||
|
this._onrejectsession(state)
|
||||||
|
break
|
||||||
|
|
||||||
|
case 3:
|
||||||
|
this._onclosesession(state)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_recvRejectedProtocol (state) {
|
_bufferMessage (r, type, { buffer, start, end }) {
|
||||||
const offset = c.uint.decode(state)
|
const state = { buffer, start, end } // copy
|
||||||
|
r.pending.push({ type, state })
|
||||||
for (let i = 0; i < this.protocols.length; i++) {
|
this._buffered += byteSize(state)
|
||||||
const p = this.protocols[i]
|
this._pauseMaybe()
|
||||||
|
|
||||||
if (p.offset === offset && !p.remoteClosed) {
|
|
||||||
p.remoteClosed = true
|
|
||||||
this._catch(p.onremoteclose())
|
|
||||||
p._gc()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_recvBatch (end, state) {
|
_pauseMaybe () {
|
||||||
while (state.start < state.end) {
|
if (this._paused === true || this._buffered <= MAX_BUFFERED) return
|
||||||
|
this._paused = true
|
||||||
|
this.stream.pause()
|
||||||
|
}
|
||||||
|
|
||||||
|
_resumeMaybe () {
|
||||||
|
if (this._paused === false || this._buffered > MAX_BUFFERED) return
|
||||||
|
this._paused = false
|
||||||
|
this.stream.resume()
|
||||||
|
}
|
||||||
|
|
||||||
|
_onbatch (state) {
|
||||||
|
const end = state.end
|
||||||
|
let remoteId = c.uint.decode(state)
|
||||||
|
|
||||||
|
while (state.end > state.start) {
|
||||||
const len = c.uint.decode(state)
|
const len = c.uint.decode(state)
|
||||||
const type = c.uint.decode(state)
|
if (len === 0) {
|
||||||
state.end = state.start + len
|
remoteId = c.uint.decode(state)
|
||||||
this._recv(type, state)
|
continue
|
||||||
|
}
|
||||||
|
state.end = state.start + end
|
||||||
|
this._decode(remoteId, state)
|
||||||
state.end = end
|
state.end = end
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_recv (type, state) {
|
_onopensession (state) {
|
||||||
if (type < 4) {
|
const protocol = c.string.decode(state)
|
||||||
if (type === 0) {
|
const id = c.buffer.decode(state)
|
||||||
throw new Error('Invalid nested batch')
|
const remoteId = c.uint.decode(state)
|
||||||
}
|
|
||||||
|
|
||||||
if (type === 1) {
|
// remote tried to open the control session - auto reject for now
|
||||||
this._recvAddProtocol(state)
|
// as we can use as an explicit control protocol declaration if we need to
|
||||||
|
if (remoteId === 0) {
|
||||||
|
this._rejectSession(0)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type === 2) {
|
const rid = remoteId - 1
|
||||||
this._recvRemoveProtocol(state)
|
const info = this._get(protocol, id)
|
||||||
|
|
||||||
|
// allow the remote to grow the ids by one
|
||||||
|
if (this._remote.length === rid) {
|
||||||
|
this._remote.push(null)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rid >= this._remote.length || this._remote[rid] !== null) {
|
||||||
|
throw new Error('Invalid open message')
|
||||||
|
}
|
||||||
|
|
||||||
|
if (info.outgoing.length > 0) {
|
||||||
|
const localId = info.outgoing.shift()
|
||||||
|
const session = this._local[localId - 1]
|
||||||
|
|
||||||
|
if (session === null) { // we already closed the channel - ignore
|
||||||
|
this._free.push(localId - 1)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type === 3) {
|
this._remote[rid] = { pending: null, session: null }
|
||||||
this._recvRejectedProtocol(state)
|
|
||||||
|
session._remoteId = remoteId
|
||||||
|
session._fullyOpen()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
this._remote[rid] = { pending: [], session: null }
|
||||||
|
|
||||||
|
if (++this._remoteBacklog > MAX_BACKLOG) {
|
||||||
|
throw new Error('Remote exceeded backlog')
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Consider make this array sorted by remoteOffset and use a bisect here.
|
info.pairing++
|
||||||
// For now we use very few protocols in practice, so it might be overkill.
|
info.incoming.push(remoteId)
|
||||||
for (let i = 0; i < this.protocols.length; i++) {
|
|
||||||
const p = this.protocols[i]
|
|
||||||
|
|
||||||
if (p.remoteOffset <= type && type < p.remoteEnd) {
|
this._requestSession(protocol, id, info).catch(this._safeDestroyBound)
|
||||||
p._recv(type - p.remoteOffset, state)
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_onrejectsession (state) {
|
||||||
|
const protocol = c.string.decode(state)
|
||||||
|
const id = c.buffer.decode(state)
|
||||||
|
const info = this._get(protocol, id)
|
||||||
|
|
||||||
|
if (info.outgoing.length === 0) {
|
||||||
|
throw new Error('Invalid reject message')
|
||||||
|
}
|
||||||
|
|
||||||
|
const localId = info.outgoing.shift()
|
||||||
|
const session = this._local[localId - 1]
|
||||||
|
|
||||||
|
this._free.push(localId - 1)
|
||||||
|
if (session !== null) session._close(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
_onclosesession (state) {
|
||||||
|
const remoteId = c.uint.decode(state)
|
||||||
|
|
||||||
|
if (remoteId === 0) return // ignore
|
||||||
|
|
||||||
|
const rid = remoteId - 1
|
||||||
|
const r = rid < this._remote.length ? this._remote[rid] : null
|
||||||
|
|
||||||
|
if (r === null) return
|
||||||
|
|
||||||
|
if (r.session !== null) r.session._close(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
async _requestSession (protocol, id, info) {
|
||||||
|
const notify = this._notify.get(toKey(protocol, id)) || this._notify.get(toKey(protocol, null))
|
||||||
|
|
||||||
|
if (notify) await notify(id)
|
||||||
|
|
||||||
|
if (--info.pairing > 0) return
|
||||||
|
|
||||||
|
while (info.incoming.length > 0) {
|
||||||
|
this._rejectSession(info, info.incoming.pop())
|
||||||
|
}
|
||||||
|
|
||||||
|
this._gc(info)
|
||||||
|
}
|
||||||
|
|
||||||
|
_rejectSession (info, remoteId) {
|
||||||
|
if (remoteId > 0) {
|
||||||
|
const r = this._remote[remoteId - 1]
|
||||||
|
|
||||||
|
if (r.pending !== null) {
|
||||||
|
for (let i = 0; i < r.pending.length; i++) {
|
||||||
|
this._buffered -= byteSize(r.pending[i].state)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_push (type, enc, message) {
|
this._remote[remoteId - 1] = null
|
||||||
if (this.corked > 0) {
|
this._resumeMaybe()
|
||||||
this._batch.push({ type, encoding: enc, message })
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const state = { start: 0, end: 0, buffer: null }
|
const state = { buffer: null, start: 2, end: 2 }
|
||||||
|
|
||||||
c.uint.preencode(state, type)
|
c.string.preencode(state, info.protocol)
|
||||||
enc.preencode(state, message)
|
c.buffer.preencode(state, info.id)
|
||||||
|
|
||||||
state.buffer = this._alloc(state.end)
|
state.buffer = this._alloc(state.end)
|
||||||
|
|
||||||
c.uint.encode(state, type)
|
state.buffer[0] = 0
|
||||||
enc.encode(state, message)
|
state.buffer[1] = 2
|
||||||
|
c.string.encode(state, info.protocol)
|
||||||
|
c.buffer.encode(state, info.id)
|
||||||
|
|
||||||
return this.stream.write(state.buffer)
|
this._write0(state.buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
_write0 (buffer) {
|
||||||
|
if (this._batch !== null) {
|
||||||
|
this._pushBatch(0, buffer.subarray(1))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
this.stream.write(buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
_safeDestroy (err) {
|
||||||
|
safetyCatch(err)
|
||||||
|
this.stream.destroy(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_shutdown () {
|
||||||
|
for (const s of this._local) {
|
||||||
|
if (s !== null) s._close(true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function noop () {}
|
function noop () {}
|
||||||
|
|
||||||
function isPromise (p) {
|
function toKey (protocol, id) {
|
||||||
return typeof p === 'object' && p !== null && !!p.catch
|
return protocol + '##' + (id ? b4a.toString(id, 'hex') : '')
|
||||||
|
}
|
||||||
|
|
||||||
|
function byteSize (state) {
|
||||||
|
return 512 + (state.end - state.start)
|
||||||
|
}
|
||||||
|
|
||||||
|
function isPromise (p) {
|
||||||
|
return !!(p && typeof p.then === 'function')
|
||||||
|
}
|
||||||
|
|
||||||
|
function encodingLength (enc, val) {
|
||||||
|
const state = { buffer: null, start: 0, end: 0 }
|
||||||
|
enc.preencode(state, val)
|
||||||
|
return state.end
|
||||||
}
|
}
|
||||||
|
|
41
messages.js
41
messages.js
|
@ -1,41 +0,0 @@
|
||||||
const c = require('compact-encoding')
|
|
||||||
|
|
||||||
const version = {
|
|
||||||
preencode (state, v) {
|
|
||||||
c.uint.preencode(state, v.major)
|
|
||||||
c.uint.preencode(state, v.minor)
|
|
||||||
},
|
|
||||||
encode (state, v) {
|
|
||||||
c.uint.encode(state, v.major)
|
|
||||||
c.uint.encode(state, v.minor)
|
|
||||||
},
|
|
||||||
decode (state, v) {
|
|
||||||
return {
|
|
||||||
major: c.uint.decode(state),
|
|
||||||
minor: c.uint.decode(state)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
exports.addProtocol = {
|
|
||||||
preencode (state, p) {
|
|
||||||
c.string.preencode(state, p.name)
|
|
||||||
version.preencode(state, p.version)
|
|
||||||
c.uint.preencode(state, p.offset)
|
|
||||||
c.uint.preencode(state, p.length)
|
|
||||||
},
|
|
||||||
encode (state, p) {
|
|
||||||
c.string.encode(state, p.name)
|
|
||||||
version.encode(state, p.version)
|
|
||||||
c.uint.encode(state, p.offset)
|
|
||||||
c.uint.encode(state, p.length)
|
|
||||||
},
|
|
||||||
decode (state, p) {
|
|
||||||
return {
|
|
||||||
name: c.string.decode(state),
|
|
||||||
version: version.decode(state),
|
|
||||||
offset: c.uint.decode(state),
|
|
||||||
length: c.uint.decode(state)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
59
test.js
59
test.js
|
@ -9,10 +9,9 @@ test('basic', function (t) {
|
||||||
|
|
||||||
replicate(a, b)
|
replicate(a, b)
|
||||||
|
|
||||||
const p = a.addProtocol({
|
const p = a.open({
|
||||||
name: 'foo',
|
protocol: 'foo',
|
||||||
messages: 1,
|
onopen () {
|
||||||
onremoteopen () {
|
|
||||||
t.pass('a remote opened')
|
t.pass('a remote opened')
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -24,9 +23,8 @@ test('basic', function (t) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
const bp = b.addProtocol({
|
const bp = b.open({
|
||||||
name: 'foo',
|
protocol: 'foo'
|
||||||
messages: 1
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.plan(2)
|
t.plan(2)
|
||||||
|
@ -40,9 +38,8 @@ test('echo message', function (t) {
|
||||||
|
|
||||||
replicate(a, b)
|
replicate(a, b)
|
||||||
|
|
||||||
const ap = a.addProtocol({
|
const ap = a.open({
|
||||||
name: 'foo',
|
protocol: 'foo'
|
||||||
messages: 1
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const aEcho = ap.addMessage({
|
const aEcho = ap.addMessage({
|
||||||
|
@ -52,15 +49,13 @@ test('echo message', function (t) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
b.addProtocol({
|
b.open({
|
||||||
name: 'other',
|
protocol: 'other'
|
||||||
messages: 2
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const bp = b.addProtocol({
|
const bp = b.open({
|
||||||
name: 'foo',
|
protocol: 'foo',
|
||||||
messages: 1,
|
onopen () {
|
||||||
onremoteopen () {
|
|
||||||
t.pass('b remote opened')
|
t.pass('b remote opened')
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -80,14 +75,12 @@ test('echo message', function (t) {
|
||||||
test('multi message', function (t) {
|
test('multi message', function (t) {
|
||||||
const a = new Protomux(new SecretStream(true))
|
const a = new Protomux(new SecretStream(true))
|
||||||
|
|
||||||
a.addProtocol({
|
a.open({
|
||||||
name: 'other',
|
protocol: 'other'
|
||||||
messages: 2
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const ap = a.addProtocol({
|
const ap = a.open({
|
||||||
name: 'multi',
|
protocol: 'multi'
|
||||||
messages: 3
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const a1 = ap.addMessage({ encoding: c.int })
|
const a1 = ap.addMessage({ encoding: c.int })
|
||||||
|
@ -96,9 +89,8 @@ test('multi message', function (t) {
|
||||||
|
|
||||||
const b = new Protomux(new SecretStream(false))
|
const b = new Protomux(new SecretStream(false))
|
||||||
|
|
||||||
const bp = b.addProtocol({
|
const bp = b.open({
|
||||||
name: 'multi',
|
protocol: 'multi'
|
||||||
messages: 2
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const b1 = bp.addMessage({ encoding: c.int })
|
const b1 = bp.addMessage({ encoding: c.int })
|
||||||
|
@ -131,14 +123,12 @@ test('corks', function (t) {
|
||||||
|
|
||||||
a.cork()
|
a.cork()
|
||||||
|
|
||||||
a.addProtocol({
|
a.open({
|
||||||
name: 'other',
|
protocol: 'other'
|
||||||
messages: 2
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const ap = a.addProtocol({
|
const ap = a.open({
|
||||||
name: 'multi',
|
protocol: 'multi'
|
||||||
messages: 2
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const a1 = ap.addMessage({ encoding: c.int })
|
const a1 = ap.addMessage({ encoding: c.int })
|
||||||
|
@ -146,9 +136,8 @@ test('corks', function (t) {
|
||||||
|
|
||||||
const b = new Protomux(new SecretStream(false))
|
const b = new Protomux(new SecretStream(false))
|
||||||
|
|
||||||
const bp = b.addProtocol({
|
const bp = b.open({
|
||||||
name: 'multi',
|
protocol: 'multi'
|
||||||
messages: 2
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const b1 = bp.addMessage({ encoding: c.int })
|
const b1 = bp.addMessage({ encoding: c.int })
|
||||||
|
|
Reference in New Issue