Code Monkey home page Code Monkey logo

amqp-client's Introduction

Simple Scala AMQP client

Simple AMQP client in Scala/Akka based on the RabbitMQ java client.

Build Status

Overview

This client provides a simple API for

  • publishing and consuming messages over AMQP
  • setting up RPC clients and servers
  • automatic reconnection

It is based on the Akka 2.0 framework.

Limitations and compatibility issues

  • This client is compatible with AMQP 0.9.1, not AMQP 1.0.
  • This client is most probably not easily usable from Java

"Production" status

This very simple library is being used in production in a few projects now, either directly or through the Akka AMQP Proxies pattern, and so far so good.... So it kind of works and will be maintained for some time :-)

Configuring maven/sbt

  • releases and milestones are pushed to maven central
  • snapshots are pushed to the sonatype snapshot repository
 <repositories>
    <repository>
        <id>sonatype snapshots</id>
        <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
    </repository>
</repositories>

<dependencies>
  <dependency>
    <groupId>com.github.sstone</groupId>
    <artifactId>amqp-client_SCALA-VERSION</artifactId>
    <version>1.5</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor</artifactId> <!-- for Akka 2.0.X -->
    <artifactId>akka-actor_SCALA-VERSION</artifactId> <!-- from Akka 2.1.X on -->
    <version>AKKA-VERSION</version>
  </dependency>
</dependencies>

Please note that the Akka dependency is now in the "provided" scope which means that you'll have to define it explicitly in your maven/sbt projects.

The latest snapshot (development) version is 1.6-SNAPSHOT, the latest released version is 1.5

  • amqp-client 1.0 is compatible with Scala 2.9.2 and Akka 2.0.3
  • amqp-client 1.1 is compatible with Scala 2.9.2 and Akka 2.0.5
  • amqp-client 1.1 is compatible with Scala 2.10.0 and Akka 2.1.0
  • amqp-client 1.2 is compatible with Scala 2.10 and Akka 2.1
  • amqp-client 1.3 is compatible with Scala 2.10 and Akka 2.2
  • amqp-client 1.4 is compatible with Scala 2.10, Scala 2.11 and Akka 2.3.2
  • amqp-client 1.5 is compatible with Scala 2.10, Scala 2.11 and Akka 2.3.11
  • amqp-client 1.6-SNAPSHOT is compatible with Scala 2.10, Scala 2.11 and Akka 2.4.3

Library design

This is a thin wrapper over the RabbitMQ java client, which tries to take advantage of the nice actor model provided by the Akka library. There is no effort to "hide/encapsulate" the RabbitMQ library (and I don't really see the point anyway since AMQP is a binary protocol spec, not an API spec). So to make the most of this library you should first check the documentation for the RabbitMQ client, and learn a bit about AMQP 0.9.1. There are very nice tutorial on the RabbitMQ website, and also there, and probably many other...

Connection and channel management

  • AMQP connections are equivalent to "physical" connections. They are managed by ConnectionOwner objects. Each ConnectionOwner object manages a single connection and will try and reconnect when the connection is lost.
  • AMQP channels are multiplexed over AMQP connections. You use channels to publish and consume messages. Channels are managed by ChannelOwner objects.

ConnectionOwner and ChannelOwner are implemened as Akka actors:

  • channel owners are created by connection owners
  • when a connection is lost, the connection owner will create a new connection and provide each of its children with a new channel
  • connection owners and channel owners are implemented as Finite State Machines, with 2 possible states: Connected and Disconnected
  • For a connection owner, "connected" means that it owns a valid connection to the AMQP broker
  • For a channel owner, "connected" means that it owns a valid AMQP channel

YMMV, but using few connections (one per JVM) and many channels per connection (one per thread) is a common practice.

Wrapping the RabbitMQ client

As explained above, this is an actor-based wrapper around the RabbitMQ client, with 2 main classes: ConnectionOwner and ChannelOwner. Instead of calling the RabbitMQ Channel interface, you send a message to a ChannelOwner actor, which replies with whatever the java client returned wrapped in an Amqp.Ok() message if the call was successful, or an Amqp.Error if it failed.

For example, to declare a queue you could write:

  val connFactory = new ConnectionFactory()
  connFactory.setUri("amqp://guest:guest@localhost/%2F")
  val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))
  val channel = ConnectionOwner.createChildActor(conn, ChannelOwner.props())

  channel ! DeclareQueue(QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true))

Or, if you want to check the number of messages in a queue:

  val connFactory = new ConnectionFactory()
  connFactory.setUri("amqp://guest:guest@localhost/%2F")
  val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))
  val channel = ConnectionOwner.createChildActor(conn, ChannelOwner.props())

  val Amqp.Ok(_, Some(result: Queue.DeclareOk)) = Await.result(
    (channel ? DeclareQueue(QueueParameters(name = "my_queue", passive = true))).mapTo[Amqp.Ok],
    5 seconds
  )
  println("there are %d messages in the queue named %s".format(result.getMessageCount, result.getQueue))

Initialization and failure handling

If the connection to the broker is lost, ConnectionOwner actors will try and reconnect, and once they are connected again they will send a new AMQP channel to each of their ChannelOwner children.

Likewise, if the channel owned by a ChannelOwner is shut down because of an error it will request a new one from its parent.

