Code Monkey home page Code Monkey logo

node-pg-query-stream's Introduction

pg-query-stream

This repo has been merged into the node-postgres monorepo. Please file issues & PRs over there!

license

The MIT License (MIT)

Copyright (c) 2013 Brian M. Carlson

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

node-pg-query-stream's People

Contributors

abenhamdine avatar amilajack avatar brianc avatar calvinmetcalf avatar dependabot[bot] avatar grncdr avatar mandyreal avatar tbuchok avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-pg-query-stream's Issues

stream modification for elastic search format !

Hi,
i wanted to append below object to every object in stream

{"index":{"_index":"tvseries","_type":"internindex"}}
my stream looks like this

[
  {"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
  {"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
{"showname":"The X Files","episode":"01","content":"What?","season":"1"}
]

what my stream should look like !

-> POST http://localhost:9200/_bulk
  {"index":{"_index":"tvseries","_type":"internindex"}}
  {"showname":"The X Files","episode":"04","content":"Before what?","season":"1"}
  {"index":{"_index":"tvseries","_type":"internindex"}}
  {"showname":"The X Files","episode":"04","content":"Great.","season":"1"}
  {"index":{"_index":"tvseries","_type":"internindex"}}
  {"showname":"The X Files","episode":"01","content":"What?","season":"1"}

how can i achieve this using jsonstream in my existing below codebase

var stream = new ElasticsearchWritableStream(client, {
  highWaterMark: 256,
  flushTimeout: 500
});

pg.connect(connectionString,function(err, client, done) {
  if(err) throw err;
  var query = new QueryStream('SELECT * FROM srt limit 2')
  var streams = client.query(query)
  
  //release the client when the stream is finished
  streams.on('end', done)
  streams.pipe(JSONStream.stringify()).pipe(stream)
})

the npm packages currently i am using

  1. For bulk insertion in elasticsearch !
    elasticsearch-writable-stream
  2. for getting data from postgres into streams !
    pg-query-stream
  3. missing piece is converting postgres streams into elastic writable streams !

any suggestion, pointers ,recommendations on how to achieve this !

Should work with a QueryConfig

Hi Brianc, and thx for this great module !

Unlike pg.Client, QueryStream doesn't seem to work when a QueryConfig is passed, ie an object as :

{
    text : 'SELECT * FROM foo WHERE bar =$1,
    values : [1]
}

I get the following error :

Unhandled rejection TypeError: "string" must be a string, Buffer, or ArrayBuffer
    at Function.byteLength (buffer.js:363:11)
    at module.exports.Writer.addCString (C:\node-projects\payroll-app\node_modules\buffer-writer\index.js:57:22)
    at Connection.parse (C:\node-projects\payroll-app\node_modules\pg\lib\connection.js:212:6)
    at module.exports.Cursor.submit (C:\node-projects\payroll-app\node_modules\pg-query-stream\node_modules\pg-cursor\index.js:21:7)
    at Client._pulseQueryQueue (C:\node-projects\payroll-app\node_modules\pg\lib\client.js:298:24)
    at Client.query (C:\node-projects\payroll-app\node_modules\pg\lib\client.js:326:8)

Is it a bug or a this feature is missing ?

Thx by advance.

Get the same behavior for native bindings

Hi. Sorry for the noob question.
The README for this library says that it only works for pure JS node-postgres bindings. Is it possible to get the same behavior for native bindings? Ideally with the same API.

It would be useful for the Knex library: now it supports only pure JS bindings because of the pg-query-stream use: https://github.com/tgriesser/knex/blob/master/lib/dialects/postgres/runner.js#L21-L28 If pg-query-stream supports native bindings as well Knex will transparently support both native and pure JS bindings.

readable event no longer works

Version 1.1.0 broke compatibility where it no longer supports readable event.

The general rule/recommendation for supporting data vs readable is as follows:

  • Both should be supported, ideally.
  • data must be present when the data is provided in chunks (as an array of rows)
  • readable must be supported when data is provided as one-by-one item

See this question for more details: https://stackoverflow.com/questions/26174308/what-are-the-differences-between-readable-and-data-event-of-process-stdin-stream

This library provides only data in simplified form one-by-one, which means readable should be supported first of all, and data secondarily.

However, this library stopped supporting readable event since version 1.1.0

Not compatible with pg 7.50

This module became incompatible with pg's 'minor' update 7.5.0.
Here you can see changes in query.submit function - it now returns null or Error, while current pg-query-stream returns this
This basically breaks the whole streaming thing.
I believe returning null should be enough to fix this issue.

Get number of rows?

Would it be difficult to make the library report the number of rows processed, as a parameter of the end event?

It just seems to be the key information for any app that uses the stream, and no way to find out how many rows have been processed.

At the moment I have to intercept function _fetch as shown in the following code, which is quite awkward, though it works:

var count = 0, qs = new QueryStream('SELECT * FROM test');

var stream = client.query(qs);

// intercepting the _fetch call to count rows;
var fetch = stream._fetch;
stream._fetch = function (size, func) {
    fetch.call(stream, size, function (err, rows) {
        count += rows.length;
        return func(err, rows);
    });
};

stream.pipe(JSONStream.stringify()).pipe(process.stdout);

stream.once('end', function () {
    resolve(count); // resolve with number of rows;
});

Transaction support

return dbModule.asyncTransaction(self.db, async function (t) {
        await t.query('SET LOCAL statement_timeout = 30000')
        return t.query(sql.text, sql.values)
      })

Is there anyway to achieve this with pg-query-stream?

Stream not closed if for:await loop break

Considering the first utility (that yields the first result and then breaks):

import { fist } from 'axax/esnext/first'

async function* toAsyncIterable(conn, config) {
  const stream = conn.query(new QueryStream(config.text, config.values))
  return yield* stream // It's ok, stream.Readable has an [Symbol.asyncIterator] method
}

await first(toAsyncIterable(conn, 'SELECT * FROM some_table_with_more_than_one_entry'))

Doing so will result in the connection being stuck because the cursor was never closed.

My current workaround:

import { fist } from 'axax/esnext/first'

async function queryStream(conn, config) {
  const stream = conn.query(new QueryStream(config.text, config.values))
  try {
    return yield* stream
  } finally {
    // This will be called when iterator.return() is called
    stream.close()
  }
}

await first(toAsyncIterable(conn, 'SELECT * FROM some_table_with_more_than_one_entry'))

I believe that internally, node will wall the "destroy" method of the stream once the iterator consumer stops requesting output.

Issue when using Pool

const { Pool } = require('pg');
const QueryStream = require('pg-query-stream');
const config = require('config');

const pool = new Pool({
  host: config.redshift.host,
  database: config.redshift.database,
  user: config.redshift.user,
  password: config.redshift.password,
  port: 5439,
});

const getReportDataAsync = (queryString, onNewRow, done) => {
  pool.connect((err, client, release) => {
    if (err) throw err;
    const query = new QueryStream(queryString);
    const stream = client.query(query);
    stream.on('end', release);
    stream.pipe(process.stdout);
  });
};

module.exports = { getReportDataAsync };

This seems to break on even simple queries. Look like the pool isn't allowing the same abilities as a client connection? I've used a lot of ctrl + z here. Anything I can do to help?

'end' semantics

I was debugging a problem for someone, and I noticed that 'end' is emitted earlier than in node-postgres proper. Consider the following schema:

create table p(id serial primary key);
create table c(id int primary key references p);

and the following code:

var client1 = new pg.Client(conString);
client1.connect();
var client2 = new pg.Client(conString);
client2.connect();

var qr = new QueryStream("INSERT INTO p DEFAULT VALUES RETURNING id");
var id = null;
qr.on('data', function(row) {
    console.log(row);
    id = row.id;
});
qr.on('end', function () {
    client2.query("INSERT INTO c(id) VALUES ($1)", [id], function (err, rows) {
        console.log(err)
    });
});
client1.query(qr);

which results in: error: insert or update on table "c" violates foreign key constraint "c_id_fkey"

The problem seems to come from here; the underlying cursor does not guarantee that ReadyForQuery has been received from the server when it returns a rows with a length of 0. This also means that the effects of the first transaction are not guaranteed to be visible to the second transaction, which may result in the error above.

Do you see this as something worth fixing?

Do not emit 'close' event after finish with query

We found that one of our apps could not use node11 because of breaking change. I created bug in nodejs repo. And node maintainers said it is bug in this module instead. Stream should not emit 'close' before 'end'

highWaterMark not considered during read

I'd like to use this in conjunction with the pg-promise library and one of the things i'm trying to do is stream data from a redshift table to a dynamoDb table in AWS, but one of the issues i'm having is when i am using the promises is that the buffer of the readable stream is filling up more than i want at a time. You are implementing the readable._read(n) which (n) is supposed to take the highWatermark.

// abstract method.  to be overridden in specific implementation classes.
// call cb(er, data) where data is <= n in length.
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function (n) {
  this.emit('error', new Error('not implemented'));
};

It seems like you should look at n and the current amount of recorrds (state.length? ) before doing the self.push..

read and chunk large text column

hi, i create table with keys, body column and the body column have large text data and i wanna read all text value of body

Column Type Value
keys numeric[] array[1, 2]
body centered abcd... (20Mb)

now can split value of body to record OR read all value of body in chunk?

pg-query-stream just chunk record by record can chunk value of column?

Feature Request: RowMode

It would be great if we could set the RowMode to array, so that the stream returns arrays, instead of JSON objects.

use-case:
e.g. when we want to export the data to a csv file, it seems a waste of CPU cycles that this lib converts the db-data to a JSON object and then we must convert this JSON object back to an array (and restoring the order of the fields - which is not guaranteed when we simply loop over the fields of the JSON object).

Bad data paging / streaming

According to Node.js documentation for readable streams: https://nodejs.org/api/stream.html#stream_readable_push_chunk_encoding

We are supposed to use method push on the stream class for the entire chunk of data. This way the data is delivered at the same pace as it arrives.

But in the current implementation here we for some reasons chop it into individual records: https://github.com/brianc/node-pg-query-stream/blob/master/index.js#L55

for (var i = 0; i < rows.length; i++) {
        this.push(rows[i])
      }

This creates the following problems:

  1. The data is delivered at a different pace than it arrives, throwing event data on every single row rather than on each page, which makes it impossible to sync data processing/scaling with the rate at which it arrives.

  2. The extra loop stalls a lot of the CPU time, as it results in a needless separate processing for each row, which includes firing event data for each row.

  3. It makes impossible for the client to efficiently handle pages of data, forcing it to do this row-by-row, thus counteracting what is otherwise good in streams scaling.

What the code should do instead - throw that loop (above) away, and just do:

this.push(rows);

This will also make sense out of parameter batchSize that we can pass, which in the current implementation doesn't affect the client-side at all.

This library does NOT use the latest pg-cursor

This is causing a problem with empty queries which was fixed between 1.0.0 (version being still used here) and 1.2.0 (the latest) of pg-cursor.

Can you please update to the latest pg-cursor. Thanks.

Number of rows?

When I did support of this library's v1.0 within pg-promise, I had to add a hack (code injection) for the read operation (method _fetch) in order to pull such basic information as the number of rows that went through the stream.

With version 1.1, do I have to write a new hack for it or is there now a civilized approach to getting the number of rows once the streaming has finished?


This is the hack that I had to do for v1.0:

    let stream, fetch, nRows = 0;
    try {
        stream = client.query(qs);
        fetch = stream._fetch;
        stream._fetch = (size, func) => {
            fetch.call(stream, size, (err, rows) => {
                if (!err && rows.length) {
                    nRows += rows.length;
                    if (error) {
                        stream.close();
                    }
                }
                return func(err, rows);
            });
        };
    } catch (err) {
        error = err;
    }

TypeScript and es6 module import

Not sure exactly why, but doing this doesn't work in TypeScript:

import * as QueryStream from 'pg-query-stream';
const query = new QueryStream('SELECT true', []);

This produces the following errors:

file.ts (1,1): A namespace-style import cannot be called or constructed, and will cause a failure at runtime. (7038)
file.ts (2,14): Cannot use 'new' with an expression whose type lacks a call or construct signature. (2351)

I even tried installing @types/pg-query-stream, but that didn't help.

But the import method recommended in the README does work in TypeScript, even if the rest of your imports use the es6 style.

const QueryStream = require('pg-query-stream');
const query = new QueryStream('SELECT true', []);

Event handling

Trying to add support for this library to pg-promise, just wanted to ask...

Is it correct to use the following two event traps to handle all situations?

stream.once('end', finishHandler);
stream.once('error', errorHandler);

Is once sufficient when using this library? Or is it possible that we need the handlers more than once?

Have the option to rely on PG's 'row' event rather than polling with cursor

I would like to stream large amounts of rows from a database without being powered behind the scenes by having multiple queries with a cursor. Currently using this is super slow in comparison to just loading 100K rows and processing them when ready by ~5 times.

The pg library already emits a 'row' event for each row. In fact, if there's no need for the whole set of rows at the end, it won't even buffer them in memory (which is a big plus).

Initially reported at: knex/knex#2535

Thanks

End Event Before Last Record Of Data

Hi Man,

I want to report that this issue is back:
#3

This is my sample code:

var pg = require('pg');
var QueryStream = require('pg-query-stream');

// DB Settings
var conString = "postgres://..........";
var client = new pg.Client(conString);

client.connect(function(err) {
  if(err) 
  {
    return console.error('could not connect to database', err);
  }

  querySelect = "SELECT * from test LIMIT 5";

  var queryStream = new QueryStream(querySelect);
  var queryObj = client.query(queryStream)
  var counter=0

  queryObj
  .on('error',function(err){
    console.log("Error")
    console.log(err)
  })

  .on('data',function(rec){

    queryObj.pause()

    var sleep = function() {
        counter +=1
        console.log("Data: " +counter)
        queryObj.resume();
      }

    setTimeout(sleep,1000);
  })

  .on('end',function(){
    console.log("End")
    client.end()
  })
});

This is the output:

Data: 1
Data: 2
Data: 3
Data: 4
End
Data: 5

Operating System:
CentOS 6.5 Final

Node Version:
v0.10.26

Can you help me with this issue as soon as possible please ?

Regards,
Roberto

How do I pg-query-stream multhi query use?

Dear @brianc first of all congratulations on the superb job you all have been doing creating this modules. It looks amazing and it's working quite well.

I want to use the multhi query in pg-query-stream now.

my enviroment PostgreSQL 9.3.5
node-pg 3.4.0
pg-query-stream 0.6.0
(Linux Fedora 20 & Mac OSX 10.9.x)

This example, I want to use internally the results obtained in the original stream, but we have something like this to help isolate the problem.

var PgSQL = require('pg');
var QueryStream = require('pg-query-stream');
var through2 = require('through2');
var fs = require('fs');

var connection = {
    host: '127.0.0.1',
    port: '5432',
    user: 'postgres',
    password: null,
    database: 'postgres'
}

var db1 = new PgSQL.Client(connection);
db1.connect();

var db2 = new PgSQL.Client(connection);
db2.connect();

var db3 = new PgSQL.Client(connection);
db3.connect();
/*
var db4 = new PgSQL.Client(connection);
db4.connect();
*/

var writestream = fs.createWriteStream('test.txt', {flags: 'a'});

var query1 = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10]);
var stream1 = db1.query(query1);

