Code Monkey home page Code Monkey logo

node-rabbitmq-client's Introduction

npm version

RabbitMQ Client

Node.js client library for RabbitMQ. Publish messages, declare rules for routing those messages into queues, consume messages from queues.

Why not amqplib?

  • No dependencies
  • Automatically re-connect, re-subscribe, or retry publishing
  • Optional higher-level Consumer/Publisher API for even more robustness
  • Written in typescript and published with heavily commented type definitions
  • See here for full API documentation
  • Intuitive API with named parameters instead of positional
  • "x-arguments" like "x-message-ttl" don't have camelCase aliases

Performance

Performance is comparable to amqplib (see ./benchmark.ts). Time to publish 1 million messages in batches of 50:

TOTAL (messages)=1_000_000
BATCH_SIZE=50

time per batch (milliseconds) (transient queue)
                 total_time  mean   std     min    max
rabbitmq-client  39128       2.465  11.713  1.000  1192.000
[email protected]   42378       2.615  5.783   1.000  478.000


time per batch (milliseconds) (no queue)
                 total_time  mean   std    min    max
rabbitmq-client  23163       1.726  0.497  1.000  13.000
[email protected]   24897       1.842  0.444  1.000  17.000

Quick start

In addition to the lower-level RabbitMQ methods, this library exposes two main interfaces, a Consumer and a Publisher (which should cover 90% of uses cases), as well as a third RPCClient for request-response communication.

import {Connection} from 'rabbitmq-client'

// Initialize:
const rabbit = new Connection('amqp://guest:guest@localhost:5672')
rabbit.on('error', (err) => {
  console.log('RabbitMQ connection error', err)
})
rabbit.on('connection', () => {
  console.log('Connection successfully (re)established')
})

// Consume messages from a queue:
// See API docs for all options
const sub = rabbit.createConsumer({
  queue: 'user-events',
  queueOptions: {durable: true},
  // handle 2 messages at a time
  qos: {prefetchCount: 2},
  // Optionally ensure an exchange exists
  exchanges: [{exchange: 'my-events', type: 'topic'}],
  // With a "topic" exchange, messages matching this pattern are routed to the queue
  queueBindings: [{exchange: 'my-events', routingKey: 'users.*'}],
}, async (msg) => {
  console.log('received message (user-events)', msg)
  // The message is automatically acknowledged when this function ends.
  // If this function throws an error, then msg is NACK'd (rejected) and
  // possibly requeued or sent to a dead-letter exchange
})

sub.on('error', (err) => {
  // Maybe the consumer was cancelled, or the connection was reset before a
  // message could be acknowledged.
  console.log('consumer error (user-events)', err)
})

// Declare a publisher
// See API docs for all options
const pub = rabbit.createPublisher({
  // Enable publish confirmations, similar to consumer acknowledgements
  confirm: true,
  // Enable retries
  maxAttempts: 2,
  // Optionally ensure the existence of an exchange before we use it
  exchanges: [{exchange: 'my-events', type: 'topic'}]
})

// Publish a message to a custom exchange
await pub.send(
  {exchange: 'my-events', routingKey: 'users.visit'}, // metadata
  {id: 1, name: 'Alan Turing'}) // message content

// Or publish directly to a queue
await pub.send('user-events', {id: 1, name: 'Alan Turing'})

// Clean up when you receive a shutdown signal
async function onShutdown() {
  // Waits for pending confirmations and closes the underlying Channel
  await pub.close()
  // Stop consuming. Wait for any pending message handlers to settle.
  await sub.close()
  await rabbit.close()
}

Connection.createConsumer() vs Channel.basicConsume()

The above Consumer & Publisher interfaces are recommended for most cases. These combine a few of the lower level RabbitMQ methods (exposed on the Channel interface) and and are much safer to use since they can recover after connection loss, or after a number of other edge-cases you may not have imagined. Consider the following list of scenarios (not exhaustive):

  • Connection lost due to a server restart, missed heartbeats (timeout), forced by the management UI, etc.
  • Channel closed as a result of publishing to an exchange which does not exist (or was deleted), or attempting to acknowledge an invalid deliveryTag
  • Consumer closed from the management UI, or because the queue was deleted, or because basicCancel() was called

In all of these cases you would need to create a new channel and re-declare any queues/exchanges/bindings before you can start publishing/consuming messages again. And you're probably publishing many messages, concurrently, so you'd want to make sure this setup only runs once per connection. If a consumer is cancelled then you may be able to reuse the channel but you still need to check the queue and so on...

The Consumer & Publisher interfaces abstract all of that away by running the necessary setup as needed and handling all the edge-cases for you.

Managing queues & exchanges

The basic RabbitMQ methods are available on the Channel interface for creating/deleting queues, exchanges, or bindings, etc:

// Will wait for the connection to establish and then create a Channel
const ch = await rabbit.acquire()

// Channels can emit some events too (see documentation)
ch.on('close', () => {
  console.log('channel was closed')
})

// Create a queue for the duration of this connection
await ch.queueDeclare({queue: 'my-queue'})

// Enable publisher acknowledgements
await ch.confirmSelect()

const data = {title: 'just some object'}

// Resolves when the data has been flushed through the socket or if
// ch.confirmSelect() was called: will wait for an acknowledgement
await ch.basicPublish({routingKey: 'my-queue'}, data)

const msg = ch.basicGet('my-queue')
console.log(msg)

await ch.queueDelete('my-queue')

// It's your responsibility to close any acquired channels
await ch.close()

RPCClient: request-response communication between services

This will create a single "client" Channel on which you may publish messages and listen for direct responses. This can allow, for example, two micro-services to communicate with each other using RabbitMQ as the middleman instead of directly via HTTP.

// rpc-server.js
const rabbit = new Connection()

const rpcServer = rabbit.createConsumer({
  queue: 'my-rpc-queue'
}, async (req, reply) => {
  console.log('request:', req.body)
  await reply('pong')
})

process.on('SIGINT', async () => {
  await rpcServer.close()
  await rabbit.close()
})
// rpc-client.js
const rabbit = new Connection()

const rpcClient = rabbit.createRPCClient({confirm: true})

const res = await rpcClient.send('my-rpc-queue', 'ping')
console.log('response:', res.body) // pong

await rpcClient.close()
await rabbit.close()

node-rabbitmq-client's People

Contributors

cody-greene avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.