Code Monkey home page Code Monkey logo

nodejs-pubsub's Introduction

Google Cloud Platform logo

release level npm version

Cloud Pub/Sub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications.

This document contains links to an API reference, samples, and other resources useful to developing Node.js applications. For additional help developing Pub/Sub applications, in Node.js and other languages, see our Pub/Sub quickstart, publisher, and subscriber guides.

A comprehensive list of changes in each version may be found in the CHANGELOG.

Read more about the client libraries for Cloud APIs, including the older Google APIs Client Libraries, in Client Libraries Explained.

Table of contents:

Quickstart

Before you begin

  1. Select or create a Cloud Platform project.
  2. Enable billing for your project.
  3. Enable the Google Cloud Pub/Sub API.
  4. Set up authentication with a service account so you can access the API from your local workstation.

Installing the client library

npm install @google-cloud/pubsub

Using the client library

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

async function quickstart(
  projectId = 'your-project-id', // Your Google Cloud Platform project ID
  topicNameOrId = 'my-topic', // Name for the new topic to create
  subscriptionName = 'my-sub' // Name for the new subscription to create
) {
  // Instantiates a client
  const pubsub = new PubSub({projectId});

  // Creates a new topic
  const [topic] = await pubsub.createTopic(topicNameOrId);
  console.log(`Topic ${topic.name} created.`);

  // Creates a subscription on that new topic
  const [subscription] = await topic.createSubscription(subscriptionName);

  // Receive callbacks for new messages on the subscription
  subscription.on('message', message => {
    console.log('Received message:', message.data.toString());
    process.exit(0);
  });

  // Receive callbacks for errors on the subscription
  subscription.on('error', error => {
    console.error('Received error:', error);
    process.exit(1);
  });

  // Send a message to the topic
  topic.publishMessage({data: Buffer.from('Test message!')});
}

Running gRPC C++ bindings