In this case you might want to "replay" some of the messages that were sent to the ChannelOwner actor before it lost its channel, like queue declarations and bindings.

For this, you have 2 options:

  • initialize the ChannelOwner with a list of requests
  • wrap requests inside a Record message

Here, queues and bindings will be gone if the connection is lost and restored:

  val connFactory = new ConnectionFactory()
  connFactory.setUri("amqp://guest:guest@localhost/%2F")
  val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))

  // create an actor that will receive AMQP deliveries
  val listener = system.actorOf(Props(new Actor {
    def receive = {
      case Delivery(consumerTag, envelope, properties, body) => {
        println("got a message: " + new String(body))
        sender ! Ack(envelope.getDeliveryTag)
      }
    }
  }))

  // create a consumer that will route incoming AMQP messages to our listener
  // it starts with an empty list of queues to consume from
  val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener, channelParams = None, autoack = false))

  // wait till everyone is actually connected to the broker
  Amqp.waitForConnection(system, consumer).await()

  // create a queue, bind it to a routing key and consume from it
  // here we don't wrap our requests inside a Record message, so they won't replayed when if the connection to
  // the broker is lost: queue and binding will be gone

  // create a queue
  val queueParams = QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true)
  consumer ! DeclareQueue(queueParams)
  // bind it
  consumer ! QueueBind(queue = "my_queue", exchange = "amq.direct", routing_key = "my_key")
  // tell our consumer to consume from it
  consumer ! AddQueue(QueueParameters(name = "my_queue", passive = false))

We can initialize our consumer with a list of messages that will be replayed each time its receives a new channel:

 val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(
    listener = Some(listener),
    init = List(AddBinding(Binding(StandardExchanges.amqDirect, queueParams, "my_key")))
  ), name = Some("consumer"))

Or can can wrap our initialization messages with Record to make sure they will be replayed each time its receives a new channel:

  consumer ! Record(AddBinding(Binding(StandardExchanges.amqDirect, QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true), "my_key")))

If you have a reason to add a heartbeat (for instance, to keep your load balancer from dropping the connection), you can easily do so:

  val connFactory = new ConnectionFactory()
  connFactory.setRequestedHeartbeat(5) // seconds

RPC patterns

Typical RPC with AMQP follows this pattern:

  1. client sets up a private, exclusive response queue
  2. client sends message and set their 'replyTo' property to the name of this response queue
  3. server processes the message and replies to its 'replyTo' queue by publishing the response to the default exchange using the queue name as routing key (all queues are bound to their name on the default exchange)

Distributed Worker Pattern

This is one of the simplest but most useful pattern: using a shared queue to distributed work among consumers. The broker will load-balance messages between these consumers using round-robin distribution, which can be combined with 'prefetch' channel settings. Setting 'prefetch' to 1 is very useful if you need resource-based (CPU, ...) load-balancing. You will typically use explicit acknowledgments and ack messages once they have been processed and the response has been sent. This way, if your consumer fails to process the request or is disconnected, the broker will re-send the same request to another consumer.

  // typical "work queue" pattern, where a job can be picked up by any running node
  implicit val system = ActorSystem("mySystem")

  // create an AMQP connection
  val connFactory = new ConnectionFactory()
  connFactory.setUri("amqp://guest:guest@localhost/%2F")
  val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))

  val queueParams = QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true)

  // create 2 equivalent servers
  val rpcServers = for (i <- 1 to 2) yield {
    // create a "processor"
    // in real life you would use a serialization framework (json, protobuf, ....), define command messages, etc...
    // check the Akka AMQP proxies project for examples
    val processor = new IProcessor {
      def process(delivery: Delivery) = {
        // assume that the message body is a string
        val response = "response to " + new String(delivery.body)
        Future(ProcessResult(Some(response.getBytes)))
      }
      def onFailure(delivery: Delivery, e: Throwable) = ProcessResult(None) // we don't return anything
    }
    ConnectionOwner.createChildActor(conn, RpcServer.props(queueParams, StandardExchanges.amqDirect,  "my_key", processor, ChannelParameters(qos = 1)))
  }

  val rpcClient = ConnectionOwner.createChildActor(conn, RpcClient.props())

  // wait till everyone is actually connected to the broker
  Amqp.waitForConnection(system, rpcServers: _*).await()
  Amqp.waitForConnection(system, rpcClient).await()

  implicit val timeout: Timeout = 2 seconds

  for (i <- 0 to 5) {
    val request = ("request " + i).getBytes
    val f = (rpcClient ? Request(List(Publish("amq.direct", "my_key", request)))).mapTo[RpcClient.Response]
    f.onComplete {
      case Success(response) => println(new String(response.deliveries.head.body))
      case Failure(error) => println(error)
    }
  }
  // wait 10 seconds and shut down
  Thread.sleep(10000)
  system.shutdown()

One request/several responses

If your process is "sharded" and one request should result in several responses (one per shard for example) you can use private exclusive queues which are all bound to the same key. In this case, each server will receive the same request and will send back a response.

