2021-11-10 14:15:09 +00:00
|
|
|
const c = require('compact-encoding')
|
2021-12-30 20:13:47 +00:00
|
|
|
const b4a = require('b4a')
|
|
|
|
const safetyCatch = require('safety-catch')
|
|
|
|
const { addProtocol } = require('./messages')
|
|
|
|
|
|
|
|
const EMPTY = []
|
2021-11-10 14:15:09 +00:00
|
|
|
|
|
|
|
class Protocol {
|
2021-12-30 20:13:47 +00:00
|
|
|
constructor (mux) {
|
|
|
|
this.mux = mux
|
|
|
|
this.name = null
|
|
|
|
this.version = null
|
|
|
|
this.messages = EMPTY
|
|
|
|
this.offset = 0
|
|
|
|
this.length = 0
|
|
|
|
this.opened = false
|
|
|
|
|
|
|
|
this.remoteVersion = null
|
|
|
|
this.remoteOffset = 0
|
|
|
|
this.remoteEnd = 0
|
2021-11-10 14:15:09 +00:00
|
|
|
this.remoteOpened = false
|
2021-12-30 20:13:47 +00:00
|
|
|
this.remoteClosed = false
|
|
|
|
|
2021-11-10 14:15:09 +00:00
|
|
|
this.onmessage = noop
|
2021-12-30 20:13:47 +00:00
|
|
|
this.onremoteopen = noop
|
|
|
|
this.onremoteclose = noop
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_attach ({ name, version = { major: 0, minor: 0 }, messages, onmessage = noop, onremoteopen = noop, onremoteclose = noop }) {
|
|
|
|
const opened = this.opened
|
|
|
|
|
|
|
|
this.name = name
|
|
|
|
this.version = version
|
|
|
|
this.messages = messages
|
|
|
|
this.offset = this.mux.offset
|
|
|
|
this.length = messages.length
|
|
|
|
this.opened = true
|
|
|
|
this.corked = false
|
|
|
|
|
|
|
|
this.onmessage = onmessage
|
|
|
|
this.onremoteopen = onremoteopen
|
|
|
|
this.onremoteclose = onremoteclose
|
|
|
|
|
|
|
|
return !opened
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
cork () {
|
2021-12-30 20:13:47 +00:00
|
|
|
if (this.corked) return
|
|
|
|
this.corked = true
|
|
|
|
this.mux.cork()
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
uncork () {
|
2021-12-30 20:13:47 +00:00
|
|
|
if (!this.corked) return
|
|
|
|
this.corked = false
|
|
|
|
this.mux.uncork()
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
send (type, message) {
|
2021-12-30 20:13:47 +00:00
|
|
|
if (!this.opened) return false
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
const t = this.offset + type
|
|
|
|
const m = this.messages[type]
|
2021-12-27 15:09:34 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
return this.mux._push(t, m, message)
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
close () {
|
|
|
|
if (this.opened === false) return
|
|
|
|
this.opened = false
|
|
|
|
this.mux._unopened++
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
const offset = this.offset
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
this.version = null
|
|
|
|
this.messages = EMPTY
|
|
|
|
this.offset = 0
|
|
|
|
this.length = 0
|
|
|
|
this.onmessage = this.onremoteopen = this.onremoteclose = noop
|
|
|
|
this.mux._push(2, c.uint, offset)
|
|
|
|
this._gc()
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
if (this.corked) this.uncork()
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_gc () {
|
|
|
|
if (this.opened || this.remoteOpened) return
|
|
|
|
this.mux._removeProtocol(this)
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_recv (type, state) {
|
|
|
|
if (type >= this.messages.length) return
|
|
|
|
|
|
|
|
const m = this.messages[type]
|
|
|
|
const message = m.decode(state)
|
|
|
|
|
|
|
|
this.mux._catch(this.onmessage(type, message))
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = class Protomux {
|
2021-12-30 20:13:47 +00:00
|
|
|
constructor (stream, { backlog = 128, alloc, onacceptprotocol } = {}) {
|
2021-11-10 14:15:09 +00:00
|
|
|
this.stream = stream
|
|
|
|
this.protocols = []
|
|
|
|
this.remoteProtocols = []
|
2021-12-30 20:13:47 +00:00
|
|
|
this.offset = 4 // 4 messages reserved
|
2021-11-10 14:15:09 +00:00
|
|
|
this.corked = 0
|
2021-12-30 20:13:47 +00:00
|
|
|
this.backlog = backlog
|
|
|
|
this.onacceptprotocol = onacceptprotocol || (() => this._unopened < this.backlog)
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
this._unopened = 0
|
2021-11-10 14:15:09 +00:00
|
|
|
this._batch = null
|
2021-12-30 20:13:47 +00:00
|
|
|
this._alloc = alloc || (typeof stream.alloc === 'function' ? stream.alloc.bind(stream) : b4a.allocUnsafe)
|
|
|
|
this._safeDestroyBound = this._safeDestroy.bind(this)
|
2021-11-10 14:15:09 +00:00
|
|
|
|
|
|
|
this.stream.on('data', this._ondata.bind(this))
|
2021-12-30 20:13:47 +00:00
|
|
|
this.stream.on('close', this._shutdown.bind(this))
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
sendKeepAlive () {
|
|
|
|
this.stream.write(this._alloc(0))
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
cork () {
|
|
|
|
if (++this.corked === 1) this._batch = []
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
uncork () {
|
|
|
|
if (--this.corked !== 0) return
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
const batch = this._batch
|
|
|
|
this._batch = null
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
const state = { start: 0, end: 1, buffer: null }
|
|
|
|
const lens = new Array(batch.length)
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
for (let i = 0; i < batch.length; i++) {
|
|
|
|
const b = batch[i]
|
|
|
|
const end = state.end
|
2021-12-27 15:09:34 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
c.uint.preencode(state, b.type)
|
|
|
|
b.encoding.preencode(state, b.message)
|
|
|
|
c.uint.preencode(state, lens[i] = (state.end - end))
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
state.buffer = this._alloc(state.end)
|
|
|
|
state.buffer[state.start++] = 0
|
|
|
|
|
|
|
|
for (let i = 0; i < batch.length; i++) {
|
|
|
|
const b = batch[i]
|
|
|
|
|
|
|
|
c.uint.encode(state, lens[i])
|
|
|
|
c.uint.encode(state, b.type)
|
|
|
|
b.encoding.encode(state, b.message)
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
2021-12-30 20:13:47 +00:00
|
|
|
|
|
|
|
this.stream.write(state.buffer)
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
hasProtocol (opts) {
|
|
|
|
return !!this.getProtocol(opts)
|
|
|
|
}
|
2021-12-27 15:09:34 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
getProtocol ({ name, version }) {
|
|
|
|
return this._getProtocol(name, version, false)
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
addProtocol (opts) {
|
|
|
|
const p = this._getProtocol(opts.name, (opts.version && opts.version.major) || 0, true)
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
if (opts.cork) p.cork()
|
|
|
|
if (!p._attach(opts)) return p
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
this._unopened--
|
|
|
|
this.offset += p.length
|
|
|
|
this._push(1, addProtocol, {
|
|
|
|
name: p.name,
|
|
|
|
version: p.version,
|
|
|
|
offset: p.offset,
|
|
|
|
length: p.length
|
|
|
|
})
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
return p
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
destroy (err) {
|
|
|
|
this.stream.destroy(err)
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_shutdown () {
|
|
|
|
while (this.protocols.length) {
|
|
|
|
const p = this.protocols.pop()
|
|
|
|
if (!p.remoteOpened) continue
|
|
|
|
if (p.remoteClosed) continue
|
|
|
|
p.remoteOpened = false
|
|
|
|
p.remoteClosed = true
|
|
|
|
this._catch(p.onremoteclose())
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
2021-12-30 20:13:47 +00:00
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_safeDestroy (err) {
|
|
|
|
safetyCatch(err)
|
|
|
|
this.destroy(err)
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_catch (p) {
|
|
|
|
if (isPromise(p)) p.catch(this._safeDestroyBound)
|
|
|
|
}
|
2021-12-27 15:09:34 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
async _acceptMaybe (added) {
|
|
|
|
let accept = false
|
2021-11-10 14:15:09 +00:00
|
|
|
|
|
|
|
try {
|
2021-12-30 20:13:47 +00:00
|
|
|
accept = await this.onacceptprotocol(added)
|
2021-11-10 14:15:09 +00:00
|
|
|
} catch (err) {
|
2021-12-30 20:13:47 +00:00
|
|
|
this._safeDestroy(err)
|
|
|
|
return
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
2021-12-30 20:13:47 +00:00
|
|
|
|
|
|
|
if (!accept) this._rejectProtocol(added)
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_rejectProtocol (added) {
|
|
|
|
for (let i = 0; i < this.protocols.length; i++) {
|
|
|
|
const p = this.protocols[i]
|
|
|
|
if (p.opened || p.name !== added.name || !p.remoteOpened) continue
|
|
|
|
if (p.remoteVersion.major !== added.version.major) continue
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
this._unopened--
|
|
|
|
this.protocols.splice(i, 1)
|
|
|
|
this._push(3, c.uint, added.offset)
|
2021-11-10 14:15:09 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_ondata (buffer) {
|
|
|
|
if (buffer.byteLength === 0) return // keep alive
|
|
|
|
|
|
|
|
const end = buffer.byteLength
|
|
|
|
const state = { start: 0, end, buffer }
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
try {
|
|
|
|
const type = c.uint.decode(state)
|
|
|
|
if (type === 0) this._recvBatch(end, state)
|
|
|
|
else this._recv(type, state)
|
|
|
|
} catch (err) {
|
|
|
|
this._safeDestroy(err)
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_getProtocol (name, major, upsert) {
|
|
|
|
for (let i = 0; i < this.protocols.length; i++) {
|
|
|
|
const p = this.protocols[i]
|
|
|
|
const v = p.remoteVersion === null ? p.version : p.remoteVersion
|
|
|
|
if (p.name === name && (v !== null && v.major === major)) return p
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
if (!upsert) return null
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
const p = new Protocol(this)
|
|
|
|
this.protocols.push(p)
|
|
|
|
this._unopened++
|
|
|
|
return p
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_removeProtocol (p) {
|
|
|
|
const i = this.protocols.indexOf(this)
|
|
|
|
if (i > -1) this.protocols.splice(i, 1)
|
|
|
|
if (!p.opened) this._unopened--
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_recvAddProtocol (state) {
|
|
|
|
const add = addProtocol.decode(state)
|
|
|
|
|
|
|
|
const p = this._getProtocol(add.name, add.version.major, true)
|
|
|
|
if (p.remoteOpened) throw new Error('Duplicate protocol received')
|
|
|
|
|
|
|
|
p.name = add.name
|
|
|
|
p.remoteVersion = add.version
|
|
|
|
p.remoteOffset = add.offset
|
|
|
|
p.remoteEnd = add.offset + add.length
|
|
|
|
p.remoteOpened = true
|
|
|
|
p.remoteClosed = false
|
|
|
|
|
|
|
|
if (p.opened) {
|
|
|
|
this._catch(p.onremoteopen())
|
|
|
|
} else {
|
|
|
|
this._acceptMaybe(add)
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_recvRemoveProtocol (state) {
|
|
|
|
const offset = c.uint.decode(state)
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
for (let i = 0; i < this.protocols.length; i++) {
|
|
|
|
const p = this.protocols[i]
|
|
|
|
|
|
|
|
if (p.remoteOffset === offset && p.remoteOpened) {
|
|
|
|
p.remoteVersion = null
|
|
|
|
p.remoteOpened = false
|
|
|
|
p.remoteClosed = true
|
|
|
|
this._catch(p.onremoteclose())
|
|
|
|
p._gc()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_recvRejectedProtocol (state) {
|
|
|
|
const offset = c.uint.decode(state)
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
for (let i = 0; i < this.protocols.length; i++) {
|
|
|
|
const p = this.protocols[i]
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
if (p.offset === offset && !p.remoteClosed) {
|
|
|
|
p.remoteClosed = true
|
|
|
|
this._catch(p.onremoteclose())
|
|
|
|
p._gc()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_recvBatch (end, state) {
|
|
|
|
while (state.start < state.end) {
|
|
|
|
const len = c.uint.decode(state)
|
|
|
|
const type = c.uint.decode(state)
|
|
|
|
state.end = state.start + len
|
|
|
|
this._recv(type, state)
|
|
|
|
state.end = end
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
2021-12-30 20:13:47 +00:00
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_recv (type, state) {
|
|
|
|
if (type < 4) {
|
|
|
|
if (type === 0) {
|
|
|
|
throw new Error('Invalid nested batch')
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
if (type === 1) {
|
|
|
|
this._recvAddProtocol(state)
|
|
|
|
return
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
if (type === 2) {
|
|
|
|
this._recvRemoveProtocol(state)
|
|
|
|
return
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
if (type === 3) {
|
|
|
|
this._recvRejectedProtocol(state)
|
|
|
|
return
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
return
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
// TODO: Consider make this array sorted by remoteOffset and use a bisect here.
|
|
|
|
// For now we use very few protocols in practice, so it might be overkill.
|
|
|
|
for (let i = 0; i < this.protocols.length; i++) {
|
|
|
|
const p = this.protocols[i]
|
2021-12-27 15:09:34 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
if (p.remoteOffset <= type && type < p.remoteEnd) {
|
|
|
|
p._recv(type - p.remoteOffset, state)
|
|
|
|
break
|
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
2021-12-30 20:13:47 +00:00
|
|
|
}
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
_push (type, enc, message) {
|
2021-11-10 14:15:09 +00:00
|
|
|
if (this.corked > 0) {
|
2021-12-30 20:13:47 +00:00
|
|
|
this._batch.push({ type, encoding: enc, message })
|
|
|
|
return false
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const state = { start: 0, end: 0, buffer: null }
|
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
c.uint.preencode(state, type)
|
|
|
|
enc.preencode(state, message)
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-27 15:09:34 +00:00
|
|
|
state.buffer = this._alloc(state.end)
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
c.uint.encode(state, type)
|
|
|
|
enc.encode(state, message)
|
2021-11-10 14:15:09 +00:00
|
|
|
|
2021-12-30 20:13:47 +00:00
|
|
|
return this.stream.write(state.buffer)
|
2021-11-10 14:15:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function noop () {}
|
2021-12-30 20:13:47 +00:00
|
|
|
|
|
|
|
function isPromise (p) {
|
|
|
|
return typeof p === 'object' && p !== null && !!p.catch
|
|
|
|
}
|