Code Monkey home page Code Monkey logo

akka-rabbitmq's Introduction

Akka RabbitMQ client Build Status

This small library allows you use RabbitMQ client via Akka Actors. The main idea implemented in library is to survive losing connection with RabbitMQ server

It gives you two actors ConnectionActor and ChannelActor

ConnectionActor

  • handles connection failures and notifies children
  • keep trying to reconnect if connection lost
  • provides children with new channels when needed

ChannelActor

  • may store messages in memory if channel lost
  • send stored messages as soon as new channel received
  • retrieve new channel if current is broken

Please note that while this library transparently reconnects when a connection fails, it cannot guarantee that no messages will be lost. If you want to make sure every message is delivered, you have to use acknowledgements and confirms. This is documented in the RabbitMQ Reliability Guide. An example program using confirms can be found in this project under ConfirmsExample.scala.

Setup

Sbt

Since version 3.0.0:

libraryDependencies += "com.newmotion" %% "akka-rabbitmq" % "6.0.2"

Maven

Since version 6.0.0

<dependency>
    <groupId>com.newmotion</groupId>
    <artifactId>akka-rabbitmq_{2.12/2.13}</artifactId>
    <version>6.0.2</version>
</dependency>

Since version 4.0.0

<dependency>
    <groupId>com.newmotion</groupId>
    <artifactId>akka-rabbitmq_{2.12/2.13}</artifactId>
    <version>5.0.4-beta</version>
</dependency>

Since version 3.0.0

<dependency>
    <groupId>com.thenewmotion</groupId>
    <artifactId>akka-rabbitmq_{2.11/2.12}</artifactId>
    <version>3.0.0</version>
</dependency>

Tutorial in comparisons

Before start, you need to add import statement

    import com.newmotion.akka.rabbitmq._

Create connection

Default approach:

    val factory = new ConnectionFactory()
    val connection: Connection = factory.newConnection()

Actor style:

    val factory = new ConnectionFactory()
    val connectionActor: ActorRef = system.actorOf(ConnectionActor.props(factory))

Let's name it:

    system.actorOf(ConnectionActor.props(factory), "my-connection")

How often will it reconnect?

    import concurrent.duration._
    system.actorOf(ConnectionActor.props(factory, reconnectionDelay = 10.seconds), "my-connection")

Create channel

That's plain option:

    val channel: Channel = connection.createChannel()

But we can do better. Asynchronously:

    connectionActor ! CreateChannel(ChannelActor.props())

Synchronously:

    val channelActor: ActorRef = connectionActor.createChannel(ChannelActor.props())

Maybe give it a name:

    connectionActor.createChannel(ChannelActor.props(), Some("my-channel"))

What's about custom actor:

    connectionActor.createChannel(Props(new Actor {
      def receive = {
        case channel: Channel =>
      }
    }))

Setup channel

    channel.queueDeclare("queue_name", false, false, false, null)

Actor style:

    // this function will be called each time new channel received
    def setupChannel(channel: Channel, self: ActorRef) = {
      channel.queueDeclare("queue_name", false, false, false, null)
    }
    val channelActor: ActorRef = connectionActor.createChannel(ChannelActor.props(setupChannel))

Use channel

    channel.basicPublish("", "queue_name", null, "Hello world".getBytes)

Using our channelActor:

    def publish(channel: Channel) = {
      channel.basicPublish("", "queue_name", null, "Hello world".getBytes)
    }
    channelActor ! ChannelMessage(publish)

But I don't want to lose messages when connection is lost:

    channelActor ! ChannelMessage(publish, dropIfNoChannel = false)

Close channel

    channel.close()

VS

    system stop channelActor

Close connection

    connection.close()

VS

    system stop connectionActor // will close all channels associated with this connection

You can shutdown ActorSystem, this will close all connections as well as channels:

    system.shutdown()

Examples:

Publish/Subscribe

Here is RabbitMQ Publish/Subscribe in actors style

import akka.actor.ActorSystem
object PublishSubscribe extends App {
  implicit val system: ActorSystem = ActorSystem()
  val factory = new ConnectionFactory()
  val connection = system.actorOf(ConnectionActor.props(factory), "akka-rabbitmq")
  val exchange = "amq.fanout"

  def setupPublisher(channel: Channel, self: ActorRef) = {
    val queue = channel.queueDeclare().getQueue
    channel.queueBind(queue, exchange, "")
  }
  connection ! CreateChannel(ChannelActor.props(setupPublisher), Some("publisher"))

