Code Monkey home page Code Monkey logo

node-amqp's Introduction

MAINTAINER NEEDED

This project is now unmaintained. Please reach out if you'd like to fix that.

build status

node-amqp

This is a client for RabbitMQ (and maybe other servers?). It partially implements the 0.9.1 version of the AMQP protocol.

Table of Contents

Installation

npm install amqp

Synopsis

IMPORTANT: This module only works with node v0.4.0 and later.

An example of connecting to a server and listening on a queue.

var amqp = require('amqp');

var connection = amqp.createConnection({ host: 'dev.rabbitmq.com' });

// add this for better debuging
connection.on('error', function(e) {
  console.log("Error from amqp: ", e);
});

// Wait for connection to become established.
connection.on('ready', function () {
  // Use the default 'amq.topic' exchange
  connection.queue('my-queue', function (q) {
      // Catch all messages
      q.bind('#');
    
      // Receive messages
      q.subscribe(function (message) {
        // Print messages to stdout
        console.log(message);
      });
  });
});

Connection

new amqp.Connection() Instantiates a new connection. Use connection.connect() to connect to a server.

amqp.createConnection() returns an instance of amqp.Connection, which contains an instance of net.Socket at its socket property. All events and methods which work on net.Socket can also be used on an amqp.Connection instance. (e.g., the events 'connect' and 'close'.)

Connection options and URL

amqp.createConnection([options, [implOptions]]) takes two options objects as parameters. The first options object has these defaults:

{ host: 'localhost'
, port: 5672
, login: 'guest'
, password: 'guest'
, connectionTimeout: 10000
, authMechanism: 'AMQPLAIN'
, vhost: '/'
, noDelay: true
, ssl: { enabled : false
       }
}

An example options object for creating an SSL connection has these properties:

{ host: 'localhost'
, port: 5671
, login: 'guest'
, password: 'guest'
, authMechanism: 'AMQPLAIN'
, vhost: '/'
, ssl: { enabled : true
       , keyFile : '/path/to/key/file'
       , certFile : '/path/to/cert/file'
       , caFile : '/path/to/cacert/file'
       , rejectUnauthorized : true
       }
}

The key, certificate, and certificate authority files must be in pem format. Alternatively, pfxFile can be used to read key and certificate from a single file. If port is not specified, the default AMQPS port 5671 is used. If rejectUnauthorized is not specified, it defaults to true.

Options can also be passed in a single URL of the form

amqp[s]://[user:password@]hostname[:port][/vhost]

AMQPLAIN is assumed for the auth mechanism.

Note that the vhost must be URL-encoded and appear as the only segment of the path, i.e., the only unencoded slash is that leading; leaving the path entirely empty indicates that the vhost /, as above, should be used (it could also be supplied as the path /%2f).

The heartbeat setting sets the heartbeat interval (in seconds) for the connection. There is no default for this option meaning no heartbeating is taking place.

This URL is supplied as the field url in the options; for example

var conn =
  amqp.createConnection({url: "amqp://guest:guest@localhost:5672"});

Options provided as individual fields will override values given in the URL.

You can also specify additional client properties for your connection by setting the clientProperties field on the options object.

{ clientProperties: { applicationName: 'myApplication'
                    , capabilities: { consumer_cancel_notify: true
                                    }
                    }
}

If the consumer_cancel_notify capability is set to true (as above), then RabbitMQ's Consumer Cancel Notification feature will be enabled.

By default the following client properties are set

{ product: 'node-amqp'
, platform: 'node-' + process.version
, version: nodeAMQPVersion
}

The second options are specific to the node AMQP implementation. It has the default values:

{ defaultExchangeName: ''
, reconnect: true
, reconnectBackoffStrategy: 'linear'
, reconnectExponentialLimit: 120000
, reconnectBackoffTime: 1000
}

The defaultExchangeName is the default exchange to which connection.publish will publish. In the past, the default exchange was amq.topic, which is not ideal. To emulate this behaviour, one can create a connection like:

var conn =
  amqp.createConnection({url: "amqp://guest:guest@localhost:5672"},
                        {defaultExchangeName: "amq.topic"});

If the reconnect option is true, then the driver will attempt to reconnect using the configured strategy any time the connection becomes unavailable. If this is not appropriate for your application, set this option to false.

If you would like this option, you can set parameters controlling how aggressively the reconnections will be attempted. Valid strategies are "linear" and "exponential".

Backoff times are in milliseconds. Under the "linear" strategy, the driver will pause reconnectBackoffTime ms before the first attempt, and between each subsequent attempt. Under the "exponential" strategy, the driver will pause reconnectBackoffTime ms before the first attempt, and will double the previous pause between each subsequent attempt until a connection is reestablished.

After a connection is established the 'connect' event is fired as it is with any net.Connection instance. AMQP requires a 7-way handshake which must be completed before any communication can begin. net.Connection does the handshake automatically and emits the 'ready' event when the handshaking is complete.

For backward compatibility, two additional options are available. Older versions of this library placed the routingKey and deliveryTag for incoming messages into the JSON payload received. This module was changed to leave inbound JSON payloads pristine. Some applications may need the old behaviour. If the key routingKeyInPayload is set to true in the connection options, the messages resulting from a subscribe call will include a 'routingKey' key in the JSON payload. If the key deliveryTagInPayload is set to true in the connection options, the deliveryTag of the incoming message will be placed in the JSON payload.

connection.publish(routingKey, body, options, callback)

Publishes a message to the default exchange; if the defaultExchange is left as '', this effectively publishes the message on the routing key named.

This method proxies to the default exchange's publish method and parameters are passed through untouched.

connection.disconnect()

Cleanly disconnect from the server, the socket will not be closed until the server responds to the disconnection request.

connection.on('tag.change', callback)

Fired when an existing consumer tag has changed. Use this event to update your consumer tag references.

When an error or reconnection occurs, any existing consumers will be automatically replaced with new ones. If your application is holding onto a reference to a consumer tag (e.g. to unsubscribe later) and reconnects, the held tag will no longer be valid, preventing the application from gracefully unsubscribing.

The callback function takes one parameter, event, which contains two properties: oldConsumerTag and consumerTag.

var connection = amqp.createConnection({ host: 'dev.rabbitmq.com' });

// Local references to the exchange, queue and consumer tag
var _exchange = null;
var _queue = null;
var _consumerTag = null;

// Report errors
connection.on('error', function(err) { 
    console.error('Connection error', err); 
});

