use message object api (#1)

This commit is contained in:
Mathias Buus 2022-02-01 19:09:56 +01:00 committed by GitHub
parent 08f976b283
commit e2d05941d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 160 additions and 98 deletions

View File

@ -25,26 +25,37 @@ const cool = mux.addProtocol({
major: 1, major: 1,
minor: 0 minor: 0
}, },
// an array of compact encoders, each encoding/decoding the messages sent messages: 2, // protocol has 2 messages
messages: [
c.string,
c.bool
],
onremoteopen () { onremoteopen () {
console.log('the other side opened this protocol!') console.log('the other side opened this protocol!')
}, },
onemoteclose () { onremoteclose () {
console.log('the other side closed this protocol!') console.log('the other side closed this protocol!')
},
onmessage (type, message) {
console.log('the other side sent a message', type, message)
} }
}) })
// And send some messages // And add some messages
cool.send(0, 'a string') const one = cool.addMessage({
cool.send(1, true) type: 0,
encoding: c.string,
onmessage (m) {
console.log('recv message (1)', m)
}
})
const two = cool.addMessage({
type: 1,
encoding: c.bool,
onmessage (m) {
console.log('recv message (2)', m)
}
})
// And send some data
one.send('a string')
two.send(true)
``` ```
## API ## API
@ -87,31 +98,50 @@ Options include:
major: 0, major: 0,
minor: 0 minor: 0
}, },
// Array of the message types you want to send/receive. Should be compact-encoders // Number of messages types you want to send/receive.
messages: [ messages: 2,
...
],
// Called when the remote side adds this protocol. // Called when the remote side adds this protocol.
// Errors here are caught and forwared to stream.destroy // Errors here are caught and forwared to stream.destroy
async onremoteopen () {}, async onremoteopen () {},
// Called when the remote side closes or rejects this protocol. // Called when the remote side closes or rejects this protocol.
// Errors here are caught and forwared to stream.destroy // Errors here are caught and forwared to stream.destroy
async onremoteclose () {}, async onremoteclose () {}
// Called when the remote sends a message
// Errors here are caught and forwared to stream.destroy
async onmessage (type, message) {}
} }
``` ```
Each of the functions can also be set directly on the instance with the same name. Each of the functions can also be set directly on the instance with the same name.
#### `const m = p.addMessage(opts)`
Specify a message. Options include:
``` js
{
// Defaults to an incrementing number
type: numericIndex,
// compact-encoding specifying how to encode/decode this message
encoding: c.binary,
// Called when the remote side sends a message.
// Errors here are caught and forwared to stream.destroy
async onmessage (message) { }
}
```
#### `m.send(data)`
Send a message.
#### `m.onmessage`
Function that is called when a message arrives.
#### `m.encoding`
The encoding for this message.
#### `p.close()` #### `p.close()`
Closes the protocol Closes the protocol.
#### `p.send(type, message)`
Send a message, type is the offset into the messages array.
#### `p.cork()` #### `p.cork()`

View File