stream1.on('end', function(){
    console.log('stream1 end');
});

stream1.pipe(through2.obj(function (chunk1, encoding, callback1) {

        writestream.write('stream1:' + chunk1.num + '\n');

        var query2 = new QueryStream('SELECT * FROM generate_series(0, $1) num', [5]);
        var stream2 = db2.query(query2);

        stream2.pipe(through2.obj(function (chunk2, encoding, callback2) {

                writestream.write('stream2' + '\n');

                var query3 = new QueryStream('SELECT * FROM generate_series(0, $1) num', [5]);
                var stream3 = db3.query(query3);

                stream3.pipe(through2.obj(function (chunk3, encoding, callback3) {

                        writestream.write('stream3' + '\n');
                        callback3()
                    })
                );

                stream3.on('end', function(){
                    console.log('stream3 end');
                    callback2();
                });

                stream3.on('error', function(e){
                    console.log('stream3 error');
                    console.log(e);
                })

            })
        );

        stream2.on('end', function(){
            console.log('stream2 end');
            /*
            var query4 = new QueryStream('SELECT * FROM generate_series(30, $1) num', [35]);
            var stream4 = db4.query(query4);

            stream4.pipe(through2.obj(function (chunk4, encoding, callback4) {

                    writestream.write(JSON.stringify(chunk4)  + '\n');
                    callback4()
                })
            );

            stream4.on('end', function(){
                console.log('stream3 end');
                callback1();
            });
            */
            callback1();
        });

        stream2.on('error', function(e){
            console.log('stream2 error');
            console.log(e);
        })
    })
);

