Multiplex multiple message oriented protocols over a stream
This repository has been archived on 2023-04-09. You can view files and clone it, but cannot push or open issues or pull requests.
Go to file
Derrick Hammer e7aa69a59b
*make message encoding/decoding async for use with the kernel
2023-04-08 21:18:14 -04:00
.github/workflows first commit 2021-11-10 15:15:09 +01:00
.gitignore Add `aliases` option (#4) 2022-08-13 00:52:34 +02:00
LICENSE first commit 2021-11-10 15:15:09 +01:00
README.md update readme with mux.pair and mux.opened (#9) 2023-02-06 18:43:05 +01:00
index.js *make message encoding/decoding async for use with the kernel 2023-04-08 21:18:14 -04:00
package.json *add prettier 2023-04-05 03:35:15 -04:00
test.js Add test: open + send + close on same tick (#8) 2022-11-08 22:13:32 +01:00

README.md

protomux

Multiplex multiple message oriented protocols over a stream

npm install protomux

Usage

const Protomux = require('protomux')
const c = require('compact-encoding')

// By framed stream, it has be a stream that preserves the messages, ie something that length prefixes
// like @hyperswarm/secret-stream

const mux = new Protomux(aStreamThatFrames)

// Now add some protocol channels

const cool = mux.createChannel({
  protocol: 'cool-protocol',
  id: Buffer.from('optional binary id'),
  onopen () {
    console.log('the other side opened this protocol!')
  },
  onclose () {
    console.log('either side closed the protocol')
  }
})

// And add some messages

const one = cool.addMessage({
  encoding: c.string,
  onmessage (m) {
    console.log('recv message (1)', m)
  }
})

const two = cool.addMessage({
  encoding: c.bool,
  onmessage (m) {
    console.log('recv message (2)', m)
  }
})

// open the channel

cool.open()

// And send some data

one.send('a string')
two.send(true)

API

mux = new Protomux(stream, [options])

Make a new instance. stream should be a framed stream, preserving the messages written.

Options include:

{
  // Called when the muxer wants to allocate a message that is written, defaults to Buffer.allocUnsafe.
  alloc (size) {}
}

mux = Protomux.from(stream | muxer, [options])

Helper to accept either an existing muxer instance or a stream (which creates a new one).

const channel = mux.createChannel(opts)

Add a new protocol channel.

Options include:

{
  // Used to match the protocol
  protocol: 'name of the protocol',
  // Optional additional binary id to identify this channel
  id: buffer,
  // Optional encoding for a handshake
  handshake: encoding,
  // Optional array of messages types you want to send/receive.
  messages: [],
  // Called when the remote side adds this protocol.
  // Errors here are caught and forwared to stream.destroy
  async onopen (handshake) {},
  // Called when the channel closes - ie the remote side closes or rejects this protocol or we closed it.
  // Errors here are caught and forwared to stream.destroy
  async onclose () {},
  // Called after onclose when all pending promises has resolved.
  async ondestroy () {}
}

Sessions are paired based on a queue, so the first remote channel with the same protocol and id.

NOTE: mux.createChannel returns null if the channel should not be opened, ie it's a duplicate channel or the remote has already closed this one.

If you want multiple sessions with the same protocol and id, set unique: false as an option.

const opened = mux.opened({ protocol, id })

Boolean that indicates if the channel is opened.

mux.pair({ protocol, id }, callback)

Register a callback to be called everytime a new channel is requested.

mux.unpair({ protocol, id })

Unregisters the pair callback.

channel.open([handshake])

Open the channel.

const m = channel.addMessage(opts)

Add/register a message type for a certain encoding. Options include:

{
  // compact-encoding specifying how to encode/decode this message
  encoding: c.binary,
  // Called when the remote side sends a message.
  // Errors here are caught and forwared to stream.destroy
  async onmessage (message) { }
}

m.send(data)

Send a message.

m.onmessage

Function that is called when a message arrives.

m.encoding

The encoding for this message.

channel.close()

Closes the protocol channel.

channel.cork()

Corking the protocol channel, makes it buffer messages and send them all in a batch when it uncorks.

channel.uncork()

Uncork and send the batch.

mux.cork()

Same as channel.cork but on the muxer instance.

mux.uncork()

Same as channel.uncork but on the muxer instance.

for (const channel of muxer) { ... }

The muxer instance is iterable, so you can iterate over all the channels.

License

MIT