From c70c9f989ffca7ddf99559fd2c8c58b4d2622e43 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 27 Dec 2021 16:09:34 +0100 Subject: [PATCH] fix some bugs, iterate the api --- index.js | 58 +++++++++++++++++++++++++++++------------------ messages.js | 2 +- package.json | 5 +++- test.js | 64 ++++++++++++++++++++++++++++++++-------------------- 4 files changed, 80 insertions(+), 49 deletions(-) diff --git a/index.js b/index.js index b891a3f..6b4cc00 100644 --- a/index.js +++ b/index.js @@ -34,6 +34,10 @@ class Protocol { const t = this.start + type const enc = this.encodings[type] + if (this.muxer._handshakeSent === false) { + this.muxer._sendHandshake() + } + if (this.muxer.corked > 0) { this.muxer._batch.push({ type: t, encoding: enc, message }) return false @@ -44,7 +48,7 @@ class Protocol { c.uint.preencode(state, t) enc.preencode(state, message) - state.buffer = this.stream.alloc(state.end) + state.buffer = this.muxer._alloc(state.end) c.uint.encode(state, t) enc.encode(state, message) @@ -74,21 +78,21 @@ module.exports = class Protomux { this._batch = null this._unmatchedProtocols = [] + this._handshakeSent = false + this._alloc = opts.alloc || (typeof stream.alloc === 'function' ? stream.alloc.bind(stream) : Buffer.allocUnsafe) for (const p of protocols) this.addProtocol(p) this.stream.on('data', this._ondata.bind(this)) - - if (opts.cork) this.cork() - this._sendHandshake() + queueMicrotask(this._sendHandshake.bind(this)) } - remoteOpened (name) { - for (const p of this.remoteProtocols) { - if (p.local.name === name) return true + remoteOpened (name, version) { + for (const { remote } of this.remoteProtocols) { + if (remote.name === name || (version === undefined || version.major === remote.version.major)) return true } for (const { remote } of this._unmatchedProtocols) { - if (remote.name === name) return true + if (remote.name === name || (version === undefined || version.major === remote.version.major)) return true } return false } @@ -101,10 +105,10 @@ module.exports = class Protomux { for (let i = 0; i < this._unmatchedProtocols.length; i++) { const { start, remote } = this._unmatchedProtocols[i] - if (remote.name !== p.name || remote.version.major !== local.version.major) continue + if (remote.name !== local.name || remote.version.major !== local.version.major) continue local.remoteOpened = true this._unmatchedProtocols.splice(i, 1) - const end = start + Math.min(p.messages, local.messages) + const end = start + Math.min(remote.messages, local.messages) this.remoteProtocols.push({ local, remote, start, end }) break } @@ -113,22 +117,26 @@ module.exports = class Protomux { } removeProtocol (p) { + const { name, version = { major: 0, minor: 0 } } = typeof p === 'string' ? { name: p, version: undefined } : p + for (let i = 0; i < this.protocols.length; i++) { const local = this.protocols[i] - if (local.name !== p.name || local.version.major !== p.version.major) continue + if (local.name !== name || local.version.major !== version.major) continue p.removed = true this.protocols.splice(i, 1) } for (let i = 0; i < this.remoteProtocols.length; i++) { const { local, remote, start } = this.remoteProtocols[i] - if (local.name !== p.name || local.version.major !== p.version.major) continue + if (local.name !== name || local.version.major !== version.major) continue this.remoteProtocols.splice(i, 1) this._unmatchedProtocols.push({ start, remote }) } } addRemoteProtocol (p) { + if (!p.version) p = { name: p.name, version: { major: 0, minor: 0 }, messages: p.messages } + const local = this.get(p.name) const start = this.remoteOffset @@ -152,10 +160,10 @@ module.exports = class Protomux { local.onopen() } - removeRemoteProtocol (p) { + removeRemoteProtocol ({ name, version = { major: 0, minor: 0 } }) { for (let i = 0; i < this.remoteProtocols.length; i++) { const { local } = this.remoteProtocols[i] - if (local.name !== p.name || local.version.major !== p.version.major) continue + if (local.name !== name || local.version.major !== version.major) continue this.remoteProtocols.splice(i, 1) local.remoteOpened = false local.onclose() @@ -164,17 +172,19 @@ module.exports = class Protomux { for (let i = 0; i < this._unmatchedProtocols.length; i++) { const { remote } = this._unmatchedProtocols[i] - if (remote.name !== p.name || remote.version.major !== p.version.major) continue + if (remote.name !== name || remote.version.major !== version.major) continue this._unmatchedProtocols.splice(i, 1) break } } _ondata (buffer) { + if (buffer.byteLength === 0) return // keep alive + const state = { start: 0, end: buffer.byteLength, buffer } try { - this._recv(state, false) + this._recv(state) } catch (err) { this.destroy(err) } @@ -195,7 +205,7 @@ module.exports = class Protomux { for (let i = 0; i < this.remoteProtocols.length; i++) { const p = this.remoteProtocols[i] - if (p.start <= t && t <= p.end) { + if (p.start <= t && t < p.end) { p.local.recv(t - p.start, state) break } @@ -210,7 +220,7 @@ module.exports = class Protomux { while (state.start < state.end) { const len = c.uint.decode(state) state.end = state.start + len - this._recv(state, true) + this._recv(state) state.end = end } } @@ -228,6 +238,7 @@ module.exports = class Protomux { } destroy (err) { + this._handshakeSent = true // just to avoid sending it again this.stream.destroy(err) } @@ -260,7 +271,7 @@ module.exports = class Protomux { c.uint.preencode(state, lens[i] = (state.end - end)) } - state.buffer = this.stream.alloc(state.end) + state.buffer = this._alloc(state.end) state.buffer[state.start++] = 0 for (let i = 0; i < batch.length; i++) { @@ -275,16 +286,19 @@ module.exports = class Protomux { } sendKeepAlive () { - this.stream.write(this.stream.alloc(0)) + this.stream.write(this._alloc(0)) } _sendHandshake () { + if (this._handshakeSent) return + this._handshakeSent = true + const hs = { protocols: this.protocols } if (this.corked > 0) { - this._batch.push({ type: 0, encoding: m.handshake, message: hs }) + this._batch.push({ type: 1, encoding: m.handshake, message: hs }) return } @@ -293,7 +307,7 @@ module.exports = class Protomux { c.uint.preencode(state, 1) m.handshake.preencode(state, hs) - state.buffer = this.stream.alloc(state.end) + state.buffer = this._alloc(state.end) c.uint.encode(state, 1) m.handshake.encode(state, hs) diff --git a/messages.js b/messages.js index 4768a1a..b9c6efa 100644 --- a/messages.js +++ b/messages.js @@ -45,7 +45,7 @@ exports.handshake = { protocolArray.preencode(state, h.protocols) }, encode (state, h) { - state.buffer[state.start++] = 0 // reversed flags + state.buffer[state.start++] = 0 // reserved flags protocolArray.encode(state, h.protocols) }, decode (state) { diff --git a/package.json b/package.json index e911340..6ed6720 100644 --- a/package.json +++ b/package.json @@ -5,9 +5,12 @@ "main": "index.js", "dependencies": {}, "devDependencies": { - "brittle": "^1.6.0", + "brittle": "^2.0.1", "standard": "^16.0.4" }, + "scripts": { + "test": "standard && brittle test.js" + }, "repository": { "type": "git", "url": "https://github.com/mafintosh/protomux.git" diff --git a/test.js b/test.js index 8aa14c0..59f2587 100644 --- a/test.js +++ b/test.js @@ -107,37 +107,51 @@ test('multi message', function (t) { } }) -// test('corks', function (t) { -// const a = new Protomux(new SecretStream(true), [{ -// name: 'other', -// messages: [c.bool, c.bool] -// }, { -// name: 'multi', -// messages: [c.int, c.string] -// }]) +test('corks', function (t) { + const a = new Protomux(new SecretStream(true), [{ + name: 'other', + messages: [c.bool, c.bool] + }, { + name: 'multi', + messages: [c.int, c.string] + }]) -// const b = new Protomux(new SecretStream(false), [{ -// name: 'multi', -// messages: [c.int, c.string] -// }]) + const b = new Protomux(new SecretStream(false), [{ + name: 'multi', + messages: [c.int, c.string] + }]) -// replicate(a, b) + replicate(a, b) -// t.plan(4) + t.plan(8 + 1) -// const ap = a.get('multi') -// const bp = b.get('multi') + const ap = a.get('multi') + const bp = b.get('multi') -// // ap.cork() -// // ap.send(0, 1) -// // ap.send(0, 2) -// // ap.send(0, 3) -// // ap.uncork() + const expected = [ + [0, 1], + [0, 2], + [0, 3], + [1, 'a string'] + ] -// bp.onmessage = function (type, message) { -// console.log(type, message) -// } -// }) + ap.cork() + ap.send(0, 1) + ap.send(0, 2) + ap.send(0, 3) + ap.send(1, 'a string') + ap.uncork() + + b.stream.once('data', function (data) { + t.ok(expected.length === 0, 'received all messages in one data packet') + }) + + bp.onmessage = function (type, message) { + const e = expected.shift() + t.is(type, e[0]) + t.is(message, e[1]) + } +}) function replicate (a, b) { a.stream.rawStream.pipe(b.stream.rawStream).pipe(a.stream.rawStream)