This is very useful if you want to break a single operation into multiple, parallel steps.

  // one request/several responses pattern
  implicit val system = ActorSystem("mySystem")

  // create an AMQP connection
  val connFactory = new ConnectionFactory()
  connFactory.setUri("amqp://guest:guest@localhost/%2F")
  val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second))

  // typical "reply queue"; the name if left empty: the broker will generate a new random name
  val privateReplyQueue = QueueParameters("", passive = false, durable = false, exclusive = true, autodelete = true)

  // we have a problem that can be "sharded", we create one server per shard, and for each request we expect one
  // response from each shard

  // create one server per shard
  val rpcServers = for (i <- 0 to 2) yield {
    // create a "processor"
    // in real life you would use a serialization framework (json, protobuf, ....), define command messages, etc...
    // check the Akka AMQP proxies project for examples
    val processor = new IProcessor {
      def process(delivery: Delivery) = {
        // assume that the message body is a string
        val response = "response to " + new String(delivery.body) + " from shard " + i
        Future(ProcessResult(Some(response.getBytes)))
      }
      def onFailure(delivery: Delivery, e: Throwable) = ProcessResult(None) // we don't return anything
    }
    ConnectionOwner.createChildActor(conn, RpcServer.props(privateReplyQueue, StandardExchanges.amqDirect,  "my_key", processor, ChannelParameters(qos = 1)))
  }

  val rpcClient = ConnectionOwner.createChildActor(conn, RpcClient.props())

  // wait till everyone is actually connected to the broker
  Amqp.waitForConnection(system, rpcServers: _*).await()
  Amqp.waitForConnection(system, rpcClient).await()

  implicit val timeout: Timeout = 2 seconds

  for (i <- 0 to 5) {
    val request = ("request " + i).getBytes
    val f = (rpcClient ? Request(List(Publish("amq.direct", "my_key", request)), 3)).mapTo[RpcClient.Response]
    f.onComplete {
      case Success(response) => {
        response.deliveries.foreach(delivery => println(new String(delivery.body)))
      }
      case Failure(error) => println(error)
    }
  }
  // wait 10 seconds and shut down
  Thread.sleep(10000)
  system.shutdown()

Workflow Pattern

This could be further extended with a simple 'workflow' pattern where each server publishes its results to the shared queue used by the next step. For example, if you want to chain steps A, B and C, set up a shared queue for each step, have 'A' processors publish to queue 'B', 'B' processors publish to queue 'C' ....

Samples

You can check either samples src/main/scala/com/github/sstone/amqp/samples or spec tests src/test/scala/com/github/sstone/amqp for examples of how to use the library.

amqp-client's People

Contributors

adam-zacharski avatar baton-rw avatar cornelf avatar dvorobiov avatar entropydown avatar janm avatar melan avatar papaver avatar slorion avatar sstone avatar wulftone avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqp-client's Issues

Jar contains samples

I just noticed that the .jar amqp-client_2.11-1.5.jar contains a package "samples" with examples.
This should not be included in the Jar - I would suggest moving that folder (either to "src/test" so that it is automatically compiled or to a top-level samples-folder).

Error creating consumer using 1.1, 1.2-SNAPSHOT and Akka 2.2

I'm migrating my project to Akka 2.2 and whenever I try to create a consumer from a RabbitMQ connection, I get this error :

Exception in thread "main" java.lang.NoSuchMethodError: akka.pattern.AskSupport.ask(Lakka/actor/ActorRef;)Lakka/pattern/AskSupport$AskableActorRef;
at com.github.sstone.amqp.RabbitMQConnection.createChild(ConnectionOwner.scala:103)
at com.github.sstone.amqp.RabbitMQConnection.createConsumer(ConnectionOwner.scala:114)
.....

shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[NormalizationActorSystem]
java.lang.NoSuchMethodError: akka.actor.ActorContext.children()Lscala/collection/Iterable;
at com.github.sstone.amqp.ConnectionOwner$$anonfun$4.applyOrElse(ConnectionOwner.scala:227)
at com.github.sstone.amqp.ConnectionOwner$$anonfun$4.applyOrElse(ConnectionOwner.scala:196)

It seems that the ActorRef is not supporting the ask method for retrieving futures.
This is strange because I can't find anything regarding a change in the way Akka handles futures in 2.2.

Thanks.

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error

Folks,

we use this amqp-client, and the following error is filling up our logs.

Question - Does this have a potential for memory leak ?

_DD