// Update our stored tag when it changes
connection.on('tag.change', function(event) {
    if (_consumerTag === event.oldConsumerTag) {
        _consumerTag = event.consumerTag;
        // Consider unsubscribing from the old tag just in case it lingers
        _queue.unsubscribe(event.oldConsumerTag);
    }
});

// Initialize the exchange, queue and subscription
connection.on('ready', function() {
    connection.exchange('exchange-name', function(exchange) {
        _exchange = exchange;
        
        connection.queue('queue-name', function(queue) {
            _queue = queue;
            
            // Bind to the exchange
            queue.bind('exchange-name', 'routing-key');
            
            // Subscribe to the queue
            queue
                .subscribe(function(message) {
                    // Handle message here
                    console.log('Got message', message);
                    queue.shift(false, false);
                })
                .addCallback(function(res) {
                    // Hold on to the consumer tag so we can unsubscribe later
                    _consumerTag = res.consumerTag;
                })
            ;
        });
    });
});

// Some time in the future, you'll want to unsubscribe or shutdown 
setTimeout(function() {
    if (_queue) {
        _queue
            .unsubscribe(_consumerTag)
            .addCallback(function() {
                // unsubscribed
            })
        ;
    } else {
        // unsubscribed
    }
}, 60000);

Queue

Events: A queue will call the callback given to the connection.queue() method once it is usable. For example:

var q = connection.queue('my-queue', function (queue) {
  console.log('Queue ' + queue.name + ' is open');
});

Declaring a queue with an empty name will make the server generate a random name.

connection.queue(name[, options][, openCallback])

Returns a reference to a queue. The name parameter is required, unlike pika which defaults the name to ''. The options are

  • passive: boolean, default false. If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state.
  • durable: boolean, default false. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue.
  • exclusive: boolean, default false. Exclusive queues may only be consumed from by the current connection. Setting the 'exclusive' flag always implies 'autoDelete'.
  • autoDelete: boolean, default true. If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted.
  • noDeclare: boolean, default false. If set, the queue will not be declared, this will allow a queue to be deleted if you don't know its previous options.
  • arguments: a map of additional arguments to pass in when creating a queue.
  • closeChannelOnUnsubscribe : a boolean when true the channel will close on unsubscribe, default false.

queue.subscribe([options,] listener)

An easy subscription command. It works like this

q.subscribe(function (message, headers, deliveryInfo, messageObject) {
  console.log('Got a message with routing key ' + deliveryInfo.routingKey);
});
    

It will automatically acknowledge receipt of each message.

There are several options available. Setting the options argument to { ack: true } (which defaults to false) will make it so that the AMQP server only delivers a single message at a time. When you want the next message, call q.shift(). When ack is false then you will receive messages as fast as they come in.

You can also use the prefetchCount option to increase the window of how many messages the server will send you before you need to ack (quality of service). { ack: true, prefetchCount: 1 } is the default and will only send you one message before you ack. Setting prefetchCount to 0 will make that window unlimited. If this option is used q.shift() should not be called. Instead the listener function should take four parameters (message, headers, deliveryInfo, ack) and ack.acknowledge() should be called to ack a single message.

The routingKeyInPayload and deliveryKeyInPayload options determine if the reception process will inject the routingKey and deliveryKey, respectively, into the JSON payload received. These default to unset thus adopting the parent connection's values (which default to false). Setting these to true provide backward compatibility for older applications.

The exclusive option will subscribe to the queue in exclusive mode. Only one subscriber is allowed at a time, and subsequent attempts to subscribe to the same queue will result in an exception. This option differs from the exclusive option passed when creating in a queue in that the queue itself is not exclusive, only the consumers. This means that long lived durable queues can be used as exclusive queues.

The messageObject can be used to acknowledge a given message using:

messageObject.acknowledge(false); // use true if you want to acknowledge all previous messages of the queue

If the consumer_cancel_notify capability was enabled when the connection was created, the queue will emit basicCancel upon receiving a consumer cancel notification from the server. The queue's channel will be automatically closed. In a clustered environment, developers may want to consider automatically re-subscribing to the queue on this event.

This method will emit 'basicQosOk' when ready.

queue.subscribeRaw([options,] listener)

Subscribes to a queue. The listener argument should be a function which receives a message. This is a low-level interface - the message that the listener receives will be a stream of binary data. You probably want to use subscribe instead. For now this low-level interface is left undocumented. Look at the source code if you need to do this.

This method will emit 'basicConsumeOk' when ready.

queue.unsubscribe(consumerTag)

Unsubscribe from a queue, given the consumer tag. The consumer tag is supplied to the promise callback of Queue.subscribeRaw or Queue.subscribe:

connection.queue('foo', function(queue) {
  var ctag;
  queue.subscribe(function(msg) {...})
    .addCallback(function(ok) { ctag = ok.consumerTag; });
  // ... and in some other callback
  queue.unsubscribe(ctag);
});

Note that Queue.unsubscribe will not requeue messages that have not been acknowledged. You need to close the queue or connection for that to happen. You may also receive messages after calling unsubscribe; you will not receive messages from the queue after the unsubscribe promise callback has been invoked, however.

queue.shift([reject[, requeue]])

For use with subscribe({ack: true}, fn). Acknowledges the last message if no arguments are provided or if reject is false. If reject is true then the message will be rejected and put back onto the queue if requeue is true, otherwise it will be discarded.

queue.bind([exchange,] routing[, callback])

This method binds a queue to an exchange. Until a queue is bound it will not receive any messages, unless they are sent through the unnamed exchange (see defaultExchangeName above).

If the exchange argument is left out 'amq.topic' will be used.

This method will emit 'queueBindOk' when complete.

If callback is provided it will also be triggered when complete, note that if you perform multiple bindings, only the last callback will be called.

queue.unbind([exchange,] routing)

This method unbinds a queue from an exchange.

If the exchange argument is left out 'amq.topic' will be used.

This method will emit 'queueUnbindOk' when complete.

queue.bind_headers([exchange,] routing)

This method binds a queue to an exchange. Until a queue is bound it will not receive any messages.

This method is to be used on an "headers"-type exchange. The routing argument must contain the routing keys and the x-match value (all or any).

If the exchange argument is left out 'amq.headers' will be used.

queue.unbind_headers([exchange,] routing)

This method unbinds a queue from an exchange. Whilst a queue is bound it will continue receive messages that have matching headers.

This method is to be used on an "headers"-type exchange. The routing argument must contain the routing keys and the x-match value (all or any).

If the exchange argument is left out 'amq.headers' will be used.

queue.purge()

This method purges a queue.

This method will emit 'queuePurgeOk' when complete.

queue.destroy(options)

