auto split corks if they get too big
This commit is contained in:
parent
3a2c88c5cb
commit
5606c5ef85
7
index.js
7
index.js
|
@ -5,6 +5,7 @@ const safetyCatch = require('safety-catch')
|
||||||
|
|
||||||
const MAX_BUFFERED = 32768
|
const MAX_BUFFERED = 32768
|
||||||
const MAX_BACKLOG = Infinity // TODO: impl "open" backpressure
|
const MAX_BACKLOG = Infinity // TODO: impl "open" backpressure
|
||||||
|
const MAX_BATCH = 8 * 1024 * 1024
|
||||||
|
|
||||||
class Channel {
|
class Channel {
|
||||||
constructor (mux, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy) {
|
constructor (mux, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy) {
|
||||||
|
@ -337,6 +338,12 @@ module.exports = class Protomux {
|
||||||
}
|
}
|
||||||
|
|
||||||
_pushBatch (localId, buffer) {
|
_pushBatch (localId, buffer) {
|
||||||
|
if (this._batchState.end >= MAX_BATCH) {
|
||||||
|
this._sendBatch(this._batch, this._batchState)
|
||||||
|
this._batch = []
|
||||||
|
this._batchState = { buffer: null, start: 0, end: 1 }
|
||||||
|
}
|
||||||
|
|
||||||
if (this._batch.length === 0 || this._batch[this._batch.length - 1].localId !== localId) {
|
if (this._batch.length === 0 || this._batch[this._batch.length - 1].localId !== localId) {
|
||||||
this._batchState.end++
|
this._batchState.end++
|
||||||
c.uint.preencode(this._batchState, localId)
|
c.uint.preencode(this._batchState, localId)
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
"safety-catch": "^1.0.1"
|
"safety-catch": "^1.0.1"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@hyperswarm/secret-stream": "^5.1.4",
|
"@hyperswarm/secret-stream": "^6.0.0",
|
||||||
"brittle": "^2.0.1",
|
"brittle": "^2.0.1",
|
||||||
"standard": "^16.0.4"
|
"standard": "^16.0.4"
|
||||||
},
|
},
|
||||||
|
|
47
test.js
47
test.js
|
@ -2,6 +2,7 @@ const Protomux = require('./')
|
||||||
const SecretStream = require('@hyperswarm/secret-stream')
|
const SecretStream = require('@hyperswarm/secret-stream')
|
||||||
const test = require('brittle')
|
const test = require('brittle')
|
||||||
const c = require('compact-encoding')
|
const c = require('compact-encoding')
|
||||||
|
const b4a = require('b4a')
|
||||||
|
|
||||||
test('basic', function (t) {
|
test('basic', function (t) {
|
||||||
const a = new Protomux(new SecretStream(true))
|
const a = new Protomux(new SecretStream(true))
|
||||||
|
@ -189,6 +190,52 @@ test('corks', function (t) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test('mega cork', function (t) {
|
||||||
|
const a = new Protomux(new SecretStream(true))
|
||||||
|
|
||||||
|
a.cork()
|
||||||
|
|
||||||
|
const ap = a.createChannel({
|
||||||
|
protocol: 'mega'
|
||||||
|
})
|
||||||
|
|
||||||
|
ap.open()
|
||||||
|
|
||||||
|
const a1 = ap.addMessage({ encoding: c.buffer })
|
||||||
|
|
||||||
|
const b = new Protomux(new SecretStream(false))
|
||||||
|
|
||||||
|
const bp = b.createChannel({
|
||||||
|
protocol: 'mega'
|
||||||
|
})
|
||||||
|
|
||||||
|
bp.open()
|
||||||
|
|
||||||
|
const b1 = bp.addMessage({ encoding: c.buffer })
|
||||||
|
|
||||||
|
replicate(a, b)
|
||||||
|
|
||||||
|
t.plan(32 + 4)
|
||||||
|
|
||||||
|
const buf = b4a.alloc(1024 * 1024)
|
||||||
|
const expected = []
|
||||||
|
|
||||||
|
for (let i = 0; i < 32; i++) {
|
||||||
|
a1.send(buf)
|
||||||
|
expected.push(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.uncork()
|
||||||
|
|
||||||
|
b.stream.on('data', function (data) {
|
||||||
|
t.ok(data.byteLength > 8000000, 'got big message')
|
||||||
|
})
|
||||||
|
|
||||||
|
b1.onmessage = function (message) {
|
||||||
|
t.alike(message, expected.shift())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
test('handshake', function (t) {
|
test('handshake', function (t) {
|
||||||
const a = new Protomux(new SecretStream(true))
|
const a = new Protomux(new SecretStream(true))
|
||||||
const b = new Protomux(new SecretStream(false))
|
const b = new Protomux(new SecretStream(false))
|
||||||
|
|
Reference in New Issue