Code Monkey home page Code Monkey logo

node-amqp-connection-manager's Introduction

amqp-connection-manager

NPM version Build Status semantic-release

Connection management for amqplib. This is a wrapper around amqplib which provides automatic reconnects.

Features

  • Automatically reconnect when your amqplib broker dies in a fire.
  • Round-robin connections between multiple brokers in a cluster.
  • If messages are sent while the broker is unavailable, queues messages in memory until we reconnect.
  • Supports both promises and callbacks (using promise-breaker)
  • Very un-opinionated library - a thin wrapper around amqplib.

Installation

npm install --save amqplib amqp-connection-manager

Basics

The basic idea here is that, usually, when you create a new channel, you do some setup work at the beginning (like asserting that various queues or exchanges exist, or binding to queues), and then you send and receive messages and you never touch that stuff again.

amqp-connection-manager will reconnect to a new broker whenever the broker it is currently connected to dies. When you ask amqp-connection-manager for a channel, you specify one or more setup functions to run; the setup functions will be run every time amqp-connection-manager reconnects, to make sure your channel and broker are in a sane state.

Before we get into an example, note this example is written using Promises, however much like amqplib, any function which returns a Promise will also accept a callback as an optional parameter.

Here's the example:

var amqp = require('amqp-connection-manager');

// Create a new connection manager
var connection = amqp.connect(['amqp://localhost']);

// Ask the connection manager for a ChannelWrapper.  Specify a setup function to
// run every time we reconnect to the broker.
var channelWrapper = connection.createChannel({
  json: true,
  setup: function (channel) {
    // `channel` here is a regular amqplib `ConfirmChannel`.
    // Note that `this` here is the channelWrapper instance.
    return channel.assertQueue('rxQueueName', { durable: true });
  },
});

// Send some messages to the queue.  If we're not currently connected, these will be queued up in memory
// until we connect.  Note that `sendToQueue()` and `publish()` return a Promise which is fulfilled or rejected
// when the message is actually sent (or not sent.)
channelWrapper
  .sendToQueue('rxQueueName', { hello: 'world' })
  .then(function () {
    return console.log('Message was sent!  Hooray!');
  })
  .catch(function (err) {
    return console.log('Message was rejected...  Boo!');
  });

Sometimes it's handy to modify a channel at run time. For example, suppose you have a channel that's listening to one kind of message, and you decide you now also want to listen to some other kind of message. This can be done by adding a new setup function to an existing ChannelWrapper:

channelWrapper.addSetup(function (channel) {
  return Promise.all([
    channel.assertQueue('my-queue', { exclusive: true, autoDelete: true }),
    channel.bindQueue('my-queue', 'my-exchange', 'create'),
    channel.consume('my-queue', handleMessage),
  ]);
});

addSetup() returns a Promise which resolves when the setup function is finished (or immediately, if the underlying connection is not currently connected to a broker.) There is also a removeSetup(setup, teardown) which will run teardown(channel) if the channel is currently connected to a broker (and will not run teardown at all otherwise.) Note that setup and teardown must either accept a callback or return a Promise.

See a complete example in the examples folder.

API

connect(urls, options)

Creates a new AmqpConnectionManager, which will connect to one of the URLs provided in urls. If a broker is unreachable or dies, then AmqpConnectionManager will try the next available broker, round-robin.

Options:

  • options.heartbeatIntervalInSeconds - Interval to send heartbeats to broker. Defaults to 5 seconds.
  • options.reconnectTimeInSeconds - The time to wait before trying to reconnect. If not specified, defaults to heartbeatIntervalInSeconds.
  • options.findServers(callback) is a function which returns one or more servers to connect to. This should return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism. such as Consul or etcd. Instead of taking a callback, this can also return a Promise. Note that if this is supplied, then urls is ignored.
  • options.connectionOptions is passed as options to the amqplib connect method.

AmqpConnectionManager events

  • connect({connection, url}) - Emitted whenever we successfully connect to a broker.
  • connectFailed({err, url}) - Emitted whenever we attempt to connect to a broker, but fail.
  • disconnect({err}) - Emitted whenever we disconnect from a broker.
  • blocked({reason}) - Emitted whenever a connection is blocked by a broker
  • unblocked - Emitted whenever a connection is unblocked by a broker

AmqpConnectionManager#createChannel(options)

Create a new ChannelWrapper. This is a proxy for the actual channel (which may or may not exist at any moment, depending on whether or not we are currently connected.)

Options:

  • options.name - Name for this channel. Used for debugging.
  • options.setup(channel, [cb]) - A function to call whenever we reconnect to the broker (and therefore create a new underlying channel.) This function should either accept a callback, or return a Promise. See addSetup below. Note that this inside the setup function will the returned ChannelWrapper. The ChannelWrapper has a special context member you can use to store arbitrary data in.
  • options.json - if true, then ChannelWrapper assumes all messages passed to publish() and sendToQueue() are plain JSON objects. These will be encoded automatically before being sent.
  • options.confirm - if true (default), the created channel will be a ConfirmChannel
  • options.publishTimeout - a default timeout for messages published to this channel.

AmqpConnectionManager#isConnected()

Returns true if the AmqpConnectionManager is connected to a broker, false otherwise.

AmqpConnectionManager#close()

Close this AmqpConnectionManager and free all associated resources.