Delete the queue. Without options, the queue will be deleted even if it has pending messages or attached consumers. If +options.ifUnused+ is true, then the queue will only be deleted if there are no consumers. If +options.ifEmpty+ is true, the queue will only be deleted if it has no messages.

Note: the successful destruction of a queue will cause a consumer cancel notification to be emitted (for clients who have enabled the consumer_cancel_notify option when creating the connection).

Exchange

Events: An exchange will call the callback given to the connection.exchange() method once it is usable. For example:

var exc = connection.exchange('my-exchange', function (exchange) {
  console.log('Exchange ' + exchange.name + ' is open');
});

exchange.on('open', callback)

The open event is emitted when the exchange is declared and ready to be used. This interface is considered deprecated.

connection.exchange()

connection.exchange(name, options={}, openCallback)

An exchange can be created using connection.exchange(). The method returns an amqp.Exchange object.

Without any arguments, this method returns the default exchange. Otherwise a string, name, is given as the first argument and an options object for the second. The options are

  • type: the type of exchange 'direct', 'fanout', or 'topic' (default).
  • passive: boolean, default false. If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state.
  • durable: boolean, default false. If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
  • autoDelete: boolean, default true. If set, the exchange is deleted when there are no longer queues bound to it.
  • noDeclare: boolean, default false. If set, the exchange will not be declared, this will allow the exchange to be deleted if you dont know its previous options.
  • confirm: boolean, default false. If set, the exchange will be in confirm mode, and you will get a 'ack'|'error' event emitted on a publish, or the callback on the publish will be called.
  • arguments: a map of additional arguments to pass in when creating an exchange.

An exchange will emit the 'open' event when it is finally declared.

exchange.publish(routingKey, message, options, callback)

Publishes a message to the exchange. The routingKey argument is a string which helps routing in topic and direct exchanges. The message can be either a Buffer or Object. A Buffer is used for sending raw bytes; an Object is converted to JSON.

options is an object with any of the following

  • mandatory: boolean, default false. This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is false, the server silently drops the message.
  • immediate: boolean, default false. This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is false, the server will queue the message, but with no guarantee that it will ever be consumed.
  • contentType: default 'application/octet-stream'
  • contentEncoding: default null.
  • headers: default {}. Arbitrary application-specific message headers.
  • deliveryMode: Non-persistent (1) or persistent (2)
  • priority: The message priority, 0 to 9.
  • correlationId: string, default null. Application correlationย identifier
  • replyTo: Usually used to name a reply queue for a request message.
  • expiration: default null. Messageย expirationย specification
  • messageId: default null. Applicationย messageย identifier
  • timestamp: default null. Messageย timestamp
  • type: default null. Messageย typeย name
  • userId: default null. Creatingย userย id
  • appId: default null. Creatingย applicationย id

callback is a function that will get called if the exchange is in confirm mode, the value sent will be true or false, this is the presense of a error so true, means an error occured and false, means the publish was successfull

exchange.destroy(ifUnused = true)

Deletes an exchange. If the optional boolean second argument is set, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead.

exchange.bind(srcExchange, routingKey [, callback])

Binds the exchange (destination) to the given source exchange (srcExchange). When one exchange is bound to another, the destination (or receiving) exchange will receive all messages published to the source exchange that match the given routingKey.

This method will emit 'exchangeBindOk' when complete.

Please note that Exchange to Exchange Bindings (E2E) are an extension to the AMQP spec introduced by RabbitMQ, and that by using this feature, you will be reliant on RabbitMQ's AMQP implementation. For more information on E2E Bindings with RabbitMQ see:

http://www.rabbitmq.com/e2e.html

exchange.unbind(srcExchange, routingKey [, callback])

Unbinds the exchange (destination) from the given source exchange (srcExchange). This is the reverse of the exchange.bind method above, and will stop messages from srcExchange/routingKey from being sent to the destination exchange.

This method will emit 'exchangeUnbindOk' when complete.

exchange.bind_headers(exchange, routing [, bindCallback])

This method is to be used on an "headers"-type exchange. The routing argument must contain the routing keys and the x-match value (all or any).

Debugging

The NODE_DEBUG_AMQP=1 environment variable enables built-in low-level debugging support.

node-amqp's People

Contributors

barshow avatar bkw avatar carlhoerberg avatar chriswiggins avatar drewww avatar fester avatar flashingpumpkin avatar fritz-gerneth avatar glenjamin avatar humanchimp avatar ifraixedes avatar jamescarr avatar kfitzgerald avatar khrome avatar kschzt avatar mattbornski avatar mfloryan avatar ofere avatar postwait avatar radu-c avatar ry avatar sam-github avatar shimaore avatar skrat avatar squaremo avatar ssafejava avatar thedeveloper avatar timbeyer avatar xaviershay avatar yevhenlukomskyi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-amqp's Issues

Ack specific message

Hi,

I'm using ack: true and prefetchCount: 0 in my queue. I know that i have to use queue.shift() to ack my messages but from i've read this will ack the last message.

Is there a way to ack a specific message? Example: using message variable in the callback of queue.subscribe.

Queues not created automatically

amqp v0.1
node v0.4.11
rabbitmq v2.6.1

When creating a new Queue, it is never registered in RabbitMQ. I have the web mgmt console running and I can see amqp create the Exchange properly, but the Queue does't get created after the Exchange. When profiling the library, I can see that the calls to create a Queue and Exchange are nearly identity, so I'm not sure why one would be created but not the other. I do see that the code continues through fine and doesn't skip the Queue creation (no exceptions that I can see). I can alternatively create the queue via the web mgmt console, without problem. I have also tried using rabbit.js (https://github.com/squaremo/rabbit.js) with [email protected], and it DID seem to make the queue. I thought that maybe it wouldn't be created until a message came through, so I tried from a client sending to the Exchange, but again, no Queue was created.

Here's an example:

var amqp=require('amqp');
var connection=amqp.createConnection({host:'localhost'});
connection.on('ready', function(){
    var exchange = connection.exchange('echo-exchange'); // will create the exchange
    var queue = connection.queue('echo-queue'); // will NOT create the queue
    queue.bind('echo-exchange', 'foo.*');
    queue.subscribe( {ack:true}, function(message){
        console.log(message); // never called because the queue is never created
        queue.shift();
    });

});

no callback from exchange.publish?

Sorry if this is more a question than a bug.

I'm publishing to a topic exchange from a 'fire and forget' type script, and cant find a way to cleanup when the publish has occurred. Making do with a setTimeout block, but that feels hacky (and error prone).

A lot of the API methods seem to take a callback that fires after the operation, but exchange.publish doesn't seem to.
What's the best way to know when the publish method has completed? Thanks.

