diff --git a/index.js b/index.js index 08b82bc..2df9936 100644 --- a/index.js +++ b/index.js @@ -5,6 +5,7 @@ const safetyCatch = require('safety-catch') const MAX_BUFFERED = 32768 const MAX_BACKLOG = Infinity // TODO: impl "open" backpressure +const MAX_BATCH = 8 * 1024 * 1024 class Channel { constructor (mux, info, userData, protocol, id, handshake, messages, onopen, onclose, ondestroy) { @@ -337,6 +338,12 @@ module.exports = class Protomux { } _pushBatch (localId, buffer) { + if (this._batchState.end >= MAX_BATCH) { + this._sendBatch(this._batch, this._batchState) + this._batch = [] + this._batchState = { buffer: null, start: 0, end: 1 } + } + if (this._batch.length === 0 || this._batch[this._batch.length - 1].localId !== localId) { this._batchState.end++ c.uint.preencode(this._batchState, localId) diff --git a/package.json b/package.json index 9e1cac4..e363092 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,7 @@ "safety-catch": "^1.0.1" }, "devDependencies": { - "@hyperswarm/secret-stream": "^5.1.4", + "@hyperswarm/secret-stream": "^6.0.0", "brittle": "^2.0.1", "standard": "^16.0.4" }, diff --git a/test.js b/test.js index af58cd0..6eefcdd 100644 --- a/test.js +++ b/test.js @@ -2,6 +2,7 @@ const Protomux = require('./') const SecretStream = require('@hyperswarm/secret-stream') const test = require('brittle') const c = require('compact-encoding') +const b4a = require('b4a') test('basic', function (t) { const a = new Protomux(new SecretStream(true)) @@ -189,6 +190,52 @@ test('corks', function (t) { } }) +test('mega cork', function (t) { + const a = new Protomux(new SecretStream(true)) + + a.cork() + + const ap = a.createChannel({ + protocol: 'mega' + }) + + ap.open() + + const a1 = ap.addMessage({ encoding: c.buffer }) + + const b = new Protomux(new SecretStream(false)) + + const bp = b.createChannel({ + protocol: 'mega' + }) + + bp.open() + + const b1 = bp.addMessage({ encoding: c.buffer }) + + replicate(a, b) + + t.plan(32 + 4) + + const buf = b4a.alloc(1024 * 1024) + const expected = [] + + for (let i = 0; i < 32; i++) { + a1.send(buf) + expected.push(buf) + } + + a.uncork() + + b.stream.on('data', function (data) { + t.ok(data.byteLength > 8000000, 'got big message') + }) + + b1.onmessage = function (message) { + t.alike(message, expected.shift()) + } +}) + test('handshake', function (t) { const a = new Protomux(new SecretStream(true)) const b = new Protomux(new SecretStream(false))