stream1 loop count 10. (total10)
stream2 loop count 5 (total 50)
stream3 loop count 5 (total 250)

but result test.txt row count 495...

I think the total number of rows, the correct 310.

The correct? How to handle? In addition, in the case of multiple streams, please tell me notes, etc.

thank you.

Cannot read property 'sync' of undefined

Receiving the below error sometimes which causes the process to exit. Not able to reproduce always

TypeError: Cannot read property 'sync' of undefined at PostgresQuery.QueryStream.handleError (/opt/cnmaestro-server/deploy/node_modules/pg-query-stream/index.js:105:19) at PostgresQuery.handleError (/opt/cnmaestro-server/deploy/node_modules/any-db-postgres/index.js:84:27) at process.nextTick (/opt/cnmaestro-server/deploy/node_modules/pg/lib/client.js:464:13) at args.(anonymous function) (/usr/lib/node_modules/pm2/node_modules/event-loop-inspector/index.js:138:29) at _combinedTickCallback (internal/process/next_tick.js:132:7) at process._tickDomainCallback (internal/process/next_tick.js:219:9)

Is it a known issue? Any help would be great. The version of pg-query-stream which we are using is 0.2.0 (dependency of [email protected])

multiple streams and stream needs to be piped to writable

Hi,

I am struggling to see, why the below test would not work. Maybe I get something wrong. But right now I assume there is something very quirky during the Stream Implementation happening. Maybe some vents won't be passed.