ChannelWrapper events

  • connect - emitted every time this channel connects or reconnects.
  • error(err, {name}) - emitted if an error occurs setting up the channel.
  • close - emitted when this channel closes via a call to close()

ChannelWrapper#addSetup(setup)

Adds a new 'setup handler'.

setup(channel, [cb]) is a function to call when a new underlying channel is created - handy for asserting exchanges and queues exists, and whatnot. The channel object here is a ConfirmChannel from amqplib. The setup function should return a Promise (or optionally take a callback) - no messages will be sent until this Promise resolves.

If there is a connection, setup() will be run immediately, and the addSetup Promise/callback won't resolve until setup is complete. Note that in this case, if the setup throws an error, no 'error' event will be emitted, since you can just handle the error here (although the setup will still be added for future reconnects, even if it throws an error.)

Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error' event.

ChannelWrapper#removeSetup(setup, teardown)

Removes a setup handler. If the channel is currently connected, will call teardown(channel), passing in the underlying amqplib ConfirmChannel. teardown should either take a callback or return a Promise.

ChannelWrapper#publish and ChannelWrapper#sendToQueue

These work exactly like their counterparts in amqplib's Channel, except that they return a Promise (or accept a callback) which resolves when the message is confirmed to have been delivered to the broker. The promise rejects if either the broker refuses the message, or if close() is called on the ChannelWrapper before the message can be delivered.

Both of these functions take an additional option when passing options:

  • timeout - If specified, if a messages is not acked by the amqp broker within the specified number of milliseconds, the message will be rejected. Note that the message may still end up getting delivered after the timeout, as we have no way to cancel the in-flight request.

ChannelWrapper#ack and ChannelWrapper#nack

These are just aliases for calling ack() and nack() on the underlying channel. They do nothing if the underlying channel is not connected.

ChannelWrapper#queueLength()

Returns a count of messages currently waiting to be sent to the underlying channel.

ChannelWrapper#close()

Close a channel, clean up resources associated with it.

node-amqp-connection-manager's People

Contributors

andarist avatar borales avatar brettbedarf avatar cs-bturner avatar dependabot-preview[bot] avatar dependabot[bot] avatar devjerry avatar diegog avatar ericwastakenmjd avatar fvictorio avatar greenkeeper[bot] avatar greenkeeperio-bot avatar jwalton avatar kekkokk avatar kpman avatar lededje avatar luddd3 avatar megadix avatar microwavekonijn avatar nekufa avatar radu-c avatar rhsobr avatar semantic-release-bot avatar skaegi avatar sw271 avatar treble-snake avatar tstapleton avatar tw0517tw avatar whs avatar xfd1387 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-amqp-connection-manager's Issues

An in-range update of sinon is breaking the build 🚨

Version 5.0.10 of sinon was just published.

Branch Build failing 🚨
Dependency sinon
Current Version 5.0.9
Type devDependency

This version is covered by your current version range and after updating it in your project the build failed.

sinon is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • continuous-integration/travis-ci/push The Travis CI build could not complete due to an error Details
  • coverage/coveralls First build on greenkeeper/sinon-5.0.10 at 100.0% Details

Commits

The new version differs by 9 commits.

  • 4313d2d Update docs/changelog.md and set new release id in docs/_config.yml
  • e21c92a Add release documentation for v5.0.10
  • 41d0dcb 5.0.10
  • 928379c Update History.md and AUTHORS for new release
  • 5ca48d3 Merge pull request #1802 from ifrost/feature/restore-default-sandbox-fake-timers
  • 087bc1c Cache reverse function
  • d19a4c0 Add issue number
  • 6799e1c Use fakeTimers in the default sandbox
  • 8b6f8a8 Revert spied fakeTimers to original state

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Need help in implementing this for RPC pattern

Hi,
I want to implement following ;

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    ch.assertQueue('', {exclusive: true}, function(err, q) {
      var corr = generateUuid();
      var num = parseInt(args[0]);
      console.log(' [x] Requesting fib(%d)', num);
      ch.consume(q.queue, function(msg) {
        if (msg.properties.correlationId === corr) {
          console.log(' [.] Got %s', msg.content.toString());
          setTimeout(function() { conn.close(); process.exit(0) }, 500);
        }
      }, {noAck: true});
      ch.sendToQueue('rpc_queue',
        new Buffer(num.toString()),
        { correlationId: corr, replyTo: q.queue });
    });
  });
});

Full source link : https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/javascript-nodejs/src/rpc_client.js

In your example pattern like ;

const channelWrapper = amqpConnection.createChannel({
  json: true,
  setup(channel) {
    // channel here is a regular amqplib ConfirmChannel.
    return channel.assertQueue('', { exclusive: true });
  }
});

const sendMessage = function (num, id) {
  channelWrapper.sendToQueue(QUEUE_NAME, new Buffer(num.toString()),
    { correlationId: id, **replyTo: q.queue** })
    .then(() => {
      console.log('Message sent');
      return;
    }).catch((err) => {
      console.log('Message was rejected:', err.stack);
      channelWrapper.close();
      amqpConnection.close();
    });
};

How can I get reference to assertQueue ? (repyTo: **"q".**queue)

Please help me in modifying the code.

Support for per URL connection options

Some of the connectionOptions we use are per URL (for example "servername" which boils down to TLSSocket.servername and used as part of the TLS Server Name Indication). Could connect be altered to accept an array of {url:... , options:...} or something along those lines.

