Code Monkey home page Code Monkey logo

node-stomp-client's Introduction

Stomp Client

Build Status Monthly Downloads Version Licence

A node.js STOMP client. Props goes to Russell Haering for doing the initial legwork.

The following enhancements have been added:

  • Unit tests
  • Ability to support different protocol versions (1.0 or 1.1) - more work needed
  • Inbound frame validation (required / regex'able header values)
  • Support for UNSUBSCRIBE frames in client
  • Ability to add custom headers to SUBSCRIBE/UNSUBSCRIBE frames
  • ACK and NACK support

Installation

npm install stomp-client

Super basic example

var Stomp = require('stomp-client');
var destination = '/queue/someQueueName';
var client = new Stomp('127.0.0.1', 61613, 'user', 'pass');

client.connect(function(sessionId) {
    client.subscribe(destination, function(body, headers) {
      console.log('This is the body of a message on the subscribed queue:', body);
    });

    client.publish(destination, 'Oh herrow');
});

The client comes in two forms, a standard or secure client. The example below is using the standard client. To use the secure client simply change StompClient to SecureStompClient

API

Queue Names

The meaning of queue names is not defined by the STOMP spec, but by the Broker. However, with ActiveMQ, they should begin with "/queue/" or with "/topic/", see STOMP1.0 for more detail.

Stomp = require('stomp-client')

Require returns a constructor for STOMP client instances.

For backwards compatibility, require('stomp-client').StompClient is also supported.

Stomp(address, [port], [user], [pass], [protocolVersion], [vhost], [reconnectOpts], [tls])

  • address: address to connect to, default is "127.0.0.1"
  • port: port to connect to, default is 61613
  • user: user to authenticate as, default is ""
  • pass: password to authenticate with, default is ""
  • protocolVersion: see below, defaults to "1.0"
  • vhost: see below, defaults to null
  • reconnectOpts: see below, defaults to {}
  • tls: Establish a tls/ssl connection. If an object is passed for this argument it will passed as options to the tls module.

Protocol version negotiation is not currently supported and version "1.0" is the only supported version.

ReconnectOpts should contain an integer retries specifying the maximum number of reconnection attempts, and a delay which specifies the reconnection delay. (reconnection timings are calculated using exponential backoff. The first reconnection happens immediately, the second reconnection happens at +delay ms, the third at + 2*delay ms, etc).

Stomp(options)

  • options: Properties are named the same as the positional parameters above. The property 'host' is accepted as an alias for 'address'.

stomp.connect([callback, [errorCallback]])

Connect to the STOMP server. If the callbacks are provided, they will be attached on the 'connect' and 'error' event, respectively.

virtualhosts

If using virtualhosts to namespace your queues, you must pass a version header of '1.1' otherwise it is ignored.

stomp.disconnect(callback)

Disconnect from the STOMP server. The callback will be executed when disconnection is complete. No reconnections should be attempted, nor errors thrown as a result of this call.

stomp.subscribe(queue, [headers,] callback)

  • queue: queue to subscribe to
  • headers: headers to add to the SUBSCRIBE message
  • callback: will be called with message body as first argument, and header object as the second argument

stomp.unsubscribe(queue, [headers])

  • queue: queue to unsubscribe from
  • headers: headers to add to the UNSUBSCRIBE message

stomp.publish(queue, message, [headers])

  • queue: queue to publish to
  • message: message to publish, a string or buffer
  • headers: headers to add to the PUBLISH message

stomp.ack(messageId, subscription, [transaction]),

stomp.nack(messageId, subscription, [transaction])

  • messageId: the id of the message to ack/nack
  • subscription: the id of the subscription
  • transaction: optional transaction name

Property: stomp.publishable (boolean)

Returns whether or not the connection is currently writable. During normal operation this should be true, however if the client is in the process of reconnecting, this will be false.

Event: 'connect'

Emitted on successful connect to the STOMP server.

Event: 'error'

Emitted on an error at either the TCP or STOMP protocol layer. An Error object will be passed. All error objects have a .message property, STOMP protocol errors may also have a .details property.

If the error was caused by a failure to reconnect after exceeding the number of reconnection attempts, the error object will have a reconnectionFailed property.

Event: 'reconnect'

Emitted when the client has successfully reconnected. The event arguments are the new sessionId and the reconnection attempt number.

Event: 'reconnecting'

Emitted when the client has been disconnected for whatever reason, but is going to attempt to reconnect.

Event: 'message' (body, headers)

Emitted for each message received. This can be used as a simple way to receive messages for wildcard destination subscriptions that would otherwise not trigger the subscription callback.

LICENSE

MIT

node-stomp-client's People

Contributors

blakmatrix avatar bmocm avatar dev0x10 avatar easternbloc avatar gknapp avatar grenzr avatar jangie avatar kgoerlitz avatar nicksellen avatar russellhaering avatar sam-github avatar shaneisrael avatar storkme avatar tepafoo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-stomp-client's Issues

Messages published multiple times

Hi! we built a wrapper over the stomp-client. We basically want to provide a onMessage and putMessage capabilities.
This is what our code looks like

var Stomp = require('stomp-client');
module.exports = function (config) {
  var client = new Stomp(config.address, config.port, config.user, config.pass);
  return {
    onMessage: function (onMessage) {
      client.connect(function (sessionId) {
        client.subscribe(config.queueName, onMessage);
      });
    },
    putMessage: function (message) {
      client.connect(function (sessionId) {
        client.publish(config.queueName, message);
      });
    }
  }
}

Problem we are having is that often times the line client.publish(config.queueName, message); is called a random amount of times...yes...random. This is:

  1. We receive a request in our API
  2. We do something
  3. Call putMessage function

When I debug the line client.publish(config.queueName, message); sometimes is called like up to 10 times.
Do any of you have an idea of what may be happening?
I probably will be trying a different stomp-client to see its behavior. Really weird and odd to me.

I know that from the other side of the queue there is something subscribed that will convert messages into files and do something with them...but problem seems to be in this side of the queue (publish) since when I debug I can see the code going through it multiple times for one single request.

any thoughts?

Can't resolve 'tls' ,how to resolve it?

ERROR in ./node_modules/stomp-client/lib/client.js
Module not found: Error: Can't resolve 'tls' in 'C:\projects\demos\STOMP\node_modules\stomp-client\l
ib'
@ ./node_modules/stomp-client/lib/client.js 3:10-24
@ ./src/requireComponent.js
@ ./src/index.js
@ multi (webpack)-dev-server/client?http://localhost:8686 ./src/index.js

Large messages break subscribe functionality.

It appears that messages above some certain size break subscription handling.

Tried a code snippet along the lines of:

socketClient.connect(function(){
    logger.info("Subscribed");
    socketClient.subscribe("/queue/connector", function(body, header){
        console.log("Something Happened!")
    });
});

When I publish messages into /queue/connector, it seems to handle messages of at least length 3509, but it breaks for messages with length 119858. I haven't zeroed in on the magic number yet.

Additionally, when it fails once at processing the large message, subsequent short messages also start failing.

I added some print statements to the library's lib/client.js frameEmitter's on 'MESSAGE', handler and those don't even get triggered when I publish a large message, so I'm guessing the problem is happening upstream from there.

Add possibility to connect/disconnect to a queue @ runtime

If you use a queue for sending measurements for example and a microservice delivers these to a database, you want to be able to block getting messages from the que if there is something wrong with the database. Otherwise you will loose measurements. This is just 1 use case.
But in the end it would be great to have the possibility to enable/disable the connection to the STOM server or halt the subscription to the STOMP server.

memory leak in eventemitter

I was testing the package by publishing and subscribing on my own. I was publishing data every 2000 milliseconds and then the following warning appeared on console.

(node) warning: possible EventEmitter memory leak detected. 11 MESSAGE listeners added. Use emitter.setMaxListeners() to increase limit.
Trace
at StompFrameEmitter.addListener (events.js:179:15)
at StompClient.onConnect (/Users/deniz/WebstormProjects/crm-sync/node_modules/stomp-client/lib/client.js:199:16)
at Socket.emit (events.js:104:17)
at TCPConnectWrap.afterConnect as oncomplete

I inspected the memory usage with "top" command and then realized that memory leak was actually real.

Error Callback

How does the client get notified, when the server connection goes down? I have defined the "errorCallback" in:

stomp.connect([callback, [errorCallback]])

but it never gets called if the server goes down.

Cannot see reconnect event

I'm testing reconnection capability but I couldn't see the reconnection after I restart my ActiveMQ server.
Here is my code:

var activemqClient = new Stomp(
  Config.get('ActiveMQ.host'), Config.get('ActiveMQ.port'),
  Config.get('ActiveMQ.username'), Config.get('ActiveMQ.password'),
  '1.0', null, {'retries':10, 'delay':3}
);

activemqClient.on('reconnect', function () {
  logger.info('ActiveMQ reconnected successfully');
});

On the documentation, Stomp parameters missing virtualHost as I see inside client.js.

function StompClient(address, port, user, pass, protocolVersion, vhost, reconnectOpts) {...}

Module fails if messages contain utf8

If a message contains utf8 StompFrameEmitter.prototype.parseBody function and the StompFrame.prototype.send function don't work properly. I managed to patch it (will fork) in a somewhat inelegant manor:

StompFrameEmitter.prototype.parseBody = function () {
  var bufferBuffer = new Buffer(this.buffer);

  if (this.frame.contentLength > -1) {
    var remainingLength = this.frame.contentLength - this.frame.body.length;

    this.frame.appendToBody(bufferBuffer.slice(0, remainingLength).toString());
    this.buffer = bufferBuffer.slice(remainingLength, bufferBuffer.length).toString();

    if (this.frame.contentLength == Buffer.byteLength(this.frame.body)) {
      this.frame.contentLength = -1;
    } else {
      return;
    }
  }

  var index = this.buffer.indexOf('\0');

  if (index == -1) {
    this.frame.appendToBody(this.buffer);
    this.buffer = '';
  } else {
    // The end of the frame has been identified, finish creating it
    this.frame.appendToBody(this.buffer.slice(0, index));

    var frameValidation = this.getFrameValidation(this.frame.command);

    if (frameValidation.isValid) {
      // Emit the frame and reset
      this.emit('frame', this.frame);             // Event emit to catch any frame emission
      this.emit(this.frame.command, this.frame);  // Specific frame emission
    } else {
      this.emit('parseError', { message: frameValidation.message });
    }

    this.frame = new StompFrame();
    this.incrementState();
    this.buffer = this.buffer.substr(index + 1);
  }
};

StompFrame.prototype.send = function(stream) {
  stream.write(this.command + '\n');
  for (var key in this.headers) {
    stream.write(key + ':' + this.headers[key] + '\n');
  }
  if (this.body.length > 0) {
    stream.write('content-length:' + Buffer.byteLength(this.body) + '\n');
  }
  stream.write('\n');
  if (this.body.length > 0) {
    stream.write(this.body);
  }
  stream.write('\0');
};

Documentation @ https://www.npmjs.com/package/stomp-client#stompaddress-port-user-pass-protocolversion-reconnectopts is slightly wrong

The documentation https://www.npmjs.com/package/stomp-client#stompaddress-port-user-pass-protocolversion-reconnectopts

mentions the "reconnectOptions" as the 6th argument, but in real its the 7th argument. I had to go thru the code to figure out this discrepancy as no obvious errors are thrown when this is offered as the 6th argument. kindly correct the documentation.

parse message-id error

the message-id include ":", example: xxxxxxxxx04-37571-1369787148937-2:209:-1:1:3

so split it by ":" is error.

var kv = line.split(':', 2); //from parser.js

Body is empty when message pushed from log4j appender

The topic message (body) is empty
ActiveMQ version - 5.11.0

Am using log4j JMS appender and pushing log messages to a topic. And running stomp client in node js app

When a new log message arrives, the stomp client app responds , but the body is empty

var Stomp = require('stomp-client');
var destination = '/topic/ic';
var client = new Stomp('127.0.0.1', 61613, '', '');

client.connect(function(sessionId) {
client.subscribe(destination, function(body, headers) {
console.log('Mesage');
console.log(body); //This is empty
});

});

(net.createConnection is not a function) error in React JS

Hi, using React js and got this error at the beginning. (The options are not right but atleast it could get me a callback error instead of net error.)

const Stomp = require("stomp-client");

const destination = "/queue/someQueueName";
const client = new Stomp("127.0.0.1", 61613, "user", "pass");

export default function StompTest() {
  client.connect(
    function (sessionId) {
      client.subscribe(destination, function (body, headers) {
        console.log(
          "This is the body of a message on the subscribed queue:",
          body
        );
      });

      client.publish(destination, "Oh herrow");
    },
    function (err) {
      console.log(err);
    }
  );
}

Wildcard subscriptions unsupported

Hi,

I've noticed that due to the way the client works, wildcard subscriptions are not going to function as expected, e.g. subscribing to /topic/foo.* is never going to get triggered as the destination header will likely be something like /topic/foo.bar and thus won't match any registered callback

Might be worth chucking something into the docs saying it's unsupported.

Cheers.

(disclaimer: only tested against RabbitMQ, other brokers may implement the destination header differently to the way I've described.)

Frame is null that causing error on write

For a certain reason I don't know why I cannot declare 'subscribe'.
I checked that on first time connecting to ActiveMQ, the stream is null and cannot call write method.
I guess related to other npm libraries I used.

"dependencies": {
    "bluebird": "^2.9.24",
    "cassandra-driver": "^2.0.1",
    "config": "^1.12.0",
    "good": "^5.1.2",
    "good-console": "^4.1.0",
    "good-file": "^4.0.2",
    "hapi": "^8.4.0",
    "heapdump": "^0.3.5",
    "hiredis": "^0.3.0",
    "iron_mq": "^0.2.4",
    "lodash": "^3.6.0",
    "moment": "^2.9.0",
    "mysql": "git://github.com/felixge/node-mysql",
    "path": "^0.11.14",
    "poop": "^1.2.5",
    "redis": "^0.12.1",
    "stomp-client": "^0.6.2",
    "swig": "^1.4.2",
    "tv": "^4.1.0",
    "winston": "^0.9.0",
    "wreck": "^5.4.0"
  }

check whether if stream error before write #42

Topic Problem

Hi,
It works with Topic?
I'm trying to connect to topic (jboss hornetq) but show a error
"{ '0': [Error: Error creating subscription subscription/jms.topic.bcssSaidaMonitoramento], '1': 'HQ119032: User: bcssjmsuser doesnt have permission=CREATE_NON_DURABLE_QU
EUE on address {2}' }".
There is any special configuration to topic?

When I try Queue, it works. (jms.queue.bcssSaidaMonitoramento)

Thank you

Header parsing error with reply-to temporary queue from ActiveMQ

When a temporary queue is passed from ActiveMQ in the reply-to header, it contains embedded colons (':') like "/remote-temp-queue/ID:SYSTEM-NAME-49739-1421170578125-2:10:1". The parser does not handle this properly. It ends up stopping at the first embedded ':' and dropping the rest of the value, so the reply-to header ends up only containing "/remote-temp-queue/ID".

StompClient holding state after disconnected

Hi,

Similar to #19 I'm looking into a way of handling re-connections if disconnected due to a socket error. My current solution is to register an error event listener, and simply call connect() (with no arguments).

The only issue I'm having with that is the client isn't designed to be used in this way, as the subscriptions and _stompFrameEmitter objects are re-used between connections, causing some side effects (messages being delivered multiple times, not re-subscribing to queues).

Something like this https://gist.github.com/storkme/43dec2d73bef5d804375 seems to do the trick, but it's hardly elegant. Perhaps it would make sense to expose a reset function or similar?

TLS support

The STOMP server we are connecting just supports TLS/SSL connections.
...but the client seems not to support that.

Is that feature planed or is there a fork that already supports encrypted connections?

Limit number of messages received at once

Hi, I'm using stomp-client over an update stream VirtualTopic and sometimes the queue can grow too quick. While connected the client can handle it but when disconnected and reconnects, too many messages come at once without interruption.

I think stomp-client could have a limit to how many messages handling simultaneous (while not ack'ed) or even set a configuration to only 1 message each time. What do you think?

Connection Timeout?

If the stomp server is down when stomp-client tries to connect, it won't timeout. It will just sit there waiting for a connection. If the server comes up it won't connect.
There really should be a timeout or retry interval or something.

I apologize if I missed a timeout setting that is there, I wasn't able to find it.

You can test this by trying to connect to a server that doesn't exist.

Also there are like 4 stomp clients out there for Node. It would be nice if we had 1 really mature implementation.

Heart-beat not utilized. Made my own for now.

I forked the repo here to implement a crude TCP heartbeat when connecting to an ActiveMQ server. I do not like the overall implementation/use case so I am not making a pull request, but if you would like to see it in action, checkout my fork.

var Stomp = require('stomp-client');
client = new Stomp('localhost', 61613, '', '', '1.1', undefined, undefined, '1000,1000'); // One second heartbeats.

https://github.com/ahudak/node-stomp-client.git

I found the activemq connection state in my app much easier to manage with this change now that the error handler gets called in a timely manner.

Connecting to existing queue

I am using the following code which is mentioned in the ReadMe for subscribing and publishing to the RabbitMQ Queue:

var Stomp = require('stomp-client');
var destination = '/queue/someQueueName';
var client = new Stomp('127.0.0.1', 61613, 'user', 'pass');

client.connect(function(sessionId) {
    client.subscribe(destination, function(body, headers) {
      console.log('This is the body of a message on the subscribed queue:', body);
    });

    client.publish(destination, 'Oh herrow');
});

However each time I start my Node.JS service I see a new queue created with some random name like: stomp-subscription-dmAADeGXr6fWuOgLSo13zA I already have the queue created in RabbitMQ and I would like to connect to the same queue each time instead of creating a new queue always as the queue created is temporary and gets deleted when the NodeJS service stops. Is there any way to connect to a persistent queue.

Queue message repetition

code:

var Stomp = require('stomp-client');
var dest2 = '/queue/st5';
var client = new Stomp('127.0.0.1', 61613, 'admin', 'admin');
client.connect(function(sessionId) {
console.log(sessionId)
client.publish(dest2,JSON.stringify({name:'消灭',age:23}));
client.publish(dest2,JSON.stringify({name:'xiaohua',age:3}));
client.subscribe(dest2,function(data){
console.log('from queue: '+data)
})
client.subscribe(dest2,function(data){
console.log('from queue2: '+data)
})

});

console :
ID:WIN-KMVDG7DGGD4-49230-1558420488751-5:
from queue: {"name":"消灭","age":23}
from queue2: {"name":"消灭","age":23}
from queue: {"name":"xiaohua","age":3}
from queue2: {"name":"xiaohua","age":3}

StompFrame.send BUG

StompFrame.prototype.send = function(stream) {
// Avoid small writes, they get sent in their own tcp packet, which
// is not efficient (and v8 does fast string concat).
var frame = this.command + '\n';
for (var key in this.headers) {
frame += key + ':' + this.headers[key] + '\n';
}
if (this.body.length > 0) {
frame += 'content-length:' + Buffer.byteLength(this.body) + '\n';
}
frame += '\n';
if (this.body.length > 0) {
frame += this.body;
}
frame += '\0';
stream.write(frame);
};

As the source code of StompFrame.send show above, every property in variable 'headers' will be sent, including functions. In my project I had extended some functions of Object's prototype already, this function will send function content as StompFrame's header to the Queue, and the subscriber got the unexpected message, maybe it is an error of header parser.
So that, could you add an filter to the header properties, assert the value to the property is not a function is nice, and consider that the header value contains '\n'?

What about sending TextMessages?

ActiveMQ uses the content-length header to decide whether to create a textmessage or bytesmessage for the message consumer in Java.

If content-length is missing then a TextMessage is created, any thoughts on passing a null content-length when publishing the message being interpreted as meaning no content-length should be passed, meaning a java client will get a TextMessage-

Singleton of stomp-client as parameter

Hey guys,
I posted my issue there https://stackoverflow.com/questions/49303927/create-a-singleton-of-stomp-client-and-pass-it-as-parameter
If someone got a solution, feel free to answer me please.

I simply created a connection by
const client = new Stomp(config) ;
client.connect(function(sessionId, err)); and tried to pass it as parameter to a manager
for instance : const myManager = new MyManager(client);
And use it into myManager :

function MyManager(client){
client.subscribe();
}

But I got the error "Not connected" on MyManager side.

What I did wrong ?

Support for nack on Stomp 1.1 needs a change

When I use the nack method, I keep getting the following error:

NACK received but connection is in v1.0 mode.

There's not too much information about that specific issue on Google, but I could solve it by editing the file lib/client.js to include a accept-version header when using the CONNECT command. So the headers on the file remain like this:

var headers = { 'login': self.user, 'passcode': self.pass, 'accept-version' : this.version };

Indefinite retry?

is there any way to introduce an indefinite retry into this project? we have a requirement when we'd never really want to stop trying to reconnect; it simply wouldn't make sense to stop trying.

we could hack it and make the number huge but feels a bit dirty. is there functionality we are missing or is it a PR you'd like us to raise?

Reconnect event does not work after a connection has been established.

To reproduce:

  • start message queue server (tested with ActiveMQ.)
  • connect and subscribe to a queue.
  • stop the message queue server.
  • start the message queue server.

in client.js, line 215:
self.emit('reconnect', frame.headers.session, self._retryNumber);
has no effect.

If I instead emit:
self.emit('connect', frame.headers.session);

Then the client successfully connects again.

No way to set clien-id on connect header

On the connect function, there's no way to set the client-id as it can be clearly seen that's not included in the CONNECT verb headers

image

And it shouil be possible to be added

image

Update README

Update README and remove;
"The client comes in two forms, a standard or secure client. The example below is using the standard client. To use the secure client simply change StompClient to SecureStompClient"

According to #46 Is it not used

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.