In any case the need to piping to a writable a to get the end event is somehow weird.

I am running on Node 8.

The use case here would be to have an array of query and recursively querying them. I am creating new instances of the client and streams wherever possible.

var helper = require('./helper')
var QueryStream = require('../')
var concat = require('concat-stream')
var pg = require('pg')

var Transform = require('stream').Transform

var mapper = new Transform({ objectMode: true })

mapper._transform = function (obj, enc, cb) {
  console.log('will see data a couple of times')

  this.push(obj)
  cb(null)
}

helper('mapper', function (client) {
  it('works', function (done) {
    exec(client, () => {
      // // commenting that in works
      // return done()
      const client1 = new pg.Client()

      client1.connect(() => {
        exec(client1, () => {
          done()
        })
      })
    })
  })
})

function exec (client, cb) {
  var stream = new QueryStream('SELECT * FROM generate_series(0, 201) num', [], { highWaterMark: 100, batchSize: 50 })
  stream.on('end', function () {
    // console.log('stream end')
    return cb()
  })
  client.query(stream)
  // stream.pipe(mapper).pipe(concat(function (res) {
  //   cb()
  // }))

  // lib actually needs to pipe to a writable stream
  stream.pipe(mapper)
}

any help appreciated.

Is it necessary to manually release connections?