How to use createChannel with callback

Hi there,

I'm trying to use this lib but I still use callbacks in the project I'm currently working on, and I can't get to use the callback version of createChannel. Is there an example I can look at?

json option in the examples

It seems that the json option in the examples is in the wrong place.

In the example it is on the amqp.connect but it should be on the createChannel options.

Reconnect missing when broker closes connection gracefully

Hi,

My CoffeeScript skills are not exactly brilliant - but after investigating this part of the code I would guess that a reconnection attempt is only initiated if the broker went away due to an unexpected error.
However, besides the error event there's also a close event which can be fired from the connection. This happens if you e.g. manually shutdown the RabbitMQ server. The underlying amqplib then generates a CONNECTION_FORCED error and emits a close event for the connection.

As far as I can tell there just needs to be another event handler for connections' close events implemented which also tries to do a reconnect. However as mentioned, I haven't been doing anything in CoffeeScript yet and that's why I didn't want to do a pull request right away.

EDIT: I just saw the (last) test which makes sure that there's no reconnection attempt after a connection close. So furthermore there should be a distinction between close events fired by us manually closing the connection in the client and those fired by the server as described in the above example.

Feel free to correct me if I'm wrong.

exception handling into setup function - reconnection retry interval

Hi there,

in the following test case, an exception will be raised by the RabbitMq server due to the lack of authorization in assertExchange.

Disconnected. Error: Channel closed by server: 403 (ACCESS-REFUSED) with message "ACCESS_REFUSED - access to exchange 'assertFail' in vhost '/' refused for user 'test'"
    at ConfirmChannel.C.accept (/home/test/lib/node_modules/amqplib/lib/channel.js:403:24)
    at Connection.mainAccept [as accept] (/home/test/lib/node_modules/amqplib/lib/connection.js:62:33)
    at Socket.go (/home/test/availability-agent/lib/node_modules/amqplib/lib/connection.js:465:48)
    at Socket.emit (events.js:92:17)
    at emitReadable_ (_stream_readable.js:427:10)
    at emitReadable (_stream_readable.js:423:5)
    at readableAddChunk (_stream_readable.js:166:9)
    at Socket.Readable.push (_stream_readable.js:128:10)
    at TCP.onread (net.js:529:21)

The final outcome is a tight recconection loop that will not respect the reconnectTimeInSeconds directive. I expeted to see retry attempts every reconnectTimeInSeconds

var amqp = require('amqp-connection-manager');

var connection = amqp.connect(['amqp://test:[email protected]:5672'], {reconnectTimeInSeconds: 5, json: true});
connection.on('connect', function() {
    console.log('Connected!');
});
connection.on('disconnect', function(params) {
    console.log('Disconnected.', params.err.stack);
});

// Set up a channel listening for messages in the queue.
var channelWrapper = connection.createChannel({
    setup: function(channel) {
        return  channel.assertExchange("assertFail");
    }
});

channelWrapper.waitForConnect().then(function() {
    console.log("Listening for messages");
});

channelWrapper.on('connect', function(){console.log("connect")})
channelWrapper.on('error', function(){console.log("error")})
channelWrapper.on('drop', function(){console.log("drop")})
channelWrapper.on('close', function(){console.log("close")})

An in-range update of eslint is breaking the build 🚨

The devDependency eslint was updated from 5.13.0 to 5.14.0.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

