Code Monkey home page Code Monkey logo

muxdemux's Introduction

muxdemux Build Status

multiplex and demultiplex (mux / demux) streams into an single stream (object mode or not)

Installation

npm i --save muxdemux

Usage

Example: muxdemux substreams

var muxdemux = require('muxdemux')
var mux = muxdemux()
var demux = muxdemux(handleSubstream)
mux.pipe(demux)
mux.substream('foo').write(new Buffer('hello world'))
mux.substream('bar').write(new Buffer('yolo'))
function handleSubstream (substream, name) {
  if (name === 'foo') {
    substream.once('data', function (data) {
      data.toString() // 'hello world'
    })
    substream.pipe(/* any other stream */)
  } else if (name === 'bar') {
    substream.once('data', function (data) {
      data.toString() // 'yolo'
    })
  }
}

Example: muxdemux substream events

Substream events are encoded and sent down the stream as data packets. Demux streams will decode these data packets and emit the events as if they occurred on downstream substreams themselves. All emitted from a mux.substream().emit(...) will be propagated to downstream substreams.

var muxdemux = require('muxdemux')
var mux = muxdemux()
var demux = muxdemux(handleSubstream)
mux.pipe(demux)
mux.substream('foo').emit('buffer-event', new Buffer('🔥'))
mux.substream('bar').emit('error', new Error('boom'))
mux.substream('qux').emit('other-event', { abc: 'hello' })
function handleSubstream (substream, name) {
  if (name === 'foo') {
    // buffers are encoded on json and reparsed as buffers
    substream.once('buffer-event', function (buf, {
      buf instanceof Buffer // true
      buf.toString() // '🔥'
    })
  } else if (name === 'bar') {
    substream.once('error', function (err) {
      // errors are encoded on json and reparsed as errors
      err instanceof Error // true
      err.message // 'boom'
      err.stack // 'Error: boom ...'
    })
  } else if (name === 'qux') {
    substream.once('other-event', function (data) {
      // errors are encoded on json and reparsed as errors
      typeof data // 'object'
      data // { abc: 'hello' }
    })
  }
}

Example: muxdemux object mode streams

For now muxdemux assumes that substreams share the same objectMode as their parents; substreams of objectMode:true mux/demux streams will also be objectMode:true and vice versa.

var muxdemux = require('muxdemux')
var mux = muxdemux.obj()
var demux = muxdemux.obj(handleSubstream)
mux.pipe(demux)
mux.substream('foo').write({ hello: 1 })
mux.substream('bar').write({ world: 2 })
function handleSubstream (substream, name) {
  if (name === 'foo') {
    substream.once('data', function (data) {
      data // { hello: 1 }
    })
    substream.pipe(/* any other stream */)
  } else if (name === 'bar') {
    substream.once('data', function (data) {
      data // { world: 2 }
    })
  }
}

Example: muxdemux substreams end

If all of a mux/demux's substreams end the mux/demux stream will also end. The same is also true for the opposite. If a mux/demux stream ends, it's substreams will be ended.

var muxdemux = require('muxdemux')
var mux = muxdemux.obj()
var demux = muxdemux.obj(function handleSubstream (substream, name) { /* ... */ })
mux.on('finish', handleMuxFinish)
demux.on('finish', handleDemuxFinish)
mux.pipe(demux)
var foo = mux.substream('foo')
foo.write({ hello: 1 })
var bar = mux.substream('bar')
bar.write({ world: 2 })
// end substreams
foo.end()
bar.end()
function handleMuxFinish () {
  // get's called bc all substreams ended (both foo and bar)
}
function handleDemuxFinish () {
  // get's called downstream bc all substreams ended (both foo and bar)
}

Example: unexpected muxdemux finish

If a muxdemux finishes before it's substreams it will emit an error to each unfinished substream. This default behavior can be disabled by passing opts.unexpectedFinishError = false

var muxdemux = require('muxdemux')
var mux = muxdemux.obj()
var foo = mux.substream('foo')
var bar = mux.substream('bar')
foo.on('error', function (err) {
  // called bc mux finished before foo-substream finished
  err // [Error: unexpected muxdemux finish]
})
bar.on('error', function () {
  // not called, bc bar finished before mux
})
bar.end() // bar ends first, hence no error
mux.end()

Example: unexpected muxdemux error

If a muxdemux errors before it's substreams finish it will emit an error to each unfinished substream. This default behavior can be disabled by passing opts.unexpectedFinishError = false

var muxdemux = require('muxdemux')
var mux = muxdemux.obj()
var foo = mux.substream('foo')
var bar = mux.substream('bar')
foo.on('error', function (err) {
  // called bc mux finished before foo-substream finished
  // error message is prepended w/ 'unexpected muxdemux error: '
  err // [Error: unexpected muxdemux error: boom]
})
bar.on('error', function () {
  // not called, bc bar finished before mux
})
bar.end() // bar ends first, hence no error
mux.emit('error', new Error('boom'))

Example: circular streams

Circular substream data is filtered out of muxdemux streams by default. But if you want to be explicit and prevent non-substream data from infinitely circulating through your stream use opts.circular. opts.circular will filter out non-substream data. Circular streams are useful if you want substream events to be emitted "upstream" and "downstream".

// server.js
var muxdemux = require('muxdemux')
var websocket = /* ... */
var mux = muxdemux({ circular: true })
websocket.pipe(mux).pipe(websocket)
var fooSubstream = mux.substream('foo')
fooSubstream.on('custom1', function (data) {
  console.log(data) // "hello"
  fooSubstream.emit('custom2', 'world')
})

// client.js
var muxdemux = require('muxdemux')
var websocket = /* ... */
var mux = muxdemux({ circular: true })
websocket.pipe(demux).pipe(websocket)
var fooSubstream = demux.substream('foo')
fooSubstream.on('custom2', function () {
  console.log(data) // "world"
})
fooSubstream.emit('custom1', 'hello')

License

MIT

muxdemux's People

Contributors

tjmehta avatar greenkeeper[bot] avatar

Stargazers

Summy avatar  avatar

Watchers

 avatar James Cloos avatar  avatar

muxdemux's Issues

Version 10 of node.js has been released

Version 10 of Node.js (code name Dubnium) has been released! 🎊

To see what happens to your code in Node.js 10, Greenkeeper has created a branch with the following changes:

  • Added the new Node.js version to your .travis.yml

If you’re interested in upgrading this repo to Node.js 10, you can open a PR with these changes. Please note that this issue is just intended as a friendly reminder and the PR as a possible starting point for getting your code running on Node.js 10.

More information on this issue

Greenkeeper has checked the engines key in any package.json file, the .nvmrc file, and the .travis.yml file, if present.

  • engines was only updated if it defined a single version, not a range.
  • .nvmrc was updated to Node.js 10
  • .travis.yml was only changed if there was a root-level node_js that didn’t already include Node.js 10, such as node or lts/*. In this case, the new version was appended to the list. We didn’t touch job or matrix configurations because these tend to be quite specific and complex, and it’s difficult to infer what the intentions were.

For many simpler .travis.yml configurations, this PR should suffice as-is, but depending on what you’re doing it may require additional work or may not be applicable at all. We’re also aware that you may have good reasons to not update to Node.js 10, which is why this was sent as an issue and not a pull request. Feel free to delete it without comment, I’m a humble robot and won’t feel rejected 🤖


FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

An in-range update of uuid is breaking the build 🚨

Version 3.2.1 of uuid was just published.

Branch Build failing 🚨
Dependency uuid
Current Version 3.2.0
Type dependency

This version is covered by your current version range and after updating it in your project the build failed.

uuid is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • continuous-integration/travis-ci/push The Travis CI build could not complete due to an error Details

Commits

The new version differs by 2 commits.

  • ce7d317 chore(release): 3.2.1
  • 262a8ea Fix illegal invocation of getRandomValues (#257)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.