I noticed in the example use that done() is never called to explicitly release clients back to the pool:

pg.connect(function(err, client, done) {
  if(err) throw err;
  var query = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000000])
  var stream = client.query(query)
  stream.pipe(JSONStream.stringify()).pipe(process.stdout)
})

Is this correct use? Will the connections be collected under the covers?

TypeError: self.activeQuery.handleEmptyQuery is not a function

It appears that QueryStream cannot handle queries that only return a few rows (less than 100). Here is the full stack trace:

TypeError: self.activeQuery.handleEmptyQuery is not a function
    at Connection.<anonymous> (/Users/user/github/proj/server/app/node_modules/pg/lib/client.js:227:22)
    at emitOne (events.js:116:13)
    at Connection.emit (events.js:211:7)
    at Socket.<anonymous> (/Users/user/github/proj/server/app/node_modules/pg/lib/connection.js:118:12)
    at emitOne (events.js:116:13)
    at Socket.emit (events.js:211:7)
    at addChunk (_stream_readable.js:263:12)
    at readableAddChunk (_stream_readable.js:250:11)
    at Socket.Readable.push (_stream_readable.js:208:10)
    at TCP.onread (net.js:594:20)

I have queries that I want to always be streaming, but sometimes the submitted parameters limit the result set to only a few records

