Code Monkey home page Code Monkey logo

op-rabbit's Introduction

Op-Rabbit

An opinionated RabbitMQ library for Scala and Akka.

Documentation

Browse the latest API Docs online.

For announcements, discussions, etc., join the discussion:

Issues go here:

Intro

Op-Rabbit is a high-level, type-safe, opinionated, composable, fault-tolerant library for interacting with RabbitMQ; the following is a high-level feature list:

  • Recovery:
    • Consumers automatically reconnect and subscribe if the connection is lost
    • Messages published will wait for a connection to be available
  • Integration
    • Connection settings pulled from Typesafe config library
    • Asynchronous, concurrent consumption using Scala native Futures or the new Akka Streams project.
    • Common pattern for serialization allows easy integration with serialization libraries such play-json or json4s
    • Common pattern for exception handling to publish errors to Airbrake, Syslog, or all of the above
  • Modular
    • Composition favored over inheritance enabling flexible and high code reuse.
  • Modeled
    • Queue binding, exchange binding modeled with case classes
    • Queue, and Exchange arguments, such as x-ttl, are modeled
    • HeaderValues are modeled; if you try and provide RabbitMQ an invalid type for a header value, the compiler will let you know.
    • Publishing mechanisms also modeled
  • Reliability
    • Builds on the excellent Akka RabbitMQ client library for easy recovery.
    • Built-in consumer error recovery strategy in which messages are re-delivered to the message queue and retried (not implemented for akka-streams integration as retry mechanism affects message order)
    • With a single message, pause all consumers if service health check fails (IE: database unavailable); easily resume the same.
  • Graceful shutdown
    • Consumers and streams can immediately unsubscribe, but stay alive long enough to wait for any messages to finish being processed.
  • Program at multiple levels of abstraction
    • If op-rabbit doesn't do what you need it to, you can either extend op-rabbit or interact directly with akka-rabbitmq Akka RabbitMQ client.
  • Tested
    • Extensive integration tests

Installation

Add the SpinGo OSS repository and include the dependencies of your choosing:

resolvers ++= Seq(
  "SpinGo OSS" at "http://spingo-oss.s3.amazonaws.com/repositories/releases"
)

val opRabbitVersion = "1.0.0-RC3"

libraryDependencies ++= Seq(
  "com.spingo" %% "op-rabbit-core"        % opRabbitVersion,
  "com.spingo" %% "op-rabbit-play-json"   % opRabbitVersion,
  "com.spingo" %% "op-rabbit-json4s"      % opRabbitVersion,
  "com.spingo" %% "op-rabbit-airbrake"    % opRabbitVersion,
  "com.spingo" %% "op-rabbit-akka-stream" % opRabbitVersion
)

A high-level overview of the available components:

  • op-rabbit-core API
    • Implements basic patterns for serialization and message processing.
  • op-rabbit-play-json API
    • Easily use Play Json formats to publish or consume messages; automatically sets RabbitMQ message headers to indicate content type.
  • op-rabbit-json4s API
    • Easily use Json4s to serialization messages; automatically sets RabbitMQ message headers to indicate content type.
  • op-rabbit-airbrake API
    • Report consumer exceptions to airbrake, using the Airbrake Java library.
  • op-rabbit-akka-stream API
    • Process or publish messages using akka-stream.

Upgrade Guide

Refer to Upgrade Guide wiki page for help upgrading.

Usage

Set up RabbitMQ connection information in application.conf:

op-rabbit {
  topic-exchange-name = "amq.topic"
  connection {
    virtual-host = "/"
    hosts = ["127.0.0.1"]
    username = "guest"
    password = "guest"
    port = 5672
    timeout = 3s
  }
}

Note that hosts is an array; Connection attempts will be made to hosts in that order, with a default timeout of 3s. This way you can specify addresses of your rabbitMQ cluster, and if one of the instances goes down, your application will automatically reconnect to another member of the cluster.

topic-exchange-name is the default topic exchange to use; this can be overriden by passing exchange = "my-topic" to TopicBinding or Message.topic.

Boot up the RabbitMQ control actor:

import com.spingo.op_rabbit.RabbitControl
import akka.actor.{ActorSystem, Props}

implicit val actorSystem = ActorSystem("such-system")
val rabbitControl = actorSystem.actorOf(Props[RabbitControl])

Set up a consumer: (Topic subscription)

(this example uses op-rabbit-play-json)

import com.spingo.op_rabbit.PlayJsonSupport._
import com.spingo.op_rabbit._
import play.api.libs.json._

import scala.concurrent.ExecutionContext.Implicits.global
case class Person(name: String, age: Int)
implicit val personFormat = Json.format[Person] // setup play-json serializer

val subscriptionRef = Subscription.run(rabbitControl) {
  import Directives._
  // A qos of 3 will cause up to 3 concurrent messages to be processed at any given time.
  channel(qos = 3) {
    consume(topic(queue("such-message-queue"), List("some-topic.#"))) {
      (body(as[Person]) & routingKey) { (person, key) =>
        // do work; this body is executed in a separate thread, as provided by the implicit execution context
        println(s"A person named '${person.name}' with age ${person.age} was received over '${key}'.")
        ack
      }
    }
  }
}

Now, test the consumer by sending a message:

subscriptionRef.initialized.foreach { _ =>
  rabbitControl ! Message.topic(Person("Your name here", 33), "some-topic.cool")
}

Stop the consumer:

subscriptionRef.close()

