This commit is contained in:
commit
3b1c8662d6
|
@ -1,4 +1,5 @@
|
|||
node_modules
|
||||
package-lock.json
|
||||
sandbox.js
|
||||
sandbox.mjs
|
||||
sandbox
|
||||
|
|
32
index.js
32
index.js
|
@ -5,11 +5,13 @@ const safetyCatch = require('safety-catch')
|
|||
|
||||
const MAX_BUFFERED = 32768
|
||||
const MAX_BACKLOG = Infinity // TODO: impl "open" backpressure
|
||||
const MAX_BATCH = 8 * 1024 * 1024
|
||||
|
||||
class Channel {
|
||||
constructor (mux, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy) {
|
||||
constructor (mux, info, userData, protocol, aliases, id, handshake, messages, onopen, onclose, ondestroy) {
|
||||
this.userData = userData
|
||||
this.protocol = protocol
|
||||
this.aliases = aliases
|
||||
this.id = id
|
||||
this.handshake = null
|
||||
this.messages = []
|
||||
|
@ -315,14 +317,14 @@ module.exports = class Protomux {
|
|||
return info ? info.opened > 0 : false
|
||||
}
|
||||
|
||||
createChannel ({ userData = null, protocol, id = null, unique = true, handshake = null, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) {
|
||||
createChannel ({ userData = null, protocol, aliases = [], id = null, unique = true, handshake = null, messages = [], onopen = noop, onclose = noop, ondestroy = noop }) {
|
||||
if (this.stream.destroyed) return null
|
||||
|
||||
const info = this._get(protocol, id)
|
||||
const info = this._get(protocol, id, aliases)
|
||||
if (unique && info.opened > 0) return null
|
||||
|
||||
if (info.incoming.length === 0) {
|
||||
return new Channel(this, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy)
|
||||
return new Channel(this, info, userData, protocol, aliases, id, handshake, messages, onopen, onclose, ondestroy)
|
||||
}
|
||||
|
||||
this._remoteBacklog--
|
||||
|
@ -331,7 +333,7 @@ module.exports = class Protomux {
|
|||
const r = this._remote[remoteId - 1]
|
||||
if (r === null) return null
|
||||
|
||||
const session = new Channel(this, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy)
|
||||
const session = new Channel(this, info, userData, protocol, aliases, id, handshake, messages, onopen, onclose, ondestroy)
|
||||
|
||||
session._remoteId = remoteId
|
||||
session._fullyOpenSoon()
|
||||
|
@ -340,6 +342,12 @@ module.exports = class Protomux {
|
|||
}
|
||||
|
||||
_pushBatch (localId, buffer) {
|
||||
if (this._batchState.end >= MAX_BATCH) {
|
||||
this._sendBatch(this._batch, this._batchState)
|
||||
this._batch = []
|
||||
this._batchState = { buffer: null, start: 0, end: 1 }
|
||||
}
|
||||
|
||||
if (this._batch.length === 0 || this._batch[this._batch.length - 1].localId !== localId) {
|
||||
this._batchState.end++
|
||||
c.uint.preencode(this._batchState, localId)
|
||||
|
@ -371,20 +379,30 @@ module.exports = class Protomux {
|
|||
this.stream.write(state.buffer)
|
||||
}
|
||||
|
||||
_get (protocol, id) {
|
||||
_get (protocol, id, aliases = []) {
|
||||
const key = toKey(protocol, id)
|
||||
|
||||
let info = this._infos.get(key)
|
||||
if (info) return info
|
||||
|
||||
info = { key, protocol, id, pairing: 0, opened: 0, incoming: [], outgoing: [] }
|
||||
info = { key, protocol, aliases: [], id, pairing: 0, opened: 0, incoming: [], outgoing: [] }
|
||||
this._infos.set(key, info)
|
||||
|
||||
for (const alias of aliases) {
|
||||
const key = toKey(alias, id)
|
||||
info.aliases.push(key)
|
||||
|
||||
this._infos.set(key, info)
|
||||
}
|
||||
|
||||
return info
|
||||
}
|
||||
|
||||
_gc (info) {
|
||||
if (info.opened === 0 && info.outgoing.length === 0 && info.incoming.length === 0) {
|
||||
this._infos.delete(info.key)
|
||||
|
||||
for (const alias of info.aliases) this._infos.delete(alias)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "protomux",
|
||||
"version": "3.2.1",
|
||||
"version": "3.3.0",
|
||||
"description": "Multiplex multiple message oriented protocols over a stream",
|
||||
"main": "index.js",
|
||||
"dependencies": {
|
||||
|
@ -10,8 +10,8 @@
|
|||
"safety-catch": "^1.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@hyperswarm/secret-stream": "^5.1.4",
|
||||
"brittle": "^2.0.1",
|
||||
"@hyperswarm/secret-stream": "^6.0.0",
|
||||
"brittle": "^3.0.0",
|
||||
"standard": "^16.0.4"
|
||||
},
|
||||
"scripts": {
|
||||
|
|
80
test.js
80
test.js
|
@ -2,6 +2,7 @@ const Protomux = require('./')
|
|||
const SecretStream = require('@hyperswarm/secret-stream')
|
||||
const test = require('brittle')
|
||||
const c = require('compact-encoding')
|
||||
const b4a = require('b4a')
|
||||
|
||||
test('basic', function (t) {
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
|
@ -189,6 +190,52 @@ test('corks', function (t) {
|
|||
}
|
||||
})
|
||||
|
||||
test('mega cork', function (t) {
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
|
||||
a.cork()
|
||||
|
||||
const ap = a.createChannel({
|
||||
protocol: 'mega'
|
||||
})
|
||||
|
||||
ap.open()
|
||||
|
||||
const a1 = ap.addMessage({ encoding: c.buffer })
|
||||
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
||||
const bp = b.createChannel({
|
||||
protocol: 'mega'
|
||||
})
|
||||
|
||||
bp.open()
|
||||
|
||||
const b1 = bp.addMessage({ encoding: c.buffer })
|
||||
|
||||
replicate(a, b)
|
||||
|
||||
t.plan(32 + 4)
|
||||
|
||||
const buf = b4a.alloc(1024 * 1024)
|
||||
const expected = []
|
||||
|
||||
for (let i = 0; i < 32; i++) {
|
||||
a1.send(buf)
|
||||
expected.push(buf)
|
||||
}
|
||||
|
||||
a.uncork()
|
||||
|
||||
b.stream.on('data', function (data) {
|
||||
t.ok(data.byteLength > 8000000, 'got big message')
|
||||
})
|
||||
|
||||
b1.onmessage = function (message) {
|
||||
t.alike(message, expected.shift())
|
||||
}
|
||||
})
|
||||
|
||||
test('handshake', function (t) {
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
@ -265,6 +312,39 @@ test('pipeline close and rejections', function (t) {
|
|||
}
|
||||
})
|
||||
|
||||
test('alias', function (t) {
|
||||
const a = new Protomux(new SecretStream(true))
|
||||
const b = new Protomux(new SecretStream(false))
|
||||
|
||||
replicate(a, b)
|
||||
|
||||
const p = a.createChannel({
|
||||
protocol: 'foo',
|
||||
aliases: ['bar'],
|
||||
onopen () {
|
||||
t.pass('a remote opened')
|
||||
}
|
||||
})
|
||||
|
||||
p.open()
|
||||
|
||||
p.addMessage({
|
||||
encoding: c.string,
|
||||
onmessage (message) {
|
||||
t.is(message, 'hello world')
|
||||
}
|
||||
})
|
||||
|
||||
const bp = b.createChannel({
|
||||
protocol: 'bar'
|
||||
})
|
||||
|
||||
t.plan(2)
|
||||
|
||||
bp.open()
|
||||
bp.addMessage({ encoding: c.string }).send('hello world')
|
||||
})
|
||||
|
||||
test('deduplicate muxers', function (t) {
|
||||
const sa = new SecretStream(true)
|
||||
const sb = new SecretStream(false)
|
||||
|
|
Reference in New Issue