@ -8,12 +8,15 @@ const EMPTY = []
class Protocol { class Protocol {
constructor (mux) { constructor (mux) {
this.mux = mux this.mux = mux
this.name = null this.name = null
this.version = null this.version = null
this.messages = EMPTY this.messages = EMPTY
this.context = null
this.offset = 0 this.offset = 0
this.length = 0 this.length = 0
this.opened = false this.opened = false
this.corked = false
this.remoteVersion = null this.remoteVersion = null
this.remoteOffset = 0 this.remoteOffset = 0
@ -21,29 +24,49 @@ class Protocol {
this.remoteOpened = false this.remoteOpened = false
this.remoteClosed = false this.remoteClosed = false
this.onmessage = noop
this.onremoteopen = noop this.onremoteopen = noop
this.onremoteclose = noop this.onremoteclose = noop
} }
_attach ({ name, version = { major: 0, minor: 0 }, messages, onmessage = noop, onremoteopen = noop, onremoteclose = noop }) { _attach ({ name, version = { major: 0, minor: 0 }, messages = 0, context = null, onremoteopen = noop, onremoteclose = noop }) {
const opened = this.opened const opened = this.opened
this.name = name this.name = name
this.version = version this.version = version
this.messages = messages this.messages = new Array(messages)
this.context = context
this.offset = this.mux.offset this.offset = this.mux.offset
this.length = messages.length this.length = messages
this.opened = true this.opened = true
this.corked = false this.corked = false
this.onmessage = onmessage
this.onremoteopen = onremoteopen this.onremoteopen = onremoteopen
this.onremoteclose = onremoteclose this.onremoteclose = onremoteclose
return !opened return !opened
} }
_nextMessage () {
for (let i = this.messages.length - 1; i >= 0; i--) {
if (this.messages[i] === undefined && (i === 0 || this.messages[i - 1] !== undefined)) {
return i
}
}
return -1
}
addMessage ({ type = this._nextMessage(), encoding = c.binary, onmessage = noop } = {}) {
if (type < 0 || type >= this.messages.length) {
throw new Error('Invalid type, must be <= ' + this.messages.length)
}
const t = this.offset + type
const send = (message) => this.opened && this.mux._push(t, m.encoding, message)
const m = this.messages[type] = { encoding, onmessage, send }
return m
}
cork () { cork () {
if (this.corked) return if (this.corked) return
this.corked = true this.corked = true
@ -56,15 +79,6 @@ class Protocol {
this.mux.uncork() this.mux.uncork()
} }
send (type, message) {
if (!this.opened) return false
const t = this.offset + type
const m = this.messages[type]
return this.mux._push(t, m, message)
}
close () { close () {
if (this.opened === false) return if (this.opened === false) return
this.opened = false this.opened = false
@ -76,7 +90,7 @@ class Protocol {
this.messages = EMPTY this.messages = EMPTY
this.offset = 0 this.offset = 0
this.length = 0 this.length = 0
this.onmessage = this.onremoteopen = this.onremoteclose = noop this.onremoteopen = this.onremoteclose = noop
this.mux._push(2, c.uint, offset) this.mux._push(2, c.uint, offset)
this._gc() this._gc()
@ -92,9 +106,7 @@ class Protocol {
if (type >= this.messages.length) return if (type >= this.messages.length) return
const m = this.messages[type] const m = this.messages[type]
const message = m.decode(state) if (m !== undefined) this.mux._catch(m.onmessage(m.encoding.decode(state), this.context))
this.mux._catch(this.onmessage(type, message))
} }
} }

130
test.js
View File

@ -9,69 +9,72 @@ test('basic', function (t) {
replicate(a, b) replicate(a, b)
a.addProtocol({ const p = a.addProtocol({
name: 'foo', name: 'foo',
messages: [c.string], messages: 1,
onremoteopen () { onremoteopen () {
t.pass('a remote opened') t.pass('a remote opened')
}, }
onmessage (type, message) { })
t.is(type, 0)
p.addMessage({
encoding: c.string,
onmessage (message) {
t.is(message, 'hello world') t.is(message, 'hello world')
} }
}) })
const bp = b.addProtocol({ const bp = b.addProtocol({
name: 'foo', name: 'foo',
messages: [c.string] messages: 1
}) })
t.plan(3) t.plan(2)
bp.send(0, 'hello world') bp.addMessage({ encoding: c.string }).send('hello world')
}) })
test('echo message', function (t) { test('echo message', function (t) {
const a = new Protomux(new SecretStream(true)) const a = new Protomux(new SecretStream(true))
const b = new Protomux(new SecretStream(false))
const b = new Protomux(new SecretStream(false), [{
name: 'other',
messages: [c.bool, c.bool]
}, {
name: 'foo',
messages: [c.string]
}])
replicate(a, b) replicate(a, b)
const ap = a.addProtocol({ const ap = a.addProtocol({
name: 'foo', name: 'foo',
messages: [c.string], messages: 1
onmessage (type, message) { })
ap.send(type, 'echo: ' + message)
const aEcho = ap.addMessage({
encoding: c.string,
onmessage (message) {
aEcho.send('echo: ' + message)
} }
}) })
b.addProtocol({ b.addProtocol({
name: 'other', name: 'other',
messages: [c.bool, c.bool] messages: 2
}) })
const bp = b.addProtocol({ const bp = b.addProtocol({
name: 'foo', name: 'foo',
messages: [c.string], messages: 1,
onremoteopen () { onremoteopen () {
t.pass('b remote opened') t.pass('b remote opened')
}, }
onmessage (type, message) { })
t.is(type, 0)
const bEcho = bp.addMessage({
encoding: c.string,
onmessage (message) {
t.is(message, 'echo: hello world') t.is(message, 'echo: hello world')
} }
}) })
t.plan(3) t.plan(2)
bp.send(0, 'hello world') bEcho.send('hello world')
}) })
test('multi message', function (t) { test('multi message', function (t) {
@ -79,38 +82,47 @@ test('multi message', function (t) {
a.addProtocol({ a.addProtocol({
name: 'other', name: 'other',
messages: [c.bool, c.bool] messages: 2
}) })
const ap = a.addProtocol({ const ap = a.addProtocol({
name: 'multi', name: 'multi',
messages: [c.int, c.string, c.string] messages: 3
}) })
const a1 = ap.addMessage({ encoding: c.int })
const a2 = ap.addMessage({ encoding: c.string })
const a3 = ap.addMessage({ encoding: c.string })
const b = new Protomux(new SecretStream(false)) const b = new Protomux(new SecretStream(false))
const bp = b.addProtocol({ const bp = b.addProtocol({
name: 'multi', name: 'multi',
messages: [c.int, c.string] messages: 2
}) })
const b1 = bp.addMessage({ encoding: c.int })
const b2 = bp.addMessage({ encoding: c.string })
replicate(a, b) replicate(a, b)
t.plan(4) t.plan(2)
ap.send(0, 42) a1.send(42)
ap.send(1, 'a string with 42') a2.send('a string with 42')
ap.send(2, 'should be ignored') a3.send('should be ignored')
const expected = [ const expected = [
[0, 42], 42,
[1, 'a string with 42'] 'a string with 42'
] ]
bp.onmessage = function (type, message) { b1.onmessage = function (message) {
const e = expected.shift() t.is(message, expected.shift())
t.is(type, e[0]) }
t.is(message, e[1])
b2.onmessage = function (message) {
t.is(message, expected.shift())
} }
}) })
@ -121,36 +133,42 @@ test('corks', function (t) {
a.addProtocol({ a.addProtocol({
name: 'other', name: 'other',
messages: [c.bool, c.bool] messages: 2
}) })
const ap = a.addProtocol({ const ap = a.addProtocol({
name: 'multi', name: 'multi',
messages: [c.int, c.string] messages: 2
}) })
const a1 = ap.addMessage({ encoding: c.int })
const a2 = ap.addMessage({ encoding: c.string })
const b = new Protomux(new SecretStream(false)) const b = new Protomux(new SecretStream(false))
const bp = b.addProtocol({ const bp = b.addProtocol({
name: 'multi', name: 'multi',
messages: [c.int, c.string] messages: 2
}) })
const b1 = bp.addMessage({ encoding: c.int })
const b2 = bp.addMessage({ encoding: c.string })
replicate(a, b) replicate(a, b)
t.plan(8 + 1) t.plan(4 + 1)
const expected = [ const expected = [
[0, 1], 1,
[0, 2], 2,
[0, 3], 3,
[1, 'a string'] 'a string'
] ]
ap.send(0, 1) a1.send(1)
ap.send(0, 2) a1.send(2)
ap.send(0, 3) a1.send(3)
ap.send(1, 'a string') a2.send('a string')
a.uncork() a.uncork()
@ -158,10 +176,12 @@ test('corks', function (t) {
t.ok(expected.length === 0, 'received all messages in one data packet') t.ok(expected.length === 0, 'received all messages in one data packet')
}) })
bp.onmessage = function (type, message) { b1.onmessage = function (message) {
const e = expected.shift() t.is(message, expected.shift())
t.is(type, e[0]) }
t.is(message, e[1])
b2.onmessage = function (message) {
t.is(message, expected.shift())
} }
}) })