diff --git a/README.md b/README.md index 2bfd145..6cda015 100644 --- a/README.md +++ b/README.md @@ -17,9 +17,9 @@ const c = require('compact-encoding') const mux = new Protomux(aStreamThatFrames) -// Now add some protocol sessions +// Now add some protocol channels -const cool = mux.open({ +const cool = mux.createChannel({ protocol: 'cool-protocol', id: Buffer.from('optional binary id'), onopen () { @@ -46,6 +46,11 @@ const two = cool.addMessage({ } }) +// open the channels + +one.open() +two.open() + // And send some data one.send('a string') @@ -71,9 +76,9 @@ Options include: Helper to accept either an existing muxer instance or a stream (which creates a new one). -#### `const session = mux.open(opts)` +#### `const channel = mux.createChannel(opts)` -Add a new protocol session. +Add a new protocol channel. Options include: @@ -81,14 +86,16 @@ Options include: { // Used to match the protocol protocol: 'name of the protocol', - // Optional additional binary id to identify this session + // Optional additional binary id to identify this channel id: buffer, + // Optional encoding for a handshake + handshake: encoding, // Optional array of messages types you want to send/receive. messages: [], // Called when the remote side adds this protocol. // Errors here are caught and forwared to stream.destroy - async onopen () {}, - // Called when the session closes - ie the remote side closes or rejects this protocol or we closed it. + async onopen (handshake) {}, + // Called when the channel closes - ie the remote side closes or rejects this protocol or we closed it. // Errors here are caught and forwared to stream.destroy async onclose () {}, // Called after onclose when all pending promises has resolved. @@ -96,13 +103,17 @@ Options include: } ``` -Sessions are paired based on a queue, so the first remote session with the same `protocol` and `id`. +Sessions are paired based on a queue, so the first remote channel with the same `protocol` and `id`. -__NOTE__: `mux.open` returns `null` if the session should not be opened, ie it's a duplicate session or the remote has already closed this one. +__NOTE__: `mux.createChannel` returns `null` if the channel should not be opened, ie it's a duplicate channel or the remote has already closed this one. If you want multiple sessions with the same `protocol` and `id`, set `unique: false` as an option. -#### `const m = session.addMessage(opts)` +#### `channel.open([handshake])` + +Open the channel. + +#### `const m = channel.addMessage(opts)` Add a message. Options include: @@ -128,25 +139,25 @@ Function that is called when a message arrives. The encoding for this message. -#### `session.close()` +#### `channel.close()` -Closes the protocol session. +Closes the protocol channel. -#### `sessoin.cork()` +#### `channel.cork()` -Corking the protocol session, makes it buffer messages and send them all in a batch when it uncorks. +Corking the protocol channel, makes it buffer messages and send them all in a batch when it uncorks. -#### `session.uncork()` +#### `channel.uncork()` Uncork and send the batch. #### `mux.cork()` -Same as `session.cork` but on the muxer instance. +Same as `channel.cork` but on the muxer instance. #### `mux.uncork()` -Same as `session.uncork` but on the muxer instance. +Same as `channel.uncork` but on the muxer instance. ## License diff --git a/index.js b/index.js index cf0b25f..25462b3 100644 --- a/index.js +++ b/index.js @@ -6,13 +6,13 @@ const safetyCatch = require('safety-catch') const MAX_BUFFERED = 32768 const MAX_BACKLOG = Infinity // TODO: impl "open" backpressure -class Session { - constructor (mux, info, context, protocol, id, messages, onopen, onclose, ondestroy) { - this.context = context +class Channel { + constructor (mux, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy) { + this.userData = userData this.protocol = protocol this.id = id + this.handshake = null this.messages = [] - this.remoteMessages = this.messages this.opened = false this.closed = false @@ -22,6 +22,7 @@ class Session { this.onclose = onclose this.ondestroy = ondestroy + this._handshake = handshake this._mux = mux this._info = info this._localId = 0 @@ -35,7 +36,7 @@ class Session { for (const m of messages) this.addMessage(m) } - _open () { + open (handshake) { const id = this._mux._free.length > 0 ? this._mux._free.pop() : this._mux._local.push(null) - 1 @@ -44,19 +45,25 @@ class Session { this._localId = id + 1 this._mux._local[id] = this + if (this._remoteId === 0) { + this._info.outgoing.push(this._localId) + } + const state = { buffer: null, start: 2, end: 2 } + c.uint.preencode(state, this._localId) c.string.preencode(state, this.protocol) c.buffer.preencode(state, this.id) - c.uint.preencode(state, this._localId) + if (this._handshake) this._handshake.preencode(state, handshake) state.buffer = this._mux._alloc(state.end) state.buffer[0] = 0 state.buffer[1] = 1 + c.uint.encode(state, this._localId) c.string.encode(state, this.protocol) c.buffer.encode(state, this.id) - c.uint.encode(state, this._localId) + if (this._handshake) this._handshake.encode(state, handshake) this._mux._write0(state.buffer) } @@ -81,9 +88,11 @@ class Session { const remote = this._mux._remote[this._remoteId - 1] this.opened = true - this._track(this.onopen(this)) + this.handshake = this._handshake ? this._handshake.decode(remote.state) : null + this._track(this.onopen(this.handshake, this)) remote.session = this + remote.state = null if (remote.pending !== null) this._drain(remote) } @@ -135,8 +144,8 @@ class Session { } _recv (type, state) { - if (type < this.remoteMessages.length) { - this.remoteMessages[type].recv(state, this) + if (type < this.messages.length) { + this.messages[type].recv(state, this) } } @@ -302,17 +311,14 @@ module.exports = class Protomux { return info ? info.opened > 0 : false } - open ({ context = null, protocol, id = null, unique = true, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) { + createChannel ({ userData = null, protocol, id = null, unique = true, handshake = null, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) { if (this.stream.destroyed) return null const info = this._get(protocol, id) if (unique && info.opened > 0) return null if (info.incoming.length === 0) { - const session = new Session(this, info, context, protocol, id, messages, onopen, onclose, ondestroy) - session._open() - info.outgoing.push(session._localId) - return session + return new Channel(this, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy) } this._remoteBacklog-- @@ -321,10 +327,9 @@ module.exports = class Protomux { const r = this._remote[remoteId - 1] if (r === null) return null - const session = new Session(this, info, context, protocol, id, messages, onopen, onclose, ondestroy) + const session = new Channel(this, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy) session._remoteId = remoteId - session._open() session._fullyOpenSoon() return session @@ -465,9 +470,9 @@ module.exports = class Protomux { } _onopensession (state) { + const remoteId = c.uint.decode(state) const protocol = c.string.decode(state) const id = c.buffer.decode(state) - const remoteId = c.uint.decode(state) // remote tried to open the control session - auto reject for now // as we can use as an explicit control protocol declaration if we need to @@ -497,14 +502,14 @@ module.exports = class Protomux { return } - this._remote[rid] = { pending: null, session: null } + this._remote[rid] = { state, pending: null, session: null } session._remoteId = remoteId session._fullyOpen() return } - this._remote[rid] = { pending: [], session: null } + this._remote[rid] = { state, pending: [], session: null } if (++this._remoteBacklog > MAX_BACKLOG) { throw new Error('Remote exceeded backlog') @@ -517,19 +522,25 @@ module.exports = class Protomux { } _onrejectsession (state) { - const protocol = c.string.decode(state) - const id = c.buffer.decode(state) - const info = this._get(protocol, id) + const localId = c.uint.decode(state) - if (info.outgoing.length === 0) { - throw new Error('Invalid reject message') + // TODO: can be done smarter... + for (const info of this._infos.values()) { + const i = info.outgoing.indexOf(localId) + if (i === -1) continue + + info.outgoing.splice(i, 1) + + const session = this._local[localId - 1] + + this._free.push(localId - 1) + if (session !== null) session._close(true) + + this._gc(info) + return } - const localId = info.outgoing.shift() - const session = this._local[localId - 1] - - this._free.push(localId - 1) - if (session !== null) session._close(true) + throw new Error('Invalid reject message') } _onclosesession (state) { @@ -553,7 +564,7 @@ module.exports = class Protomux { if (--info.pairing > 0) return while (info.incoming.length > 0) { - this._rejectSession(info, info.incoming.pop()) + this._rejectSession(info, info.incoming.shift()) } this._gc(info) @@ -575,15 +586,13 @@ module.exports = class Protomux { const state = { buffer: null, start: 2, end: 2 } - c.string.preencode(state, info.protocol) - c.buffer.preencode(state, info.id) + c.uint.preencode(state, remoteId) state.buffer = this._alloc(state.end) state.buffer[0] = 0 state.buffer[1] = 2 - c.string.encode(state, info.protocol) - c.buffer.encode(state, info.id) + c.uint.encode(state, remoteId) this._write0(state.buffer) } diff --git a/test.js b/test.js index a6d017c..af58cd0 100644 --- a/test.js +++ b/test.js @@ -9,13 +9,15 @@ test('basic', function (t) { replicate(a, b) - const p = a.open({ + const p = a.createChannel({ protocol: 'foo', onopen () { t.pass('a remote opened') } }) + p.open() + p.addMessage({ encoding: c.string, onmessage (message) { @@ -23,12 +25,13 @@ test('basic', function (t) { } }) - const bp = b.open({ + const bp = b.createChannel({ protocol: 'foo' }) t.plan(2) + bp.open() bp.addMessage({ encoding: c.string }).send('hello world') }) @@ -38,10 +41,12 @@ test('echo message', function (t) { replicate(a, b) - const ap = a.open({ + const ap = a.createChannel({ protocol: 'foo' }) + ap.open() + const aEcho = ap.addMessage({ encoding: c.string, onmessage (message) { @@ -49,17 +54,19 @@ test('echo message', function (t) { } }) - b.open({ + b.createChannel({ protocol: 'other' - }) + }).open() - const bp = b.open({ + const bp = b.createChannel({ protocol: 'foo', onopen () { t.pass('b remote opened') } }) + bp.open() + const bEcho = bp.addMessage({ encoding: c.string, onmessage (message) { @@ -75,24 +82,28 @@ test('echo message', function (t) { test('multi message', function (t) { const a = new Protomux(new SecretStream(true)) - a.open({ + a.createChannel({ protocol: 'other' - }) + }).open() - const ap = a.open({ + const ap = a.createChannel({ protocol: 'multi' }) + ap.open() + const a1 = ap.addMessage({ encoding: c.int }) const a2 = ap.addMessage({ encoding: c.string }) const a3 = ap.addMessage({ encoding: c.string }) const b = new Protomux(new SecretStream(false)) - const bp = b.open({ + const bp = b.createChannel({ protocol: 'multi' }) + bp.open() + const b1 = bp.addMessage({ encoding: c.int }) const b2 = bp.addMessage({ encoding: c.string }) @@ -123,23 +134,27 @@ test('corks', function (t) { a.cork() - a.open({ + a.createChannel({ protocol: 'other' - }) + }).open() - const ap = a.open({ + const ap = a.createChannel({ protocol: 'multi' }) + ap.open() + const a1 = ap.addMessage({ encoding: c.int }) const a2 = ap.addMessage({ encoding: c.string }) const b = new Protomux(new SecretStream(false)) - const bp = b.open({ + const bp = b.createChannel({ protocol: 'multi' }) + bp.open() + const b1 = bp.addMessage({ encoding: c.int }) const b2 = bp.addMessage({ encoding: c.string }) @@ -174,6 +189,82 @@ test('corks', function (t) { } }) +test('handshake', function (t) { + const a = new Protomux(new SecretStream(true)) + const b = new Protomux(new SecretStream(false)) + + replicate(a, b) + + const p = a.createChannel({ + protocol: 'foo', + handshake: c.string, + onopen (handshake) { + t.is(handshake, 'b handshake') + } + }) + + p.open('a handshake') + + const bp = b.createChannel({ + protocol: 'foo', + handshake: c.string, + onopen (handshake) { + t.is(handshake, 'a handshake') + } + }) + + t.plan(2) + + bp.open('b handshake') +}) + +test('rejections', function (t) { + t.plan(1) + + const a = new Protomux(new SecretStream(true)) + const b = new Protomux(new SecretStream(false)) + + replicate(a, b) + + let closed = 0 + for (let i = 0; i < 10; i++) { + const p = a.createChannel({ + protocol: 'foo#' + i, + onclose () { + closed++ + if (closed === 10) t.pass('all closed') + } + }) + + p.open() + } +}) + +test('pipeline close and rejections', function (t) { + t.plan(1) + + const a = new Protomux(new SecretStream(true)) + const b = new Protomux(new SecretStream(false)) + + replicate(a, b) + + let closed = 0 + for (let i = 0; i < 10; i++) { + const p = a.createChannel({ + protocol: 'foo#' + i, + onclose () { + closed++ + if (closed === 10) { + t.pass('all closed') + } + } + }) + + p.open() + p.close() + } +}) + function replicate (a, b) { a.stream.rawStream.pipe(b.stream.rawStream).pipe(a.stream.rawStream) }