Compare commits

...

24 Commits

Author SHA1 Message Date
Derrick Hammer e7aa69a59b
*make message encoding/decoding async for use with the kernel 2023-04-08 21:18:14 -04:00
Derrick Hammer bbe30a3de5
*prettier 2023-04-05 03:36:37 -04:00
Derrick Hammer ce12140ac0
*add prettier 2023-04-05 03:35:15 -04:00
Nina Breznik 6dfa4f32ca
update readme with mux.pair and mux.opened (#9)
* update readme with mux.pair and mux.opened

* add opts to .pair, .unpair and .opened
2023-02-06 18:43:05 +01:00
Mathias Buus 131fce63d7 3.4.1 2023-01-13 14:08:42 +01:00
Mathias Buus 78e83bbfa9 failing test 2023-01-13 14:08:34 +01:00
Lucas b9ff6b50b7
Add test: open + send + close on same tick (#8) 2022-11-08 22:13:32 +01:00
Lucas 69ae35ca8b
Update README.md (#6) 2022-11-03 10:39:15 +01:00
Mathias Buus d47fd52836 3.4.0 2022-08-31 12:49:40 +02:00
Ar Nazeh 43d5192f31
Deduplicate muxer for a given same stream. (#5)
* docs: add documentation for iterating over channels

* fix: deduplicate protomux instances for the same stream

creating multiple channels on multiple muxers for the same stream closes
channels accidently see
[here](https://gist.github.com/Nazeh/a3f1d24b597913303afcb6c568f4b042)

* fix: deduplicate muxers on the same stream

opening multiple channels on multiple muxers causes channels to either
close or reset, see
[here](https://gist.github.com/Nazeh/a3f1d24b597913303afcb6c568f4b042)

* use userData
2022-08-31 12:49:19 +02:00
Mathias Buus fcdc0db1e1 move to brittle 3 2022-08-15 23:51:56 +02:00
Mathias Buus e00098b294 3.3.0 2022-08-13 00:54:18 +02:00
Kasper Isager Dalsgarð 88df528516
Add `aliases` option (#4)
* Add `aliases` option

* Clear aliases on GC
2022-08-13 00:52:34 +02:00
Mathias Buus 699dbed87d 3.2.2 2022-07-06 14:39:16 +02:00
Mathias Buus 5606c5ef85 auto split corks if they get too big 2022-07-06 14:39:04 +02:00
Ar Nazeh 3a2c88c5cb
docs: add documentation for iterating over channels (#3) 2022-04-06 11:48:35 +02:00
Mathias Buus 734be1f070 3.2.1 2022-03-24 15:19:57 +01:00
Mathias Buus c98b9489fa end on end 2022-03-24 15:19:55 +01:00
Mathias Buus 70cd098a83 3.2.0 2022-03-11 19:46:11 +01:00
Mathias Buus 0e5095aa35 re-add destroy 2022-03-11 19:46:06 +01:00
Mathias Buus ab9d484ada 3.1.1 2022-03-11 13:58:06 +01:00
Mathias Buus 21415211e7 session -> channel and make open explicit for easier control 2022-03-11 13:55:39 +01:00
Mathias Buus 90e8bc3288 3.1.0 2022-03-11 02:18:33 +01:00
Mathias Buus 8aff462da3 add isProtomux helper 2022-03-11 02:18:31 +01:00
5 changed files with 796 additions and 388 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
node_modules
package-lock.json
sandbox.js
sandbox.mjs
sandbox

View File

@ -17,9 +17,9 @@ const c = require('compact-encoding')
const mux = new Protomux(aStreamThatFrames)
// Now add some protocol sessions
// Now add some protocol channels
const cool = mux.open({
const cool = mux.createChannel({
protocol: 'cool-protocol',
id: Buffer.from('optional binary id'),
onopen () {
@ -46,6 +46,10 @@ const two = cool.addMessage({
}
})
// open the channel
cool.open()
// And send some data
one.send('a string')
@ -71,9 +75,9 @@ Options include:
Helper to accept either an existing muxer instance or a stream (which creates a new one).
#### `const session = mux.open(opts)`
#### `const channel = mux.createChannel(opts)`
Add a new protocol session.
Add a new protocol channel.
Options include:
@ -81,14 +85,16 @@ Options include:
{
// Used to match the protocol
protocol: 'name of the protocol',
// Optional additional binary id to identify this session
// Optional additional binary id to identify this channel
id: buffer,
// Optional encoding for a handshake
handshake: encoding,
// Optional array of messages types you want to send/receive.
messages: [],
// Called when the remote side adds this protocol.
// Errors here are caught and forwared to stream.destroy
async onopen () {},
// Called when the session closes - ie the remote side closes or rejects this protocol or we closed it.
async onopen (handshake) {},
// Called when the channel closes - ie the remote side closes or rejects this protocol or we closed it.
// Errors here are caught and forwared to stream.destroy
async onclose () {},
// Called after onclose when all pending promises has resolved.
@ -96,15 +102,31 @@ Options include:
}
```
Sessions are paired based on a queue, so the first remote session with the same `protocol` and `id`.
Sessions are paired based on a queue, so the first remote channel with the same `protocol` and `id`.
__NOTE__: `mux.open` returns `null` if the session should not be opened, ie it's a duplicate session or the remote has already closed this one.
__NOTE__: `mux.createChannel` returns `null` if the channel should not be opened, ie it's a duplicate channel or the remote has already closed this one.
If you want multiple sessions with the same `protocol` and `id`, set `unique: false` as an option.
#### `const m = session.addMessage(opts)`
#### `const opened = mux.opened({ protocol, id })`
Add a message. Options include:
Boolean that indicates if the channel is opened.
#### `mux.pair({ protocol, id }, callback)`
Register a callback to be called everytime a new channel is requested.
#### `mux.unpair({ protocol, id })`
Unregisters the pair callback.
#### `channel.open([handshake])`
Open the channel.
#### `const m = channel.addMessage(opts)`
Add/register a message type for a certain encoding. Options include:
``` js
{
@ -128,25 +150,29 @@ Function that is called when a message arrives.
The encoding for this message.
#### `session.close()`
#### `channel.close()`
Closes the protocol session.
Closes the protocol channel.
#### `sessoin.cork()`
#### `channel.cork()`
Corking the protocol session, makes it buffer messages and send them all in a batch when it uncorks.
Corking the protocol channel, makes it buffer messages and send them all in a batch when it uncorks.
#### `session.uncork()`
#### `channel.uncork()`
Uncork and send the batch.
#### `mux.cork()`
Same as `session.cork` but on the muxer instance.
Same as `channel.cork` but on the muxer instance.
#### `mux.uncork()`
Same as `session.uncork` but on the muxer instance.
Same as `channel.uncork` but on the muxer instance.
#### `for (const channel of muxer) { ... }`
The muxer instance is iterable, so you can iterate over all the channels.
## License

830
index.js

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
{
"name": "protomux",
"version": "3.0.2",
"version": "3.4.1",
"description": "Multiplex multiple message oriented protocols over a stream",
"main": "index.js",
"dependencies": {
@ -10,8 +10,9 @@
"safety-catch": "^1.0.1"
},
"devDependencies": {
"@hyperswarm/secret-stream": "^5.1.4",
"brittle": "^2.0.1",
"@hyperswarm/secret-stream": "^6.0.0",
"brittle": "^3.0.0",
"prettier": "^2.8.7",
"standard": "^16.0.4"
},
"scripts": {

284
test.js
View File

@ -2,6 +2,7 @@ const Protomux = require('./')
const SecretStream = require('@hyperswarm/secret-stream')
const test = require('brittle')
const c = require('compact-encoding')
const b4a = require('b4a')
test('basic', function (t) {
const a = new Protomux(new SecretStream(true))
@ -9,13 +10,15 @@ test('basic', function (t) {
replicate(a, b)
const p = a.open({
const p = a.createChannel({
protocol: 'foo',
onopen () {
t.pass('a remote opened')
}
})
p.open()
p.addMessage({
encoding: c.string,
onmessage (message) {
@ -23,12 +26,13 @@ test('basic', function (t) {
}
})
const bp = b.open({
const bp = b.createChannel({
protocol: 'foo'
})
t.plan(2)
bp.open()
bp.addMessage({ encoding: c.string }).send('hello world')
})
@ -38,10 +42,12 @@ test('echo message', function (t) {
replicate(a, b)
const ap = a.open({
const ap = a.createChannel({
protocol: 'foo'
})
ap.open()
const aEcho = ap.addMessage({
encoding: c.string,
onmessage (message) {
@ -49,17 +55,19 @@ test('echo message', function (t) {
}
})
b.open({
b.createChannel({
protocol: 'other'
})
}).open()
const bp = b.open({
const bp = b.createChannel({
protocol: 'foo',
onopen () {
t.pass('b remote opened')
}
})
bp.open()
const bEcho = bp.addMessage({
encoding: c.string,
onmessage (message) {
@ -75,24 +83,28 @@ test('echo message', function (t) {
test('multi message', function (t) {
const a = new Protomux(new SecretStream(true))
a.open({
a.createChannel({
protocol: 'other'
})
}).open()
const ap = a.open({
const ap = a.createChannel({
protocol: 'multi'
})
ap.open()
const a1 = ap.addMessage({ encoding: c.int })
const a2 = ap.addMessage({ encoding: c.string })
const a3 = ap.addMessage({ encoding: c.string })
const b = new Protomux(new SecretStream(false))
const bp = b.open({
const bp = b.createChannel({
protocol: 'multi'
})
bp.open()
const b1 = bp.addMessage({ encoding: c.int })
const b2 = bp.addMessage({ encoding: c.string })
@ -123,23 +135,27 @@ test('corks', function (t) {
a.cork()
a.open({
a.createChannel({
protocol: 'other'
})
}).open()
const ap = a.open({
const ap = a.createChannel({
protocol: 'multi'
})
ap.open()
const a1 = ap.addMessage({ encoding: c.int })
const a2 = ap.addMessage({ encoding: c.string })
const b = new Protomux(new SecretStream(false))
const bp = b.open({
const bp = b.createChannel({
protocol: 'multi'
})
bp.open()
const b1 = bp.addMessage({ encoding: c.int })
const b2 = bp.addMessage({ encoding: c.string })
@ -174,6 +190,246 @@ test('corks', function (t) {
}
})
test('mega cork', function (t) {
const a = new Protomux(new SecretStream(true))
a.cork()
const ap = a.createChannel({
protocol: 'mega'
})
ap.open()
const a1 = ap.addMessage({ encoding: c.buffer })
const b = new Protomux(new SecretStream(false))
const bp = b.createChannel({
protocol: 'mega'
})
bp.open()
const b1 = bp.addMessage({ encoding: c.buffer })
replicate(a, b)
t.plan(32 + 4)
const buf = b4a.alloc(1024 * 1024)
const expected = []
for (let i = 0; i < 32; i++) {
a1.send(buf)
expected.push(buf)
}
a.uncork()
b.stream.on('data', function (data) {
t.ok(data.byteLength > 8000000, 'got big message')
})
b1.onmessage = function (message) {
t.alike(message, expected.shift())
}
})
test('handshake', function (t) {
const a = new Protomux(new SecretStream(true))
const b = new Protomux(new SecretStream(false))
replicate(a, b)
const p = a.createChannel({
protocol: 'foo',
handshake: c.string,
onopen (handshake) {
t.is(handshake, 'b handshake')
}
})
p.open('a handshake')
const bp = b.createChannel({
protocol: 'foo',
handshake: c.string,
onopen (handshake) {
t.is(handshake, 'a handshake')
}
})
t.plan(2)
bp.open('b handshake')
})
test('rejections', function (t) {
t.plan(1)
const a = new Protomux(new SecretStream(true))
const b = new Protomux(new SecretStream(false))
replicate(a, b)
let closed = 0
for (let i = 0; i < 10; i++) {
const p = a.createChannel({
protocol: 'foo#' + i,
onclose () {
closed++
if (closed === 10) t.pass('all closed')
}
})
p.open()
}
})
test('pipeline close and rejections', function (t) {
t.plan(1)
const a = new Protomux(new SecretStream(true))
const b = new Protomux(new SecretStream(false))
replicate(a, b)
let closed = 0
for (let i = 0; i < 10; i++) {
const p = a.createChannel({
protocol: 'foo#' + i,
onclose () {
closed++
if (closed === 10) {
t.pass('all closed')
}
}
})
p.open()
p.close()
}
})
test('alias', function (t) {
const a = new Protomux(new SecretStream(true))
const b = new Protomux(new SecretStream(false))
replicate(a, b)
const p = a.createChannel({
protocol: 'foo',
aliases: ['bar'],
onopen () {
t.pass('a remote opened')
}
})
p.open()
p.addMessage({
encoding: c.string,
onmessage (message) {
t.is(message, 'hello world')
}
})
const bp = b.createChannel({
protocol: 'bar'
})
t.plan(2)
bp.open()
bp.addMessage({ encoding: c.string }).send('hello world')
})
test('deduplicate muxers', function (t) {
const sa = new SecretStream(true)
const sb = new SecretStream(false)
replicate({ stream: sa }, { stream: sb })
const a = Protomux.from(sa)
const foo = a.createChannel({
protocol: 'foo',
onopen () { t.pass('a remote opened') }
})
foo.open()
foo.addMessage({
encoding: c.string,
onmessage (message) { t.is(message, 'hello foo') }
})
const bfoo = Protomux.from(sb).createChannel({ protocol: 'foo' })
// Another Protomux instance for another protocol
const a2 = Protomux.from(sa)
const bar = a2.createChannel({
protocol: 'bar',
onopen () { t.pass('a remote opened') }
})
bar.open()
bar.addMessage({
encoding: c.string,
onmessage (message) { t.is(message, 'hello bar') }
})
const bbar = Protomux.from(sb).createChannel({ protocol: 'bar' })
t.plan(4)
bfoo.open()
bfoo.addMessage({ encoding: c.string }).send('hello foo')
bbar.open()
bbar.addMessage({ encoding: c.string }).send('hello bar')
})
test('open + send + close on same tick', async function (t) {
t.plan(4)
const a = new Protomux(new SecretStream(true))
const b = new Protomux(new SecretStream(false))
replicate(a, b)
const ac = a.createChannel({
protocol: 'foo',
onopen () {
t.pass('a opened')
},
onclose () {
t.pass('a closed')
}
})
ac.open()
ac.addMessage({
encoding: c.string,
onmessage (message) { t.is(message, 'hello') }
})
const bc = b.createChannel({
protocol: 'foo',
onopen () {
t.fail('b opened')
},
onclose () {
t.pass('b closed')
}
})
bc.open()
bc.addMessage({ encoding: c.string }).send('hello')
bc.close()
})
function replicate (a, b) {
a.stream.rawStream.pipe(b.stream.rawStream).pipe(a.stream.rawStream)
}