Note, if your call generates an additional future, you can pass it to ack, and message will be acked based off the Future success, and nacked with Failure (such that the configured RecoveryStrategy if the Future fails:

  // ...
      (body(as[Person]) & routingKey) { (person, key) =>
        // do work; this body is executed in a separate thread, as provided by the implicit execution context
        val result: Future[Unit] = myApi.methodCall(person)
        ack(result)
      }
  // ...

Accessing additional headers

As seen in the example above, you can extract headers in addition to the message body, using op-rabbit's Directives. You can use multiple declaratives via multiple nested functions, as follows:

import com.spingo.op_rabbit.properties._

// Nested directives
// ...
      body(as[Person]) { person =>
        optionalProperty(ReplyTo) { replyTo =>
          // do work
          ack
        }
      }
// ...

Or, you can combine directives using & to form a compound directive, as follows:

// Compound directive
// ...
      (body(as[Person]) & optionalProperty(ReplyTo)) { (person, replyTo) =>
        // do work
        ack
      }
// ...

See the documentation on Directives for more details.

Shutting down a consumer

The following methods are available on a SubscriptionRef which will allow control over the subscription.

// stop receiving new messages from RabbitMQ immediately; shut down consumer and channel as soon as pending messages are completed. A grace period of 30 seconds is given, after which the subscription forcefully shuts down. (Default of 5 minutes used if duration not provided)
subscription.close(30 seconds)

// Shut down the subscription immediately; don't wait for messages to finish processing.
subscription.abort()

// Future[Unit] which completes once the provided binding has been applied (IE: queue has been created and topic bindings configured). Useful if you need to assert you don't send a message before a message queue is created in which to place it.
subscription.initialized

// Future[Unit] which completes when the subscription is closed.
subscription.closed

Publish a message:

rabbitControl ! Message.topic(Person(name = "Mike How", age = 33), routingKey = "some-topic.very-interest")

rabbitControl ! Message.queue(Person(name = "Ivanah Tinkle", age = 25), queue = "such-message-queue")

By default:

  • Messages will be queued up until a connection is available

  • Messages are monitored via publisherConfirms; if a connection is lost before RabbitMQ confirms receipt of the message, then the message is published again. This means that the message may be delivered twice, the default opinion being that at-least-once is better than at-most-once. You can use UnconfirmedMessage if you'd like at-most-once delivery, instead.

  • If you would like to be notified of confirmation, use the ask pattern:

      ```scala
      import akka.pattern.ask
      import akka.util.Timeout
      import scala.concurrent.duration._
      implicit val timeout = Timeout(5 seconds)
      val received = (rabbitControl ? Message.queue(Person(name = "Ivanah Tinkle", age = 25), queue = "such-message-queue")).mapTo[Boolean]
      ```
    

Consuming using Akka streams

(this example uses op-rabbit-play-json and op-rabbit-akka-streams)

import Directives._
RabbitSource(
  rabbitControl,
  channel(qos = 3),
  consume(queue("such-queue", durable = true, exclusive = false, autoDelete = false)),
  body(as[Person])). // marshalling is automatically hooked up using implicits
  runForeach { person =>
    greet(person)
  } // after each successful iteration the message is acknowledged.

Note: RabbitSource yields an AckedSource, which can be combined with an AckedSink (such as ConfirmedPublisherSink). You can convert an acked stream into a normal stream by calling AckedStream.acked; once messages flow passed the acked component, they are considered acknowledged, and acknowledgement tracking is no longer a concern (and thus, you are free to use the akka-stream library in it's entirety).

Publishing using Akka streams

(this example uses op-rabbit-play-json and op-rabbit-akka-streams)

import com.spingo.op_rabbit._
import com.spingo.op_rabbit.stream._
import com.spingo.op_rabbit.PlayJsonSupport._
implicit val workFormat = Format[Work] // setup play-json serializer

val sink = ConfirmedPublisherSink[Work](
  rabbitControl,
  Message.factory(Publisher.queue(queueName())))

AckedSource(1 to 15). // Each element in source will be acknowledged after publish confirmation is received
  to(sink)
  .run

If you can see the pattern here, combining an akka-stream rabbitmq consumer and publisher allows for guaranteed at-least-once message delivery from head to tail; in other words, don't acknowledge the original message from the message queue until any and all side-effect events have been published to other queues and persisted.

Error notification

It's important to know when your consumers fail. Out of the box, op-rabbit ships with support for logging to logback (and therefore syslog), and also airbrake via op-rabbit-airbrake. Without any additional signal provided by you, logback will be used, making error visibility a default.

You can report errors to multiple sources by combining error logging strategies; for example, if you'd like to report to both logback and to airbrake, import / set the following implicit RabbitErrorLogging in the scope where your consumer is instantiated:

import com.spingo.op_rabbit.{LogbackLogger, AirbrakeLogger}

implicit val rabbitErrorLogging = LogbackLogger + AirbrakeLogger.fromConfig

Implementing your own error reporting strategy is simple; here's the source code for the LogbackLogger:

object LogbackLogger extends RabbitErrorLogging {
  def apply(name: String, message: String, exception: Throwable, consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]): Unit = {
    val logger = LoggerFactory.getLogger(name)
    logger.error(s"${message}. Body=${bodyAsString(body, properties)}. Envelope=${envelope}", exception)
  }
}

Notes

Shapeless dependency

Note, Op-Rabbit depends on shapeless 2.2.3; if you are using spray, then you'll need to use the version built for shapeless 2.1.0; shapeless 2.2.3 is noted to be binary compatible with 2.1.x in most cases.

Credits

Op-Rabbit was created by Tim Harper

This library builds upon the excellent Akka RabbitMQ client by Yaroslav Klymko.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.