Null exception [parser.execute()]

I'm getting occasional crashes inside amqp.js in the following place (around line 832):

  self.addListener('data', function (data) {
    parser.execute(data);
  });

self.addListener('end', function () {
    self.end();
    // in order to allow reconnects, have to clear the
    // state.
    parser = null;
  });

This is my stack trace:

/usr/local/marquee-web/node_modules/amqp/amqp.js:851
    parser.execute(data);
           ^
TypeError: Cannot call method 'execute' of null
    at Connection.<anonymous> (/usr/local/marquee-web/node_modules/amqp/amqp.js:851:12)
    at Connection.emit (events.js:67:17)
    at TCP.onread (net.js:347:14)

Just before that I'm also getting an 'error' event from AMQP connection object. As you can see from the code above the parser is indeed set to null when connection ends. Is this a race condition of some sort?

passive option not working in queue function

I am opening my queue like this:
var q = connection.queue('MyQueue, {passive: true},function(queue) { console.log("Queue " + queue.name + " is open");});

Whether I set passive to true or false I get the error:

Error: NOT_FOUND - no queue 'MyQueue' in vhost '/'

It is my understanding from the readme that if I set the passive to true the queue will be created if it does not exist. But this does not seem to be the case.

subscribe() versus subscribeRaw()

Hello,

I tried to use the subscribe() method and when I received the message in a variable called m, I tried to use the methods m.addListener() and m.acknowledge()

But I had an error saying that these 2 methods did not exist. The only way I found to use these methods was to use them into "subscribeRaw()" and not "subscribe()".

So I needed to make the object being a string then to parse it into a json object.

We should be able to acknowledge() or use the other methods available for subscribeRaw() into the subscribe() method to be able to acknowledge simple json/application object type.

because subscribeRaw() sends messages in some type like octet/stream which I didn't need but I was obliged to use it in order to access to the needed method "m.acknowledge"

Catching basicReturn when publish options contain {mandatory: true}

I'd like to use {mandatory: true} when publishing a message so that I can prevent my messages from being dropped. However, I'm unsure how this is supposed to be done. In the example below, I catch an event ('basicReturn') when a message isn't going to be delivered anywhere, but I also get this error:

Warning: Uncaught basicReturn: {"replyCode":312,"replyText":"NO_ROUTE","exchange":"","routingKey":"example3"}

This leads me to believe that I'm using the library incorrectly. What is the proper way to do this?

Example:

var amqp = require('amqp');

var connOpts = { ... };

var connection = amqp.createConnection(connOpts);

connection.on('ready', function () { 
  var q = connection.queue('example2', function (queue) { 
    var exchange = connection.exchange('', {});
    exchange.on('open', function(){
      var publishReturn = exchange.publish('example3', {txt: 'hello, world'}, {mandatory: true});
      exchange.on('basicReturn', function(x) {
        console.log('handling basic return');
        console.log(x);
      });
    });
  });
});

Thanks

JavaScript doesn't do bitshifts > 31 or integers larger than 2^53

Not sure what to do about this one, or even whether something needs to be done: JavaScript can only represent integers exactly up to 2^53, while AMQP has two types that are integers encoded in 64 bits (longlong and timestamp).

A timestamp is used only once in the protocol, as a timestamp for messages; this has no formal meaning, but taking it as say, Unix time, 2^53 is probably well after the technological singularity.

The type longlong is only used for delivery tags, in a few places, which are in effect serial numbers and (in RabbitMQ at least) scoped to channels. Reaching 2^53 messages on a single channel would be impressive.

Possibly more of a problem is that the parsing and serialisation code use bitshifts with operands outside that defined in JavaScript. Putting the 64-bit integer problem aside, these should really use straight multiplication and division (yes even for 32-bit numbers; bitshifts are defined on twos-complement integers).

Promise returned from Exchange.publish() never has events emitted

When I publish a message using Exchange.publish(), it returns a promise. That promise never has success emitted, even if the message is delivered successfully to the server. In fact, it is never called even when my consumer process on the other end logs that the message was delivered.

To recreate:

// assuming you have a connection and exchange is a valid exchange object
var promise = exchange.publish('everyone', 'hello world');

promise.on('success', function(){
console.log('you will never see this');
});

promise.addCallback(function(){
console.log('...or this');
});

Default JSON content type

The default content type for JSON is text/json. According to RFC 4627 application/json is the correct content type. Can the default be changed, or does that break backwards compatibility?

Message not sent using setTimeout()

Hi,

I made a class named Queue.js that is using node-amqp and I do a "require" of this class into a client.js and a server.js

The class Queue.js contains a constructor and 2 methods :

  • addTaskToQueue(hereIsTheObjectToSend)
  • subscribeToQueue() // this method returns an object

In this way I am able to use node-amqp messaging more easily.


here is client.js :

var Queue = require('./Queue.js'); // the class that I use to manage easily message sending/received
var MyClass = require('./MyClass.js'); // a simple class that has only a constructor that has the attribute MyClass::message
var myObject = new MyClass("Hi, here is my message"); // Now I have myObject.message containing some text

var queue = new Queue("localhost", "salut"); // creation of the queue on localhost, to queue named "salut"
queue.addTaskToQueue(myObject); // Here I send 4 times the same messages
queue.addTaskToQueue(myObject);
queue.addTaskToQueue(myObject);
queue.addTaskToQueue(myObject);

// this timeout waits 3 seconds, then send another message
setTimeout(function () {
// wait 3 sec to send another message
queue.addTaskToQueue(myObject); // this message should be sent, but it never displays when I execute the program
console.log("It should have 5 messages sent now"); // this console.log is correctly displayed after 3 seconds
}, 3000);


In the linux terminal, I start the server in listening, then the client, so the client send this

$ node client.js
publishing the json message...
publishing the json message...
publishing the json message...
publishing the json message...
#3 seconds after it adds the line :

It should have 5 messages sent now


And at the linux terminal using server.js :

$ node server.js
{ message: 'Hi, here is my message' }
{ message: 'Hi, here is my message' }
{ message: 'Hi, here is my message' }
{ message: 'Hi, here is my message' }

but I don't see the 5th message. The message is not send while it is in a setTimeout function. But I need to use setTimeout because I need to implement the possibility when a message is not received to the server (so, if the server doesn't ACKnowledge the message), the client should wait 5 minutes to send the message again.

How should I do ?

Trying to open/declare queue gives "not found" on server

I have the following code for a consumer:

var amqp = require('amqp');

var conn = amqp.createConnection({host: 'localhost'});

