commit ce309533f4571a82c19861144a1bca2494b89a43 Author: Mathias Buus Date: Wed Nov 10 15:15:09 2021 +0100 first commit diff --git a/.github/workflows/test-node.yml b/.github/workflows/test-node.yml new file mode 100644 index 0000000..a93c431 --- /dev/null +++ b/.github/workflows/test-node.yml @@ -0,0 +1,23 @@ +name: Build Status +on: + push: + branches: + - master + pull_request: + branches: + - master +jobs: + build: + strategy: + matrix: + node-version: [lts/*] + os: [ubuntu-latest, macos-latest, windows-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v2 + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v2 + with: + node-version: ${{ matrix.node-version }} + - run: npm install + - run: npm test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..df9289e --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +node_modules +sandbox.js +sandbox +coverage diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..702b8e7 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2021 Mathias Buus + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..11c898b --- /dev/null +++ b/README.md @@ -0,0 +1,17 @@ +# protomux + +Multiplex multiple message oriented protocols over a stream + +``` +npm install protomux +``` + +## Usage + +``` js +const Protomux = require('protomux') +``` + +## License + +MIT diff --git a/index.js b/index.js new file mode 100644 index 0000000..b891a3f --- /dev/null +++ b/index.js @@ -0,0 +1,305 @@ +const c = require('compact-encoding') +const m = require('./messages') + +class Protocol { + constructor (muxer, offset, protocol) { + this.muxer = muxer + this.stream = muxer.stream + this.start = offset + this.end = offset + protocol.messages.length + this.name = protocol.name + this.version = protocol.version || { major: 0, minor: 0 } + this.messages = protocol.messages.length + this.remoteOpened = false + this.removed = false + this.encodings = protocol.messages + this.onmessage = noop + this.onopen = noop + this.onclose = noop + } + + get corked () { + return this.muxer.corked + } + + cork () { + this.muxer.cork() + } + + uncork () { + this.muxer.uncork() + } + + send (type, message) { + const t = this.start + type + const enc = this.encodings[type] + + if (this.muxer.corked > 0) { + this.muxer._batch.push({ type: t, encoding: enc, message }) + return false + } + + const state = { start: 0, end: 0, buffer: null } + + c.uint.preencode(state, t) + enc.preencode(state, message) + + state.buffer = this.stream.alloc(state.end) + + c.uint.encode(state, t) + enc.encode(state, message) + + return this.stream.write(state.buffer) + } + + recv (type, state) { + this.onmessage(type, this.encodings[type].decode(state)) + } +} + +module.exports = class Protomux { + constructor (stream, protocols, opts = {}) { + this.stream = stream + + this.protocols = [] + this.offset = 2 + + this.remoteProtocols = [] + this.remoteOffset = 2 + + this.remoteHandshake = null + this.onhandshake = noop + + this.corked = 0 + + this._batch = null + this._unmatchedProtocols = [] + + for (const p of protocols) this.addProtocol(p) + + this.stream.on('data', this._ondata.bind(this)) + + if (opts.cork) this.cork() + this._sendHandshake() + } + + remoteOpened (name) { + for (const p of this.remoteProtocols) { + if (p.local.name === name) return true + } + for (const { remote } of this._unmatchedProtocols) { + if (remote.name === name) return true + } + return false + } + + addProtocol (p) { + const local = new Protocol(this, this.offset, p) + + this.protocols.push(local) + this.offset += p.messages.length + + for (let i = 0; i < this._unmatchedProtocols.length; i++) { + const { start, remote } = this._unmatchedProtocols[i] + if (remote.name !== p.name || remote.version.major !== local.version.major) continue + local.remoteOpened = true + this._unmatchedProtocols.splice(i, 1) + const end = start + Math.min(p.messages, local.messages) + this.remoteProtocols.push({ local, remote, start, end }) + break + } + + return local + } + + removeProtocol (p) { + for (let i = 0; i < this.protocols.length; i++) { + const local = this.protocols[i] + if (local.name !== p.name || local.version.major !== p.version.major) continue + p.removed = true + this.protocols.splice(i, 1) + } + + for (let i = 0; i < this.remoteProtocols.length; i++) { + const { local, remote, start } = this.remoteProtocols[i] + if (local.name !== p.name || local.version.major !== p.version.major) continue + this.remoteProtocols.splice(i, 1) + this._unmatchedProtocols.push({ start, remote }) + } + } + + addRemoteProtocol (p) { + const local = this.get(p.name) + const start = this.remoteOffset + + this.remoteOffset += p.messages + + if (!local || local.version.major !== p.version.major) { + this._unmatchedProtocols.push({ start, remote: p }) + return + } + + if (local.remoteOpened) { + this.destroy(new Error('Remote sent duplicate protocols')) + return + } + + const end = start + Math.min(p.messages, local.messages) + + this.remoteProtocols.push({ local, remote: p, start, end }) + + local.remoteOpened = true + local.onopen() + } + + removeRemoteProtocol (p) { + for (let i = 0; i < this.remoteProtocols.length; i++) { + const { local } = this.remoteProtocols[i] + if (local.name !== p.name || local.version.major !== p.version.major) continue + this.remoteProtocols.splice(i, 1) + local.remoteOpened = false + local.onclose() + break + } + + for (let i = 0; i < this._unmatchedProtocols.length; i++) { + const { remote } = this._unmatchedProtocols[i] + if (remote.name !== p.name || remote.version.major !== p.version.major) continue + this._unmatchedProtocols.splice(i, 1) + break + } + } + + _ondata (buffer) { + const state = { start: 0, end: buffer.byteLength, buffer } + + try { + this._recv(state, false) + } catch (err) { + this.destroy(err) + } + } + + _recv (state) { + const t = c.uint.decode(state) + + if (t < 2) { + if (t === 0) { + this._recvBatch(state) + } else { + this._recvHandshake(state) + } + return + } + + for (let i = 0; i < this.remoteProtocols.length; i++) { + const p = this.remoteProtocols[i] + + if (p.start <= t && t <= p.end) { + p.local.recv(t - p.start, state) + break + } + } + + state.start = state.end + } + + _recvBatch (state) { + const end = state.end + + while (state.start < state.end) { + const len = c.uint.decode(state) + state.end = state.start + len + this._recv(state, true) + state.end = end + } + } + + _recvHandshake (state) { + if (this.remoteHandshake !== null) { + this.destroy(new Error('Double handshake')) + return + } + + this.remoteHandshake = m.handshake.decode(state) + for (const p of this.remoteHandshake.protocols) this.addRemoteProtocol(p) + + this.onhandshake(this.remoteHandshake) + } + + destroy (err) { + this.stream.destroy(err) + } + + get (name) { + for (const p of this.protocols) { + if (p.name === name) return p + } + return null + } + + cork () { + if (++this.corked === 1) this._batch = [] + } + + uncork () { + if (--this.corked !== 0) return + + const batch = this._batch + this._batch = null + + const state = { start: 0, end: 1, buffer: null } + const lens = new Array(batch.length) + + for (let i = 0; i < batch.length; i++) { + const b = batch[i] + const end = state.end + + c.uint.preencode(state, b.type) + b.encoding.preencode(state, b.message) + c.uint.preencode(state, lens[i] = (state.end - end)) + } + + state.buffer = this.stream.alloc(state.end) + state.buffer[state.start++] = 0 + + for (let i = 0; i < batch.length; i++) { + const b = batch[i] + + c.uint.encode(state, lens[i]) + c.uint.encode(state, b.type) + b.encoding.encode(state, b.message) + } + + this.stream.write(state.buffer) + } + + sendKeepAlive () { + this.stream.write(this.stream.alloc(0)) + } + + _sendHandshake () { + const hs = { + protocols: this.protocols + } + + if (this.corked > 0) { + this._batch.push({ type: 0, encoding: m.handshake, message: hs }) + return + } + + const state = { start: 0, end: 0, buffer: null } + + c.uint.preencode(state, 1) + m.handshake.preencode(state, hs) + + state.buffer = this.stream.alloc(state.end) + + c.uint.encode(state, 1) + m.handshake.encode(state, hs) + + this.stream.write(state.buffer) + } +} + +function noop () {} diff --git a/package.json b/package.json new file mode 100644 index 0000000..e911340 --- /dev/null +++ b/package.json @@ -0,0 +1,21 @@ +{ + "name": "protomux", + "version": "0.0.0", + "description": "Multiplex multiple message oriented protocols over a stream", + "main": "index.js", + "dependencies": {}, + "devDependencies": { + "brittle": "^1.6.0", + "standard": "^16.0.4" + }, + "repository": { + "type": "git", + "url": "https://github.com/mafintosh/protomux.git" + }, + "author": "Mathias Buus (@mafintosh)", + "license": "MIT", + "bugs": { + "url": "https://github.com/mafintosh/protomux/issues" + }, + "homepage": "https://github.com/mafintosh/protomux" +} diff --git a/test.js b/test.js new file mode 100644 index 0000000..8aa14c0 --- /dev/null +++ b/test.js @@ -0,0 +1,144 @@ +const Protomux = require('./') +const SecretStream = require('@hyperswarm/secret-stream') +const test = require('brittle') +const c = require('compact-encoding') + +test('basic', function (t) { + const a = new Protomux(new SecretStream(true), [{ + name: 'foo', + messages: [c.string] + }]) + + const b = new Protomux(new SecretStream(false), [{ + name: 'foo', + messages: [c.string] + }]) + + replicate(a, b) + + const ap = a.get('foo') + const bp = b.get('foo') + + t.plan(3) + + ap.onopen = function () { + t.pass('a opened') + } + + ap.onmessage = function (type, message) { + t.is(type, 0) + t.is(message, 'hello world') + } + + bp.send(0, 'hello world') +}) + +test('echo message', function (t) { + const a = new Protomux(new SecretStream(true), [{ + name: 'foo', + messages: [c.string] + }]) + + const b = new Protomux(new SecretStream(false), [{ + name: 'other', + messages: [c.bool, c.bool] + }, { + name: 'foo', + messages: [c.string] + }]) + + replicate(a, b) + + const ap = a.get('foo') + const bp = b.get('foo') + + t.plan(3) + + ap.onmessage = function (type, message) { + ap.send(type, 'echo: ' + message) + } + + bp.send(0, 'hello world') + + bp.onopen = function () { + t.pass('b opened') + } + + bp.onmessage = function (type, message) { + t.is(type, 0) + t.is(message, 'echo: hello world') + } +}) + +test('multi message', function (t) { + const a = new Protomux(new SecretStream(true), [{ + name: 'other', + messages: [c.bool, c.bool] + }, { + name: 'multi', + messages: [c.int, c.string, c.string] + }]) + + const b = new Protomux(new SecretStream(false), [{ + name: 'multi', + messages: [c.int, c.string] + }]) + + replicate(a, b) + + t.plan(4) + + const ap = a.get('multi') + const bp = b.get('multi') + + ap.send(0, 42) + ap.send(1, 'a string with 42') + ap.send(2, 'should be ignored') + + const expected = [ + [0, 42], + [1, 'a string with 42'] + ] + + bp.onmessage = function (type, message) { + const e = expected.shift() + t.is(type, e[0]) + t.is(message, e[1]) + } +}) + +// test('corks', function (t) { +// const a = new Protomux(new SecretStream(true), [{ +// name: 'other', +// messages: [c.bool, c.bool] +// }, { +// name: 'multi', +// messages: [c.int, c.string] +// }]) + +// const b = new Protomux(new SecretStream(false), [{ +// name: 'multi', +// messages: [c.int, c.string] +// }]) + +// replicate(a, b) + +// t.plan(4) + +// const ap = a.get('multi') +// const bp = b.get('multi') + +// // ap.cork() +// // ap.send(0, 1) +// // ap.send(0, 2) +// // ap.send(0, 3) +// // ap.uncork() + +// bp.onmessage = function (type, message) { +// console.log(type, message) +// } +// }) + +function replicate (a, b) { + a.stream.rawStream.pipe(b.stream.rawStream).pipe(a.stream.rawStream) +}