ERROR com.github.sstone.amqp.Consumer - null
java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.exchangeDeclarePassive(ChannelN.java:693) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.exchangeDeclarePassive(ChannelN.java:61) ~[amqp-client-3.0.1.jar:na]
at com.github.sstone.amqp.Amqp$.declareExchange(Amqp.scala:55) ~[amqp-client_2.10-1.1.jar:na]
at com.github.sstone.amqp.Consumer.com$github$sstone$amqp$Consumer$$setupBinding(Consumer.scala:32) ~[amqp-client_2.10-1.1.jar:na]
at com.github.sstone.amqp.Consumer$$anonfun$onChannel$1.apply(Consumer.scala:44) ~[amqp-client_2.10-1.1.jar:na]
at com.github.sstone.amqp.Consumer$$anonfun$onChannel$1.apply(Consumer.scala:44) ~[amqp-client_2.10-1.1.jar:na]
at scala.collection.immutable.List.foreach(List.scala:309) ~[scala-library.jar:na]
at com.github.sstone.amqp.Consumer.onChannel(Consumer.scala:44) ~[amqp-client_2.10-1.1.jar:na]
at com.github.sstone.amqp.ChannelOwner.setup(ChannelOwner.scala:81) ~[amqp-client_2.10-1.1.jar:na]
at com.github.sstone.amqp.ChannelOwner$$anonfun$3.applyOrElse(ChannelOwner.scala:86) ~[amqp-client_2.10-1.1.jar:na]
at com.github.sstone.amqp.ChannelOwner$$anonfun$3.applyOrElse(ChannelOwner.scala:84) ~[amqp-client_2.10-1.1.jar:na]
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) ~[scala-library.jar:na]
at akka.actor.FSM$class.processEvent(FSM.scala:576) ~[akka-actor_2.10-2.1.4.jar:na]
at com.github.sstone.amqp.ChannelOwner.processEvent(ChannelOwner.scala:41) ~[amqp-client_2.10-1.1.jar:na]
at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:570) ~[akka-actor_2.10-2.1.4.jar:na]
at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:564) ~[akka-actor_2.10-2.1.4.jar:na]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:425) [akka-actor_2.10-2.1.4.jar:na]
at akka.actor.ActorCell.invoke(ActorCell.scala:386) [akka-actor_2.10-2.1.4.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:230) [akka-actor_2.10-2.1.4.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:212) [akka-actor_2.10-2.1.4.jar:na]
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:506) [akka-actor_2.10-2.1.4.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262) [scala-library.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975) [scala-library.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478) [scala-library.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) [scala-library.jar:na]
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myexchange' in vhost '/', class-id=40, method-id=10), null, ""}
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[amqp-client-3.0.1.jar:na]
... 25 common frames omitted
com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myexchange' in vhost '/', class-id=40, method-id=10), null, ""}
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:473) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:313) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.0.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533) ~[amqp-client-3.0.1.jar:na]

Requests dropped during re-connect

Hi,

When a ChannelOwner is disconnected, AMQP Requests are unhandled by the Actor, effective getting dropped. This can happen on high message throughput when a connection is dropped and re-connected.

I've put together a fix using a Stash in this commit: ljcoomber@e801efb

If you're happy with the approach, I'll write some tests and submit a PR next week.

Cheers,

Lee

Update to RabbitMQ Java Client 3.3.0

More of a request for enhancement than an issue - but, is there a plan to move the milestone release to the latest RabbitMQ Java client - 3.3.0?
It seems a bunch of security fixes have gone in the for RabbitMQ from 3.2.1 to 3.3.0 - it would be nice if the amqp-client is updated with the java client aswell to take advantages of those fixes.

wrong sha1 for amqp-client_2.11-1.5.pom on maven central repo

Here's a snippet from a sbt project that has this package as a dep:

[info] Resolving com.github.sstone#amqp-client_2.11;1.5 ... [warn] problem while downloading module descriptor: https://repo1.maven.org/maven2/com/github/sstone/amqp-client_2.11/1.5/amqp-client_2.11-1.5.pom: invalid sha1: expected=42e068568008f5994a4d5ef3f425470be7c0fe35 computed=5b68b5b75889e94c2c7892fcecfa7f5010d27dcc (460ms)

Keepalive Support

Hi guys,

I'm using amqp-client in quite a large Amazon deployment, one of the things i see most regularly in my logs is big ugly connection lost messages where the amazon load balancers have killed off my connection to RabbitMQ (my MQ of choice). The connections get reestablished of course but it's probably not good for overall efficiency (or disk space on my logging platform :p).

The Pika for python libraries support a 'keepalive' parameter which stops this happening when i use them to listen, do you have any plans to support keepalive in amqp-client?

Cheers,

Henri

Alternate constructor for RabbitMQConnection

I'm planning on using the RabbitMQ Bigwig add-on for Heroku. Bigwig uses the amqp URI scheme for its configuration parameters, but there's no ctor for RabbitMQConnection that takes an AMQP URI.

I worked around this by using the underlying Java client's ConnectionFactory.setUri method and then calling the RabbitMQConnection ctor with the values from the factory, but it'd be nice to avoid this.

Consumer Cancellation Notifications

SStone AMQP Library needs to recognize and handle Consumer Cancellation Notifications as mentioned in

http://www.rabbitmq.com/ha.html
http://www.rabbitmq.com/consumer-cancel.html

Without this, it is hard to use this library in clustered production scenarios.

Quoting from ha.html:

Clients that were consuming from the mirrored-queue and support our Consumer Cancellation Notifications extension will receive a notification that their subscription to the mirrored-queue has been abruptly cancelled. At this point they should re-consume from the queue, which will pick up the new master. The reason for sending this notification is that informing clients of the loss of the master is essential: otherwise the client may continue to issue acknowledgements for messages they were sent by the old, failed master, and not expect that they might be about to see the same messages again, this time sent by the new master. Of course, clients that were connected to the failed node will find their connections failed, and will need to reconnect to a surviving node of the cluster.

Spring AMQP had a similar bug that was fixed:
https://jira.spring.io/browse/AMQP-209

Actor doesn't get Garbage Collected

I've got a publisher and multiple consumer objects (singletons) that create the actor system and instantiate the actor per your examples. However, I'm finding that as messages go through the system, there is no garbage collection done on the actor. Therefore memory eventually fills up and it comes to a screeching halt. What can I do to enable GC of vals that are out of scope? I want everything inside the receive function to be GC'd.