conn.on('ready', function() {
  var exc = conn.exchange('exchange-name');
  var q = conn.queue('queue-name', function(queue) {
    console.log(q.name);
  });
  q.bind('exchange-name', 'routingkey');
});

The queue is not durable and should be created by this consumer.

This fails on the server with the following error in the rabbitmq log:

"connection <0.1173.0>, channel 2 - error:
{amqp_error,not_found,"no queue 'queue-name' in vhost '/'",'queue.bind'}"

RabbitMQ version is 2.4.1

Following sample code desn not work -

Hello,

I just npm install amqp and when I do npm list I get following modules [email protected]
I tried to run following code which comes bundled with source ...

I get following error ..

q.bind.addCallback undefined...

Any idea ? Is it supposed to work ?

Any sample code which works with this package or I should install some other package

Thanks
Rajan

var amqp = require('./amqp');

var connection = amqp.createConnection({host: 'localhost'});

connection.addListener('close', function (e) {
if (e) {
throw e;
} else {
console.log('connection closed.');
}
});

connection.addListener('ready', function () {
console.log("connected to " + connection.serverProperties.product);

var exchange = connection.exchange('clock', {type: 'fanout'});

var q = connection.queue('my-events-receiver');

q.bind(exchange, "*").addCallback(function () {
console.log("publishing message");
exchange.publish("message.json", {hello: 'world', foo: 'bar'});
exchange.publish("message.text", 'hello world', {contentType: 'text/plain'});
});

q.subscribe(function (m) {
console.log("--- Message (" + m.deliveryTag + ", '" + m.routingKey + "') ---");
console.log("--- contentType: " + m.contentType);

m.addListener('data', function (d) {
  console.log(d);
});

m.addListener('end', function () {
  m.acknowledge();
  console.log("--- END (" + m.deliveryTag + ", '" + m.routingKey + "') ---");
});

});
});

Is there a memory issue?

If i setup a connection and just publish a lot of messages. All works fine -> messages get published to the queue.

But if i look at memory usage it goes up and stays at the high level. I didn't found out where the problem is.

Sample Code i used:

var amqp = require('amqp')
  , connection = amqp.createConnection({url: "amqp://guest:guest@localhost:5672"});

connection.on('ready', function () {

    for (i = 1; i <= 300000; i++) {
        connection.publish('sample', {msg: i + ' hello world'});
    }

});

Allow For Easy Queue Binding

Currently when binding to a queue you MUST match the queue parameters exactly or it will not work at all. For example, if I have queue "a" that is durable and not auto-delete, the following code will never work:

var q = conn.queue('a');

q.subscribe(function(msg){
  console.log(msg);
});

But if I match what it really is:

var q = conn.queue('a', {durable:true, autoDelete:false});

q.subscribe(function(msg){
  console.log(msg);
});

It works.

I'd like to be able to just listen on a queue without knowing the explicit details of it. Or, if I get the options wrong it'd be nice to make a lot of noise rather than silently failing. :)

Thanks,
James

Message is no sending in queue

Hi, I am using RabbitMQ for my application.
For testing purpose I have put the code for publishing messages in forever while loop. But its not working.
While I am sending one message alone, its working well.

My code for 1st problem is:

var sys = require('sys');
var amqp = require('amqp');

var connection = amqp.createConnection({ host: 'ip_address',vhost:'/' });

connection.addListener('close',function(){

if(e){
    throw e;
}else{
    sys.put("Connection Closed..");
}

});

// Wait for connection to become established.
connection.on('ready', function () {

var q = connection.queue('anand_queue',{durable: true,autoDelete:false});
q.bind("*");

while(true){        
            //if i comment while loop then it works well
    connection.publish('my_queue',"hello",{contentType: 'text/plain'});
}//end of while         

});

I have also observe that amqp uses buffer to store messages and then send messages in queue.
How can i overcome with this problems?

Error: Cannot find module 'amqp'

Hi,

I can install amqp normally:

$ sudo npm install amqp -g
Password:
npm http GET https://registry.npmjs.org/amqp
npm http 304 https://registry.npmjs.org/amqp
[email protected] /usr/local/lib/node_modules/amqp

But it doesn't work when i try to start with node.js

node.js:201
        throw e; // process.nextTick error, or 'error' event on first tick
              ^
Error: Cannot find module 'amqp'
    at Function._resolveFilename (module.js:332:11)

Any ideas of what i'm doing wrong?

I'm running node.js v0.6.12 on Lion.

Thanks.

"Default exchange" confusion

In AMQP, the exchange know as the "default exchange" is that referenced by not supplying an exchange name (usually interpreted in client libraries as an empty string). Its purpose is to allow messages to be published directly to queues.

node-amqp, from its early days, has used the exchange "amq.topic" as the target when an exchange name is not supplied to procedures.

"amq.topic" is something of an odd choice. It is not guaranteed to exist -- the spec asks that it be initially present in each vhost (and so it is in rabbitmq), but it is otherwise not special, and may be deleted for example.

Ideally, node-amqp would follow the semantics as indicated by the specification, and a procedure that doesn't use an exchange name would publish to the actual default exchange. I'm thinking of Connection.publish specifically. However, it may be that applications rely on the current behaviour.

The choices are at this point:

  1. Correct the behaviour of Connection.publish
  2. Keep the current behaviour of Connection.publish, and
    a. Supply queue.publish, for publishing directly to a queue
    b. Supply Connection.publishToQueue
    c. Require people to use Connection.exchange('').publish(queueName, ...)

I've implemented 2a, but it's unsatisfactory as it requires a channel creation to get the queue, which is not actually needed for the publish. Of the 2s, 2b is probably best. But 1 is better still from my point of view.

In any case, the README ought to be fixed up -- when my changeset doing 1 above was merged, the behaviour was reverted, but not the README apparently.

I'm happy to do the work for this, but I need to know which option is going to be acceptable.

Unsubscribing

While writing a new example for rabbit.js, I ran into this problem: there's no way to unsubscribe from a queue, meaning that messages will keep being delivered even if you have removed the listener.

In general this is OK if not ideal -- the messages just get dropped on the floor if there's no listener. The case in which it matters is when you have more than one subscriber. RabbitMQ (and all those other AMQP brokers that there are) will deliver messages round-robin to the subscribers. If you stop handling some messages they'll just get lost -- what you really want is for those messages to go to one of the other subscribers.

Impossible to publish and to consume empty string messages

