Code Monkey home page Code Monkey logo

pumpify's Introduction

pumpify

Combine an array of streams into a single duplex stream using pump and duplexify. If one of the streams closes/errors all streams in the pipeline will be destroyed.

npm install pumpify

build status

Usage

Pass the streams you want to pipe together to pumpify pipeline = pumpify(s1, s2, s3, ...). pipeline is a duplex stream that writes to the first streams and reads from the last one. Streams are piped together using pump so if one of them closes all streams will be destroyed.

var pumpify = require('pumpify')
var tar = require('tar-fs')
var zlib = require('zlib')
var fs = require('fs')

var untar = pumpify(zlib.createGunzip(), tar.extract('output-folder'))
// you can also pass an array instead
// var untar = pumpify([zlib.createGunzip(), tar.extract('output-folder')])

fs.createReadStream('some-gzipped-tarball.tgz').pipe(untar)

If you are pumping object streams together use pipeline = pumpify.obj(s1, s2, ...). Call pipeline.destroy() to destroy the pipeline (including the streams passed to pumpify).

Using setPipeline(s1, s2, ...)

Similar to duplexify you can also define the pipeline asynchronously using setPipeline(s1, s2, ...)

var untar = pumpify()

setTimeout(function() {
  // will start draining the input now
  untar.setPipeline(zlib.createGunzip(), tar.extract('output-folder'))
}, 1000)

fs.createReadStream('some-gzipped-tarball.tgz').pipe(untar)

License

MIT

Related

pumpify is part of the mississippi stream utility collection which includes more useful stream modules similar to this one.

pumpify's People

Contributors

allouis avatar devinivy avatar eomm avatar lostthetrail avatar mafintosh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

pumpify's Issues

Mixing object mode with "normal" mode.

If I have a mix of streams that are in object mode and streams that are in "normal" buffer mode, what will the impact be between using pumpify.obj and regular pumpify?

Write after end

I'm not sure if I'm using pumpify wrong or if I stumbled upon a bug.

This example causes a "write after end" error:

var pumpify = require('pumpify');
var miss = require('mississippi');

function double() {
  return miss.through.obj(function(num, _, cb) {
    cb(null, num * 2);
  });
}

function log(results) {
  console.log(results);
}

miss.pipe([
  miss.from.obj([1,2,3]),
  pumpify.obj(miss.from.obj([100,200,300]), double()),
  miss.concat({ encoding: 'object' }, log)
], log);

I have readable stream as the first argument to pumpify (which is likely causing this), but I thought that a duplex stream could be used in this fashion. Let me know if you need any other information. Thanks!

Unhandled error

Hello, I have an unhandled error with the following code:

const { once, Readable } = require('stream')
const miss = require('mississippi')
const through = require('through')

const createStream = (content) => {
  const s = new Readable()
  s._read = () => {}
  s.push(content)
  s.push(null)
  return s
}

;(async function () {
  const transform = through(function () {
    transform.emit('error', new Error('Invalid JSON (Unexpected "m" at position 1 in state STOP)'))
  })
  const source = createStream('<message>No Feed.</message>')
  source.on('error', () => {
    console.log('handled')
  })
  const stream = miss.pipeline.obj(source, transform)
  const error = await once(stream, 'error')
  console.log(error)
})()

Node: v14.5.0
through: 2.3.8
pumpify: 1.5.1

Note: I've run into this issue while using mississippi,pipeline.obj with JSONStream (see error here)

Different behaviour between node 8 and 10

For some reason when an error is emitted on stream in node 8 this is propagated correctly and followed by end event, however on node 10, this happens in reverse order. (using latest 2.0.1 version of pumpify)

Example:

'use strict';

const { PassThrough, Writable } = require('stream');

const pumpify = require('pumpify');

const stream1 = new PassThrough();
const stream2 = new Writable();

const combined = pumpify(stream1, stream2);

combined.on('end', () => {
    console.log('Stream end');
});
combined.on('error', e => {
    console.log('Stream error: ', e);
});

stream1.emit('error', 'aaa');
nvm use 8
Now using node v8.17.0 (npm v6.13.4)

node tmp.js
Stream error:  aaa
Stream end
nvm use 10
Now using node v10.19.0 (npm v6.13.4)

node tmp.js
Stream end
Stream error:  aaa

Notice the reverse order of events.

Is this expected behaviour? Am I doing something wrong?

Class 'Pumpify' incorrectly implements interface 'Duplexify'

Using [email protected] and [email protected]

Running npm run build gives the following error:

node_modules/@types/pumpify/index.d.ts(12,15): error TS2420: Class 'Pumpify' incorrectly implements interface 'Duplexify'.
  Property 'cork' is missing in type 'Pumpify'.

I believe the error may actually lie in https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/pumpify/index.d.ts but may also be a version mismatch somewhere. (Let me know if this Issue should be moved to the types repo).

autoDestroy change hiding error from duplexify stream

Hello again, @mafintosh! I went a few years without having a stream question, so I figured I was due :)

With the recent autoDestroy change in 1.5.0, some tests started to silently timeout in @google-cloud/storage. I've re-created our structure with a failing test:

tape('returns error from duplexify', function (t) {
  var a = through()
  var b = duplexify()
  var s = pumpify()

  s.setPipeline(a, b)

  s.on('error', function (err) {
    t.same(err.message, 'stop')
    t.end()
  })

  s.write('data')
  // Test passes if `.end()` is not called
  s.end()

  b.setWritable(through())

  setImmediate(function () {
    b.destroy(new Error('stop'))
  })
})
Error: test exited without ending
  at Test.assert [as _assert] (C:\Users\sawch\dev\pumpify\node_modules\tape\lib\test.js:225:54)
  at Test.bound [as _assert] (C:\Users\sawch\dev\pumpify\node_modules\tape\lib\test.js:77:32)
  at Test.fail (C:\Users\sawch\dev\pumpify\node_modules\tape\lib\test.js:318:10)
  at Test.bound [as fail] (C:\Users\sawch\dev\pumpify\node_modules\tape\lib\test.js:77:32)
  at Test._exit (C:\Users\sawch\dev\pumpify\node_modules\tape\lib\test.js:191:14)
  at Test.bound [as _exit] (C:\Users\sawch\dev\pumpify\node_modules\tape\lib\test.js:77:32)
  at process.<anonymous> (C:\Users\sawch\dev\pumpify\node_modules\tape\index.js:90:19)
  at emitOne (events.js:115:13)
  at process.emit (events.js:210:7)

Is this a strange pattern that I've found myself in, or could the recent changes have introduced an unintentional side effect? Thanks again for all of your help!

Typescript return type?

I see this isn't quite a stream.Writeable, it feels like it includes all the functionality from a writeable stream, and then some (readable and writable?)

Object keys for a steam.Writable (node 16)

[
  '_writableState',
  '_events',
  '_eventsCount',
  '_maxListeners',
  '_write'
]

usage of pumpify:

const parseJsonStream = streams.parseJsonStream()
const batchStream = streams.batchStream(1)

const aiClientWriteStream =
  createAppInsightsWriteSteam(
    setupAppInsights,
  )

let a = new pumpify(
  parseJsonStream,
  batchStream,
  aiClientWriteStream,
)

Object.keys for final stream:

[
  '_readableState',  'readable',
  '_events',         '_eventsCount',
  '_maxListeners',   '_writableState',
  'writable',        'allowHalfOpen',
  '_writable',       '_readable',
  '_readable2',      '_autoDestroy',
  '_forwardDestroy', '_forwardEnd',
  '_corked',         '_ondrain',
  '_drained',        '_forwarding',
  '_unwrite',        '_unread',
  '_ended'
]

fails test with latest end-of-stream

I don't exactly understand how mafintosh/end-of-stream@c023407 can cause the failure. But the failure seems related to two things:

  • error/close is emitted asynchronously from destroy which causes unpipe to be delayed resulting in a write after destroy (nodejs/node#29790)
  • error can be emitted twice (this I believe has been fixed in Node master so it might be worth to just wait for a stream-readable update). The second error should just be swallowed. However, the problem here is that the write after destroy error might be preceded by the actual error (see comment in nodejs/node#29790)

Ref: nodejs/node#29504

# preserves error again
ok 19 should be truthy
ok 20 does not close with premature close
ok 21 should be truthy
ok 22 does not close with premature close
not ok 23 .end() called twice
  ---
    operator: fail
    at: Pumpify.<anonymous> (/Users/ronagy/GitHub/public/pumpify/test.js:210:9)
    stack: |-
      Error: .end() called twice
          at Test.assert [as _assert] (/Users/ronagy/GitHub/public/pumpify/node_modules/tape/lib/test.js:225:54)
          at Test.bound [as _assert] (/Users/ronagy/GitHub/public/pumpify/node_modules/tape/lib/test.js:77:32)
          at Test.fail (/Users/ronagy/GitHub/public/pumpify/node_modules/tape/lib/test.js:318:10)
          at Test.bound [as fail] (/Users/ronagy/GitHub/public/pumpify/node_modules/tape/lib/test.js:77:32)
          at Test.end (/Users/ronagy/GitHub/public/pumpify/node_modules/tape/lib/test.js:151:14)
          at Test.bound [as end] (/Users/ronagy/GitHub/public/pumpify/node_modules/tape/lib/test.js:77:32)
          at Pumpify.<anonymous> (/Users/ronagy/GitHub/public/pumpify/test.js:210:9)
          at Pumpify.emit (events.js:209:13)
          at Pumpify.Duplexify._destroy (/Users/ronagy/GitHub/public/pumpify/node_modules/duplexify/index.js:195:15)
          at /Users/ronagy/GitHub/public/pumpify/node_modules/duplexify/index.js:185:10
  ...

pumpify messes with browser duplex

Hey,

I just figured that when I require pumpify in my code, the Duplex stream is not working anymore.

So if I do

var pumpify = require('pumpify') // without this line it works
var Duplex = require('stream').Duplex
new Duplex()

and run this in the browser or in testling with browserify temp.js | testling, I get

not ok 1 Error: TypeError: 'undefined' is not a function (evaluating 'this.once('end', onend)') on line 7182

The line that throws the error looks like this: this.once('end', onend);.

Best,
Finn

does pumpify not emit "end" when an error happens?

Given streams A, B, C are streams I have
const stream = pumpify.obj(A, B, C)

if A emits error followed by an emit of end, the pumpified stream emits error but does not emit end. Is that the correct behavior? Does a pumpify pipeline close on an error only emitting an error event without emitting an end.

Still trying to understand Node streams, but from what I got on normal Node streams an error does not stop the stream so there is also an end. I am writing some code that has to know when a pipeline finishes hence my confusion with error and end

Therefore to know if something finished successfully do I just listen to end and to know if something finished failing i listen to error.

Problem with `readable-stream@3` and async iteration

Note: to reproduce, duplexify must be updated to version 4 (to get readable-stream@3. I published @targos/pumpify with upgraded dependencies (targos@396b360).

If async iteration is used on a pumpify stream, it works correctly as long as the loop is not ended prematurely (by throwing an error or breaking).
According to readable-stream, the destroy method must support a callback as second argument. However, it is described as private API that shouldn't be handle by userland.

See https://github.com/targos/bug-stream-async-iteration/blob/master/test.js for an example of the issue.

/cc @mcollina @mafintosh

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.