Here is my publisher:

object Publisher  {
  implicit val system = ActorSystem(LibContext.akkaConfig.system)
  val connFactory = new ConnectionFactory()
  connFactory.setUri(LibContext.rabbitConfig.uri)
  val conn = system.actorOf(ConnectionOwner.props(connFactory, 5 second))

  private val channel = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
 Amqp.waitForConnection(system, channel).await()

 class Producer extends Actor {
val logger = LoggerFactory.getLogger(this.getClass)
logger.info("Starting publisher")

channel ! ConfirmSelect
channel ! AddReturnListener(self)
channel ! AddConfirmListener(self)
def receive = {
  case message: QueueableMessage => {
    logger.info("publishing message from exchange:" + message.exchangeType)
    val props = new AMQP.BasicProperties("application/json", "utf-8", null, 2, 1, "", "", null, message.messageType, new DateTime(message.createdOn).toDate, message.messageType, LibContext.rabbitConfig.user, "", "")

    val mapper = new ObjectMapper
    mapper.registerModule(DefaultScalaModule)

    // Declare the exchange if it's specified in the message
    if (message.exchangeType != None) {
      channel ! DeclareExchange(ExchangeParameters(message.exchangeType.get.toString, false, QueueUtils.getExchangeType(message.exchangeType.get), durable = true))
      channel ! Publish(message.exchangeType.get.toString, message.messageType, mapper.writeValueAsBytes(message), Option(props))
    }
    else {
      channel ! DeclareQueue(QueueParameters(name = message.messageType, passive = false, durable = true, exclusive = false, autodelete = false))
      channel ! Publish("", message.messageType, mapper.writeValueAsBytes(message), Option(props))
    }
    channel ! WaitForConfirms(None)
  }

  }
}
val producer = system.actorOf(Props[Producer], "Producer")

def publishMessage(message: QueueableMessage){
  producer ! message
}
}

Erroneous Success for Publish

Hello,
I am calling Publish(...), but Publish(...) does not return Ampq.Error as expected when the RabbitMQ exchange does not exist. The expectation I have is Publish(...) returns Ampq.Error if something is wrong with the channel. One way to avoid this is to create the exchange, but I am more concerned that Publish may be returning Ampq.Ok erroneously in other situations. Any suggestions?

Code:
val publisherFuture = publisher ? Publish(RabbitMQConfig.Exchange,
routingKey,
json.getBytes,
Some(properties),
mandatory = true)

val publisherResult = Await.result(publisherFuture, timeout.duration)
logger.info("Result: " + publisherResult)