If you want to compute messages only depending on their routing key, we are used to publish an empty string in the message.
I have looked at AMQP v0-9-1, and I didn't see anywhere that we shouldn't do that.
In addition, it is possible in Python with pika, and RabbitMQ handles such messages, but they are not consumed by node-amqp. So if I set the ack option to true in the declaration of my queue, the consumption of messages stop as no callback is triggered on the reception of such messages, and so I don't have possibility to ack those messages.

Of course, I can change my applications to avoid the publication of empty string messages, but I would like to know if is it a choice or if there is a reason for that impossibility.

Cheers,
Gabriel

Decoded message on reciept of subscribed message?

Perhaps a silly question but: when I get a message back from a subscribed queue it looks like:

{ data: <Buffer 54 65 73 74 20 54 69 63 6b 6c 65>,
contentType: 'text/plain' }

printing just message.data is:
<Buffer 54 65 73 74 20 54 69 63 6b 6c 65>

How do I get that data back as actual plain text?

npm installs old version

When running npm install amqp (using node 0.6.0)

I get installed this;

{ "name" : "amqp", "version" : "0.0.2" }

I guess you have to update your package info to include node engine 0.6 , and to future proof it, add >0.4

Direct messages not received from queue

I'm trying to set up a round-robin style worker queue and am not seeing any direct messages arrive at any subscribers. In the spirit of creating a simple test case, I modified the test-simple.js sample to create a new direct exchange: https://gist.github.com/ab3e637c0717a7a8e096. I'm not seeing any messages show up.

Fanout messages seem to propagate just fine.

Is there something missing here?

Thanks,

Matt

Layered API to expose channel operations

This is a problem resulting from caching queues and hiding channels: message prefetch is defined per channel, but a queue (= one channel) may have more than one consumer. If queue.subscribe and the option autoAck: false are used, it sets the prefetch to 1, meaning that another message won't be sent until the previous has been acknowledged.

If there's more than one consumer, then it's easy for one of them to unjustly starve the others by not acknowledging a message. Say, for example, each consumer is piping messages through a stream, and only acknowledging them once the message has successfully written; if the stream is paused, the acknowledgment won't be made, and no other consumer will receive a message either, because they are all sharing the channel and thus the prefetch.

One possible solution to this is to unhide channels, so to allow a channel per consumer. The API as is can be kept as a convenience, and channel exposed for when more fine-grained control is needed. Thoughts? (or would you prefer to wait for a pull request ..)

Getting a null parser after an error

...\node_modules\amqp\amqp.js:833
parser.execute(data);
^
TypeError: Cannot call method 'execute' of null
at Connection. (C:\Users\tshrestha\GSMarquee\node-web\main\node_modules\amqp\amqp.js:833:12)
at Connection.emit (events.js:67:17)
at TCP.onread (net.js:347:14)

Looking at amqp.js
parser.onError = function(e) {
self.end();
self.emit("error", e);
self.emit("close");
parser = null;
};

self.addListener('end', function () {
self.end();
// in order to allow reconnects, have to clear the
// state.
parser = null;
});

So, the parser is reset to null on end or error. But somehow a 'data' event arrived afterwards:
self.addListener('data', function (data) {
parser.execute(data); //<--this is line #833
});

Manual acknowledgments in Node.js using node-amqp module

I am using the module node-amqp by postwait : https://github.com/postwait/node-amqp

I am able to publish/subscribe to a queue some messages, but I would like to acknowledge the messages manually.

For example, I want the message to be read, and not acknowledged, then to execute some calculation on this message, and THEN send a q.shift() to acknowledge the message and go through the second message.

I want the acknowledges to be manually used.

Actually I have a function which permits to subscribe with the flag {ack: false} then I have a function to manually make a q.shift() (as said in the doc).

But when I publish 2 messages, and then I read 2 messages, without acknowledging, I see the 2 messages. But I would like the subrscribe to repeat the same message that was received at first, until I didn't acknowledge it.

How can I use the methods ?

cannot get queue from non default virtual host

using 0.1.1

I've created virtual host "dev" on my rabbit instance. I cannot connect to queue for anything other than default "/" virtual host. I do not get an error if I connect to any queue on "/" vhost.

var queueName = "devqueue";
 var connection = amqp.createConnection({'host': 'localhost', 'port': 5672, 'vhost' : 'dev'});

 connection.on('ready', function() {
        var args = {'exclusive': false, 'autoDelete': false, 'durable' : true};
        var q = connection.queue(queueName , args);
        q.bind('#');
 });

yields the error

node.js:201
        throw e; // process.nextTick error, or 'error' event on first tick
              ^
Error: INTERNAL_ERROR
    at Connection._onMethod (/opt/apps/amqp/scripts/node_modules/amqp/amqp.js:995:15)
    at AMQPParser.onMethod (/opt/apps/amqp/scripts/node_modules/amqp/amqp.js:811:12)
    at AMQPParser._parseMethodFrame (/opt/apps/amqp/scripts/node_modules/amqp/amqp.js:456:10)
    at AMQPParser.execute (/opt/apps/amqp/scripts/node_modules/amqp/amqp.js:206:20)
    at Connection.<anonymous> (/opt/apps/amqp/scripts/node_modules/amqp/amqp.js:851:12)
    at Connection.emit (events.js:67:17)
    at TCP.onread (net.js:329:14)

Exchange and Queue callbacks aren't executed

When calling "connection.exchange(...)" or "connection.queue(...)" the callback isn't being executed. I can profile the amqp.js file and see it in the Exchange and Queue objects, but it's not being executed. It's only invoked if the Exchange or Queue names already exist.

Frame size issue on connection

Hi!

I was getting started with Node.Js and interest in testing the speed of queuing with RabbitMQ. However i got stuck with this issue connecting. I will be investigating more into it, but perhaps it is something i overlooked or something else in the code.

Basically i get this error which shows a fairly large frame size. I also have a PHP implementation and the frame size is set to the default rabbitMQ size of 131072 and it seems fine.

execute: AMQP

got frame: [65,19793,1342243080]


events.js:47
        throw new Error("Uncaught, unspecified 'error' event.");
              ^
Error: Uncaught, unspecified 'error' event.
    at Connection.emit (events.js:47:15)
    at AMQPParser.onError (/nodeJS_stuff/node_modules/amqp/amqp.js:839:12)
    at AMQPParser.throwError (/nodeJS_stuff/node_modules/amqp/amqp.js:145:25)
    at AMQPParser.execute (/nodeJS_stuff/node_modules/amqp/amqp.js:181:18)
    at Connection.<anonymous> (/nodeJS_stuff/node_modules/amqp/amqp.js:851:12)
    at Connection.emit (events.js:64:17)
    at Connection._onReadable (net.js:672:14)
    at IOWatcher.onReadable [as callback] (net.js:177:10)

