Compare commits
24 Commits
Author | SHA1 | Date |
---|---|---|
Derrick Hammer | e7aa69a59b | |
Derrick Hammer | bbe30a3de5 | |
Derrick Hammer | ce12140ac0 | |
Nina Breznik | 6dfa4f32ca | |
Mathias Buus | 131fce63d7 | |
Mathias Buus | 78e83bbfa9 | |
Lucas | b9ff6b50b7 | |
Lucas | 69ae35ca8b | |
Mathias Buus | d47fd52836 | |
Ar Nazeh | 43d5192f31 | |
Mathias Buus | fcdc0db1e1 | |
Mathias Buus | e00098b294 | |
Kasper Isager Dalsgarð | 88df528516 | |
Mathias Buus | 699dbed87d | |
Mathias Buus | 5606c5ef85 | |
Ar Nazeh | 3a2c88c5cb | |
Mathias Buus | 734be1f070 | |
Mathias Buus | c98b9489fa | |
Mathias Buus | 70cd098a83 | |
Mathias Buus | 0e5095aa35 | |
Mathias Buus | ab9d484ada | |
Mathias Buus | 21415211e7 | |
Mathias Buus | 90e8bc3288 | |
Mathias Buus | 8aff462da3 |
|
@ -1,4 +1,5 @@
|
|||
node_modules
|
||||
package-lock.json
|
||||
sandbox.js
|
||||
sandbox.mjs
|
||||
sandbox
|
||||
|
|
62
README.md
62
README.md
|
@ -17,9 +17,9 @@ const c = require('compact-encoding')
|
|||
|
||||
const mux = new Protomux(aStreamThatFrames)
|
||||
|
||||
// Now add some protocol sessions
|
||||
// Now add some protocol channels
|
||||
|
||||
const cool = mux.open({
|
||||
const cool = mux.createChannel({
|
||||
protocol: 'cool-protocol',
|
||||
id: Buffer.from('optional binary id'),
|
||||
onopen () {
|
||||
|
@ -46,6 +46,10 @@ const two = cool.addMessage({
|
|||
}
|
||||
})
|
||||
|
||||
// open the channel
|
||||
|
||||
cool.open()
|
||||
|
||||
// And send some data
|
||||
|
||||
one.send('a string')
|
||||
|
@ -71,9 +75,9 @@ Options include:
|
|||
|
||||
Helper to accept either an existing muxer instance or a stream (which creates a new one).
|
||||
|
||||
#### `const session = mux.open(opts)`
|
||||
#### `const channel = mux.createChannel(opts)`
|
||||
|
||||
Add a new protocol session.
|
||||
Add a new protocol channel.
|
||||
|
||||
Options include:
|
||||
|
||||
|
@ -81,14 +85,16 @@ Options include:
|
|||
{
|
||||
// Used to match the protocol
|
||||
protocol: 'name of the protocol',
|
||||
// Optional additional binary id to identify this session
|
||||
// Optional additional binary id to identify this channel
|
||||
id: buffer,
|
||||
// Optional encoding for a handshake
|
||||
handshake: encoding,
|
||||
// Optional array of messages types you want to send/receive.
|
||||
messages: [],
|
||||
// Called when the remote side adds this protocol.
|
||||
// Errors here are caught and forwared to stream.destroy
|
||||
async onopen () {},
|
||||
// Called when the session closes - ie the remote side closes or rejects this protocol or we closed it.
|
||||
async onopen (handshake) {},
|
||||
// Called when the channel closes - ie the remote side closes or rejects this protocol or we closed it.
|
||||
// Errors here are caught and forwared to stream.destroy
|
||||
async onclose () {},
|
||||
// Called after onclose when all pending promises has resolved.
|
||||
|
@ -96,15 +102,31 @@ Options include:
|
|||
}
|
||||
```
|
||||
|
||||
Sessions are paired based on a queue, so the first remote session with the same `protocol` and `id`.
|
||||
Sessions are paired based on a queue, so the first remote channel with the same `protocol` and `id`.
|
||||
|
||||
__NOTE__: `mux.open` returns `null` if the session should not be opened, ie it's a duplicate session or the remote has already closed this one.
|
||||
__NOTE__: `mux.createChannel` returns `null` if the channel should not be opened, ie it's a duplicate channel or the remote has already closed this one.
|
||||
|
||||
If you want multiple sessions with the same `protocol` and `id`, set `unique: false` as an option.
|
||||
|
||||
#### `const m = session.addMessage(opts)`
|
||||
#### `const opened = mux.opened({ protocol, id })`
|
||||
|
||||
Add a message. Options include:
|
||||
Boolean that indicates if the channel is opened.
|
||||
|
||||
#### `mux.pair({ protocol, id }, callback)`
|
||||
|
||||
Register a callback to be called everytime a new channel is requested.
|
||||
|
||||
#### `mux.unpair({ protocol, id })`
|
||||
|
||||
Unregisters the pair callback.
|
||||
|
||||
#### `channel.open([handshake])`
|
||||
|
||||
Open the channel.
|
||||
|
||||
#### `const m = channel.addMessage(opts)`
|
||||
|
||||
Add/register a message type for a certain encoding. Options include:
|
||||
|
||||
``` js
|
||||
{
|
||||
|
@ -128,25 +150,29 @@ Function that is called when a message arrives.
|
|||
|
||||
The encoding for this message.
|
||||
|
||||
#### `session.close()`
|
||||
#### `channel.close()`
|
||||
|
||||
Closes the protocol session.
|
||||
Closes the protocol channel.
|
||||
|
||||
#### `sessoin.cork()`
|
||||
#### `channel.cork()`
|
||||
|
||||
Corking the protocol session, makes it buffer messages and send them all in a batch when it uncorks.
|
||||
Corking the protocol channel, makes it buffer messages and send them all in a batch when it uncorks.
|
||||
|
||||
#### `session.uncork()`
|
||||
#### `channel.uncork()`
|
||||
|
||||
Uncork and send the batch.
|
||||
|
||||
#### `mux.cork()`
|
||||
|
||||
Same as `session.cork` but on the muxer instance.
|
||||
Same as `channel.cork` but on the muxer instance.
|
||||
|
||||
#### `mux.uncork()`
|
||||
|
||||
Same as `session.uncork` but on the muxer instance.
|
||||
Same as `channel.uncork` but on the muxer instance.
|
||||
|
||||
#### `for (const channel of muxer) { ... }`
|
||||
|
||||
The muxer instance is iterable, so you can iterate over all the channels.
|
||||
|
||||
## License
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "protomux",
|
||||
"version": "3.0.2",
|
||||
"version": "3.4.1",
|
||||
"description": "Multiplex multiple message oriented protocols over a stream",
|
||||
"main": "index.js",
|
||||
"dependencies": {
|
||||
|
@ -10,8 +10,9 @@
|
|||
"safety-catch": "^1.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@hyperswarm/secret-stream": "^5.1.4",
|
||||
"brittle": "^2.0.1",
|
||||
"@hyperswarm/secret-stream": "^6.0.0",
|
||||
"brittle": "^3.0.0",
|
||||
"prettier": "^2.8.7",
|
||||
"standard": "^16.0.4"
|
||||
},
|
||||
"scripts": {
|
||||
|
|
284
test.js
284
test.js
|
@ -2,6 +2,7 @@ const Protomux = require('./')
|
|||
const SecretStream = require('@hyperswarm/secret-stream')
|
||||
const test = require('brittle')
|
||||
const c = require('compact-encoding')
|
||||
const b4a = require('b4a')
|
||||
|
||||
test('basic', function (t) {
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
|
@ -9,13 +10,15 @@ test('basic', function (t) {
|
|||
|
||||
replicate(a, b)
|
||||
|
||||
const p = a.open({
|
||||
const p = a.createChannel({
|
||||
protocol: 'foo',
|
||||
onopen () {
|
||||
t.pass('a remote opened')
|
||||
}
|
||||
})
|
||||
|
||||
p.open()
|
||||
|
||||
p.addMessage({
|
||||
encoding: c.string,
|
||||
onmessage (message) {
|
||||
|
@ -23,12 +26,13 @@ test('basic', function (t) {
|
|||
}
|
||||
})
|
||||
|
||||
const bp = b.open({
|
||||
const bp = b.createChannel({
|
||||
protocol: 'foo'
|
||||
})
|
||||
|
||||
t.plan(2)
|
||||
|
||||
bp.open()
|
||||
bp.addMessage({ encoding: c.string }).send('hello world')
|
||||
})
|
||||
|
||||
|
@ -38,10 +42,12 @@ test('echo message', function (t) {
|
|||
|
||||
replicate(a, b)
|
||||
|
||||
const ap = a.open({
|
||||
const ap = a.createChannel({
|
||||
protocol: 'foo'
|
||||
})
|
||||
|
||||
ap.open()
|
||||
|
||||
const aEcho = ap.addMessage({
|
||||
encoding: c.string,
|
||||
onmessage (message) {
|
||||
|
@ -49,17 +55,19 @@ test('echo message', function (t) {
|
|||
}
|
||||
})
|
||||
|
||||
b.open({
|
||||
b.createChannel({
|
||||
protocol: 'other'
|
||||
})
|
||||
}).open()
|
||||
|
||||
const bp = b.open({
|
||||
const bp = b.createChannel({
|
||||
protocol: 'foo',
|
||||
onopen () {
|
||||
t.pass('b remote opened')
|
||||
}
|
||||
})
|
||||
|
||||
bp.open()
|
||||
|
||||
const bEcho = bp.addMessage({
|
||||
encoding: c.string,
|
||||
onmessage (message) {
|
||||
|
@ -75,24 +83,28 @@ test('echo message', function (t) {
|
|||
test('multi message', function (t) {
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
|
||||
a.open({
|
||||
a.createChannel({
|
||||
protocol: 'other'
|
||||
})
|
||||
}).open()
|
||||
|
||||
const ap = a.open({
|
||||
const ap = a.createChannel({
|
||||
protocol: 'multi'
|
||||
})
|
||||
|
||||
ap.open()
|
||||
|
||||
const a1 = ap.addMessage({ encoding: c.int })
|
||||
const a2 = ap.addMessage({ encoding: c.string })
|
||||
const a3 = ap.addMessage({ encoding: c.string })
|
||||
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
||||
const bp = b.open({
|
||||
const bp = b.createChannel({
|
||||
protocol: 'multi'
|
||||
})
|
||||
|
||||
bp.open()
|
||||
|
||||
const b1 = bp.addMessage({ encoding: c.int })
|
||||
const b2 = bp.addMessage({ encoding: c.string })
|
||||
|
||||
|
@ -123,23 +135,27 @@ test('corks', function (t) {
|
|||
|
||||
a.cork()
|
||||
|
||||
a.open({
|
||||
a.createChannel({
|
||||
protocol: 'other'
|
||||
})
|
||||
}).open()
|
||||
|
||||
const ap = a.open({
|
||||
const ap = a.createChannel({
|
||||
protocol: 'multi'
|
||||
})
|
||||
|
||||
ap.open()
|
||||
|
||||
const a1 = ap.addMessage({ encoding: c.int })
|
||||
const a2 = ap.addMessage({ encoding: c.string })
|
||||
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
||||
const bp = b.open({
|
||||
const bp = b.createChannel({
|
||||
protocol: 'multi'
|
||||
})
|
||||
|
||||
bp.open()
|
||||
|
||||
const b1 = bp.addMessage({ encoding: c.int })
|
||||
const b2 = bp.addMessage({ encoding: c.string })
|
||||
|
||||
|
@ -174,6 +190,246 @@ test('corks', function (t) {
|
|||
}
|
||||
})
|
||||
|
||||
test('mega cork', function (t) {
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
|
||||
a.cork()
|
||||
|
||||
const ap = a.createChannel({
|
||||
protocol: 'mega'
|
||||
})
|
||||
|
||||
ap.open()
|
||||
|
||||
const a1 = ap.addMessage({ encoding: c.buffer })
|
||||
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
||||
const bp = b.createChannel({
|
||||
protocol: 'mega'
|
||||
})
|
||||
|
||||
bp.open()
|
||||
|
||||
const b1 = bp.addMessage({ encoding: c.buffer })
|
||||
|
||||
replicate(a, b)
|
||||
|
||||
t.plan(32 + 4)
|
||||
|
||||
const buf = b4a.alloc(1024 * 1024)
|
||||
const expected = []
|
||||
|
||||
for (let i = 0; i < 32; i++) {
|
||||
a1.send(buf)
|
||||
expected.push(buf)
|
||||
}
|
||||
|
||||
a.uncork()
|
||||
|
||||
b.stream.on('data', function (data) {
|
||||
t.ok(data.byteLength > 8000000, 'got big message')
|
||||
})
|
||||
|
||||
b1.onmessage = function (message) {
|
||||
t.alike(message, expected.shift())
|
||||
}
|
||||
})
|
||||
|
||||
test('handshake', function (t) {
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
||||
replicate(a, b)
|
||||
|
||||
const p = a.createChannel({
|
||||
protocol: 'foo',
|
||||
handshake: c.string,
|
||||
onopen (handshake) {
|
||||
t.is(handshake, 'b handshake')
|
||||
}
|
||||
})
|
||||
|
||||
p.open('a handshake')
|
||||
|
||||
const bp = b.createChannel({
|
||||
protocol: 'foo',
|
||||
handshake: c.string,
|
||||
onopen (handshake) {
|
||||
t.is(handshake, 'a handshake')
|
||||
}
|
||||
})
|
||||
|
||||
t.plan(2)
|
||||
|
||||
bp.open('b handshake')
|
||||
})
|
||||
|
||||
test('rejections', function (t) {
|
||||
t.plan(1)
|
||||
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
||||
replicate(a, b)
|
||||
|
||||
let closed = 0
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const p = a.createChannel({
|
||||
protocol: 'foo#' + i,
|
||||
onclose () {
|
||||
closed++
|
||||
if (closed === 10) t.pass('all closed')
|
||||
}
|
||||
})
|
||||
|
||||
p.open()
|
||||
}
|
||||
})
|
||||
|
||||
test('pipeline close and rejections', function (t) {
|
||||
t.plan(1)
|
||||
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
||||
replicate(a, b)
|
||||
|
||||
let closed = 0
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const p = a.createChannel({
|
||||
protocol: 'foo#' + i,
|
||||
onclose () {
|
||||
closed++
|
||||
if (closed === 10) {
|
||||
t.pass('all closed')
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
p.open()
|
||||
p.close()
|
||||
}
|
||||
})
|
||||
|
||||
test('alias', function (t) {
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
||||
replicate(a, b)
|
||||
|
||||
const p = a.createChannel({
|
||||
protocol: 'foo',
|
||||
aliases: ['bar'],
|
||||
onopen () {
|
||||
t.pass('a remote opened')
|
||||
}
|
||||
})
|
||||
|
||||
p.open()
|
||||
|
||||
p.addMessage({
|
||||
encoding: c.string,
|
||||
onmessage (message) {
|
||||
t.is(message, 'hello world')
|
||||
}
|
||||
})
|
||||
|
||||
const bp = b.createChannel({
|
||||
protocol: 'bar'
|
||||
})
|
||||
|
||||
t.plan(2)
|
||||
|
||||
bp.open()
|
||||
bp.addMessage({ encoding: c.string }).send('hello world')
|
||||
})
|
||||
|
||||
test('deduplicate muxers', function (t) {
|
||||
const sa = new SecretStream(true)
|
||||
const sb = new SecretStream(false)
|
||||
|
||||
replicate({ stream: sa }, { stream: sb })
|
||||
|
||||
const a = Protomux.from(sa)
|
||||
const foo = a.createChannel({
|
||||
protocol: 'foo',
|
||||
onopen () { t.pass('a remote opened') }
|
||||
})
|
||||
|
||||
foo.open()
|
||||
|
||||
foo.addMessage({
|
||||
encoding: c.string,
|
||||
onmessage (message) { t.is(message, 'hello foo') }
|
||||
})
|
||||
|
||||
const bfoo = Protomux.from(sb).createChannel({ protocol: 'foo' })
|
||||
|
||||
// Another Protomux instance for another protocol
|
||||
const a2 = Protomux.from(sa)
|
||||
const bar = a2.createChannel({
|
||||
protocol: 'bar',
|
||||
onopen () { t.pass('a remote opened') }
|
||||
})
|
||||
|
||||
bar.open()
|
||||
|
||||
bar.addMessage({
|
||||
encoding: c.string,
|
||||
onmessage (message) { t.is(message, 'hello bar') }
|
||||
})
|
||||
|
||||
const bbar = Protomux.from(sb).createChannel({ protocol: 'bar' })
|
||||
|
||||
t.plan(4)
|
||||
|
||||
bfoo.open()
|
||||
bfoo.addMessage({ encoding: c.string }).send('hello foo')
|
||||
|
||||
bbar.open()
|
||||
bbar.addMessage({ encoding: c.string }).send('hello bar')
|
||||
})
|
||||
|
||||
test('open + send + close on same tick', async function (t) {
|
||||
t.plan(4)
|
||||
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
||||
replicate(a, b)
|
||||
|
||||
const ac = a.createChannel({
|
||||
protocol: 'foo',
|
||||
onopen () {
|
||||
t.pass('a opened')
|
||||
},
|
||||
onclose () {
|
||||
t.pass('a closed')
|
||||
}
|
||||
})
|
||||
|
||||
ac.open()
|
||||
ac.addMessage({
|
||||
encoding: c.string,
|
||||
onmessage (message) { t.is(message, 'hello') }
|
||||
})
|
||||
|
||||
const bc = b.createChannel({
|
||||
protocol: 'foo',
|
||||
onopen () {
|
||||
t.fail('b opened')
|
||||
},
|
||||
onclose () {
|
||||
t.pass('b closed')
|
||||
}
|
||||
})
|
||||
|
||||
bc.open()
|
||||
bc.addMessage({ encoding: c.string }).send('hello')
|
||||
bc.close()
|
||||
})
|
||||
|
||||
function replicate (a, b) {
|
||||
a.stream.rawStream.pipe(b.stream.rawStream).pipe(a.stream.rawStream)
|
||||
}
|
||||
|
|
Reference in New Issue