Deduplicate muxer for a given same stream. (#5)
* docs: add documentation for iterating over channels * fix: deduplicate protomux instances for the same stream creating multiple channels on multiple muxers for the same stream closes channels accidently see [here](https://gist.github.com/Nazeh/a3f1d24b597913303afcb6c568f4b042) * fix: deduplicate muxers on the same stream opening multiple channels on multiple muxers causes channels to either close or reset, see [here](https://gist.github.com/Nazeh/a3f1d24b597913303afcb6c568f4b042) * use userData
This commit is contained in:
parent
fcdc0db1e1
commit
43d5192f31
3
index.js
3
index.js
|
@ -245,6 +245,8 @@ class Channel {
|
||||||
|
|
||||||
module.exports = class Protomux {
|
module.exports = class Protomux {
|
||||||
constructor (stream, { alloc } = {}) {
|
constructor (stream, { alloc } = {}) {
|
||||||
|
if (stream.userData === null) stream.userData = this
|
||||||
|
|
||||||
this.isProtomux = true
|
this.isProtomux = true
|
||||||
this.stream = stream
|
this.stream = stream
|
||||||
this.corked = 0
|
this.corked = 0
|
||||||
|
@ -271,6 +273,7 @@ module.exports = class Protomux {
|
||||||
}
|
}
|
||||||
|
|
||||||
static from (stream, opts) {
|
static from (stream, opts) {
|
||||||
|
if (stream.userData && stream.userData.isProtomux) return stream.userData
|
||||||
if (stream.isProtomux) return stream
|
if (stream.isProtomux) return stream
|
||||||
return new this(stream, opts)
|
return new this(stream, opts)
|
||||||
}
|
}
|
||||||
|
|
46
test.js
46
test.js
|
@ -345,6 +345,52 @@ test('alias', function (t) {
|
||||||
bp.addMessage({ encoding: c.string }).send('hello world')
|
bp.addMessage({ encoding: c.string }).send('hello world')
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test('deduplicate muxers', function (t) {
|
||||||
|
const sa = new SecretStream(true)
|
||||||
|
const sb = new SecretStream(false)
|
||||||
|
|
||||||
|
replicate({ stream: sa }, { stream: sb })
|
||||||
|
|
||||||
|
const a = Protomux.from(sa)
|
||||||
|
const foo = a.createChannel({
|
||||||
|
protocol: 'foo',
|
||||||
|
onopen () { t.pass('a remote opened') }
|
||||||
|
})
|
||||||
|
|
||||||
|
foo.open()
|
||||||
|
|
||||||
|
foo.addMessage({
|
||||||
|
encoding: c.string,
|
||||||
|
onmessage (message) { t.is(message, 'hello foo') }
|
||||||
|
})
|
||||||
|
|
||||||
|
const bfoo = Protomux.from(sb).createChannel({ protocol: 'foo' })
|
||||||
|
|
||||||
|
// Another Protomux instance for another protocol
|
||||||
|
const a2 = Protomux.from(sa)
|
||||||
|
const bar = a2.createChannel({
|
||||||
|
protocol: 'bar',
|
||||||
|
onopen () { t.pass('a remote opened') }
|
||||||
|
})
|
||||||
|
|
||||||
|
bar.open()
|
||||||
|
|
||||||
|
bar.addMessage({
|
||||||
|
encoding: c.string,
|
||||||
|
onmessage (message) { t.is(message, 'hello bar') }
|
||||||
|
})
|
||||||
|
|
||||||
|
const bbar = Protomux.from(sb).createChannel({ protocol: 'bar' })
|
||||||
|
|
||||||
|
t.plan(4)
|
||||||
|
|
||||||
|
bfoo.open()
|
||||||
|
bfoo.addMessage({ encoding: c.string }).send('hello foo')
|
||||||
|
|
||||||
|
bbar.open()
|
||||||
|
bbar.addMessage({ encoding: c.string }).send('hello bar')
|
||||||
|
})
|
||||||
|
|
||||||
function replicate (a, b) {
|
function replicate (a, b) {
|
||||||
a.stream.rawStream.pipe(b.stream.rawStream).pipe(a.stream.rawStream)
|
a.stream.rawStream.pipe(b.stream.rawStream).pipe(a.stream.rawStream)
|
||||||
}
|
}
|
||||||
|
|
Reference in New Issue