My only initial guess is somehow the frame length in the header position is different thus reading the wrong length value, but i am not well versed in rabbitMQ packets yet.

Using NodeJs v0.4.12
RabbitMQ Server: 1.7.2 default config values, running on ubuntu.

Code:
var conn = amqp.createConnection({url: rabbitUrl()}); conn.on('ready', setup);

For me it seems like a code issue at least with the settings/config i am using considering the PHP client works fine.

How to send JSON objects via AMQP ?

Hi !

I am actually able to send a "Hello World" via AMQP publishing, and to the value by subscribing.

I would like, instead of sending a simple "Hello World", to send a complex object like a JSON object.
I am not sure if what I say makes sense, so I illustrate it with an example :

Here is a class :

var MyClass;
MyClass = (function() {
function MyClass(myText) {
if (myText == null) {
myText = "test";
}
this.myText = myText;
}
MyClass.prototype.display = function() {
return "Method display works : " + this.myText;
};
return MyClass;
})();

I will do this to create the object, then I send it with node-amqp :
var test = new MyClass("Hello World !!!");

And after receiving the object by subrscribing to the queue, I am able to get json.myText, but not able to call the method djson.display().

For example :

q.subscribe(function (json, headers, deliveryInfo) {
recvCount++;
assert.equal("node-json-fanout", deliveryInfo.exchange);
assert.equal("node-json-queue", deliveryInfo.queue);
assert.equal(false, deliveryInfo.redelivered);

  // Receiving a message
  switch (deliveryInfo.routingKey) {
    case 'message.json1':
      console.log("message 1 : " + json.myText + " And here is the method call: " + json.display());
      break;

    default:
      throw new Error('unexpected routing key: ' + deliveryInfo.routingKey);
  }
})

I have the error : TypeError: Object # has no method 'display'

When I did some messaging with Ruby + the RabbitMQ gem (an AMQP implementation), I was able to send objects (with attributes and methods) and use it after receiving by "marshalling" the object before sending, and unmarshalling the object at reception.

I need a method that equals the function of "Marshalling" / "Unmarshalling" or something which will permit me to send/reveive objects issued from a "new" and to be able to use its attributes (here I am already able to do it), but ALSO its METHODS ! How can I do it ?

Thanks for your help !

No way to catch 'Error: NOT_FOUND' exceptions

I stumbled upon an annoying bug trying to use the passive option of the queue.
The library doesn't properly handle the
'NOT_FOUND - no queue 'xxxxx' in vhost '/'' event, rendering passive useless.

There is no way to set prefetchCount for Queue

I need to collect messages to be processed in batch on remote service, i also want to be sure that messages safely stored in durable queue until have not been processed by service.
To implement this approach i use durable queue with prefetchsize = batchsize and manual acknowledge.

  var batch = new Array();
   var q = connection.queue('Q', {durable : true});

   // HERE I NEED TO SAY    q.basicQos(5);

    q.on('queueDeclareOk', function (args) {
        q.bind(e, "key");
        q.subscribeRaw({ack:true}, function(message) {
            batch.push(message);
            if(batch.length == BATCH_SIZE){
                self.batch.forEach(function(m){
                    m.acknowledge();
                });
                self.batch.length = 0;
            }
        });
    });

I've made temporary patch in local copy:

Queue.prototype.basicQos = function(prefetchCount){
this.connection._sendMethod(this.channel, methods.basicQos,
{ reserved1: 0
, prefetchSize: 0
, prefetchCount: prefetchCount
, global: false
});
};

occasional "sourceEnd out of bounds" exception on receipt of message

This might be related to receiving a large message, it doesn't seem to happen all the time. Looks to be related to some code that was recently committed.

buffer.js:494
    throw new Error('sourceEnd out of bounds');
          ^ 
Error: sourceEnd out of bounds
    at Buffer.copy (buffer.js:494:11)
    at frame (/smartdc/node_modules/amqp/amqp.js:169:10)
    at header (/smartdc/node_modules/amqp/amqp.js:159:14)
    at frameEnd (/smartdc/node_modules/amqp/amqp.js:204:16)
    at frame (/smartdc/node_modules/amqp/amqp.js:171:14)
    at header (/smartdc/node_modules/amqp/amqp.js:159:14)
    at frameEnd (/smartdc/node_modules/amqp/amqp.js:204:16)
    at frame (/smartdc/node_modules/amqp/amqp.js:171:14)
    at AMQPParser.header [as parse] (/smartdc/node_modules/amqp/amqp.js:159:14)
    at AMQPParser.execute (/smartdc/node_modules/amqp/amqp.js:230:21)

Problem when publishing to a queue BEFORE listening on it

When I start at first, a server in listening for messages, and THEN, a client publishing 4 messages :
I see 4 messages received at server side

BUT there is a problem, when I start a client publishing 4 messages (while the subscriber is off, but the daemon RabbitMQ is started of course), and THEN I start the server in order to subrscribe to the queue and to receive these 4 same messages, I receive only 3 messages, and every other messages are received normally.

The fact is that the FIRST message is never seen at the server side if I publish before subscribing.
How can I resolve this problem ?

Implementing reject

From what i can gather inspecting the source code this is not yet implemented? Is there a pragmatic reason for this? If not I'll see what I can wipe up this weekend now I understand a little more about this library

Uncaught basicReturn

I'm writing a small test and, when there are no subscribers in the exchange while using mandatory delivery mode, I get an an uncaught exception, despite me catching that event.

Code snippets

var msgDeliveryOptions = {
    mandatory: true,
    deliveryMode: 2          // persistent = 2, non-persistent = 1
};

var exchange = connection.exchange('acacio.topic', null, function(e) { util.puts(e.name); });
exchange.publish('test', msg, msgDeliveryOptions);
exchange.on('basicReturn', exchangeError);

function exchangeError(err) {
  util.puts(JSON.stringify(err, null, 2));
  util.puts('I got it!');
}

OUTPUT:

{
  "replyCode": 312,
  "replyText": "NO_ROUTE",
  "exchange": "acacio.topic",
  "routingKey": "test"
}
I got it!
Warning: Uncaught basicReturn: {"replyCode":312,"replyText":"NO_ROUTE","exchange":"acacio.topic","routingKey":"test"}

connectionCloseOK, queueUnbindOK, basicCancelOK missing

Also added the ability to close a connection via

Connection.prototype.closeConnection = function () {
//
// 0 is the control channel
//
this._sendMethod(0, methods.connectionClose, {'replyText': 'Goodbye from node',
'replyCode': 200,
'classId': 0,
'methodId': 0});
//this.end();
};

