spingo / op-rabbit Goto Github PK
View Code? Open in Web Editor NEWThe Opinionated RabbitMQ Library for Scala and Akka
License: Other
The Opinionated RabbitMQ Library for Scala and Akka
License: Other
Currently, binding to a queue (or exchange) that doesn't exist will create it. In some cases, though, it is preferable to fail if it doesn't exist. This can be accomplished by using queueDeclarePassive instead of queueDeclare.
(As a side note, I figured I'd try opening issues here instead of communicating through the user group topic. Which do you prefer?)
My application.conf is the current working directory, in the same place where the Jar is placed. If I want to add the configuration manually with the ConfigFactor like below. The rabbitmq.topic-exchange-name will not be used, instead, it is set as a lazy val in RabbitControl.scala:18. See source code below:
val config = ConfigFactory.parseFile(new File("application.conf")).getConfig("rabbitmq")
val rabbitMq = actorSystem.actorOf(Props(new RabbitControl(ConnectionParams.fromConfig(config))))
rabbitmq {
topic-exchange-name = "name"
connection-timeout = 3000
virtual-host = "/"
hosts = ["rabbit.test.com"]
username = "user"
password = "pass"
port = 5672
timeout = 3s
}
[info] Running RabbitTest
[info] [ERROR] [07/31/2015 14:50:47.636] [such-system-akka.actor.default-dispatcher-3] [akka://such-system/user/$a] No configuration setting found for key 'rabbitmq.topic-exchange-name'
[info] com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'rabbitmq.topic-exchange-name'
[info] at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)
[info] at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
[info] at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
[info] at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184)
[info] at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
[info] at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:246)
[info] at com.spingo.op_rabbit.RabbitControl$.topicExchangeName$lzycompute(RabbitControl.
How would I set a virtual host to use ?
The ConnectionParams.scala supports VirtualHost but the fromConfig
does not set it.
Use RabbitMQ TTL to schedule how fast it will be redelivered. This will provide better visibility when messages are failing.
I'm using the core module of op-rabbit:
build.sbt
resolvers += "SpinGo OSS" at "http://spingo-oss.s3.amazonaws.com/repositories/releases"
val opRabbitVersion = "1.0.0-M11"
val sprayVersion = "1.3.2"
val akkaVersion = "2.3.9"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-agent" % akkaVersion,
"io.spray" %% "spray-can" % sprayVersion,
"io.spray" %% "spray-routing" % sprayVersion,
"com.spingo" %% "op-rabbit-core" % opRabbitVersion,
...
I get a list of missing dependencies errors. One of them is:
Error:scalac: missing or invalid dependency detected while loading class file 'FieldDefMagnet2.class'.
Could not access type LeftFolder in package shapeless,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
A full rebuild may help if 'FieldDefMagnet2.class' was compiled against an incompatible version of shapeless.
It looks like the use of shapeless is causing some conflict with Spray's directive system.
Wondering if there is a way for us to specify the exchange type on the producing side? We have figured out how to do it on the Consumer side via consume(OurBinding(..))
and OurBinding()
sets up a different type of exchange via channel.exchangeDeclare(.., TYPE, ..)
.
drop
- Nack the message, do not reschedule for delivery.abort
- shuts down the consumer, exposing the exception through SubscriptionRef.closed
.Using op-rabbit 2.0-M2, Akka 2.4.1
2015-12-16 01:31:09,591 [xxx-akka.actor.default-dispatcher-5] INFO akka.actor.LocalActorRef - Message [com.thenewmotion.akka.rabbitmq.AmqpShutdownSignal] from Actor[akka://xxx/user/$a/connection/$a#-1532455610] to Actor[akka://xxx/user/$a/connection/$a#-1532455610] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Hi! Seems like there is no possibility to bind queue to direct exchange by means of this library (we can do this by using rabbitmq client), currently we can only bind queues to topic, fanout and header exchanges. Although we can still overcome this problem by using topic exchange, but I guess this will make an impact on the performance due to more complex routing logic. Is this feature in the todo list?
I'm serializing my classes with protobuf, so I don't intend to use any of the provided JSON tools.
I have:
implicit val actorSystem = ActorSystem("controller-system")
val rabbitControl = actorSystem.actorOf(Props[RabbitControl])
val subscriptionRef = Subscription.run(rabbitControl) {
import Directives._
channel(qos = 3) { // Process up to 3 concurrent messages at once
consume(topic(queue("controller-message-queue"), List("actions-topic.#"))) {
(body(as[Action]) & routingKey) { (action, key) =>
print(s"Action '${action.value}' was received over '${key}'.\n")
ack
}
}
}
}
where Action
is my protobuf generated class. On compile, I get:
MessagingControllerModule.scala:24: could not find implicit value for parameter um: com.spingo.op_rabbit.RabbitUnmarshaller[ai.osaro.traits.Types.Action]
(body(as[Action]) & routingKey) { (action, key) =>
^
What's necessary for it to be able to handle a custom class?
Unrelated, where does op-rabbit look for the application.config?
Hi Tim,
I have a (private) lib I am developing and am using op-rabbit. I want it cross build for scala 2.10 and 2.11 still, so set up my sbt accordingly. I was hoping that the v1.0.x branch of op-rabbit would cross build for scala 2.10, and it seems it is set up to cross build in build.sbt. However I don't see a published release of 1.0.3 for scala 2.10 and when I tried to compile it myself there was a dependency problem. So my question is, do you still plan to support scala 2.10 in the v1.0.x branch? :)
Thanks
Dan
The only way to set the topicExchangeName is by having a op-rabbit.topic-exchange-name config in application.resources. This should be more flexible by allowing clients to pass in a config or the string itself. All of our configuration is scoped by environment so we will never have a top-level op-rabbit.topic-exchange-name. We will have dev.op-rabbit.topic-exchange-name, prod.op-rabbit.topic-exchange-name, etc.
getting this error from rabbit logs
=ERROR REPORT==== 10-Feb-2016::07:54:10 ===
Channel error on connection <0.24850.3867> (x.x.x.x:51049 -> x.x.x.x:5672, vhost: 'servo', user: 'servo_mq'), channel 21:
{amqp_error,not_found,
"no queue 'op-rabbit.retry.ads-push-queue' in vhost 'servo'",
'queue.declare'}
Presently, rejections (IE, extracting a property that is not defined) are handled with a nack
, reschedule.
Leaning towards a single RecoveryStrategy to decide them all (and, simplifying how recovery strategies are created).
See Title
There are several methods that "plain" streams have, but acked streams don't.
Some examples:
Source.via / Flow.via
Source.mapMaterializedValue
There are probably more I'm missing, these are just the ones I encountered.
here's a short test case
implicit val actorSystem = ActorSystem("rabbitactor")
val rabbitControl = actorSystem.actorOf(Props[RabbitControl])
Subscription.run(rabbitControl) {
import Directives._
channel() {
consume(Binding.direct(Queue("somegoodqueue"), Exchange.direct("somegoodexchange"), List("somegoodroutingkey"))) {
body(as[JsObject]) {
data =>
println(s"got $data")
ack
}
}
}
}
rabbitControl ! Message.exchange("hi", "somebadexchange", "somebadroutingkey")
Thread.sleep(10000)
for(i <- 1 to 10) rabbitControl ! Message.exchange(Json.obj("id" -> i), "somegoodexchange", "somegoodroutingkey")
Thread.sleep(10000)
in this case, the 10 messages to the good exchange are never logged. if i comment out the line where the message is sent to the bad exchange then all 10 good messages are logged
Hi
Can you please also add example for Java users? I'm not really sure how to use Subscription.run(...).
Edit: previous question was for Java Play, now only Java example.
Hi there,
I've been tracing the code and trying to see what qos
does under the hood. The docs are a little sparse on the details (rightfully so). The reason I need to know is I am trying to create a wrapper for this which closely resembles another NodeJS based RMQ lib we use. What I want to know is what happens in Akka when I define a qos
of lets say 10 (channel(qos = 10) {}
)? Details around that would be very helpful. Thanks in advance!
When conversion from JSON to case class in the PlayJsonSupport unmarshall function fails, the error is not handled very well:
java.util.NoSuchElementException: JsError.get
Is there any way to handle the error properly, and for example retrieve the contents of the JsError?
How do you set the Recovery Strategy like retry limit, delay, onAbandon, name of abandon queue, etc.
I was not able to find much on this.
Would there be autoack support in the future?
Hi,
is it possible to react on broken connections (as asked here: ShellRechargeSolutionsEU/akka-rabbitmq#13)
And, even more interesting: is it possible to react on a blocked connection (https://www.rabbitmq.com/connection-blocked.html) ?
Thanks,
Sebastian
I've run into two issues.
[warn] ==== SpinGo OSS: tried
[warn] http://spingo-oss.s3.amazonaws.com/repositories/releases/com/spingo/scoped-fixtures_2.11/0.4.0/scoped-fixtures_2.11-0.4.0.pom
[warn] ==== Sonatype Releases: tried
[warn] http://oss.sonatype.org/content/repositories/releases/com/spingo/scoped-fixtures_2.11/0.4.0/scoped-fixtures_2.11-0.4.0.pom
[info] Resolving jline#jline;2.12.1 ...
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: com.spingo#scoped-fixtures_2.11;0.4.0: not found
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] com.spingo:scoped-fixtures_2.11:0.4.0 (/home/rob/git/op-rabbit/project/Build.scala#L16)
[warn] +- com.spingo:op-rabbit-core_2.11:1.0.0-SNAPSHOT
sbt.ResolveException: unresolved dependency: com.spingo#scoped-fixtures_2.11;0.4.0: not found
In the later case, I try to follow the link to the pom file and get a NoSuchKey error from AWS.
[info] Resolving jline#jline;2.12.1 ...
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: com.thenewmotion.akka#akka-rabbitmq_2.11;2.2: not found
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] com.thenewmotion.akka:akka-rabbitmq_2.11:2.2
[warn] +- com.spingo:op-rabbit-core_2.11:1.2.1
........
.
.
.
.
error sbt.ResolveException: unresolved dependency: com.thenewmotion.akka#akka-rabbitmq_2.11;2.2: not found
[error] Total time: 3 s, completed Feb 10, 2016 4:24:18 AM
my config file looks like this
resolvers ++= Seq(
"SpinGo OSS" at "http://spingo-oss.s3.amazonaws.com/repositories/releases"
)
val opRabbitVersion = "1.2.1"
libraryDependencies ++= Seq(
"com.spingo" %% "op-rabbit-core" % opRabbitVersion,
"com.spingo" %% "op-rabbit-play-json" % opRabbitVersion,
"com.spingo" %% "op-rabbit-json4s" % opRabbitVersion,
"com.spingo" %% "op-rabbit-airbrake" % opRabbitVersion,
"com.spingo" %% "op-rabbit-akka-stream" % opRabbitVersion
)
first i was using val opRabbitVersion = "1.2.0" so i updated to the one above 1.2.1 still get this sbt asembly error in codeship, any ideas of why this would happen?
I have a project that uses gradle. I'd expect to be able to install it as a dependency just as any other. In build.gradle:
repositories {
maven {
url "http://spingo-oss.s3.amazonaws.com/repositories/releases"
}
}
dependencies {
compile "com.spingo:op-rabbit-core:1.0.0" // rabbitMQ client
}
I get:
Could not resolve all dependencies for configuration ':common:compile'.
> Could not find com.spingo:op-rabbit-core:1.0.0.
Searched in the following locations:
https://repo1.maven.org/maven2/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.pom
https://repo1.maven.org/maven2/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.jar
file:/Users/admin/.m2/repository/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.pom
file:/Users/admin/.m2/repository/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.jar
https://jcenter.bintray.com/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.pom
https://jcenter.bintray.com/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.jar
http://spingo-oss.s3.amazonaws.com/repositories/releases/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.pom
http://spingo-oss.s3.amazonaws.com/repositories/releases/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.jar
I'm surprised this doesn't work. The error message itself appears to show exactly the files it should be looking for. I suspect I've simply gotten something small wrong in my gradle build settings.
In the Java API I can call myChannel.queueDeclare(qName,...) to be sure a queue exists before trying to use it. I typically have a short bit of code at the beginning of my program that rolls through and does this for all the queues I'm going to need.
What's the equivalent in this world? I've tried something like this but it doesn't build:
Subscription.run(rabbitControl) {
channel() {
queue("testQ") // Declare the queue
}
}
(Note this isn't my actual "using" of the queue in a stream--for which I plan to use Akka Streams. This is just pre-work to declare the queues.)
I tried to call queue("testQ").declare(???), but declare needs a Channel, so how do I get that?
Honestly for this pre-work it doesn't even have to be a fancy Directive-based thing. If there's not already a way to do this, perhaps rabbitControl could receive a QueueDeclare message that tweaks the right things internally?
Ideas appreciated...
How would i connect to 2 different vhost?
If using the rabbitmq java lib, i would just setup 2 different connection factories. How can i do something like that with op rabbit
Is there some kind of Connection Factory so that I can connect to Rabbit without using a conf file.
Thanks
It would be useful to have access to the properties in order to get the reply-to property when handling messages.
Unless there is a different way to do the request/response pattern?
Hi !
I was trying to run several AckedFlows with the same RabbitMQ Server (with different queues, though)
RabbitSource materializes to SubscriptionRef and when I call close on it - the top level RabbitControl actor is killed, so is destroyed (in turn) the parallel subscription that I have made with the same rabbitcontrol actor ref.
My question is: according to the RabbitControl.scala code - the actor maintains a list of subscriptions - so is he really meant to support parallel subscriptions (with their independent connections and channels) or not ?
Thanks in advance !
Example usecase - support for https://github.com/rabbitmq/rabbitmq-consistent-hash-exchange.
Two suggested solutions I could think of:
Exchange
enumeration;Exchange
and ExchangeImpl
; maintain the Exchange.{topic,fanout,...}
but also provide an additional method that allow users of op-rabbit to pass custom strings as the kind
parameterIt would be nice to be able to custom the names of the queues that are used to retry / abandon messages.
Right now, it's implemented at a publisher level, so each unique instance of a publisher bound will declare the entity to which it's publishing on first try.
This is suboptimal since publishers enclose the topic, and a use common use-case is that a publisher will publish to the same exchange but a variety of topics.
If the "i've already been declared" logic were moved from Publisher to the defining entity, then this problem is resolved (and the potential for surprise is greatly reduced).
I was trying to see if I can add support for #7 when I encountered something that looks like a bug in TopicBinding. I don't use topics/exchanges, so I'm not sure about this, but it looks to me like it always binds to the default exchange (RabbitControl.topicExchangeName) instead of using the provided name.
Hey, thanks for the great library!
What's the recommended way of declaring an exchange using RabbitMQ's Java driver methods through the RabbitControl interface? Couldn't find any mention of this usecase.
To clarify - this is a publisher usecase; I don't require subscription/consumer functionality.
SBT cannot resolve op-rabbit-akka-stream from SpinGo OSS using the provided SBT build config.
Using "SpinGo OSS" at "http://spingo-oss.s3.amazonaws.com/repositories/releases"
does not work
actually, this works: mapTo[ConfirmResponse] which makes sense
So i think the docu is wrong?!
Creating a stream that tries writing to a non-existent queue doesn't report an error, and still somehow acknowledges messages (causing message loss).
I haven't tested this using plain (non-stream) actors, but it might affect them as well.
There are 2 issues here:
(I don't use exchanges/topics, so I'm not sure if this is relevant for them or not)
Replying with true
is less than useful when one actor is publishing a message via !
.
Modify it to respond with Message.Confirmation(msg.uuid)
(and add a Message UUID property to every Message
)
Define a new directive, forward
, which delivers the message (with, optionally, additional properties set), and then acks the message.
Define directive, nack(false)
, which nacks the message and does not re-schedule for redelivery.
Consider adding a "delay" directive?
Forward directive can take a QueueBinding; define a "ifNotExists" wrapper QueueBinding.
There is a missing closing parenthesis in the subscription example at https://github.com/SpinGo/op-rabbit#set-up-a-consumer-topic-subscription
topics = List("some-topic.#")),
val opRabbitVersion = "1.1.2"
Scenario:
val subscriptionRef = Subscription.run(rabbitControl) {
channel(qos = 1) {
consume(
topic("foo", List("#"), exchange = "some_exchange")
) {....
If the queue does not exist, op-rabbit
creates it, all is well. Creating the queue manually (via the management plugin) works too.
However, creating the same queue manually with x-max-length:10000
and then starting the Subscription results in messages not being consumed. No errors reported.
Is there any way to control the logs being spit out into the console? There are numerous lines printed directly, obviously with println, not using the logger. Things like: "handleDelivery 1", "I have ack 1", or something like "You jerk! 312 NO_ROUTE". I almost suspect the third one should be an exception of some sort. I tried to find it in the code, but no luck.
I would like to publish messages to a direct exchange with a specific routing key, and have that exchange created if it doesn't exist, but I can't work out the syntax. My guess is something like:
rabbitControl ! Message("message", "key", Publisher.exchange(Exchange.direct("test")))
But that doesn't compile, and I can't work it out by digging in the code even. Any chance you could help me out? Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.