connection terminated

When I use the library to get data stream from pg, the connection terminated before extracting all of data. The error is catched by "error" event. What could be the problem?

My pg database version is 11.3
My node-pg-query-stream version is 2.0.0
my node pg version is 7.6.1

Not works for me in redshift.

I'm not sure the reason, or if simply query stream is not supported by redshift. Any idea?

My code:

const query = new QueryStream(info.sql);
const stream = client.query(query);

stream.on('end', () => {

})

stream.on('error', (err) => {

})

stream.on('data', (x) => {

});

If my query(info.sql) has 1'000.000 of records, it doesn't works.

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory

In this case never shows any log inside data listener.

Works perfect pointing to local DB with postgress.

Maybe related with #32 in the vitaly-t's question.

Using:

"pg-query-stream": "1.1.1",
node: 8.9.1

get fields at the start of the stream

This is how I currently consume the stream:

const query = new QueryStream('SELECT * FROM foo');

const queryStream = connect.query(query);

queryStream.on('data', (row) => {
  console.log(row);
});

However, is there a way to get the field description at the beginning of the stream?

Something similar to what pg provides for query results, i.e.

type FieldType = {|
  +columnID: number,
  +dataTypeID: number,
  +dataTypeModifier: number,
  +dataTypeSize: number,
  +format: string,
  +name: string,
  +tableID: number
|};

How to handle errors on queries?

I'm curious how to handle an error generated while running the query in the pipe.