Log:
01:03:33.491 [CreateAssetsActorSpec-akka.actor.default-dispatcher-7] DEBUG com.github.sstone.amqp.ChannelOwner - publishing Publish(dsa.exchange,private.asset.create,[B@285ab390,Some(#contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=61d52810-1b36-11e3-a15e-0800276c71c2, reply-to=null, expiration=null, message-id=61d52810-1b36-11e3-a15e-0800276c71c2, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)),true,false)
01:03:33.501 [pool-60-thread-2] INFO c.g.d.m.s.c.v1.CreateAssetsActor - Result: Ok(Publish(dsa.exchange,private.asset.create,[B@285ab390,Some(#contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=61d52810-1b36-11e3-a15e-0800276c71c2, reply-to=null, expiration=null, message-id=61d52810-1b36-11e3-a15e-0800276c71c2, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)),true,false),None)
01:03:33.522 [ForkJoinPool-3-worker-1] INFO c.g.d.m.s.c.v1.CreateAssetsActor - Asset metadata for masterId(3703) successfully inserted into MessageBroker - Actor[akka://CreateAssetsActorSpec/deadLetters] -- Actor[akka://CreateAssetsActorSpec/system/testActor1#1754928134]
01:03:33.539 [CreateAssetsActorSpec-akka.actor.default-dispatcher-3] ERROR com.github.sstone.amqp.ChannelOwner - channel was shut down
com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dsa.exchange' in vhost 'dsa', class-id=60, method-id=40), null, ""}
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:474) ~[amqp-client-3.1.3.jar:na]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[amqp-client-3.1.3.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.1.3.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.1.3.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533) ~[amqp-client-3.1.3.jar:na]
01:03:33.539 [CreateAssetsActorSpec-akka.actor.default-dispatcher-3] WARN com.github.sstone.amqp.ChannelOwner - disconnected
01:03:33.539 [CreateAssetsActorSpec-akka.actor.default-dispatcher-3] INFO akka.actor.RepointableActorRef - Message [akka.actor.FSM$Transition] from Actor[akka://CreateAssetsActorSpec/user/management-connection/$a#1723353950] to Actor[akka://CreateAssetsActorSpec/user/$a#937797823] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
01:03:33.540 [CreateAssetsActorSpec-akka.actor.default-dispatcher-3] INFO com.github.sstone.amqp.ChannelOwner - connected

ChannelOwner: bad wrapping of Channel methods that do not return anything

In the rabbitmq java client, some Channl methods return something meaningful (QueueDeclare ,...), some are just void methods, and all throw IOExceptions. But they are all handled as if they returned something, so users sometimes get cryptic 'scala.runtime.BoxedUnit' messages and are not happy
=> Add some kind of Ok(request) response for these cases

Custom ExecutionContext for RpcServer

Hello,

I would like to use a custom ExecutionContext (delegating the execution to the global one after having done some preparations). If I understand your code correctly, it is not possible to do that currently because ExecutionContext.Implicits.global is used directly, but an additional constructor argument would allow that.

Please let me know if I guessed correctly and if so, I will create a pull request that will add the argument with ExecutionContext.Implicits.global as default value.

As a side note, what would be the best way to store data that is to be saved/restored with the ExecutionContext (e.g. a shared transaction context) ? I try to find how to do that, but to no avail.

Thank you!

Waiting for all messages to be sent

Recently I had a task to send more than a million messages to RabbitMQ (exporting processed data from the one system to another).

Imagine a very basic flow: read the message from S3, send it to rabbit, repeat.

I had no problems with the export implementation using ampq-client, but noticed, that publishing goes quite fast, but I need to wait for Akka system to actually send all messages to rabbit - if I call system.shutdown prematurely it will interrupt the sending process, and just 10% of messages will actually go to rabbit.

I guess I need to listen for some event from Akka, but because I had to finish things quickly I just moved to Java client for synchronous sending. If you know how to get a notification that all the messages were processed, I suggest to add it to the examples.

ConnectionOwner.scala:179: type mismatch

amqp-client-scala2.10/src/main/scala/com/github/sstone/amqp/ConnectionOwner.scala:179: type mismatch;
[error] found : Unit
[error] required: ConnectionOwner.this.State
[error](which expands to) akka.actor.FSM.State[com.github.sstone.amqp.ConnectionOwner.State,com.github.sstone.amqp.ConnectionOwner.Data]
[error] setTimer("reconnect", 'connect, reconnectionDelay, true)
[error] ^
[error] one error found
error Compilation failed

Getting future timeout exception which creating a consumer/producer

I am consistently getting the following timeout exception with ~100 producer and equal number consumers (same VM, different queues on a rmq cluster, same connection ActorRef). Is this a bug or this is expected? If yes, how can I increase the timeout?

@400000005640a96d37c7c70c 2015-11-09 19:40:43,935 - [ERROR] - akka.actor.OneForOneStrategy - Futures timed out after [5000 milliseconds]
@400000005640a96d37c7caf4 akka.actor.ActorInitializationException: exception during creation
@400000005640a96d37c7cedc at akka.actor.ActorInitializationException$.apply(Actor.scala:164) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c7cedc at akka.actor.ActorCell.create(ActorCell.scala:596) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c7d2c4 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c82c9c at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c83084 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c8346c at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c83854 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c84bdc at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c84fc4 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c86734 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c86b1c at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c86f04 Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
@400000005640a96d37c87ea4 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c8828c at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c88674 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c89614 at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c899fc at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) [dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c8ad84 at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c8b16c at scala.concurrent.Await$.result(package.scala:190) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c8b554 at com.github.sstone.amqp.ConnectionOwner$.createChildActor(ConnectionOwner.scala:32) ~[dispatcher-service.jar:1.8-SNAPSHOT]
@400000005640a96d37c8c10c at com.olacabs.dispatcher.rmq.RabbitMQConnectionHelper$.createProducer(RabbitMQConnectionHelper.

Getting future timeout exception which creating a consumer/producer

Hi,

We have one ConnectionOwner. On top of that we are creating 7-8 queues. For each queue we have one producer and one consumer (on the same ConnectionOwner).

This is how we create the connection owner.

val connectionFactory = new ConnectionFactory()
connectionFactory.setAutomaticRecoveryEnabled(true)
connectionFactory.setTopologyRecoveryEnabled(true)
connectionFactory.setVirtualHost(config.vhost)
connectionFactory.setNetworkRecoveryInterval(5)
connectionFactory.setPassword(config.pass)
connectionFactory.setUsername(config.user)
connectionFactory.setSharedExecutor(Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors() * 4))
val addresses = config.hosts.map( h => {
val tokens = h.split(":")
new Address(tokens(0), tokens(1).toInt)
}).toArray
connection = system.actorOf(ConnectionOwner.props(connectionFactory, 1 second, addresses = Option(addresses)))

The trace looks like this -

java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) ~[dispatcher-service.jar:1.14-SNAPSHOT]
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) ~[dispatcher-service.jar:1.14-SNAPSHOT]
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) ~[dispatcher-service.jar:1.14-SNAPSHOT]
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) ~[dispatcher-service.jar:1.14-SNAPSHOT]
at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) [dispatcher-service.jar:1.14-SNAPSHOT]
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) ~[dispatcher-service.jar:1.14-SNAPSHOT]
at scala.concurrent.Await$.result(package.scala:190) ~[dispatcher-service.jar:1.14-SNAPSHOT]
at com.github.sstone.amqp.ConnectionOwner$.createChildActor(ConnectionOwner.scala:32) ~[dispatcher-service.jar:1.14-SNAPSHOT]
at com.olacabs.dispatcher.rmq.RabbitMQConnectionHelper$.createProducer(RabbitMQConnectionHelper.scala:115) ~[dispatcher-service.jar:1.14-SNAPSHOT]

Feature request: possibility to create exclusive consumer

We would like to use consumer in exclusive mode and although com.rabbitmq.client.Channel.basicConsume() allows it, would it be possible to add relevant support to Consumer actor (by adding relevant argument or even ConsumerParameters, where to bundle (autoAck, noLocal, exclusive) arguments?

Or is there any reason why it wouldn't be good idea?

Thanks!

Publisher disconnects from queue when AskTimeoutException encountered

Hi Fabrice,

Firstly, thanks for building such a useful library. We use it in a service that publishes email requests onto a queue. Normally, we don't face any issues but sometimes we face the following exception:

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://actor-ystem/user/$3j/$a#-1930803633]] after [5000 ms]

After we encounter this exception a service restart is required to reconnect to the queue. We'd like if we could just reconnect without having to restart the service.

A simplified example is shown below:


val publisher = createPublisher()

def sendEmail(emailRequest: EmailRequest)(implicit ec: ExecutionContext): Future[Amqp.Publish] = {
     val props = // Build props from EmailRequest
     val queue = getQueue()
     def publish(_publisher: ActorRef) = _publisher ? Publish("", queue, Array[Byte](), Option(props))
     val future = publish(publisher) map {
        case Ok(request, _) => request
        case Error(request, errorVal) => logging
        case s => throw new RuntimeException(s"$s")
     }
}

def createPublisher()(implicit ec: ExecutionContext) = {
        val publisher = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
        Amqp.waitForConnection(system, publisher).await()
        info(s"Publisher connected on server $rabbitUri")
        DeclareQueue(QueueParameters("myQueue", passive = false, durable = true, exclusive = false, autodelete = false, args = queueArgs))
        // must wait for results so we don't send to queues before they're declared
        Await.result(delayQueueFutures, Timeout(5 seconds).duration)
        publisher
    }

The sendMail function is what encounters the disconnect, how would you recommend recovering and reconnecting? I considered recreating the publisher but am not sure of whether that's a good idea.

Channel actor should be created with routerconfig to enable round robin publishing

My system is being bottlenecked by the time it takes to publish messages. I'm putting millions of messages through of varying sizes and they are being stalled while waiting for the channel actor to publish previous messages. I would like the producer actor to have access to multiple channel actors to prevent this.

The actor system supports creation of actors "withRouter" which allows multiple children to be created and balances the load between them - like this: context.actorOf(props.withRouter(FromConfig))

The config would look something like this:

akka{
actor {
    deployment {
        /channel {
            router = "round-robin"
            nr-of-instances = 10
        }
    }
 }
}

I forked your repo and am trying to set this up inline like this: ConnectionOwner.createChildActor(conn, ChannelOwner.props().withRouter(RoundRobinRouter(nrOfInstances = 10)), name = Some("someChanel"))

But I'm running into a problem. The 'connect message is being sent from the 10 channel actors but no connection is ever made.

I noticed you have a test that creates multiple channel actors explicitly instead of using "withRouter" so you may have already found a way around this problem. If so, how do I hook my Producer up to the multiple channel actors? If not, can you help me figure out a way to get the round-robin effect working?

Here is my repo with the code changes I've made and a new test in ChannelOwnerSpec that fails to connect. ahaid@4e1afd4 Can you help?

you cannot consume messages from a queue without binding it

To consume messages from a queue you create a Consumer with a list of (exchange/queue/routing key) bindings. Since it is not possible to redeclare bindings to the default exchange it means that there is currently no way to just consume from a queue that already exists without creating a new binding.
This has been overlooked because on most projects that I know of amqp consumers actively create queues and bindings. But it is still a valid use case.

=> If you need to consume messages from an existing queue, the workaround is to create a new binding for the consumer (using the queue name as routing key on amq.direct for example)

The W.IP on branch wip-messages should take care of this.

Support for RabbitMQ 3.x

I'm seeing the following error from my test application using amqp-client 1.1 with RabbitMQ 3.x:

com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40), null, ""}

Some googling about revealed this document, which mentions that support for the 'immediate' flag has been removed in 3.x.

Downgrading to RabbitMQ 2.8.x resolves the problem.

header value when longer than 1024 bytes converted to java.io.DataInputStream

Here is a sample usage -

When we read the messageHeaders by using the properties.getHeaders of Delivery class

val messageHeaders: collection.mutable.Map[String, AnyRef] = d.properties.getHeaders.asScala

( Where d == Delivery ( in package com.github.sstone.amqp ) )

When we get the class information - we always see the ClassName as com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString

But actually it is java.io.DataInputStream when we do a toString on this Value (We can see this).

To String == java.io.DataInputStream@7563a320

This is causing lots of issues as ByteArrayLongString is a private Inner class.

http://grepcode.com/file/repo1.maven.org/maven2/com.rabbitmq/amqp-client/3.2.3/com/rabbitmq/client/impl/LongStringHelper.java?av=f

ByteArrayLongString is an private static class

So the asInstanceOf won't work on the private member.

Also the reason we see the output as java.io.DataInputStream is because of this -

https://github.com/spring-projects/spring-amqp/blob/master/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java#L149

where it converts any String longer than 1024 to java.io.DataInputStream

Thanks much
_D

Adding multiple Bindings to a Consumer results in multiple consumer instances

We use this great wrapper for AMQP client to consume multiple routingKeys on a single Exhange.
For that, we send AddBinding to the Consumer as documented:

consumer ! Record(AddBinding(Binding(StandardExchanges.amqDirect, QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true), "my_key_1")))
consumer ! Record(AddBinding(Binding(StandardExchanges.amqDirect, QueueParameters("my_queue", passive = false, durable = false, exclusive = false, autodelete = true), "my_key_2")))

In the AMQP web-interface I now see that for the example snippet above there are 2 "Consumers".

Maybe this is the intended behavior - but what seems strange then is when there are only messages for the routingKey "my_key_1" both Consumers seem to handle this message.

It would be cool if Amqp.Binding could take more than one routingKey.
What do you think? Would that be the "correct" approach?

Consumers are not redefined after channel reconnect, resulting in ack errors: reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag

We experienced an error scenario where a channel died while our application was connected, amqp-client created another channel but didn't redefine the consumers, so as the consumers continued to attempt to ack messages it seems amqp-client was acking them via the old broken consumer tags, so this error message occurred and messages remained in RabbitMQ in an unacked state. Unfortunately I don't have the logs available anymore to show.

Seemingly relevant discussion on the problem is here:

http://rabbitmq.1065348.n5.nabble.com/reply-code-406-reply-text-PRECONDITION-FAILED-unknown-delivery-tag-tp21310p21312.html

Paraphrasing from this webpage, perhaps the following could be the issue?

"- ack'ing on a channel other than the one the messages was received on (*)

...
(*) this is a common occurrence in applications with naive error
recovery logic that re-establishes connections&channels transparently
behind the scenes. "

I'm no RabbitMQ expert, so am interested in your thoughts. Also a potential solution might be to use channel.basicCancel(consumerTag)?

Some example code that might be of use to implement this is here (credit to momania/akka-amqp for this):

https://github.com/momania/akka-amqp/blob/master/src/main/scala/akka/amqp/DurableConsumer.scala#L90

How to cleanly close a channel owner?

My application needs ephemeral queues. I delete queues once done -

consumerRef ! DeleteQueue(inputQConfig.queue)

This is having two problems -

  • < 10% of times this is not deleting the queue.
  • The consumer (channel owner) actor is not killed. Hence the channel is open.
    I don't reuse the consumer actor for other queues.

What is the clean way of ensuring the consumer and channel is closed properly?

AddConfirmListener or AddReturnListener not working as expected on 1.3

I have the following setup.

val testProbe = TestProbe()
val config = PublishConfiguration("WRONG","NOPE",false,false)
producer ! AddConfirmListener(testProbe.ref)
producer ! AddReturnListener(testProbe.ref)
producter ! Publish(....)
testProbe.receiveN(1)

I then publish to a queue that does not exist. I assumed based off issue #24 that these listeners would capture the Error message. This does not appear to be the case as the test fails with a timeout.

I need a way to pass a "context" to a request so I can receive it in an Ok or Error

I would like to pass additional data with the Publish message so I can act on it when my sending actor receives an Ok or Error message. Conceptually this data is just an object that is not part of the publish message but is important to manage the asynchronous coordination of actors.

I've come up with a nasty way of doing this (https://github.com/mhamrah/amqp-client/commit/0a145b804851f50abdab28347e3240ad7783a5e2) but I believe there to be a more elegant solution which should be part of the request trait. I believe this context field should be optional and typed appropriately, but can't figure out how to make it unobtrusive with type parameters.

Any suggestions?

To apply a specific use case, I'm using amqp-client with the Spray library. I want to pass along the current Spray RequestContext with the publish message, so my sending actor can call complete() or error() on the request object when receiving an Ok or Error message from the amqp-client.

1.3-ML4 Queries

We need to upgrade to latest Akka release in our application, and thereby also upgrade the amqp-client library accordingly.
I see that there is a milestone release of the amqp-client 1.3-ML4 that is compatible with latest Akka release.

  1. Is there any plan of having another release in the near future?
  2. Can/Should the 1.3-ML4 milestone be used in the production?

Thanks,
Rohit

Update build.sbt to rabbitmq version 3.6.1

Hi,
I noticed the scala2.11 branch the pom.xml uses the latest version of rabbitmq. Can we also update the build.sbt or are there some reservations about doing so?
Thank you.
Bryce

Removing/cancelling a consumer

Hi,
This is more of a query (or feature request) in that I've a use case where I do want to stop consuming messages off a connection. I would think that equivalent to CreateConsumer, there should be a RemoveConsumer as well so that I can gracefully stop consuming messages off a particular queue. (The motivation behind this is for me to have a FSM or Actor become transition which on receipt of one message on a queue - q1, starts listening on another queue q2. And stops getting Delivery callbacks for q1.)

RabbitMQ Java client does seem to expose basicCancel(java.lang.String consumerTag), which does the same. I was wondering if similarly a newly created CancelConsumer case class could be handled by ChannelOwner, to remove an existing consumer.

Any thoughts?

Thanks,
Soumik

How to create more than one channels?

Hi,
can I call more than one times
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener, channelParams = None, autoack = false))
method to create more channels.

In my use case, I need more channels, but the same channel listener.

thanks in advance!
Yi

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.