then on _onMethod

case methods.connectionCloseOk:
this.end();
this._onMethod(0, methods.connectionClose, args);
break;

I end the connection. I don't know how to do the social commit or things that make github great. I can push these to you if you like.

SlowBuffer not accepted as Buffer

The Protobuf C++ module for node generates SlowBuffers, rather than Buffers (I believe that most modules with a native component will do the same ... Buffers appear to be more difficult to manipulate from C++). These can't be passed along to AMQP, because of the check on amqp.js line 1135 for body instanceof Buffer. That check should probably be Buffer.isBuffer(body) instead, otherwise the check fails and AMQP regards the SlowBuffer as JSON and tries to encode it. There are a few more cases in the source where instanceof Buffer is used, and most of them should probably be changed.

passive:false catch exception

I'm using {passive:false} to test if a queue exists. As expected, if the queue doesn't exist, I get an exception. However, it's not "catchable". Shouldn't this error be caught on the connection object if connection.on('error. . .is setup?

var amqp = require('amqp');
var connection = null;

var get_connection = function(cb) {
if (connection === null) {
connection = amqp.createConnection({host: "localhost"});
connection.on('ready', function() {
console.log('ready');
connection.exchange("mog");
cb(connection);
});
connection.on('error', function(e) {
console.log("error: " + e);
log.error("---->AMQP connection closed");
connection = null;
});
connection.on('end', function() {
connection = null;
});
connection.on('close', function(){
console.log("closed");
});
}
else {
cb(connection);
}
};

get_connection(function(conn){
conn.queue("foo2", {'passive':true},function(q){
console.log(q);
});
});

process.on("uncaughtException", function(e){
// caught here
console.log(e);
});

Error: NOT_FOUND - no queue 'foo2' in vhost '/'
at Queue._onMethod (/Users/rich/dev/foo/foojs/node_modules/amqp/amqp.js:1703:15)
at Queue._onChannelMethod (/Users/rich/dev/foo/foojs/node_modules/amqp/amqp.js:1348:14)
at Connection._onMethod (/Users/rich/dev/foo/foojs/node_modules/amqp/amqp.js:917:28)
at AMQPParser.onMethod (/Users/rich/dev/foo/foojs/node_modules/amqp/amqp.js:792:12)
at AMQPParser._parseMethodFrame (/Users/rich/dev/foo/foojs/node_modules/amqp/amqp.js:441:10)
at frameEnd (/Users/rich/dev/foo/foojs/node_modules/amqp/amqp.js:186:16)
at frame (/Users/rich/dev/foo/foojs/node_modules/amqp/amqp.js:171:14)
at AMQPParser.header as parse
at AMQPParser.execute (/Users/rich/dev/foo/foojs/node_modules/amqp/amqp.js:230:21)
at Connection. (/Users/rich/dev/foo/foojs/node_modules/amqp/amqp.js:832:12)

RabbitMQ errors - can't create connection with latest module

Error -

=ERROR REPORT==== 3-Jun-2011::11:07:58 ===
exception on TCP connection <0.1026.0> from 127.0.0.1:34321
{bad_header,<<65,77,81,80,0,0,9,1>>}

Connection code -

var amqp_conn = amqp.createConnection({ host: 'localhost', login: 'guest', password: 'guest', vhost : '/' });

Great module by the way. Keep up the great work.

experation vs expiration in comments

I noticed that in the comments, you have experation,

./amqp/amqp.js:// - experation
./amqp/amqp.js:// - experation
./amqp/amqp.js:// - experation
./amqp/index.js:// - experation
./amqp/index.js:// - experation
./amqp/index.js:// - experation

but in the AMQP XML definitions it specifically ONLY has.

If you are going to list possible options of arguments in the comments for user to follow, could you please correct it if its wrong. Or do some magic and make both work ;-)

Cheers

Raul

Example in README.md has too many errors

The example could never work now.
q.bind and q.subscribe should be called when the queue is ready - on callback in connection.queue as it does in test harness test file - test-queue-creation.js
Also it says that amq.topic default exchange is used, but it's defaultExchangeName is set to '' now.

Publish performance is very slow

We have severe performance problems with node-amqp when under load in production. To reproduce I created this little publisher program:

var amqp = require('amqp');

var conn = amqp.createConnection({url: "amqp://guest:guest@localhost:5672"});
conn.on('ready', function(){

    for(var i = 0; i < 50000; i++){
        conn.publish('labs', {my: "paylaod" + i});
    }

    console.log('published ' + i + ' messages');
});

On the other end of RabbitMQ this little program reads messages as they are published and display some timing statistics:

var amqp = require('amqp');

var conn = amqp.createConnection({url: "amqp://guest:guest@localhost:5672"});

conn.on('ready', function(){
    var n = 0;
    conn.queue('labs', function(queue){
        var prev = new Date().getTime();
        queue.subscribe(function (message, headers, deliveryInfo) {
            n++;
            if(n % 1000 === 0){
                var next = new Date().getTime();
                console.log('read ' + n + ' messages in ' + (next - prev) + " ms");
                prev = next;
            }
        });
    });
});

As you can see the test run results show that performance is significantly deteriorated as more and more messages are published:

read 1000 messages in 352 ms
read 2000 messages in 395 ms
read 3000 messages in 418 ms
read 4000 messages in 500 ms
read 5000 messages in 459 ms
read 6000 messages in 479 ms
read 7000 messages in 469 ms
...
read 46000 messages in 2778 ms
read 47000 messages in 2767 ms
read 48000 messages in 2818 ms
read 49000 messages in 2814 ms
read 50000 messages in 2847 ms

I re-wrote the publisher program in Python using pika, for comparison:

import pika

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    for i in range(50000):
        channel.basic_publish(
            exchange='',
            routing_key='labs',
            body='{"my": "payload"}')

    print " [x] Sent " + str(i) + " messages"

    connection.close()


if __name__ == '__main__':
    main()

As you can see the test run with pika shows much better performance that stays constant for the whole run:

read 1000 messages in 232 ms
read 2000 messages in 235 ms
read 3000 messages in 221 ms
read 4000 messages in 223 ms
read 5000 messages in 227 ms
read 6000 messages in 224 ms
read 7000 messages in 225 ms
...
read 46000 messages in 237 ms
read 47000 messages in 218 ms
read 48000 messages in 247 ms
read 49000 messages in 225 ms
read 50000 messages in 229 ms

Would you be able to look into why the difference in performance between two technologies is so significant and if this can be addressed? Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.