eslint is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Release Notes for v5.14.0
  • 85a04b3 Fix: adds conditional for separateRequires in one-var (fixes #10179) (#10980) (Scott Stern)
  • 0c02932 Upgrade: [email protected] (#11401) (Ilya Volodin)
  • 104ae88 Docs: Update governance doc with reviewers status (#11399) (Nicholas C. Zakas)
  • ab8ac6a Fix: Support boundary spread elements in sort-keys (#11158) (Jakub Rożek)
  • a23d197 New: add allowSingleLineBlocks opt. to padded-blocks rule (fixes #7145) (#11243) (richie3366)
  • e25e7aa Fix: comma-spacing ignore comma before closing paren (fixes #11295) (#11374) (Pig Fang)
  • a1f7c44 Docs: fix space-before-blocks correct code for "classes": "never" (#11391) (PoziWorld)
  • 14f58a2 Docs: fix grammar in object-curly-spacing docs (#11389) (PoziWorld)
  • d3e9a27 Docs: fix grammar in “those who says” (#11390) (PoziWorld)
  • ea8e804 Docs: Add note about support for object spread (fixes #11136) (#11395) (Steven Thomas)
  • 95aa3fd Docs: Update README team and sponsors (ESLint Jenkins)
  • 51c4972 Update: Behavior of --init (fixes #11105) (#11332) (Nicholas C. Zakas)
  • ad7a380 Docs: Update README team and sponsors (ESLint Jenkins)
  • 550de1e Update: use default keyword in JSON schema (fixes #9929) (#11288) (Pig Fang)
  • 983c520 Update: Use 'readonly' and 'writable' for globals (fixes #11359) (#11384) (Nicholas C. Zakas)
  • f1d3a7e Upgrade: some deps (fixes #11372) (#11373) (薛定谔的猫)
  • 3e0c417 Docs: Fix grammar in “there’s nothing prevent you” (#11385) (PoziWorld)
  • de988bc Docs: Fix grammar: Spacing improve -> Spacing improves (#11386) (PoziWorld)
  • 1309dfd Revert "Build: fix test failure on Node 11 (#11100)" (#11375) (薛定谔的猫)
  • 1e56897 Docs: “the function actually use”: use -> uses (#11380) (PoziWorld)
  • 5a71bc9 Docs: Update README team and sponsors (ESLint Jenkins)
  • 82a58ce Docs: Update README team and sponsors (ESLint Jenkins)
  • 546d355 Docs: Update README with latest sponsors/team data (#11378) (Nicholas C. Zakas)
  • c0df9fe Docs: ... is not an operator (#11232) (Felix Kling)
  • 7ecfdef Docs: update typescript parser (refs #11368) (#11369) (薛定谔的猫)
  • 3c90dd7 Update: remove prefer-spread autofix (fixes #11330) (#11365) (薛定谔的猫)
  • 5eb3121 Update: add fixer for prefer-destructuring (fixes #11151) (#11301) (golopot)
  • 173eb38 Docs: Clarify ecmaVersion doesn't imply globals (refs #9812) (#11364) (Keith Maxwell)
  • 84ce72f Fix: Remove extraneous linefeeds in one-var fixer (fixes #10741) (#10955) (st-sloth)
  • 389362a Docs: clarify motivation for no-prototype-builtins (#11356) (Teddy Katz)
  • 533d240 Update: no-shadow-restricted-names lets unassigned vars shadow undefined (#11341) (Teddy Katz)
  • d0e823a Update: Make --init run js config files through linter (fixes #9947) (#11337) (Brian Kurek)
  • 92fc2f4 Fix: CircularJSON dependency warning (fixes #11052) (#11314) (Terry)
  • 4dd19a3 Docs: mention 'prefer-spread' in docs of 'no-useless-call' (#11348) (Klaus Meinhardt)
  • 4fd83d5 Docs: fix a misleading example in one-var (#11350) (薛定谔的猫)
  • 9441ce7 Chore: update incorrect tests to fix build failing (#11354) (薛定谔的猫)
Commits

The new version differs by 38 commits.

  • af9688b 5.14.0
  • 0ce3ac7 Build: changelog update for 5.14.0
  • 85a04b3 Fix: adds conditional for separateRequires in one-var (fixes #10179) (#10980)
  • 0c02932 Upgrade: [email protected] (#11401)
  • 104ae88 Docs: Update governance doc with reviewers status (#11399)
  • ab8ac6a Fix: Support boundary spread elements in sort-keys (#11158)
  • a23d197 New: add allowSingleLineBlocks opt. to padded-blocks rule (fixes #7145) (#11243)
  • e25e7aa Fix: comma-spacing ignore comma before closing paren (fixes #11295) (#11374)
  • a1f7c44 Docs: fix space-before-blocks correct code for "classes": "never" (#11391)
  • 14f58a2 Docs: fix grammar in object-curly-spacing docs (#11389)
  • d3e9a27 Docs: fix grammar in “those who says” (#11390)
  • ea8e804 Docs: Add note about support for object spread (fixes #11136) (#11395)
  • 95aa3fd Docs: Update README team and sponsors
  • 51c4972 Update: Behavior of --init (fixes #11105) (#11332)
  • ad7a380 Docs: Update README team and sponsors

There are 38 commits in total.

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Nonexistent drop event

In the documentation, it states that the drop event is "called when a JSON message was dropped because it could not be encoded." I can't seem to produce this result (for logging purposes only), and when I look at the code there doesn't seem to be any code that fires it (I could be wrong, my coffeescript is nonexistent). What is the event actually supposed to do?

make "send queue" optionnal

Thanks you for this lib.

I have the following use case:
-receive MSG
-work X
-work Y
-send MSG2
-ack MSG

Since ack must be done on the same channel then reception, I need for this use case to "send or fail" all the task.

Please let me know if it's possible
Regards

Cancel consumer

Hi,

Am using the AMQP Connection Manager in a micro service deployed in containers on Kubernetes. As K8s manages scaling and such it starts and stops containers at its own digression.

I'm trying to make it so that my MicroService shuts down gracefully, meaning that any inflight messages are properly handled, but the service isn't send new messages from RMQ.

AFAIK, in plain amqp I should basic.cancel the consumer, before calling .close() on the channel.

However, AMQP Connection Manager doesn't do this nor does it seem to allow me to do so. The .close() function on the ChannelWrapper doesn't do basic.cancel before the calling .close() on the channel.

I tried hacking it, by accessing the _channel property on the ChannelWrapper and calling .cancel() on it directly, but it seems this triggers auto reconnect by the AMQP Connection Manager.

Is my thinking flawed? Am I missing something? Or is this indeed not supported? If the later, could it be added or won't that work within the goals of the Connection Manager?

Publishing / SendToQueue On a Cluster

I've got it set up and publishing to the cluster. If a node dies the target channel changes, which is great.

However I'd like to spread the load across multiple nodes since I'm firing a ton of messages, how can I do this?

I'd expect to be able to pass an argument to sendToQueue to signal which node to send the message to, or is what I'm after not possible with this plugin?

json:true benefits

Is there any benefit (like performance) of using json:true instead of sending JS object directly?

when channel error occur then connection is closed

this is not a node-amqp-connection-manager bug but

amqplib has bug when channel error occur then connection is closed

(check this issues amqp-node/amqplib#175)

squaremo said just attach error handler at channel and prevent unhanded error bubbled up to the connection

i set up all exchange and queue and send some publish message using channel wrapper

if exchange dose not exist then error occur and all channel closed!

i don't want to close connection

i want to the message is rejected or close that only error occurred channel

so could you please add handler at channel and if error occur just reopen that channel..

thank you

Missing basic.reject

Hi,

would be nice to implement basic.reject. This is easier then publish somewhere else and ack message and also for redelivery tracing ("x-death" header).

Thanks

An in-range update of sinon is breaking the build 🚨

The devDependency sinon was updated from 7.3.0 to 7.3.1.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

sinon is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Commits

The new version differs by 6 commits.

  • 3812a7d Update docs/changelog.md and set new release id in docs/_config.yml
  • 93bef55 Add release documentation for v7.3.1
  • e02c192 7.3.1
  • 8ee1d35 Update CHANGELOG.md and AUTHORS for new release
  • bc53d82 Fix security issues
  • 1a09166 Update @sinonjs/samsam to v3.3.1

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

🐛 a promise was created in a handler at src/ChannelWrapper.js:236:18 but was not returned from it

Description:

After creating a channel using connection.createChannel and then using channelWrapper.sendToQueue, This error is printed.

(node:42976) Warning: a promise was created in a handler at /Users/User/.../ProjectName/node_modules/amqp-connection-manager/src/ChannelWrapper.js:236:18 but was not returned from it, see http://goo.gl/rRqMUw
    at Function.Promise.cast (/Users/User/.../ProjectName/node_modules/bluebird/js/release/promise.js:196:13)

Environment:

  • Node.js: v10.10.0

Note:

Using Bluebird

reconnection retry interval

Hi,

I think it could be very useful to specify a retry interval between reconnection attempts.
What are your thoughts on this point?

Regards

Empty Error on disconnect: {"err":{"cause":{},"isOperational":true}}

I got the following code:

const amqp = require('amqp-connection-manager');

const connectionURLs = [
 `amqp://${connection1User}:${connection1Password}@${host1}`,
 `amqp://${connection2User}:${connection2Password}@${host2}`,
];

const connectionManager = amqp.connect(connectionURLs, { json: true });
connectionManager.on('disconnect', (error) => {
  console.warn(`amqp disconnect reason: ${JSON.stringify(error)}`);
});

The error object given to the disconnect handler is

{"err":{"cause":{},"isOperational":true}}

with no further information. What could it be?

option to disable inner _messages queue

I am well aware of that this repo is about maintaining the data flows.

However in the edge environment, where the broker may not be accessible and robust as cloud environment, accumulating data at client side would consume all the memory and cause the system to break.

I wish there is some strategies can be added to limit the memory usage.

406 (PRECONDITION-FAILED) error leads to too many connection attempts

When a 406 is triggered, I'd expect to see an attempt at reestablishing a connection in reconnectTimeInSeconds seconds, but I'm seeing them happen more frequently.

One way to reproduce:

  1. clone the repository / npm install
  2. run receiver.js (in examples)
  3. in sender.js, edit the line that asserts the queue by setting durable to false (i.e. set up a 406)
  4. in a new terminal, run sender.js

Note how fast connection attempts are happening.

An in-range update of semantic-release is breaking the build 🚨

Version 15.9.7 of semantic-release was just published.

Branch Build failing 🚨
Dependency semantic-release
Current Version 15.9.6
Type devDependency

This version is covered by your current version range and after updating it in your project the build failed.

semantic-release is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).
  • coverage/coveralls: First build on greenkeeper/semantic-release-15.9.7 at 100.0% (Details).

Release Notes v15.9.7

15.9.7 (2018-08-10)

Reverts

  • "fix: do not convert ssh repositoryUrl to https" (93377eb)
Commits

The new version differs by 1 commits.

  • 93377eb revert: "fix: do not convert ssh repositoryUrl to https"

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Option to send messages in bulk

I see your idea in _messages on ChannelWrapper is data preserving. But i realized that the performance is low (in my system only ~300msgs/s rate from producer to RabbitMQ) - One message can only be sent after previous message has callback return. Any option to send message in bulk asynchorous without waiting others. Or you can suggest me some properly way to do this. Thank you so much.

Resolve `publish` and `sendToQueue` by sending back the data

I use this library with highland streams. It would be very convenient if the promises returned by publish and sendToQueue would resolve with the data, instead of true. Case in point, here's how you have to use publish now in a highland stream:

.flatMap( msg => {
  return h( channelWrapper.publish( msg, ... ) ).map( x => msg )
})
...

A reject propagates the error down the stream as expected, but a resolve replaces the thing on the stream with true instead of with something useful. What would you say to resolving it with msg.content? I'd be happy to open a PR for review.

Property 'waitForConnect' does not exist on type 'ChannelWrapper'.

I'm using the typescript on my project and it seems that waitForConnect doesn't exist in the typedefinition. I'm following this example:

https://github.com/benbria/node-amqp-connection-manager/blob/4b09b76063c58f6aaaceca1ba5032d3dfa653cee/examples/receiver.js

My Package.json

{
  
  "private": true,
  "main": "bin/main.js",
  "scripts": {
    "start": "npm run compile",
    "clean": "rm -rf bin/ node_modules/ package-lock.json",
    "compile": "tsc-watch -p tsconfig.json --preserveWatchOutput --skipLibCheck --onSuccess \"npm run execute\" ",
    "execute": "node ./bin/main",
    "compile-prod": "tsc -p tsconfig.json  --preserveWatchOutput --skipLibCheck",
    "execute-prod": "node ./bin/main",
    "test": "echo 'TODO: add test if needed' && exit 0",
    "lint": "tslint -c ./tslint.json -p .",
    "lint-fix": "tslint --fix -c ./tslint.json -p ."
  },
  "dependencies": {
    "amqp-connection-manager": "^2.3.2",
    "amqplib": "^0.5.3",
    "axios": "^0.18.0",
    "dotenv": "^8.0.0",
    "https": "^1.0.0",
    "shelljs": "^0.8.3",
    "uuid": "^3.3.2",
    "yaml": "^1.6.0"
  },
  "devDependencies": {
    "@types/amqp-connection-manager": "^2.0.4",
    "@types/amqplib": "^0.5.12",
    "@types/dotenv": "^6.1.1",
    "@types/node": "^11.13.12",
    "@types/shelljs": "^0.8.5",
    "install": "^0.12.2",
    "npm": "^6.9.0",
    "tsc-watch": "^2.1.2",
    "tslint": "^5.14.0",
    "tslint-config-prettier": "^1.16.0",
    "typescript": "3.4.2"
  }
}

Any Ideas why?

Stop/break the queue

In case of disconnection, channels can still be created and these can also publish and queue messages.

Is there a way to "abort" or cancel previously queued channels after a specific delay?
I think the answer is NO because these are non cancelable Promises but just in case I'm missing something, please let me know if that's possible.

An in-range update of semantic-release is breaking the build 🚨

The devDependency semantic-release was updated from 15.12.4 to 15.12.5.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

semantic-release is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Release Notes for v15.12.5

15.12.5 (2018-12-11)

Bug Fixes

  • allow to set ci option via API and config file (862ec4c)
Commits

The new version differs by 8 commits.

  • 649b530 docs: mention that debug option is CLI only
  • 862ec4c fix: allow to set ci option via API and config file
  • 6b110b6 docs: switch to spectrum.chat
  • e4c6649 docs: syntax fixes in plugins list
  • 6220641 docs: add @semantic-release/apm to plugins list
  • a45273e docs: add maven-semantic-release to list of community plugins
  • d109113 chore(package): update nyc and sinon
  • cd69583 test: delete unused test helper file

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

No function disconnect()

I have a problem with missed functionality - disconnect from rabbit cluster. I want graceful disconnect from my cluster and can't find this ability. If I use the close() method from AmqpConnectionManager, it's just close channel, but don't disconnect from a cluster.
Here object after close method:

AmqpConnectionManager { _events: [Object: null prototype] {}, _eventsCount: 0, _maxListeners: 0, _channels: [], _currentUrl: 1, connectionOptions: undefined, heartbeatIntervalInSeconds: 1, reconnectTimeInSeconds: 1, _findServers: [Function], _connecting: true, _closed: true, _urls: [ 'amqp://user:[email protected]:5672?heartbeat=60' ], _currentConnection: null }
As you see, connecting is true.

channel not reconnected

Hi,
I wrote a code that the channel is not reconnected! I think something is wrong!
I start the script and after 5 seconds I restart the rabbitmq, as a result the nodejs code doesn't continue and pauses!

var amqp = require('amqp-connection-manager'),
    async=require('async');

// Create a new connection manager
var connection = amqp.connect(['amqp://localhost']);

var channelWrapper = connection.createChannel({
  setup: function(channel) {
    // `channel` here is a regular amqplib `ConfirmChannel`.
    return channel.assertQueue('mysample', {durable: true});
  }
});
var i=0;
async.whilst(()=>{return true;},(callback)=>{
  i++;
  console.log('start:'+i);
  channelWrapper.sendToQueue('mysample', new Buffer(JSON.stringify({num:10})))
    .then(function(res) {
      callback();
      return console.log('Message was sent!  Hooray!'+i,res);
    }).catch(function(err) {
      callback();
      return console.log('Message was rejected...  Boo!'+i,err);
    });

});

Need to add json to options when creating channel?

When calling sendToQueue as in the readme:

channelWrapper.sendToQueue('rxQueueName', {hello: 'World'})

I'm getting an error:

TypeError: content is not a buffer

When I use this:

channelWrapper.sendToQueue('rxQueueName', new Buffer(JSON.stringify({hello: 'World'}), {contentType: 'application/json'}))

It seems to work fine.

The amqplib docs mention a buffer too. I noticed you do this in ChannelWrapper based on the options.json flag, but that option never seems to be set, unless I change this:

var channelWrapper = connection.createChannel({
    setup: function(channel) {
        // `channel` here is a regular amqplib `ConfirmChannel`.
            return channel.assertQueue('rxQueueName', {durable: true})
        }
      });

to this:

var channelWrapper = connection.createChannel({
    setup: function(channel) {
        // `channel` here is a regular amqplib `ConfirmChannel`.
            return channel.assertQueue('rxQueueName', {durable: true})
        },
        json: true // added this
      });

about “ createChannel“ not use confirm mode?

hi, the function "createChannel" use confirm mode, how to use "not confirmation mode"? thank?

the lib>ChannelWrapper.js code below:

ChannelWrapper.prototype._onConnect = function(arg) {
      var connection;
      connection = arg.connection;
      this._connection = connection;
      return connection.createConfirmChannel().then((function(_this) {
        return function(channel) {
          _this._channel = channel;
          channel.on('close', function() {
            return _this._onChannelClose(channel);
          });
          return _this._settingUp = Promise.all(_this._setups.map(function(setupFn) {
            return pb.call(setupFn, null, channel)["catch"](function(err) {
              if (_this._channel) {
                return _this.emit('error', err, {
                  name: _this.name
                });
              } else {
.............

i modify :

return connection.createChannel().then((function(_this) {

but it not work. how use no confirm mode? thanks!

consume() method is missing from channelWrapper

Looks like the channelWrapper is missing the consume method.

Only the inner AMQP channel has the consume method, but
it is not available on the wrapper, is this by design?

Looks like it can be accessed via:
channelWrapper._channel.consume

same goes for "reject" which is also missing.

An in-range update of eslint is breaking the build 🚨

The devDependency eslint was updated from 5.15.3 to 5.16.0.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

eslint is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Release Notes for v5.16.0
  • dfef227 Build: gensite passes rulesMeta to formatter rendering (#11567) (Kevin Partington)
  • c06d38c Fix: Allow HTML formatter to handle no meta data (#11566) (Ilya Volodin)
  • 87a5c03 Docs: func-style: clarify when allowArrowFunctions is used (#11548) (Oliver Joseph Ash)
  • bc3e427 Update: pass rule meta to formatters RFC 10 (#11551) (Chris Meyer)
  • b452f27 Chore: Update README to pull in reviewer data (#11506) (Nicholas C. Zakas)
  • afe3d25 Upgrade: Bump js-yaml dependency to fix Denial of Service vulnerability (#11550) (Vernon de Goede)
  • 4fe7eb7 Chore: use nyc instead of istanbul (#11532) (Toru Nagashima)
  • f16af43 Chore: fix formatters/table test (#11534) (Toru Nagashima)
  • 78358a8 Docs: fix duplicate punctuation in CLI docs (#11528) (Teddy Katz)
Commits

The new version differs by 11 commits.

  • ded2f94 5.16.0
  • ea36e13 Build: changelog update for 5.16.0
  • dfef227 Build: gensite passes rulesMeta to formatter rendering (#11567)
  • c06d38c Fix: Allow HTML formatter to handle no meta data (#11566)
  • 87a5c03 Docs: func-style: clarify when allowArrowFunctions is used (#11548)
  • bc3e427 Update: pass rule meta to formatters RFC 10 (#11551)
  • b452f27 Chore: Update README to pull in reviewer data (#11506)
  • afe3d25 Upgrade: Bump js-yaml dependency to fix Denial of Service vulnerability (#11550)
  • 4fe7eb7 Chore: use nyc instead of istanbul (#11532)
  • f16af43 Chore: fix formatters/table test (#11534)
  • 78358a8 Docs: fix duplicate punctuation in CLI docs (#11528)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

ChannelWrapper.publish does not seem to be working

ChannelWrapper.publish does not seem to be working. Does not throw any error but messages are not getting publish on exchange

channelWrapper.publish(rabbit.exchange, routingKey, msg, { contentType: 'application/json', persistent: true });

An in-range update of @semantic-release/changelog is breaking the build 🚨

Version 2.1.1 of @semantic-release/changelog was just published.

Branch Build failing 🚨
Dependency @semantic-release/changelog
Current Version 2.1.0
Type devDependency

This version is covered by your current version range and after updating it in your project the build failed.

@semantic-release/changelog is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • coverage/coveralls First build on greenkeeper/@semantic-release/changelog-2.1.1 at 100.0% Details
  • continuous-integration/travis-ci/push The Travis CI build could not complete due to an error Details

Release Notes v2.1.1

2.1.1 (2018-06-27)

Bug Fixes

  • factorise the verification code in a function (7300663)
Commits

The new version differs by 2 commits.

  • 9edc506 docs: mention setting option to false to override shareable config
  • 7300663 fix: factorise the verification code in a function

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

PRECONDITION_FAILED - unknown delivery tag 1

Complete stack trace would be:

Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1"
    at ConfirmChannel.C.accept (.../node_modules/amqplib/lib/channel.js:406:17)
    at Connection.mainAccept (.../node_modules/amqplib/lib/connection.js:64:33)
    at Socket.go (.../node_modules/amqplib/lib/connection.js:477:48)
    at Socket.emit (events.js:182:13)
    at Socket.EventEmitter.emit (domain.js:442:20)
    at emitReadable_ (_stream_readable.js:534:12)
    at process._tickCallback (internal/process/next_tick.js:63:19) rabbit error connection lost

I have a queue where the time to process each message takes a more than 5min, and all messages from this queue are processed 1 by 1 (prefetch 1). I think this error is thrown because channel connection dies while a message is getting processed, but not sure how or where to handle this case. When time to process the message is reduced (I force the size of the message to be smaller, which will reduce time to process as well, less data to process).

I found a similar problem mentioned here amqp-node/amqplib#271 , would it be a solution to take care of this case using the on_close handler amqp-node/amqplib#271 (comment) ?

An in-range update of semantic-release is breaking the build 🚨

Version 15.6.3 of semantic-release was just published.

Branch Build failing 🚨
Dependency semantic-release
Current Version 15.6.2
Type devDependency

This version is covered by your current version range and after updating it in your project the build failed.

semantic-release is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • continuous-integration/travis-ci/push The Travis CI build could not complete due to an error Details
  • coverage/coveralls First build on greenkeeper/semantic-release-15.6.3 at 100.0% Details

Release Notes v15.6.3

15.6.3 (2018-07-02)

Bug Fixes

  • fetch all tags even if the repo is not shallow (45eee4a)
Commits

The new version differs by 2 commits.

  • 45eee4a fix: fetch all tags even if the repo is not shallow
  • 2d3a5e5 test: harmonize git-utils functions name

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

channel.prefetch is not a function

Hi, I wanted to give this library a try, however some functionality over amqplib seems to be missing. For example, with amqplib I set up the channel.prefetch limit. But with the ChannelWrapper returned by this lib this throws an error (channel.prefetch is not a function).

(I also use both confirm channels and standard channels, but it seems this lib only supports confirm channels.)

Am I missing something or is this just a WIP?

Thanks ahead

channelwrapper.nackAll() is not function

When i am adding the code channelWrapper.nackAll() then it gives me the error

0|app | TypeError: channelWrapper.nackAll is not a function

So is there any way I can implement nackAll() function. Basically I am trying to put all the unacknowledged messages to the queue so that they can be recaptured.

reconnection stopped with channelWrapper event listeners

Hi,

we have a strange behaviour when connection-manager retry to reconnect to a RabbitServer and some listener is registered on channelWrapper. In the following test case, channel manager stop trying reconnecting and exit with the following trace

/home/test/lib/node_modules/amqplib/lib/channel.js:148
    throw new IllegalOperationError(msg, stack);
          ^
IllegalOperationError: Channel closed
    at ConfirmChannel.<anonymous> (/home/test/lib/node_modules/amqplib/lib/channel.js:148:11)
    at ConfirmChannel.C.closeBecause (/home/test/lib/node_modules/amqplib/lib/channel.js:200:8)
    at ConfirmChannel.C.close (/home/test/lib/node_modules/amqplib/lib/channel_model.js:77:8)
    at ChannelWrapper._onDisconnect (/home/test/lib/node_modules/amqp-connection-manager/lib/ChannelWrapper.js:81:14)
    at AmqpConnectionManager.<anonymous> (/home/test/lib/node_modules/amqp-connection-manager/lib/ChannelWrapper.js:4:59)
    at AmqpConnectionManager.emit (events.js:117:20)
    at ChannelModel.<anonymous> (/home/test/lib/node_modules/amqp-connection-manager/lib/AmqpConnectionManager.js:88:19)
    at ChannelModel.emit (events.js:95:17)
    at Connection.emit (events.js:95:17)

Removing channelWrapper registration, all works as expected.

Test case

var amqp = require('amqp-connection-manager');

var connection = amqp.connect(['amqp://test:[email protected]:5672'], {json: true});
connection.on('connect', function() {
    console.log('Connected!');
});
connection.on('disconnect', function(params) {
    console.log('Disconnected.', params.err.stack);
});

// Set up a channel listening for messages in the queue.
var channelWrapper = connection.createChannel({
    setup: function(channel) {
        return  channel.assertExchange("assertFail");
    }
});

channelWrapper.waitForConnect().then(function() {
    console.log("Listening for messages");
});

channelWrapper.on('connect', function(){console.log("connect")})
channelWrapper.on('error', function(){console.log("error")})
channelWrapper.on('drop', function(){console.log("drop")})
channelWrapper.on('close', function(){console.log("close")})

ERROR: unable to verify the first certificate

Hello,
I have had the same problem as here amqp.node #331
After change my code to:

const uri = 'amqp://localhost' // or amqps://.........
const parsedURI = url.parse(uri);

amqp.connect([parsedURI], { servername: parsedURI.hostname })

I could make the connection.

At the moment I need to use Auto-reconnect and round robin support for amqplib.

I tried:

  const parsedURI = url.parse(credentials.uri_direct_1);
  const connectionOptions = { servername: parsedURI.hostname };
  const parsedURI2 = url.parse(credentials.uri);

  const connection = amqp.connect([parsedURI, parsedURI2], { connectionOptions });

With this config app is also conecting to the server.
I'm worried that in configuration there are 2 urls parsedURI, parsedURI2, But in connectionOptions are set only servername from one Url.

How can this problem be solved correctly?

Feature Request - timeout for publish and sendToQueue

I've noticed that if the client cannot connect, calling to sendToQueue from the channel wrapper creates a promise that never resolves or rejects.
This is an acceptable behavior in a "callbacks-api", but not in a "promises-api": calling await channel.sendToQueue(...) is dangerous for memory...

I wish this feature would exist in the original amqplib package, but their github page seems to be overloaded with issues and I don't think they will add such feature soon.

p-timeout could be helpful here. Is it possible to force "canceling" of the message being sent after sendToQueue was called (i.e. when the timeout is reached)? I guess I could call "disconnect" and destroy the whole connection instance.

Thanks!

ConnectionManager.close doesn't close the connection

I can't tell if it was intentional or not and I'm really just trying to understand. Why doesn't the ConnectionManager.close function not close the connection in addition to the channels?

If I would like to close the connection, I have to do this:

connectionManager._currentConnection.connection.close()

and it feels just dirty. 🚿

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.