For some workflows and environments it might make sense to use the C++ gRPC implementation, instead of the default one (see: #770):

To configure @google-cloud/pubsub to use an alternative grpc transport:

  1. npm install grpc, adding grpc as a dependency.

  2. instantiate @google-cloud/pubsub with grpc:

    const {PubSub} = require('@google-cloud/pubsub');
    const grpc = require('grpc');
    const pubsub = new PubSub({grpc});

Samples

Samples are in the samples/ directory. Each sample's README.md has instructions for running its sample.

Sample Source Code Try it
Commit an Avro-Based Schema source code Open in Cloud Shell
Commit an Proto-Based Schema source code Open in Cloud Shell
Create an Avro based Schema source code Open in Cloud Shell
Create BigQuery Subscription source code Open in Cloud Shell
Create a Proto based Schema source code Open in Cloud Shell
Create Push Subscription source code Open in Cloud Shell
Create Push Subscription With No Wrapper source code Open in Cloud Shell
Create Subscription source code Open in Cloud Shell
Create a Cloud Storage subscription source code Open in Cloud Shell
Create Subscription With Dead Letter Policy source code Open in Cloud Shell
Create an exactly-once delivery subscription source code Open in Cloud Shell
Create Subscription With Filtering source code Open in Cloud Shell
Create Subscription with ordering enabled source code Open in Cloud Shell
Create Subscription With Retry Policy source code Open in Cloud Shell
Create Topic source code Open in Cloud Shell
Create Topic With Kinesis Ingestion source code Open in Cloud Shell
Create Topic With Schema source code Open in Cloud Shell
Create Topic With Schema Revisions source code Open in Cloud Shell
Delete a previously created schema source code Open in Cloud Shell
Delete a Schema Revision source code Open in Cloud Shell
Delete Subscription source code Open in Cloud Shell
Delete Topic source code Open in Cloud Shell
Detach Subscription source code Open in Cloud Shell
Get a previously created schema source code Open in Cloud Shell
Get a previously created schema revision source code Open in Cloud Shell
Get Subscription source code Open in Cloud Shell
Get Subscription Policy source code Open in Cloud Shell
Get Topic Policy source code Open in Cloud Shell
List All Topics source code Open in Cloud Shell
List Revisions on a Schema source code Open in Cloud Shell
List schemas on a project source code Open in Cloud Shell
List Subscriptions source code Open in Cloud Shell
List Subscriptions On a Topic source code Open in Cloud Shell
Listen For Avro Records source code Open in Cloud Shell
Listen For Avro Records With Revisions source code Open in Cloud Shell
Listen For Errors source code Open in Cloud Shell
Listen For Messages source code Open in Cloud Shell
Listen with exactly-once delivery source code Open in Cloud Shell
Listen For Protobuf Messages source code Open in Cloud Shell
Listen For Messages With Custom Attributes source code Open in Cloud Shell
Modify Push Configuration source code Open in Cloud Shell
OpenTelemetry Tracing source code Open in Cloud Shell
Publish Avro Records to a Topic source code Open in Cloud Shell
Publish Batched Messages source code Open in Cloud Shell
Publish Message source code Open in Cloud Shell
Publish Message With Custom Attributes source code Open in Cloud Shell
Publish Ordered Message source code Open in Cloud Shell
Publish Protobuf Messages to a Topic source code Open in Cloud Shell
Publish with flow control source code Open in Cloud Shell
Publish With Retry Settings source code Open in Cloud Shell
Quickstart source code Open in Cloud Shell
Remove Dead Letter Policy source code Open in Cloud Shell
Resume Publish source code Open in Cloud Shell
Rollback a Schema source code Open in Cloud Shell
Set Subscription IAM Policy source code Open in Cloud Shell
Set Topic IAM Policy source code Open in Cloud Shell
Subscribe With Flow Control Settings source code Open in Cloud Shell
Synchronous Pull source code Open in Cloud Shell
Synchronous Pull with delivery attempt. source code Open in Cloud Shell
Synchronous Pull With Lease Management source code Open in Cloud Shell
Test Subscription Permissions source code Open in Cloud Shell
Test Topic Permissions source code Open in Cloud Shell
Update Dead Letter Policy source code Open in Cloud Shell
Update Topic Ingestion Type source code Open in Cloud Shell
Update Topic Schema source code Open in Cloud Shell
Validate a schema definition source code Open in Cloud Shell

The Google Cloud Pub/Sub Node.js Client API Reference documentation also contains samples.

Supported Node.js Versions

Our client libraries follow the Node.js release schedule. Libraries are compatible with all current active and maintenance versions of Node.js. If you are using an end-of-life version of Node.js, we recommend that you update as soon as possible to an actively supported LTS version.

Google's client libraries support legacy versions of Node.js runtimes on a best-efforts basis with the following warnings:

  • Legacy versions are not tested in continuous integration.
  • Some security patches and features cannot be backported.
  • Dependencies cannot be kept up-to-date.

Client libraries targeting some end-of-life versions of Node.js are available, and can be installed through npm dist-tags. The dist-tags follow the naming convention legacy-(version). For example, npm install @google-cloud/pubsub@legacy-8 installs client libraries for versions compatible with Node.js 8.

Versioning

This library follows Semantic Versioning.

This library is considered to be stable. The code surface will not change in backwards-incompatible ways unless absolutely necessary (e.g. because of critical security issues) or with an extensive deprecation period. Issues and requests against stable libraries are addressed with the highest priority.

More Information: Google Cloud Platform Launch Stages

Contributing

Contributions welcome! See the Contributing Guide.

Please note that this README.md, the samples/README.md, and a variety of configuration files in this repository (including .nycrc and tsconfig.json) are generated from a central template. To edit one of these files, make an edit to its templates in directory.

License

Apache Version 2.0

See LICENSE

nodejs-pubsub's People

Contributors

alexander-fenster avatar anguillanneuf avatar bcoe avatar callmehiphop avatar crwilcox avatar dpebot avatar feywind avatar fhinkel avatar gcf-owl-bot[bot] avatar greenkeeper[bot] avatar hongalex avatar jkwlui avatar jmdobry avatar jmuk avatar justinbeckwith avatar kamalaboulhosn avatar laljikanjareeya avatar maxday avatar mkamioner avatar praveenqlogic avatar release-please[bot] avatar renovate-bot avatar renovate[bot] avatar rhodgkins avatar stephenplusplus avatar summer-ji-eng avatar swcloud avatar vijay-qlogic avatar weyert avatar yoshi-automation avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nodejs-pubsub's Issues

Pub/Sub retry settings: needs common api between data & admin ops, better docs.

Copied from original issue: googleapis/google-cloud-node#2788

@kir-titievsky
January 9, 2018 4:42 PM

This is a bug to track progress on an email discussion about setting custom retry settings.

Previous suggestion:
In src/v1/publisher_client.js the gapicConfig contains the contents of the config file you mentioned, and it is passed down to gaxGrpc:

var defaults = gaxGrpc.constructSettings(

var defaults = gaxGrpc.constructSettings(
  'google.pubsub.v1.Publisher',
  gapicConfig,
  opts.clientConfig,
  {'x-goog-api-client': clientHeader.join(' ')}
);

Here the third parameter, opts.clientConfig, gives configOverrides to gaxGrpc.constructSettings, and, given the name, I guess it might be able to override options.
opts.clientConfig comes from PublisherClient constructor parameter. So I guess you might try settings opts.clientConfig to override any default configuration.

If it does not work for you, please let me know - better with code example - and I'll create an issue for pubsub package.

An in-range update of eslint-plugin-prettier is breaking the build 🚨

Version 2.4.0 of eslint-plugin-prettier was just published.

Branch Build failing 🚨
Dependency eslint-plugin-prettier
Current Version 2.3.1
Type devDependency

This version is covered by your current version range and after updating it in your project the build failed.

eslint-plugin-prettier is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • βœ… ci/circleci: node8 Your tests passed on CircleCI! Details
  • βœ… ci/circleci: node7 Your tests passed on CircleCI! Details
  • βœ… ci/circleci: node9 Your tests passed on CircleCI! Details
  • βœ… ci/circleci: node6 Your tests passed on CircleCI! Details
  • ❌ ci/circleci: lint Your tests are queued behind your running builds Details
  • βœ… ci/circleci: node4 Your tests passed on CircleCI! Details
  • ❌ ci/circleci: docs Your tests are queued behind your running builds Details
  • ❌ continuous-integration/appveyor/branch AppVeyor build failed Details

Commits

The new version differs by 3 commits.

  • a728868 Build: update package.json and changelog for v2.4.0
  • e529b60 New: Add 'recommended' configuration (#73)
  • 4335b08 Docs: Create ISSUE_TEMPLATE.md

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

AckDeadline does not work even after subscription creation with v1.SubscriptionClient

In my app I really need to configure message ackDeadline. Because of that issue I am not able to do it using createSubscription. So I found out a way to create subscription with specific ackDeadlineSeconds using v1.SubscriptionClient. But client library does not care about that setting.

Environment details

  • OS: Ubuntu 16.04
  • Node.js version: 8.9.4
  • npm version: 5.5.1
  • @google-cloud/pubsub version: 0.16.4

I created a repository with code to reproduce this issue.

Steps to reproduce

  1. Clone repository https://github.com/muryginm/google-cloud-pubsub-issues
  2. npm install
  3. GOOGLE_PROJECT=your-project-id GOOGLE_APPLICATION_CREDENTIALS=path-to-credentials npm run ack-deadline
  4. In most cases the message was resent after ~60s even when I have ackDeadlineSeconds 180s.

In source code I did the following:

  1. Created topic
  2. Created subscription with ackDeadlineSeconds using v1.SubscriptionClient
  3. After step 2 I really had a subscription with custom ackDeadlineSeconds (I checked it using console.cloud.google.com and the command gcloud pubsub subscriptions describe name)
  4. Tracked the time between message events.

How can I achieve the correct behaviour of ackDeadlineSeconds feature?

Thanks!

Resolve subscription.close() once streams have been flushed.

After using the pub/sub client for test, I have found that I wanted to know when the queue was flushed before exiting. The comment in the code states that a promise is returned when no callback is provided: https://github.com/googleapis/nodejs-pubsub/blob/master/src/subscription.js#L377 but the code does not return a promise.

Environment details

  • OS: OSX
  • Node.js version: 8.6.0
  • npm version: 5.3.0
  • @google-cloud/pubsub version: 0.16.2

Steps to reproduce

  1. Open a subscription
  2. Close a subscription with no callback.
  3. Execute .then() on the promise from the subscription close method.

Expected: the callback in the .then() is executed.
Actual: Error calling .then on undefined.

I have completed a fix with a test here: STeveShary@2bf9c33 . Your contribution guide doesn't recommend a PR yet, but I can create one quickly.

Can't access reference documentation - keeps directing to overview page

Hi there,

I've been trying to dig a little deeper into the documentation of PubSub lately, specifically to find out how to extend the ack deadline upon receiving a message, but I can't seem to access the library reference.

As seen here, which according to the url should probably be the library reference:
https://cloud.google.com/nodejs/docs/reference/pubsub/0.16.x/

It's the overview, and if you click on the link to "Cloud Pub/Sub Node.js Client API Reference" it just loads the same page again.

EDIT: I've just realized that the various classes and such are in the menu on the left side so it seems like I can find what I was looking for- just wasn't very obvious at first.

add Subscription#setOptions()

From @tonila on November 5, 2017 6:40

We are upgrading pubsub from 0.13.x to 0.14.x and current API documentation does not seem to answer our questions.

Documentation states that "subscription options do not persist across multiple instances".

With current knowledge, I assume it means, that subscription, that you see at cloud console does not store subscription options, so you need to set them for each instance you receive from the cloud.

But how do you set options, since subscription does not seem to have function for that?

Currently we are just always using topic.createSubscription() for new and existing subscriptions and it seems to work fine, but I am wondering, what is the intended way of doing this?

Copied from original issue: googleapis/google-cloud-node#2723

Receiving lots of duplicate messages

Hi

I've been getting a lot of duplicate messages recently, specifically when running with a largeish backlog of messages, limited maxMessages and delayed acking. I have downgraded to v0.13 (i.e. before StreamingPull) and the dupes have disappeared.

This could possibly be related to #68, googleapis/google-cloud-java#2465 and googleapis/google-cloud-go#778

Environment details

  • OS: ubuntu linux (running under GKE)
  • Node.js version: 9.4.0
  • @google-cloud/pubsub version: 0.16.5
  • subscription ack deadline: 600s

Steps to reproduce

The following reproduces the issue fairly reliably in a container on GKE, though the number of duplicates we were seeing in our actual usage was higher than this seems to produce.

publisher

const NOTIFICATION_TOPIC = 'test';
const pubsub = require('@google-cloud/pubsub')()
const PQueue = require('p-queue');

const publisher = pubsub.topic(NOTIFICATION_TOPIC).publisher();

// The queue ensures there are never more than 500 publishes in flight at
// any one time
const queue = new PQueue({ concurrency: 500 });

function publish() {
  return publisher.publish(Buffer.from('test'));
}

const proms = [];
for (let ii = 0; ii < 2000; ii++) {
  proms.push(queue.add(publish));
}

Promise.all(proms).then(() => console.log('done')).catch(err => console.error(err));

subscriber

const NOTIFICATION_TOPIC = 'test';
const NOTIFICATION_SUBSCRIPTION = 'test-test';

const pubsub = require('@google-cloud/pubsub')();
const ackHistory = new Map();

const topic = pubsub.topic(NOTIFICATION_TOPIC);
const subscription = topic.subscription(NOTIFICATION_SUBSCRIPTION, {
  flowControl: {
    maxMessages: 25
  }
});

var count = 0;
subscription.on('message', message => {
    var ackCount = ackHistory.get(message.ackId) || 0;
    ackCount++;
    ackHistory.set(message.ackId, ackCount);
    if (ackCount > 1) {
      console.log('Already seen this ackId',  message.ackId, ackCount);
    }

    // In our code we went and called some internal services. Simulating this
    // by using a timeout
    setTimeout(() => {
      message.ack();
      console.log('ACKED', count++);
    }, 300);
});

Let me know if I can provide any more information

Message functions don't work with destructuring

On version 0.15 of the pubsub library, destructuring a message makes the ack and nack functions nonfunctional, because of the different this parameter passed to them once destructured.

Environment details

  • OS: Debian Jessie 8
  • Node.js version: 8.9.1
  • npm version: 5.5.1
  • @google-cloud/pubsub version: 0.15

Steps to reproduce

  1. The following code reproduces the problem. This is typescript, but should be enough to illustrate repro code.
import * as PubSub from "@google-cloud/pubsub";
import { inspect } from "util";

(async () => {
  let pubsub = PubSub();
  let [topic] = await pubsub.createTopic("testTopic");
  let [sub] = await topic.createSubscription("testSub");
  sub.on("message", async ({ id, ack }) => {
    console.log("testing ack");
    await ack();
  });
  topic.publisher().publish(Buffer.from("Hi!"));
})();
process.on("unhandledRejection", (err) => console.log("err", inspect(err)))
  1. The log spits out:
err TypeError: Cannot read property 'ackId' of undefined
    at Subscription.breakLease_ (/XXX/node_modules/@google-cloud/pubsub/src/subscription.js:355:59)
    at Subscription.ack_ (/XXX/node_modules/@google-cloud/pubsub/src/subscription.js:289:8)
    at ack (/XXX/node_modules/@google-cloud/pubsub/src/connection-pool.js:306:25)
    at Object.<anonymous> (/XXX/build/test/test-pubsub.js:19:15)
    at Generator.next (<anonymous>)
    at /XXX/build/test/test-pubsub.js:7:71
    at Promise (<anonymous>)
    at __awaiter (/XXX/build/test/test-pubsub.js:3:12)
    at Subscription.sub.on (/XXX/build/test/test-pubsub.js:17:40)
    at emitOne (events.js:115:13)

Workaround

Don't destructure the messages :(

Proposed fix

Changing the createMessage function in connection-pool.js to be the following fixes the problem, because we capture the reference to the message in the calls to subscription.ack_ and subscription.nack_:

ConnectionPool.prototype.createMessage = function(connectionId, resp) {
  var self = this;

  var pt = resp.message.publishTime;
  var milliseconds = parseInt(pt.nanos, 10) / 1e6;
  var originalDataLength = resp.message.data.length;

  var message = {
    connectionId: connectionId,
    ackId: resp.ackId,
    id: resp.message.messageId,
    attributes: resp.message.attributes,
    publishTime: new Date(parseInt(pt.seconds, 10) * 1000 + milliseconds),
    received: Date.now(),
    data: resp.message.data,
    // using get here to prevent user from overwriting data
    get length() {
      return originalDataLength;
    },
  };
  message.ack = function() {
    self.subscription.ack_(message);
  },
  message.nack = function() {
    self.subscription.nack_(message);
  }
  return message;
};

Samples do not show best practices

The samples that demonstrate flow control use them in a CreateSubscription RPC. This is not the recommended pattern, as users only have 1 QPS limit. It is preferred that users attempt to subscribe, and user the error handler to create the subscription if they receive NOT_FOUND from the server. Please correct samples to show this, since we have had users running this in production hit their limits when restarting or trying to scale up their jobs.

Cloud function and pub sub performance

I have a cloud function that takes in a text file from Google storage, re-formats each line into a json object and publishes the json object into a pub sub topic.

My test input file is just over 400 lines (54KB) and yet the cloud function with 2GB of memory gives a time out after 180 seconds (the setting). It then runs to conclusion. However, it takes just on 10 minutes to process the 442 lines of input text. That's 4.4 simple lines of text per minute! Processing this file on my Apple laptop takes just 4 seconds. The performance of this combination seems really bad.

In case it is something wrong with my code, I include the cloud-function below:


helloGCS = (event, callback) => {
    const file = event.data;
  
    if (file.resourceState === 'not_exists') {
      console.log(`File ${file.name} deleted.`);
      callback(null, 'ok - not found');
    } else if (file.metageneration === '1') {
      // metageneration attribute is updated on metadata changes.
      // on create value is 1
        let streamParser = N3.StreamParser();

        const bucket = storage.bucket('woburn-advisory-ttl');
        const remoteFile = bucket.file(file.name);
        let rdfStream = remoteFile.createReadStream();

        const projectId = 'rdf-graphql';
        const pubsub = new PubSub();
        const topic = pubsub.topic('raw_triple');
        const publisher = topic.publisher();

        const datastore = new Datastore({});
        let number_of_rows = 0;

        function DatastoreWriter () {
            let writer = new require('stream').Writable({objectMode: true});
            writer._write = (triple, encoding, done) => {
                    publisher.publish(Buffer.from(JSON.stringify(triple)))
                        .then(()=>{
                            number_of_rows += 1;
                            console.log(number_of_rows);
                            done();
                        })
                        .catch((e)=>{
                            console.log('error ', e);
                            done();
                        });
            }
            return writer
        };
        rdfStream.pipe(streamParser);
        streamParser.pipe(new DatastoreWriter())
        .on('finish', ()=>{
            console.log('records written: ', number_of_rows);
            callback(null, 'ok')
        });
    }          
     else {
        console.log(`File ${file.name} metadata updated.`);
        callback(null, 'ok');
    }
};

possible memory leak?

From @JoshFerge on November 27, 2017 19:15

Environment details

  • OS: Debian GNU/Linux 9.2 (stretch
  • Node.js version: v8.9.1
  • npm version: 5.5.1
  • google-cloud-node version: pubsub-0.15.0
"@google-cloud/pubsub": {
      "resolved": "https://registry.npmjs.org/@google-cloud/pubsub/-/pubsub-0.15.0.tgz",
        "@google-cloud/common": "0.13.6",
        "google-auto-auth": "0.7.2",
        "google-gax": "0.13.5",
        "google-proto-files": "0.13.1",

Steps to reproduce

'use strict';
let pubsub = require( '@google-cloud/pubsub' )();

let topic = pubsub.topic( 'my-topic' );
let subscription = topic.subscription( 'test', { flowControl: { maxMessages : 20 } } );
let c = 0;
subscription.on( 'error', function( err ) { console.error( err ); } );
// Register a listener for `message` events.
function onMessage( message ) {
    message.ack(); 
    if ( c++ % 1000 === 0 ) {
        printMemoryUsage();
    }
}

function printMemoryUsage() {
    let obj = process.memoryUsage();
    console.log( 'Date: ' + new Date() + ', Memory: ' + Math.ceil(  obj.heapUsed / 1000 / 1000 , 2 ) + 'Mb' );
}

subscription.on( 'message', onMessage );

memory usage quickly climbs and eventually runs out. is there something i'm missing here or is there an internal problem?

(I have thousands of messages in this topic that are being processed, so that is the scale)

Copied from original issue: googleapis/google-cloud-node#2757

setting promise implementation does not seem to work

If I understand the docs correctly, the following should work:

const PubSub = require('@google-cloud/pubsub');
const Promise = require('bluebird');

const pubsubClient = PubSub({
  projectId: 'some-project-1234',
  promise: Promise
});

pubsubClient.createTopic('test')
  .tap(console.log);

... but it throws with TypeError: pubsubClient.createTopic(...).tap is not a function, since the returned promise is a native one and not by bluebird.

Setting global.Promise to bluebird makes this example work, but that's besides the point.

I also tried setting it in the option object of the call to createTopic(), but that also fails.

Looking at common/utils, the promise option does not make it into the context of promisfy() there.

Environment details

  • OS: macOs 10.13.2
  • Node.js version: 9.2.1
  • npm version: 5.6.0
  • @google-cloud/pubsub version: 0.16.0

Oldest Unacknowledged Message > 2days

Environment details

  • OS: Kubernetes Engine (1.8.5)
  • Node.js version: 0.9.5
  • npm version: –
  • @google-cloud/pubsub version: 0.16.4

Steps to reproduce

I'm running a single nodejs subscriber which reads messages from one pubsub topic in streaming pull mode at a rate of ~500 messages/s. Messages are usually being acked within below a second.

I'm reading from the same topic on a different subscription with a Dataflow-based subscriber which still uses the non-streaming interface and which doesn't show the issue described here, so I have a benchmark to compare my node-subscriber with.

The node subscriber works quite reliable, except that, from time to time, a few messages (below 10), seem to get stuck in Pubsub for a very long amount of time, often longer than a day.

I've seen values as high as 24h, 29h or even 54h (<- and counting) for the "Oldest Unacknowledged Message" graph in stackdriver.

Eventually, those few messages seem to get consumed. However I have not found a reliable way of triggering that pull, it just happens eventually. Restarting the pods that consume the subscription doesn't seem to help.

I'm not quite sure if this could be related to #11, because in my case, restarting the pods doesn't have any effect.

Any idea where to start debugging?

29h

53h

Edit 2018-02-20:

The subscription shown in the the second graph above has finally delivered/acked the stuck message, after 70h:

70h

Under normal operations the oldest unacked message is typically below 2s old, until at some point some messages get stuck:

normal

Getting GOAWAY error

I have been getting the following error a lot after upgrading to 0.15.0

Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"

It seems that the grpc module is printing it (it is the only place in my code base where this string exists) and pubsub is the only component using grpc... Anyone else having this problem? What is the impact and how can we stop it?

Thanks!
Mo

Environment details

  • OS: Debian 8.10, x86_64 GNU/Linux
  • Node.js version: 6.12.2
  • npm version: 3.10.10
  • @google-cloud/pubsub version: 0.16.1

Steps to reproduce

  1. Unsure, we are subscribing with multiple instances to a very active subscription

Documentation is misleading - Topic.createSubscription Subscription name is not optional

I was trying to create a subscription with no name after reading some older threads in the old github (googleapis/google-cloud-node#1257 & googleapis/google-cloud-node#1799), especially about allowing subscription to get random names if no name was provided.

The documentation seems to be wrong on that particular point: https://cloud.google.com/nodejs/docs/reference/pubsub/0.16.x/Topic#createSubscription

In the table it says that the subscription name is an optional parameter, but in reality it will throw an error (which is actually explicitly specified in the Throws section of the documentation).

It's the same in the Subscription documentation: https://cloud.google.com/nodejs/docs/reference/pubsub/0.16.x/Subscription

Also, just to be sure, could you point me towards the PR that made the subscription name mandatory or what was the reasoning behind that ? In the scenario I'm in, it would be nice to have the possibility to have random / unique subscriptions to a topic, from what I read it was possible at some point so I'm curious to know why it was removed ? Thank you.

Environment details

  • OS: Ubuntu 16.04
  • Node.js version: 8.9.4
  • npm version: 5.5.1
  • @google-cloud/pubsub version: 0.16.2

messages sit in queue until GKE pod with subscriber gets reset

From @ShahNewazKhan on October 1, 2017 9:3

Environment details

  • OS: Debian GNU/Linux 8.9 (jessie) [K8s pod based on dockerfile gcr.io/google_appengine/base]
  • Node.js version: 6.11.3
  • npm version: 5.4.2
  • google-cloud/pubsub version: 0.14.2

Steps to reproduce

  1. Spin up nodejs pubsub publisher to topic1 in GKE pod 1
  2. Spin up nodejs pubsub subscriber to subscription to topic1 in GKE pod 2
  3. Publish messages to topic1

I am facing an intermittent issue where pubsub messages are sitting in the queue and not being delivered to the subscriber in GKE pod 2. Only when I delete the GKE pod 2 subscriber and restart the pod does the message get delivered.

Copied from original issue: googleapis/google-cloud-node#2640

Remove setTimeout() in system tests

The first two system tests (system-tests/pubsub.js, Topic: should be listed, should list topics in a stream) are flaky, possibly because of the 5 sec timeout in the before() block that waits for the topics to be actually created and, in some cases, might be not enough.

Let's make a small refactor of the test to avoid using setTimeout in the before() block and try to poll the service instead and wait until three topics are there. We can cancel polling and fail if the topics were not created in some unreasonable long amount of time (say, 2 or 3 minutes).

FlowControl MaxMessages not minded

Reported this first on s/o and waited a couple days.

https://stackoverflow.com/questions/48326756/google-cloud-pubsub-client-does-not-mind-flowcontrol-settings-as-advertised

Environment details

  • OS: macOS 10.13 High sierra
  • Node.js version: 8.1.4
  • npm version: 5.0.3
  • @google-cloud/pubsub version: 0.16.2

Steps to reproduce

  1. Clone nodejs bookshelf example app
  2. Run local, then deploy, see that all seems to work as expected
  3. Change background.js so that topic.createSubscription is done with {flowControl: {maxMessages: 1}}
  4. Submit a hundred requests
  5. See that worker tries to handle a hindred requests at once despite max messages 1 having been set.

My understanding is that one message should be let through, and a subsequent message should not be processed unless one of these conditions is met: acked/nacked, time-out, error. This is not the case however.

Lease message do no work for long running jobs

Hi,

Environment details

  • OS: Ubuntu 16.04
  • Node.js version: 8.9.4
  • npm version: 5.6.0
  • @google-cloud/pubsub version: 0.16.4

In that issue @callmehiphop mentioned the following:

Even if we allowed the deadline to be set, the current code would overwrite it and your message would not get redelivered until your subscription object was closed.

But the message get redelivered even when subscription.close and message.ack are not called.

Steps to reproduce

  1. Create topic and subscription
  2. Subscribe to message event
  3. Publish message
  4. Do not call ack or nack in message handler. Count the number that message handler was called during at least 120 seconds.
it('should not redeliver message until subscription is closed', function (done) {
    subscription.on('error', done);

    const onMessage = sinon.spy();
    subscription.on('message', onMessage);

    topic
        .publisher()
        .publish(Buffer.from(uuid()))
        .catch(done);

    setTimeout(() => {
        try {
            sinon.assert.calledOnce(onMessage);
            done();
        } catch (error) {
            done(error);
        }
    }, 120000);
});

If you want to run this test you could clone this repository.

I almost always have the broken test. Does It mean that lease message does not work?

Subscriber stops acknowledging messages after running for a few minutes

I have sort of a weird issue and explaining it is somewhat difficult so please bear with me. It sounds very similar to an issue I read on the python pubsub github page (googleapis/google-cloud-python#4274).

I have a worker service on App Engine that I'm using to pull messages off of a PubSub topic, it is just listening to the topic and when it receives a message, it pushes it to an ElasticSearch instance that I have. I have set the minimum number of instances of the worker to a high number (20) to attempt to process a large number of records in the queue. The issue is that it seems on initial startup of the instances, they process and acknowledge a bunch of records but after a while stop acknowledging them (see the images below, the large spikes in the acknowledge graph seem to correspond to deployments/restarts of the worker instances).

screen shot 2018-01-03 at 4 06 32 pm

screen shot 2018-01-03 at 4 05 13 pm

I have attached my code, the subscription listener is invoked by calling the subscribe method in PubSubSerice.js in worker.js, I've attached the relevant portions of worker.js.

Any help is greatly appreciated!

// worker.js

pubSubService.subscribe('driver-log-sync', 'driver-log-sync', pubSubService.syncDriverLog)
pubSubService.subscribe('driver-log-sync-delete', 'driver-log-sync-delete', pubSubService.syncDriverLogDelete)
pubSubService.subscribe('hos-event-history-sync', 'hos-event-history-sync', pubSubService.syncHosEventHistory)

// PubSubService.js

const Pubsub = require('@google-cloud/pubsub')
const LoggingService = require('./loggingService')
const DriverLogsService = require('./driverLogsService')
const path = require('path')
const PQueue = require('p-queue')

let config = {
  projectId: process.env.GCLOUD_PROJECT
}

const pubsub = Pubsub(config)

const queue = new PQueue({concurrency: 4})

function subscribe (topicName = '', subscriptionName = '', cb) {
  function handleMessage (message) {
    message.data = message.data.toString('utf-8')
    cb(null, message)
  }
  function handleError (err) {
    console.log('ERR')
    LoggingService.error(err)
  }
  if (subscriptionName !== '' && topicName !== '') {
    let topic = pubsub.topic(topicName)
    let subscription = topic.subscription(subscriptionName)
    subscription.on('message', handleMessage)
    subscription.on('error', handleError)
    LoggingService.info(`Listening to topic ${topicName} via subscription ${subscriptionName}`)
  } else {
    cb(new Error('Missing topic name and/or subscription name.'))
  }
}

async function syncDriverLog (err, message) {
  try {
    if (err) {
      throw new Error(err.message)
    }
    let { dotNumber, logDate, driverId } = message.attributes
    if (!dotNumber) {
      throw new Error('Missing DOT number for driver-log-sync')
    }
    if (!logDate) {
      throw new Error('Missing Log Date for driver-log-sync')
    }
    if (!driverId) {
      throw new Error('Missing Driver Id for driver-log-sync')
    }
    queue.add(async () => {
      try {
        await delay(25)
        await DriverLogsService.syncDriverLogToElasticSearch(dotNumber, logDate, driverId, (new Date(message.publishTime).getTime()))
        message.ack()
        LoggingService.log(`Successfully synced log for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`)
      } catch (err) {
        message.ack()
        LoggingService.error(`Error syncing log to ElasticSearch for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`, err)
      }
    })
  } catch (err) {
    message.ack()
    LoggingService.error(`Error syncing log to ElasticSearch for message: ${message.attributes}`, err)
  }
}

async function syncDriverLogDelete (err, message) {
  try {
    if (err) {
      throw new Error(err.message)
    }
    let { dotNumber, logDate, driverId } = message.attributes
    if (!dotNumber) {
      throw new Error('Missing DOT number for driver-log-sync')
    }
    if (!logDate) {
      throw new Error('Missing Log Date for driver-log-sync')
    }
    if (!driverId) {
      throw new Error('Missing Driver Id for driver-log-sync')
    }
    queue.add(async () => {
      try {
        await delay(25)
        await DriverLogsService.deleteDriverLogFromElasticSearch(dotNumber, logDate, driverId)
        message.ack()
        LoggingService.log(`Successfully deleted log for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`)
      } catch (err) {
        message.ack()
        LoggingService.error(`Error deleting log for driver: ${driverId}, dotNumber: ${dotNumber}, logDate: ${logDate}`, err)
      }
    })
  } catch (err) {
    message.ack()
    LoggingService.error(`Error syncing log to ElasticSearch for message: ${message.attributes}`, err)
  }
}

async function syncHosEventHistory (err, message) {
  try {
    if (err) {
      throw new Error(err.message)
    }
    let { hosEventHistoryNodeId, driverId } = message.attributes
    let { hosEvent } = JSON.parse(message.data)
    if (!hosEventHistoryNodeId) {
      throw new Error('Missing HOS Event History Node Id for hos-event-history-sync')
    }
    if (!hosEvent) {
      throw new Error('Missing HOS Event for hos-event-history-sync')
    }
    if (!driverId) {
      throw new Error('Missing User Id for hos-event-history-sync')
    }
    queue.add(async () => {
      try {
        await delay(25)
        await DriverLogsService.syncHosEventHistoryToElasticSearch(hosEventHistoryNodeId, hosEvent, driverId, (new Date(message.publishTime).getTime()))
        message.ack()
        LoggingService.log(`Successfully synced history for driver: ${driverId}, historyNodeId: ${hosEventHistoryNodeId}`)
      } catch (err) {
        message.ack()
        LoggingService.error(`Error syncing log to ElasticSearch for message: ${message.attributes}`, err)
      }
    })
  } catch (err) {
    message.ack()
    LoggingService.error('Error syncing log to ElasticSearch', err)
  }
}

function delay (dur) {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve()
    }, dur)
  })
}

module.exports = {
  subscribe,
  syncDriverLog,
  syncDriverLogDelete,
  syncHosEventHistory
}

Environment details

  • OS: App Engine (Linux Debian Jessie)
  • Node.js version: 8.4.0
  • npm version: 5.3.0
  • @google-cloud/pubsub version: 0.14.2

v0.16.3 comes with incompatible grpc versions

When using yarn, v0.16.3 introduced the following error for me:

TypeError: Channel's second argument must be a ChannelCredentials
    at ServiceClient.Client (/app/node_modules/grpc/src/client.js:472:19)
    at new ServiceClient (/app/node_modules/grpc/src/client.js:884:12)
    at /app/node_modules/google-gax/lib/grpc.js:251:12

The reason is that @google-cloud/pubsub now defines grpc@^1.8.4 as a direct dependency as well as google-gax@^0.14.2.

google-gax@^0.14.2 however currently pins its grpc dependency to grpc@~1.7.2, see googleapis/gax-nodejs#178

This altogether breaks v0.16.3 of @google-cloud/pubsub and provokes the above error. Pinning grpc@~1.7.2 back in @google-cloud/pubsub works as a workaround for now.

Related:

Not receiving all messages

  • OS: Google Container Engine (Kubernetes)
  • Node.js version: gcr.io/google_appengine/nodejs
  • npm version:
  • @google-cloud/pubsub version: 0.16.2

I have a very simple subscription on a topic that runs in Container Engine. The code is the same as your examples:

const pubsub = require('@google-cloud/pubsub')();

const projectId = process.env.GOOGLE_CLOUD_PROJECT;
const topic = pubsub.topic('projects/' + projectId + '/topics/security-logs');
const subscription = topic.subscription('projects/' + projectId + '/subscriptions/securitySubscription');

subscription.on('error', function (err) {
    "use strict";
    console.error("Subscription error: " + err);
});

function onMessage(message) {
    "use strict";
    // Called every time a message is received.
    logger.log("Subscription message received: " + JSON.stringify(message));
    const data = Buffer.from(message.data).toString('utf-8');
    logger.log("message.data: " + data);
    message.ack();
}

subscription.on('message', onMessage);

The problem is that not all messages are received. I would say a little more than 50% are.

I see in my logs that messages are successfully sent by publishers (I get messageIds) but the subscription does not get them all. I have only one subscriber.

Any idea what might be wrong ?

subscription.on message wasn't received of messagActually unrelated failures, but we should bump this anyway.es after 10 mins or more.

From @QTGate on September 7, 2017 17:42

  • OS: Debian
  • Node.js version: 8.4.0
  • npm version: 5.4.0
  • google-cloud-node version: 0.56.0
  • @google-cloud/pubsub version: 0.14.0

Steps to reproduce

const pubsub = require ( '@google-cloud/pubsub' )( connectOption )

const subscription = pubsub.topic ( topicName ).subscription ( subScription )

subscription.on ('message', message => {
    message.ack()
    ....
})

subscription.once('error', err => {
    ....
})

I received all un-received messages when I restart it.
I checked the connectionPool object when it have not received message:
connectionPool.isPaused [false]
connectionPool.isOpen [true]
connectionPool.connections [5]

Thank you.

Copied from original issue: googleapis/google-cloud-node#2598

Project ID not automatically determined from key file

Copied from original issue: googleapis/google-cloud-node#2782

@victorandree
December 21, 2017 8:30 AM

Pub/Sub authentication documentation suggests that the project ID will be determined automatically if you provide a keyFilename or GOOGLE_APPLICATION_CREDENTIALS

If you wish, you can set an environment variable (GCLOUD_PROJECT) in place of specifying this inline. Or, if you have provided a service account JSON key file as the config.keyFilename property explained above, your project ID will be detected automatically.

Source: https://googlecloudplatform.github.io/google-cloud-node/#/docs/pubsub/0.15.0/guides/authentication

I can't get the "your project ID will be detected automatically" bit to work, and still need to provide projectId expressly (either as GCLOUD_PROJECT env variable or to the configuration call). This feature isn't really mentioned anywhere else, so maybe it's unsupported? Seems like a good idea though!

This may be related to this issue: googleapis/google-cloud-node#2576

Environment details

  • OS: macOS 10.13.2
  • Node.js version: 8.9.1
  • npm version: 5.6.0
  • @google-cloud/pubsub version: 0.16.0

Steps to reproduce

  1. Let pubsub be configured using only keyFilename or GOOGLE_APPLICATION_CREDENTIALS
  2. Try to subscribe to some topic

Sample code:

const pubsub = require('@google-cloud/pubsub')();

const subscription = pubsub.subscription('test-subscription');
subscription.on('message', message => {
  console.log('message received');
  message.ack();
});

Error message:

events.js:183
      throw er; // Unhandled 'error' event
      ^

Error: Requested project not found or user does not have access to it (project={{projectId}}). Make sure to specify the unique project identifier and not the Google Cloud Console display name.
    at StreamProxy.onConnectionStatus (/code/pubsub/node_modules/@google-cloud/pubsub/src/connection-pool.js:264:21)
    at emitOne (events.js:116:13)
    at StreamProxy.emit (events.js:211:7)
    at ClientDuplexStream.<anonymous> (/code/pubsub/node_modules/google-gax/lib/streaming.js:130:17)
    at emitOne (events.js:116:13)
    at ClientDuplexStream.emit (events.js:211:7)
    at ClientDuplexStream._emitStatusIfDone (/code/pubsub/node_modules/grpc/src/client.js:260:10)
    at ClientDuplexStream._receiveStatus (/code/pubsub/node_modules/grpc/src/client.js:233:8)
    at /code/pubsub/node_modules/grpc/src/client.js:757:12

Broken flow control in rare cases

In production I have a memory consuming task. So if I get two messages at a time my app runs out of memory. I found out that flow control settings do not always works.

Environment details

  • OS: Ubuntu 16.04
  • Node.js version: 8.9.4
  • npm version: 5.5.1
  • @google-cloud/pubsub version: 0.16.4

I created a repository with code to reproduce this issue.

Steps to reproduce

  1. Clone repository https://github.com/muryginm/google-cloud-pubsub-issues
  2. npm install
  3. GOOGLE_PROJECT=your-project-id GOOGLE_APPLICATION_CREDENTIALS=path-to-credentials npm run flow-control
  4. Repeat step 3 a couple of times. At least one of 5 tests will fail.

In source code I created topic, subscription and track for messages that are currently in process. Am I doing something wrong?

Thanks!

axios library throws error on createTopic

  • Environment details
    OS: GKE standard Google container OS
    Node.js version: 8.9.1
    npm version: 5.6.0
    google-cloud-node version: 0.16.4

  • Steps to reproduce
    Note: I ran it locally on my macbook it works however on GKE it fails with the following error when running the exact same code.

  1. require google-cloud
  2. Do PubSub.createTopic("a-Topic", aCallback)
  3. It throws the exception below. It seems to be from our logs the response of the authentication call from the google api that causes this error. From the stack trace it seems to be data passed to the axios library that fires the error

Get the following full stack trace
buffer.js:444
throw new TypeError(kConcatErrMsg);
^
TypeError: "list" argument must be an Array of Buffer or Uint8Array instances
at Function.Buffer.concat (buffer.js:444:13)
at IncomingMessage.handleStreamEnd (/var/components/live-meeting-session/node_modules/axios/lib/adapters/http.js:186:37)
at emitNone (events.js:111:20)
at IncomingMessage.emit (events.js:208:7)
at endReadableNT (_stream_readable.js:1056:12)
at _combinedTickCallback (internal/process/next_tick.js:138:11)
at process._tickCallback (internal/process/next_tick.js:180:9)
Logs from 3/3/18 4:13 AM to 3/3/18 4:13 AM UTC

published messages with more than 5000 characters truncated

From @emilyplusplus on November 30, 2017 4:15

Environment details

  • OS: macOS High Sierra
  • Node.js version: 8.1.3
  • npm version:5.5.1
  • google-cloud-node version: 0.15.0

Steps to reproduce

  1. require @google-cloud/pubsub
  2. authenticate properly
  3. publish message to existing topic and message with > 5000 characters will be truncated as if their is a character limit of 5K

Copied from original issue: googleapis/google-cloud-node#2760

Auth error: ETIMEDOUT

Environment details

Note: App is running external from GCP

  • OS: Ubuntu 16.04.3 LTS
  • Node.js version: 8.9.1
  • npm version: 5.5.1
  • @google-cloud/pubsub version: 0.14.5

Steps to reproduce

UNKNOWN

Auth error:Error: connect ETIMEDOUT 172.217.4.109:443
Auth error:Error: connect ETIMEDOUT 172.217.4.109:443
Auth error:Error: connect ETIMEDOUT 172.217.4.109:443
Auth error:Error: connect ETIMEDOUT 172.217.4.109:443
Dec 31, 2017 7:05 AM ERROR  { message: 'Getting metadata from plugin failed with error: con
nect ETIMEDOUT 172.217.4.109:443',
  stack: 'Error: Getting metadata from plugin failed with error: connect ETIMEDOUT 172.217.4.109:44
3\n    at ClientDuplexStream.onConnectionStatus (/home/jholcomb/ridealert.server/node_modules/@goog
le-cloud/pubsub/src/connection-pool.js:270:21)\n    at emitOne (events.js:116:13)\n    at ClientDup
lexStream.emit (events.js:211:7)\n    at ClientDuplexStream._emitStatusIfDone (/home/jholcomb/ridea
lert.server/node_modules/grpc/src/node/src/client.js:260:10)\n    at ClientDuplexStream._receiveSta
tus (/home/jholcomb/ridealert.server/node_modules/grpc/src/node/src/client.js:233:8)\n    at /home/
jholcomb/ridealert.server/node_modules/grpc/src/node/src/client.js:757:12',
  code: 16 }

This error occured after the server app had been connected and processing messages for several days. Is it expected that the pubsub lib automatically reconnects (eventually) or does a reconnect in this situation have to be handled by the client?

An in-range update of @google-cloud/nodejs-repo-tools is breaking the build 🚨

Version 2.2.1 of @google-cloud/nodejs-repo-tools was just published.

Branch Build failing 🚨
Dependency @google-cloud/nodejs-repo-tools
Current Version 2.2.0
Type devDependency

This version is covered by your current version range and after updating it in your project the build failed.

@google-cloud/nodejs-repo-tools is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • ❌ continuous-integration/appveyor/branch Waiting for AppVeyor build to complete Details
  • ❌ ci/circleci: node7 CircleCI is running your tests Details
  • ❌ ci/circleci: node4 CircleCI is running your tests Details
  • ❌ ci/circleci: node6 CircleCI is running your tests Details
  • βœ… ci/circleci: node9 Your tests passed on CircleCI! Details
  • ❌ ci/circleci: node8 Your tests failed on CircleCI Details

Commits

The new version differs by 8 commits.

  • 6834ab2 2.2.1
  • 13b006d Replace missing links (#103)
  • 78d1f39 fix(package): update got to version 8.2.0 (#105)
  • ae9e035 chore(package): update eslint-plugin-node to version 6.0.0 (#100)
  • b36b6ec fix(package): update lodash to version 4.17.5 (#99)
  • 3f47971 chore(package): update eslint to version 4.18.1 (#104)
  • 635dbc7 chore(package): update eslint-plugin-prettier to version 2.6.0 (#97)
  • 1d30f4c fix(package): update sinon to version 4.2.2 (#96)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

subscription.close() leads to "Request payload size exceeds the limit"

Environment details

  • OS: Kubernetes Engine
  • Node.js version: 9.3.0
  • npm version: –
  • @google-cloud/pubsub version: 0.16.2

Steps to reproduce

I have a consumer running on Kubernetes Engine which reads messages from a PubSub subscription and writes them to BigQuery.

I have implemented a clean shutdown logic when my pods get terminated (e.g. during a rolling updated). In that shutdown logic I call subscription.close().

Occasionally calling subscription.close() leads to the following error being emitted in the subscription.on('error') event:

Error: Request payload size exceeds the limit: 524288 bytes.
    at /app/node_modules/grpc/src/client.js:554:15

If I inspect the error that I receive in the subscription.on('error') handler, I can see that the stack trace contains either:

    at Subscription.emit (events.js:159:13)
    at /app/node_modules/@google-cloud/pubsub/src/subscription.js:346:10
    at <anonymous>"    

or:

    at Subscription.emit (events.js:159:13)
    at /app/node_modules/@google-cloud/pubsub/src/subscription.js:869:10
    at <anonymous>

I could trace down the problem as follows:

When subscription.close() is called, this in turn calls flushQueues_():

Subscription.prototype.close = function(callback) {
var self = this;
this.userClosed_ = true;
var inventory = this.inventory_;
inventory.lease.length = inventory.bytes = 0;
clearTimeout(this.leaseTimeoutHandle_);
this.leaseTimeoutHandle_ = null;
this.flushQueues_().then(function() {
self.closeConnection_(callback);
});
};

Within flushQueues_(), in case there are pending acks or nacks, acknowledge_() or modifyAckDeadline_() are being called will all pending ackIds/nackIds respectively:

if (acks.length) {
requests.push(
this.acknowledge_(acks).then(function() {
self.inventory_.ack = [];
})
);
}
if (nacks.length) {
requests.push(
this.modifyAckDeadline_(nacks, 0).then(function() {
self.inventory_.nack = [];
})
);
}
return Promise.all(requests);

Both these methods are rather similar. In any case they will simply make a call to the API with all the ackIds/nackIds that were passed from flushQueues_().

Here's acknowledge_():

connection.write({ackIds}, resolve);

And here's modifyAckDeadline_():

connection.write(
{
modifyDeadlineAckIds: ackIds,
modifyDeadlineSeconds: Array(ackIds.length).fill(deadline),
},
resolve
);

Apparently these API-calls are too big, since the corresponding promises reject with an error that gets then emitted and that I then see in my application code:

return promise.catch(function(err) {
self.emit('error', err);
});

return promise.catch(function(err) {
self.emit('error', err);
});

The error message mentions 524288 bytes which is precisely 512 KiB.

My suspicion is that the acks/nacks must be sent in batches in case they exceed the maximum payload size for acknowledge/modifyAckDeadline requests?

Unfortunately the REST API docs don't say anything about limits.

In any case I would expect that subscription.close() allows for a clean shutdown, even if there are many pending acks/nacks. The current behavior leads to potential duplicates on my end since pending acks are not correctly being flushed: since the sending of the pending acks itself triggers an error and subsequent crash in my application code the will be re-consumed after the restart of my application, even though they have already been processed.

subscription.on message wasn't received of messagActually unrelated failures, but we should bump this anyway.es after 10 mins or more.

From @QTGate on September 7, 2017 17:42

  • OS: Debian
  • Node.js version: 8.4.0
  • npm version: 5.4.0
  • google-cloud-node version: 0.56.0
  • @google-cloud/pubsub version: 0.14.0

Steps to reproduce

const pubsub = require ( '@google-cloud/pubsub' )( connectOption )

const subscription = pubsub.topic ( topicName ).subscription ( subScription )

subscription.on ('message', message => {
    message.ack()
    ....
})

subscription.once('error', err => {
    ....
})

I received all un-received messages when I restart it.
I checked the connectionPool object when it have not received message:
connectionPool.isPaused [false]
connectionPool.isOpen [true]
connectionPool.connections [5]

Thank you.

Copied from original issue: googleapis/google-cloud-node#2598

An in-range update of protobufjs is breaking the build 🚨

☝️ Greenkeeper’s updated Terms of Service will come into effect on April 6th, 2018.

Version 6.8.6 of protobufjs was just published.

Branch Build failing 🚨
Dependency protobufjs
Current Version 6.8.5
Type dependency

This version is covered by your current version range and after updating it in your project the build failed.

protobufjs is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • ❌ continuous-integration/appveyor/branch Waiting for AppVeyor build to complete Details
  • ❌ ci/circleci: node4 CircleCI is running your tests Details
  • ❌ ci/circleci: node6 CircleCI is running your tests Details
  • βœ… ci/circleci: node8 Your tests passed on CircleCI! Details
  • ❌ ci/circleci: node9 Your tests failed on CircleCI Details

Release Notes 6.8.6

This is a security patch:

  • Fixes typeRefRe used in the parser being vulnerable to ReDoS as reported by James Davis. Relevant where a user is allowed to provide .proto sources for parsing. Applications using trusted .proto definitions exclusively, JSON descriptors or static code are not affected.
Commits

The new version differs by 2 commits.

  • 918ff01 Update dist files for 6.8.6
  • 2ee1028 Security: Fix typeRefRe being vulnerable to ReDoS

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

subscription keeps receiving messages after subscription.close() was called

Environment details

  • OS: Kubernetes Engine
  • Node.js version: 9.3.0
  • npm version: –
  • @google-cloud/pubsub version: 0.16.2

Steps to reproduce

During my investigations on #62 I cluttered the library code with console.log() statements (see this branch for reference) and found an interesting pattern:

After subscription.close() is called, the subscription can still receive many new messages, that all need to be nacked. For the investigation I've placed a debug statement in the modifyAckDeadline_() method.

Here's the output of this log statement:

{
  "message": "modifyAckDeadline_ non-streaming",
  "acknowledgeInstanceId": "230edb80-718a-4b0a-8e9f-083da5da3fd2",
  "deadline": 0,
  "count": 2202,
  "bytes": 359042
}
{
  "message": "modifyAckDeadline_ non-streaming",
  "acknowledgeInstanceId": "3d749b9f-0156-4120-aee5-fa9c1ce455b0",
  "deadline": 0,
  "count": 5068,
  "bytes": 826668
}
{
  "message": "modifyAckDeadline_ non-streaming",
  "acknowledgeInstanceId": "2b2b66b9-723b-4cb6-a654-8ebad6ff298a",
  "deadline": 0,
  "count": 7889,
  "bytes": 1287063
}
{
  "message": "modifyAckDeadline_ non-streaming",
  "acknowledgeInstanceId": "c7b13ac3-5440-438e-9eac-f6bda0f59bc6",
  "deadline": 0,
  "count": 9270,
  "bytes": 1512777
}
{
  "message": "modifyAckDeadline_ non-streaming",
  "acknowledgeInstanceId": "aa3de1e8-55bd-432f-b9b0-8b1c057a9b76",
  "deadline": 0,
  "count": 11806,
  "bytes": 1926374
}
{
  "message": "modifyAckDeadline_ non-streaming",
  "acknowledgeInstanceId": "eed24ea6-2bce-4137-8ad8-c49dc16e528b",
  "deadline": 0,
  "count": 280,
  "bytes": 45585
}
{
  "message": "modifyAckDeadline_ non-streaming",
  "acknowledgeInstanceId": "9c8653a5-2117-4454-9e96-fd154e75bdf0",
  "deadline": 0,
  "count": 2861,
  "bytes": 466398
}

Or looking just at the ackId-counts:

  "count": 2202,
  "count": 5068,
  "count": 7889,
  "count": 9270,
  "count": 11806,
  "count": 280,
  "count": 2861,

As you can see the counts keep on increasing for a while before the eventually drop. Please note that this all happens after the subscription.close() call.

Would it somehow be possible to stop receiving new messages as soon as .close() has been called? Or is the observed behavior intended?

using "topic.exists()" raises "TypeError: Channel's second argument must be a ChannelCredentials"

Environment details

  • OS: Ubuntu 16.04
  • Node.js version: 9.3.0
  • npm version: 5.5.1
  • @google-cloud/pubsub version: 0.16.1

Steps to reproduce

  1. Setup PubSub emulator and set PUBSUB_EMULATOR_HOST
  2. run topic.exists()

This issue does not happen with PubSub version 0.15.0, when upgrading to 0.16.* and calling topic.exists() the following error occur:

[0] (node:48) UnhandledPromiseRejectionWarning: TypeError: Channel's second argument must be a ChannelCredentials
[0]     at ServiceClient.Client (/usr/src/app/node_modules/google-gax/node_modules/grpc/src/client.js:472:19)
[0]     at new ServiceClient (/usr/src/app/node_modules/google-gax/node_modules/grpc/src/client.js:884:12)
[0]     at /usr/src/app/node_modules/google-gax/lib/grpc.js:245:12
[0]     at &lt;anonymous&gt;
[0]     at process._tickCallback (internal/process/next_tick.js:160:7)
[0]     at Function.Module.runMain (module.js:703:11)
[0]     at startup (bootstrap_node.js:194:16)
[0]     at bootstrap_node.js:618:3
[0] (node:48) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
[0] (node:48) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

Thanks!

Batching settings is ignored

Hi,

I am running a nodejs-pubsub client on Windows and Linux and finding this bug on both Platform.

The batching parameter of topic.publisher() is ignored when message are published, despite being visibile and correctly set-up when I read from publisher.settings.batching.

I tried to set the maxMessages parameter to 1 but my messages are still published in batches.

I tried to set BatchSettings(max_messages=1) with Python (with the same subscribers) and it worked.

Environment details

  • OS: Ubuntu 17.10 , 64 bits
  • Node.js version: 6.11.4
  • npm version: 3.5.2
  • google-cloud/pubsub version: 0.16.4

Steps to reproduce

const PubSub = require('@google-cloud/pubsub');

const pubsub = new PubSub({});

const topic = pubsub.topic('mytopic');

const publisher = topic.publisher({
  batching: {
    maxMessages: 1
  }
})

const callback = function (err, messageId) {
  if (err) {
    console.log(err)
  }
};

for (var i = 1; i <= 3; i++) {
  publisher.publish(Buffer.from('This is a message'), { myattribute: 'attributeValue'}, callback);
} // These messages will be sent in a batch rather than as 3 independant messages

Thanks for your help!

Make ackDeadline editable

This is a request originally from @mkamioner in this PR.

Love the new changes, but I miss the ability to specify my ackDeadline -- Sometimes I have processes with long running jobs and I want to be able to change it in once place

Duplicate messages exactly every 15 minutes

Environment details

  • OS: Kubernetes Engine
  • Node.js version: 6.3.0
  • npm version: –
  • @google-cloud/pubsub version: 0.16.2

Steps to reproduce

This is most likely related to the discussion in #2 (comment) (and following comments in that thread). However since the discussion there somewhat faded out I want to report my findings in a new issue.

My subscriber is consuming messages at a rate of roughly 500/s and it is receiving small batches of duplicate messages exactly every 15 minutes. Those batches typically contain between 100 and 400 duplicate messages. Here's a plot of the number of duplicates over time:

batches

Most of the duplicates are being delivered to my subscriber within less then a second. Here's a histogram of the durations between redeliveries in milliseconds:

redelivery

As you can see, the batches of duplicates coincide with spikes in Stackdriver graphs on StreamingPull Operations and StreamingPull Acknowledge Requests (please note that Stackdriver shows Berlin time while the above graph shows UTC, hence 1h time difference):

stackdriver

From the comments in the other thread I did not really understand whether the behavior we see is actually expected. What's the reason for this to happen precisely every 15 minutes?

Even though the absolute number of duplicates is well below 1%, this still looks pretty odd, unexpected and unnecessary. I'd love to understand better what's causing this issue and how it could potentially be fixed.

/cc @kir-titievsky @callmehiphop @rossj

unacked messages being redelivered

From @rhodgkins on November 23, 2017 15:19

In previous (I think v0.12 and lower) versions of pubsub messages were auto acked upon receiving a message (or if this was turned off, had to be manually acked). As I understood this had to happen within the ackDeadlineSeconds time and this could be modified with subscription.modifyAckDeadline().

Now the newer version (v0.15), from looking at the code, uses timers to "lease" out the message (using the acDeadlineSeconds time as the initial lease interval), automatically extending the ack deadline until either message.ack() or message.nack() is called?
What happens if you don't do this for a long period of time? Does the lease timer keep on and on extending the ack deadline?

The reason I'm asking for clarification is that I've seen unacked (that is neither .ack() or .nack() has been called on the message) messages being delivered again after a period of time.

So the following would happen:

  1. Message A published
  2. Message A received and not acked or nacked
  3. Period of time passes - say 3 mins
  4. Message A is received again

I've also had it where I've nacked a message and the current message I'm processing is delivered again.

  1. Message A published
  2. Message A received and not acked or nacked
  3. Message B published and nacked
  4. Message A is received again

I'll try and post some replication steps for the latter issue (its not consistent when I have seen it), but if anyone can confirm by above questions that would be great! Cheers!

Copied from original issue: googleapis/google-cloud-node#2756

Not working in cloud function

I am unable to get the client to work in a cloud function. Sometimes messages are never acknowledged (even though ack() is being called) and the cloud function can get "stuck" where no newly published messages are ever fetched until the cloud function is redeployed.

Reading the cloud function docs, background activities should not be started.

Is the node.js pubsub client not compatible with cloud functions because it sends acknowledgments in the background and re-uses connections from a pool? I have a StackOverflow question asking the same question in more detail.

Thanks!

Environment details

  • OS: Google Cloud Function
  • Node.js version: 6.11.5
  • npm version: not documented by Google
  • @google-cloud/pubsub version: 0.16.2

Steps to reproduce

  1. Create a Google Cloud Function using the pubsub client to pull messages from a subscription
  2. Deploy the function
  3. Publish messages, execute the function, and repeat

Constructor does not normalize arguments in all cases

The PubSub constructor function contains

if (!(this instanceof PubSub)) {
  options = common.util.normalizeArguments(this, options);
  return new PubSub(options);
}

but that code is only activated when PubSub is used as a factory function. This is a problem when PubSub is used as a constructor, i.e. new PubSub(). common.util.normalizeArguments performs inference of the projectId, so when doing new PubSub(), it no longer recognizes your GCLOUD_PROJECT environment variable.

The problem of not calling normalizeArguments for all cases can be found in a bunch of Google Cloud Node libraries.

An in-range update of google-gax is breaking the build 🚨

Version 0.14.5 of google-gax was just published.

Branch Build failing 🚨
Dependency google-gax
Current Version 0.14.4
Type dependency

This version is covered by your current version range and after updating it in your project the build failed.

google-gax is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • βœ… ci/circleci: node4 Your tests passed on CircleCI! Details
  • βœ… ci/circleci: node9 Your tests passed on CircleCI! Details
  • βœ… ci/circleci: node8 Your tests passed on CircleCI! Details
  • βœ… ci/circleci: node7 Your tests passed on CircleCI! Details
  • βœ… ci/circleci: node6 Your tests passed on CircleCI! Details
  • ❌ ci/circleci: lint Your tests are queued behind your running builds Details
  • βœ… continuous-integration/appveyor/branch AppVeyor build succeeded Details
  • ❌ ci/circleci: docs Your tests failed on CircleCI Details

Commits

The new version differs by 1 commits.

  • 2d52e53 fix: downgrade grpc back to 1.7.2, bump version to 0.14.5 (#178)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

An in-range update of eslint-plugin-prettier is breaking the build 🚨

Version 2.5.0 of eslint-plugin-prettier was just published.

Branch Build failing 🚨
Dependency eslint-plugin-prettier
Current Version 2.4.0
Type devDependency

This version is covered by your current version range and after updating it in your project the build failed.

eslint-plugin-prettier is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • ❌ continuous-integration/appveyor/branch Waiting for AppVeyor build to complete Details
  • ❌ ci/circleci: node6 CircleCI is running your tests Details
  • βœ… ci/circleci: node9 Your tests passed on CircleCI! Details
  • βœ… ci/circleci: node7 Your tests passed on CircleCI! Details
  • βœ… ci/circleci: node8 Your tests passed on CircleCI! Details
  • ❌ ci/circleci: node4 Your tests failed on CircleCI Details

Commits

The new version differs by 3 commits.

  • eba622e Build: update package.json and changelog for v2.5.0
  • 0b6ab55 Fix: pass filepath to prettier (#76)
  • 804ead7 Update: Add URL to rule documentation to the metadata (#75)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.