  def setupSubscriber(channel: Channel, self: ActorRef) = {
    val queue = channel.queueDeclare().getQueue
    channel.queueBind(queue, exchange, "")
    val consumer = new DefaultConsumer(channel) {
      override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]): Unit = {
        println("received: " + fromBytes(body))
      }
    }
    channel.basicConsume(queue, true, consumer)
  }
  connection ! CreateChannel(ChannelActor.props(setupSubscriber), Some("subscriber"))

  Future {
    def loop(n: Long) = {
      val publisher = system.actorSelection("/user/akka-rabbitmq/publisher")

      def publish(channel: Channel) = {
        channel.basicPublish(exchange, "", null, toBytes(n))
      }
      publisher ! ChannelMessage(publish, dropIfNoChannel = false)

      Thread.sleep(1000)
      loop(n + 1)
    }
    loop(0)
  }

  def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
  def toBytes(x: Long) = x.toString.getBytes("UTF-8")
}

Testing Note

Tests can be run against a RabbitMQ server on the local machine using a Docker container with the following command. The RabbitMQ console can be accessible also with http://localhost:8080 using the login and password of guest and guest.

  docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p:5672:5672 rabbitmq:3-management

Changelog

6.0.3-SNAPSHOT

Code Updates

  • Fully qualified the package directory structures as com.newmotion...

  • Merged PR #66 with Scala 3 compatibility changes

  • Cleaned up many warning messages that were found with IntelliJ 2021.2

  • Updated the Prop constructors

  • Updated deprecated setTimer() calls to startSingleTimer() in ConnectionActor.scala

  • Upgraded to Scala 2.13.6

  • Upgraded to SBT 1.5.5 and cleaned deprecated issues

  • Updated to latest dependencies:

    • ampq-client: 5.9.0 -> 5.13.1
    • com.typesafe .config: 1.4.0 -> 1.4.1
    • org.specs2.specs2-mock: 4.10.3 -> 4.13.0

6.0.0

  • Drop support of Scala 2.11

  • Update dependencies:

    • amqp-client: 5.7.3 -> 5.9.0
    • akka: 2.5.+ -> 2.6.+

5.1.2

  • Update to latest dependencies:

    • amqp-client: 5.7.1 -> 5.7.3
    • Typesafe Config: 1.3.4 -> 1.4.0
    • Specs2: 4.5.1 -> 4.8.1
    • SBT: 1.2.8 -> 1.3.4
    • sbt-build-seed: 5.0.1 -> 5.0.4
    • sbt-sonatype: 2.3 -> 3.8.1

5.0.4-beta

  • Fix: proper error handling of close channel and create channel

  • Fix: proper error handling of setup connection/channel callbacks

  • Fix: if callback exception is uncaught, close connection/channel

  • Fix: take into account blocking nature of new connection/channel

  • Fix: close channel if the channel actor never got it (deadletter)

  • Fix: channel actor shouldn't ask for channel after a connection shutdown

  • If unexpectedly received a new channel, close it and use the old instead

  • Log warning when a message isn't retried any longer + more debug logging

  • Update to latest dependencies:

    • Akka: 2.5.8 -> 2.5.+ (provided)
    • amqp-client: 5.1.1 -> 5.4.2
    • Typesafe Config: 1.3.2 -> 1.3.3
    • Specs2: 4.0.2 -> 4.3.4
    • SBT: 1.0.3 -> 1.2.3
    • sbt-build-seed: 4.0.2 -> 4.1.2
    • sbt-sonatype: 2.0 -> 2.3

5.0.2

  • Supersedes version 5.0.1 which has been withdrawn to investigate some unforeseen issues

5.0.0

  • Update to latest dependencies:

    • Akka: 2.4.14 -> 2.5.8
    • amqp-client: 4.0.0 -> 5.1.1
    • Typesafe Config: 1.3.1 -> 1.3.2
    • Specs2: 3.8.6 -> 4.0.2
    • SBT: 0.13.13 -> 1.0.3
    • sbt-build-seed: 2.1.0 -> 4.0.2
    • sbt-scalariform: 1.3.0 -> 1.8.2
    • sbt-sonatype: 1.1 -> 2.0
    • sbt-pgp: 1.0.0 -> 1.1.0

4.0.0

  • Change organization from com.thenewmotion to com.newmotion

Other Libraries

Akka-RabbitMQ is a low-level library, and leaves it to the coder to manually wire consumers, serialize messages, etc. If you'd like a higher-level abstraction library, look at Op-Rabbit (which uses this library).

akka-rabbitmq's People

Contributors

t3hnar avatar gertjana avatar timcharper avatar plokhotnyuk avatar azolotko avatar dondebonair avatar kiequoo avatar fedragon avatar ianwdunlop avatar juanstiza avatar imanabu avatar markvandertol avatar mateuszjaje avatar nikitapecasa avatar pjfanning avatar romansky avatar sbmpost avatar vic avatar reinierl avatar sarveshseri avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.