session -> channel and make open explicit for easier control

This commit is contained in:
Mathias Buus 2022-03-11 13:55:39 +01:00
parent 90e8bc3288
commit 21415211e7
3 changed files with 177 additions and 66 deletions

View File

@ -17,9 +17,9 @@ const c = require('compact-encoding')
const mux = new Protomux(aStreamThatFrames) const mux = new Protomux(aStreamThatFrames)
// Now add some protocol sessions // Now add some protocol channels
const cool = mux.open({ const cool = mux.createChannel({
protocol: 'cool-protocol', protocol: 'cool-protocol',
id: Buffer.from('optional binary id'), id: Buffer.from('optional binary id'),
onopen () { onopen () {
@ -46,6 +46,11 @@ const two = cool.addMessage({
} }
}) })
// open the channels
one.open()
two.open()
// And send some data // And send some data
one.send('a string') one.send('a string')
@ -71,9 +76,9 @@ Options include:
Helper to accept either an existing muxer instance or a stream (which creates a new one). Helper to accept either an existing muxer instance or a stream (which creates a new one).
#### `const session = mux.open(opts)` #### `const channel = mux.createChannel(opts)`
Add a new protocol session. Add a new protocol channel.
Options include: Options include:
@ -81,14 +86,16 @@ Options include:
{ {
// Used to match the protocol // Used to match the protocol
protocol: 'name of the protocol', protocol: 'name of the protocol',
// Optional additional binary id to identify this session // Optional additional binary id to identify this channel
id: buffer, id: buffer,
// Optional encoding for a handshake
handshake: encoding,
// Optional array of messages types you want to send/receive. // Optional array of messages types you want to send/receive.
messages: [], messages: [],
// Called when the remote side adds this protocol. // Called when the remote side adds this protocol.
// Errors here are caught and forwared to stream.destroy // Errors here are caught and forwared to stream.destroy
async onopen () {}, async onopen (handshake) {},
// Called when the session closes - ie the remote side closes or rejects this protocol or we closed it. // Called when the channel closes - ie the remote side closes or rejects this protocol or we closed it.
// Errors here are caught and forwared to stream.destroy // Errors here are caught and forwared to stream.destroy
async onclose () {}, async onclose () {},
// Called after onclose when all pending promises has resolved. // Called after onclose when all pending promises has resolved.
@ -96,13 +103,17 @@ Options include:
} }
``` ```
Sessions are paired based on a queue, so the first remote session with the same `protocol` and `id`. Sessions are paired based on a queue, so the first remote channel with the same `protocol` and `id`.
__NOTE__: `mux.open` returns `null` if the session should not be opened, ie it's a duplicate session or the remote has already closed this one. __NOTE__: `mux.createChannel` returns `null` if the channel should not be opened, ie it's a duplicate channel or the remote has already closed this one.
If you want multiple sessions with the same `protocol` and `id`, set `unique: false` as an option. If you want multiple sessions with the same `protocol` and `id`, set `unique: false` as an option.
#### `const m = session.addMessage(opts)` #### `channel.open([handshake])`
Open the channel.
#### `const m = channel.addMessage(opts)`
Add a message. Options include: Add a message. Options include:
@ -128,25 +139,25 @@ Function that is called when a message arrives.
The encoding for this message. The encoding for this message.
#### `session.close()` #### `channel.close()`
Closes the protocol session. Closes the protocol channel.
#### `sessoin.cork()` #### `channel.cork()`
Corking the protocol session, makes it buffer messages and send them all in a batch when it uncorks. Corking the protocol channel, makes it buffer messages and send them all in a batch when it uncorks.
#### `session.uncork()` #### `channel.uncork()`
Uncork and send the batch. Uncork and send the batch.
#### `mux.cork()` #### `mux.cork()`
Same as `session.cork` but on the muxer instance. Same as `channel.cork` but on the muxer instance.
#### `mux.uncork()` #### `mux.uncork()`
Same as `session.uncork` but on the muxer instance. Same as `channel.uncork` but on the muxer instance.
## License ## License

View File

@ -6,13 +6,13 @@ const safetyCatch = require('safety-catch')
const MAX_BUFFERED = 32768 const MAX_BUFFERED = 32768
const MAX_BACKLOG = Infinity // TODO: impl "open" backpressure const MAX_BACKLOG = Infinity // TODO: impl "open" backpressure
class Session { class Channel {
constructor (mux, info, context, protocol, id, messages, onopen, onclose, ondestroy) { constructor (mux, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy) {
this.context = context this.userData = userData
this.protocol = protocol this.protocol = protocol
this.id = id this.id = id
this.handshake = null
this.messages = [] this.messages = []
this.remoteMessages = this.messages
this.opened = false this.opened = false
this.closed = false this.closed = false
@ -22,6 +22,7 @@ class Session {
this.onclose = onclose this.onclose = onclose
this.ondestroy = ondestroy this.ondestroy = ondestroy
this._handshake = handshake
this._mux = mux this._mux = mux
this._info = info this._info = info
this._localId = 0 this._localId = 0
@ -35,7 +36,7 @@ class Session {
for (const m of messages) this.addMessage(m) for (const m of messages) this.addMessage(m)
} }
_open () { open (handshake) {
const id = this._mux._free.length > 0 const id = this._mux._free.length > 0
? this._mux._free.pop() ? this._mux._free.pop()
: this._mux._local.push(null) - 1 : this._mux._local.push(null) - 1
@ -44,19 +45,25 @@ class Session {
this._localId = id + 1 this._localId = id + 1
this._mux._local[id] = this this._mux._local[id] = this
if (this._remoteId === 0) {
this._info.outgoing.push(this._localId)
}
const state = { buffer: null, start: 2, end: 2 } const state = { buffer: null, start: 2, end: 2 }
c.uint.preencode(state, this._localId)
c.string.preencode(state, this.protocol) c.string.preencode(state, this.protocol)
c.buffer.preencode(state, this.id) c.buffer.preencode(state, this.id)
c.uint.preencode(state, this._localId) if (this._handshake) this._handshake.preencode(state, handshake)
state.buffer = this._mux._alloc(state.end) state.buffer = this._mux._alloc(state.end)
state.buffer[0] = 0 state.buffer[0] = 0
state.buffer[1] = 1 state.buffer[1] = 1
c.uint.encode(state, this._localId)
c.string.encode(state, this.protocol) c.string.encode(state, this.protocol)
c.buffer.encode(state, this.id) c.buffer.encode(state, this.id)
c.uint.encode(state, this._localId) if (this._handshake) this._handshake.encode(state, handshake)
this._mux._write0(state.buffer) this._mux._write0(state.buffer)
} }
@ -81,9 +88,11 @@ class Session {
const remote = this._mux._remote[this._remoteId - 1] const remote = this._mux._remote[this._remoteId - 1]
this.opened = true this.opened = true
this._track(this.onopen(this)) this.handshake = this._handshake ? this._handshake.decode(remote.state) : null
this._track(this.onopen(this.handshake, this))
remote.session = this remote.session = this
remote.state = null
if (remote.pending !== null) this._drain(remote) if (remote.pending !== null) this._drain(remote)
} }
@ -135,8 +144,8 @@ class Session {
} }
_recv (type, state) { _recv (type, state) {
if (type < this.remoteMessages.length) { if (type < this.messages.length) {
this.remoteMessages[type].recv(state, this) this.messages[type].recv(state, this)
} }
} }
@ -302,17 +311,14 @@ module.exports = class Protomux {
return info ? info.opened > 0 : false return info ? info.opened > 0 : false
} }
open ({ context = null, protocol, id = null, unique = true, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) { createChannel ({ userData = null, protocol, id = null, unique = true, handshake = null, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) {
if (this.stream.destroyed) return null if (this.stream.destroyed) return null
const info = this._get(protocol, id) const info = this._get(protocol, id)
if (unique && info.opened > 0) return null if (unique && info.opened > 0) return null
if (info.incoming.length === 0) { if (info.incoming.length === 0) {
const session = new Session(this, info, context, protocol, id, messages, onopen, onclose, ondestroy) return new Channel(this, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy)
session._open()
info.outgoing.push(session._localId)
return session
} }
this._remoteBacklog-- this._remoteBacklog--
@ -321,10 +327,9 @@ module.exports = class Protomux {
const r = this._remote[remoteId - 1] const r = this._remote[remoteId - 1]
if (r === null) return null if (r === null) return null
const session = new Session(this, info, context, protocol, id, messages, onopen, onclose, ondestroy) const session = new Channel(this, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy)
session._remoteId = remoteId session._remoteId = remoteId
session._open()
session._fullyOpenSoon() session._fullyOpenSoon()
return session return session
@ -465,9 +470,9 @@ module.exports = class Protomux {
} }
_onopensession (state) { _onopensession (state) {
const remoteId = c.uint.decode(state)
const protocol = c.string.decode(state) const protocol = c.string.decode(state)
const id = c.buffer.decode(state) const id = c.buffer.decode(state)
const remoteId = c.uint.decode(state)
// remote tried to open the control session - auto reject for now // remote tried to open the control session - auto reject for now
// as we can use as an explicit control protocol declaration if we need to // as we can use as an explicit control protocol declaration if we need to
@ -497,14 +502,14 @@ module.exports = class Protomux {
return return
} }
this._remote[rid] = { pending: null, session: null } this._remote[rid] = { state, pending: null, session: null }
session._remoteId = remoteId session._remoteId = remoteId
session._fullyOpen() session._fullyOpen()
return return
} }
this._remote[rid] = { pending: [], session: null } this._remote[rid] = { state, pending: [], session: null }
if (++this._remoteBacklog > MAX_BACKLOG) { if (++this._remoteBacklog > MAX_BACKLOG) {
throw new Error('Remote exceeded backlog') throw new Error('Remote exceeded backlog')
@ -517,19 +522,25 @@ module.exports = class Protomux {
} }
_onrejectsession (state) { _onrejectsession (state) {
const protocol = c.string.decode(state) const localId = c.uint.decode(state)
const id = c.buffer.decode(state)
const info = this._get(protocol, id)
if (info.outgoing.length === 0) { // TODO: can be done smarter...
throw new Error('Invalid reject message') for (const info of this._infos.values()) {
} const i = info.outgoing.indexOf(localId)
if (i === -1) continue
info.outgoing.splice(i, 1)
const localId = info.outgoing.shift()
const session = this._local[localId - 1] const session = this._local[localId - 1]
this._free.push(localId - 1) this._free.push(localId - 1)
if (session !== null) session._close(true) if (session !== null) session._close(true)
this._gc(info)
return
}
throw new Error('Invalid reject message')
} }
_onclosesession (state) { _onclosesession (state) {
@ -553,7 +564,7 @@ module.exports = class Protomux {
if (--info.pairing > 0) return if (--info.pairing > 0) return
while (info.incoming.length > 0) { while (info.incoming.length > 0) {
this._rejectSession(info, info.incoming.pop()) this._rejectSession(info, info.incoming.shift())
} }
this._gc(info) this._gc(info)
@ -575,15 +586,13 @@ module.exports = class Protomux {
const state = { buffer: null, start: 2, end: 2 } const state = { buffer: null, start: 2, end: 2 }
c.string.preencode(state, info.protocol) c.uint.preencode(state, remoteId)
c.buffer.preencode(state, info.id)
state.buffer = this._alloc(state.end) state.buffer = this._alloc(state.end)
state.buffer[0] = 0 state.buffer[0] = 0
state.buffer[1] = 2 state.buffer[1] = 2
c.string.encode(state, info.protocol) c.uint.encode(state, remoteId)
c.buffer.encode(state, info.id)
this._write0(state.buffer) this._write0(state.buffer)
} }

119
test.js
View File

@ -9,13 +9,15 @@ test('basic', function (t) {
replicate(a, b) replicate(a, b)
const p = a.open({ const p = a.createChannel({
protocol: 'foo', protocol: 'foo',
onopen () { onopen () {
t.pass('a remote opened') t.pass('a remote opened')
} }
}) })
p.open()
p.addMessage({ p.addMessage({
encoding: c.string, encoding: c.string,
onmessage (message) { onmessage (message) {
@ -23,12 +25,13 @@ test('basic', function (t) {
} }
}) })
const bp = b.open({ const bp = b.createChannel({
protocol: 'foo' protocol: 'foo'
}) })
t.plan(2) t.plan(2)
bp.open()
bp.addMessage({ encoding: c.string }).send('hello world') bp.addMessage({ encoding: c.string }).send('hello world')
}) })
@ -38,10 +41,12 @@ test('echo message', function (t) {
replicate(a, b) replicate(a, b)
const ap = a.open({ const ap = a.createChannel({
protocol: 'foo' protocol: 'foo'
}) })
ap.open()
const aEcho = ap.addMessage({ const aEcho = ap.addMessage({
encoding: c.string, encoding: c.string,
onmessage (message) { onmessage (message) {
@ -49,17 +54,19 @@ test('echo message', function (t) {
} }
}) })
b.open({ b.createChannel({
protocol: 'other' protocol: 'other'
}) }).open()
const bp = b.open({ const bp = b.createChannel({
protocol: 'foo', protocol: 'foo',
onopen () { onopen () {
t.pass('b remote opened') t.pass('b remote opened')
} }
}) })
bp.open()
const bEcho = bp.addMessage({ const bEcho = bp.addMessage({
encoding: c.string, encoding: c.string,
onmessage (message) { onmessage (message) {
@ -75,24 +82,28 @@ test('echo message', function (t) {
test('multi message', function (t) { test('multi message', function (t) {
const a = new Protomux(new SecretStream(true)) const a = new Protomux(new SecretStream(true))
a.open({ a.createChannel({
protocol: 'other' protocol: 'other'
}) }).open()
const ap = a.open({ const ap = a.createChannel({
protocol: 'multi' protocol: 'multi'
}) })
ap.open()
const a1 = ap.addMessage({ encoding: c.int }) const a1 = ap.addMessage({ encoding: c.int })
const a2 = ap.addMessage({ encoding: c.string }) const a2 = ap.addMessage({ encoding: c.string })
const a3 = ap.addMessage({ encoding: c.string }) const a3 = ap.addMessage({ encoding: c.string })
const b = new Protomux(new SecretStream(false)) const b = new Protomux(new SecretStream(false))
const bp = b.open({ const bp = b.createChannel({
protocol: 'multi' protocol: 'multi'
}) })
bp.open()
const b1 = bp.addMessage({ encoding: c.int }) const b1 = bp.addMessage({ encoding: c.int })
const b2 = bp.addMessage({ encoding: c.string }) const b2 = bp.addMessage({ encoding: c.string })
@ -123,23 +134,27 @@ test('corks', function (t) {
a.cork() a.cork()
a.open({ a.createChannel({
protocol: 'other' protocol: 'other'
}) }).open()
const ap = a.open({ const ap = a.createChannel({
protocol: 'multi' protocol: 'multi'
}) })
ap.open()
const a1 = ap.addMessage({ encoding: c.int }) const a1 = ap.addMessage({ encoding: c.int })
const a2 = ap.addMessage({ encoding: c.string }) const a2 = ap.addMessage({ encoding: c.string })
const b = new Protomux(new SecretStream(false)) const b = new Protomux(new SecretStream(false))
const bp = b.open({ const bp = b.createChannel({
protocol: 'multi' protocol: 'multi'
}) })
bp.open()
const b1 = bp.addMessage({ encoding: c.int }) const b1 = bp.addMessage({ encoding: c.int })
const b2 = bp.addMessage({ encoding: c.string }) const b2 = bp.addMessage({ encoding: c.string })
@ -174,6 +189,82 @@ test('corks', function (t) {
} }
}) })
test('handshake', function (t) {
const a = new Protomux(new SecretStream(true))
const b = new Protomux(new SecretStream(false))
replicate(a, b)
const p = a.createChannel({
protocol: 'foo',
handshake: c.string,
onopen (handshake) {
t.is(handshake, 'b handshake')
}
})
p.open('a handshake')
const bp = b.createChannel({
protocol: 'foo',
handshake: c.string,
onopen (handshake) {
t.is(handshake, 'a handshake')
}
})
t.plan(2)
bp.open('b handshake')
})
test('rejections', function (t) {
t.plan(1)
const a = new Protomux(new SecretStream(true))
const b = new Protomux(new SecretStream(false))
replicate(a, b)
let closed = 0
for (let i = 0; i < 10; i++) {
const p = a.createChannel({
protocol: 'foo#' + i,
onclose () {
closed++
if (closed === 10) t.pass('all closed')
}
})
p.open()
}
})
test('pipeline close and rejections', function (t) {
t.plan(1)
const a = new Protomux(new SecretStream(true))
const b = new Protomux(new SecretStream(false))
replicate(a, b)
let closed = 0
for (let i = 0; i < 10; i++) {
const p = a.createChannel({
protocol: 'foo#' + i,
onclose () {
closed++
if (closed === 10) {
t.pass('all closed')
}
}
})
p.open()
p.close()
}
})
function replicate (a, b) { function replicate (a, b) {
a.stream.rawStream.pipe(b.stream.rawStream).pipe(a.stream.rawStream) a.stream.rawStream.pipe(b.stream.rawStream).pipe(a.stream.rawStream)
} }