stream = client.query(q)
stream.on 'end', done
stream
  .pipe(JSONStream.stringify())
  .pipe(through2.obj((result)->
     ....

await:ing in for-await loops short-circuits the stream

Hi,

I'm trying my luck using async iterators, with the end goal to use for-await.
However, a peculiar behaviour occurred while awaiting within the loop.

I am not sure if this is an internal bug or if a change (which occurred in v11.5) changed the behaviour of async iterator which in turn either pg-cursor or pg-query-stream did not follow.

I posted a bug report in node repo because I wanted to understand the change (intended or not), but I'll double post the bug here for clarity and perhaps its an easy fix outside of internals?

Here's a reproducible:

const assert = require('assert')
const { Client } = require('pg')
const QueryStream = require('pg-query-stream')

async function main () {
  const client = new Client()
  await client.connect()

  const sql = 'select * from generate_series(1, $1)'
  const n = 10 // num rows to generate
  const query = new QueryStream(sql, [n])
  const stream = client.query(query)

  let i = 0
  for await (const row of stream) {
    console.log(stream._readableState.length)

    i += 1
    await new Promise((resolve) => setTimeout(() => resolve(), 0))
    // ^-- This await breaks it all in v10.16, v11.5+ but works in from v11.0 to v.11.4
  }

  await client.end()
  assert.strictEqual(i, n)
}

if (!module.parent) {
  main()
}
$ node -v
v11.4.0
$ node what.js
9
8
7
6
5
4
3
2
1
0
$ node -v
v11.5.0
$ node what.js
9
8
(node:21383) UnhandledPromiseRejectionWarning: AssertionError [ERR_ASSERTION]: Expected values to be strictly equal:

2 !== 10

    at main (/Users/fl0w/async-iterator-stream/what.js:24:10)
    at process.internalTickCallback (internal/process/next_tick.js:77:7)

Refs: nodejs/node#29410

Can you combine QueryStream & Prepared Statements?

Is this possible?

When I do something like:

var pg          = require('pg'),
    QueryStream = require('pg-query-stream');
    credUrl     = {...};

pg.connect(credUrl, function(err, client, done) {
    var query       = new QueryStream({text : 'SELECT 10 as ten;', name : 'select10'}, []),
        stream      = client.query(query);
});

I get "TypeError: Argument must be a string". Am I missing something?

Non-blocking streaming with pg-query-stream

I am using a node module to connect to an sql database, but when I do streaming, it gets the Node blocking by design. I cannot do a simple GET while its streaming. Streaming has to end, for new requests to work.

I am thinking about changing to PostgreSQL db and using pg-query-stream to stream.

Before I continue, I have to ask : is pg-query-stream non-blocking while streaming? Also does it give options to pause and resume the stream, in order to process data and not exhaust memory?

Thank you

Piping same sqlstream into multiple writable streams vs creating multiple identical sqlstream

I need to pipe the same sqlstream into multiple writable streams.
I've read that you can do it (with a generic readable stream) but consider this simple code

var source = fs.createReadStream('source.txt');
var dest1 = fs.createWriteStream('dest1.txt');
var dest2 = fs.createWriteStream('dest2.txt');

source.pipe(dest1);
source.pipe(dest2);

This works. Also I read that if you pipe the same stream into multiple streams like here, the operations will be synchronized according to slowest pipeline. Furthermore, if i add a delay before piping to "dest2" it doesn't work anymore

source.pipe(dest1);
setTimeout(function(){source.pipe(dest2)},3000)

The second file results in an empty file.

SetTimeout here is simply simulating my flow: in fact I need to:

  1. Perform a query with this awesome lib
  2. Make async stuffs in my before piping source in some different writable streams
pg.connect(function(err, client, done) {
  if(err) throw err;
  var query = new QueryStream('SELECT * FROM hugetable')
  var stream = client.query(query)
  
  asyncStuffs(function(){
     stream.pipe(somedest)
  })
  asyncStuffs2(function(){
     stream.pipe(somedest2)
  })
  asyncStuffs3(function(){
     stream.pipe(somedest3)
  })
})

So my question is: is there a way to pipe the same sql stream into multiple writable streams without worryng about any inconsistencies caused by the async (no time deterministic) flow?

I think that this is not possible. If not, consider this solution:

pg.connect(function(err, client, done) {
  if(err) throw err;
  var streams = [];
  for (var i = 0; i < 3; i++) {
       var query = new QueryStream('SELECT * FROM hugetable')
       streams.push(client.query(query))
  }
  
  
  asyncStuffs(function(){
     streams[0].pipe(somedest)
  })
  asyncStuffs2(function(){
     streams[1].pipe(somedest2)
  })
  asyncStuffs3(function(){
     stream[2].pipe(somedest3)
  })
})

It's not so elegant but it should work. The question is: Can i have performance issue? It's pretty obvious that piping n times in parallel will slow down things but can we consider the code inside the for loop safe and efficient? Are "new QueryStream()" and "client.query()" heavy operations?

Cursor not being closed when stream ends?

When I use pg-stream, the QueryStream.close function never gets called.

I'm using pg-stream within a deferred function:

var pg = require('pg');
var Q = require('q');
var QueryStream = require('pg-query-stream');

var sql = ...;
var connStr = ...;

pg.connect(connStr, function(err, client, done) {

    var deferred = Q.defer();

    var qstream = new QueryStream(sql.text, sql.values, {highWaterMark: 10000, batchSize: 10000});
    var stream = client.query(qstream);

    stream.on('end', function() {
        //console.log('Manually closing cursor');
        //qstream.close();
        done();
    });

    stream.on('error', function(err) {
        // An error occurred, return the client to the connection pool
        done();
    });

    deferred.resolve(stream);
});

Then I call that function and use the resolved stream:

getStream()
    .then(function(stream) {

        stream.on('end', function() {
            console.log('pg-stream end!');
        });

        var jsoncsvstream = ...some json to csv transformer...

        jsoncsvstream.on('end', function() {
            console.log('jsoncsv end!');
        });

        res.on('finish', function() {
            console.log('response finish!');
        });

        stream
            .pipe(jsoncsvstream)
            .pipe(res);
        }
    });

I first noticed this because I would get a huge memory footprint when I used the stream. I placed some console.log messages within QueryStream.close and never saw them (even though I see pg-stream, jsoncsv and response end messages).

My workaround is to explicitly call QueryStream.close on the stream's end event (commented out above). But is this expected?

P.S.: @brianc thanks for all the great pg libraries!

Consider adding a Promise for the 'done' state

The .read method would be great if it returned a Promise instead of void to handle all the errors and resolved on the Cursor's 'done' state. I can maybe add this on the weekend if you think it's a valuable addition. Adding it to the current method might be considered "breaking" the API, but no one should be using the return type anyway, so you could reuse the method.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.