Code Monkey home page Code Monkey logo

op-rabbit's People

Contributors

aaabramov avatar alejandromarin avatar antonnaumov avatar arnostv avatar danslapman avatar dmexe avatar dpratt avatar dstranger avatar enelson avatar freakman avatar kthompson avatar mturlo avatar muub avatar notbobthebuilder avatar pjfanning avatar t3hnar avatar talpr avatar theshortcut avatar timcharper avatar unthingable avatar vic avatar vivri avatar whitehead1415 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

op-rabbit's Issues

Add option to bind queue/exchange passively

Currently, binding to a queue (or exchange) that doesn't exist will create it. In some cases, though, it is preferable to fail if it doesn't exist. This can be accomplished by using queueDeclarePassive instead of queueDeclare.

(As a side note, I figured I'd try opening issues here instead of communicating through the user group topic. Which do you prefer?)

Modify config object; `topic-exchange-name` should not be apart of the connection config

My application.conf is the current working directory, in the same place where the Jar is placed. If I want to add the configuration manually with the ConfigFactor like below. The rabbitmq.topic-exchange-name will not be used, instead, it is set as a lazy val in RabbitControl.scala:18. See source code below:

val config = ConfigFactory.parseFile(new File("application.conf")).getConfig("rabbitmq")
val rabbitMq = actorSystem.actorOf(Props(new RabbitControl(ConnectionParams.fromConfig(config))))


rabbitmq {
  topic-exchange-name = "name"
  connection-timeout = 3000
  virtual-host = "/"
  hosts = ["rabbit.test.com"]
  username = "user"
  password = "pass"
  port = 5672
  timeout = 3s
}

[info] Running RabbitTest 
[info] [ERROR] [07/31/2015 14:50:47.636] [such-system-akka.actor.default-dispatcher-3]         [akka://such-system/user/$a] No configuration setting found for key 'rabbitmq.topic-exchange-name'
[info] com.typesafe.config.ConfigException$Missing: No configuration setting found for key     'rabbitmq.topic-exchange-name'
[info]  at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)
[info]  at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
[info]  at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
[info]  at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184)
[info]  at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
[info]  at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:246)
[info]  at com.spingo.op_rabbit.RabbitControl$.topicExchangeName$lzycompute(RabbitControl.

Conflict with Spray's directives

I'm using the core module of op-rabbit:
build.sbt

resolvers += "SpinGo OSS" at "http://spingo-oss.s3.amazonaws.com/repositories/releases"

val opRabbitVersion = "1.0.0-M11"
val sprayVersion = "1.3.2"
val akkaVersion = "2.3.9"

libraryDependencies ++= Seq(
    "com.typesafe.akka" %% "akka-actor" % akkaVersion,
    "com.typesafe.akka" %% "akka-agent"  % akkaVersion,
    "io.spray" %% "spray-can" % sprayVersion,
    "io.spray" %% "spray-routing" % sprayVersion,
    "com.spingo" %% "op-rabbit-core" % opRabbitVersion,
...

I get a list of missing dependencies errors. One of them is:

Error:scalac: missing or invalid dependency detected while loading class file 'FieldDefMagnet2.class'.
Could not access type LeftFolder in package shapeless,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
A full rebuild may help if 'FieldDefMagnet2.class' was compiled against an incompatible version of shapeless.

It looks like the use of shapeless is causing some conflict with Spray's directive system.

Add ability to change the exchange type on producer

Wondering if there is a way for us to specify the exchange type on the producing side? We have figured out how to do it on the Consumer side via consume(OurBinding(..)) and OurBinding() sets up a different type of exchange via channel.exchangeDeclare(.., TYPE, ..).

Implement missing RecoveryStrategies

  • drop - Nack the message, do not reschedule for delivery.
  • abort - shuts down the consumer, exposing the exception through SubscriptionRef.closed.

Dead letters on subscription.close()

Using op-rabbit 2.0-M2, Akka 2.4.1

2015-12-16 01:31:09,591 [xxx-akka.actor.default-dispatcher-5] INFO  akka.actor.LocalActorRef - Message [com.thenewmotion.akka.rabbitmq.AmqpShutdownSignal] from Actor[akka://xxx/user/$a/connection/$a#-1532455610] to Actor[akka://xxx/user/$a/connection/$a#-1532455610] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Binding queues to direct exchange

Hi! Seems like there is no possibility to bind queue to direct exchange by means of this library (we can do this by using rabbitmq client), currently we can only bind queues to topic, fanout and header exchanges. Although we can still overcome this problem by using topic exchange, but I guess this will make an impact on the performance due to more complex routing logic. Is this feature in the todo list?

Some Setup Questions

I'm serializing my classes with protobuf, so I don't intend to use any of the provided JSON tools.

I have:

  implicit val actorSystem = ActorSystem("controller-system")
  val rabbitControl = actorSystem.actorOf(Props[RabbitControl])

  val subscriptionRef = Subscription.run(rabbitControl) {
    import Directives._
    channel(qos = 3) {  // Process up to 3 concurrent messages at once
      consume(topic(queue("controller-message-queue"), List("actions-topic.#"))) {
        (body(as[Action]) & routingKey) { (action, key) =>
          print(s"Action '${action.value}' was received over '${key}'.\n")
          ack
        }
      }
    }
  }

where Action is my protobuf generated class. On compile, I get:

MessagingControllerModule.scala:24: could not find implicit value for parameter um: com.spingo.op_rabbit.RabbitUnmarshaller[ai.osaro.traits.Types.Action]
        (body(as[Action]) & routingKey) { (action, key) =>
                ^

What's necessary for it to be able to handle a custom class?

Unrelated, where does op-rabbit look for the application.config?

Scala 2.10 cross build

Hi Tim,

I have a (private) lib I am developing and am using op-rabbit. I want it cross build for scala 2.10 and 2.11 still, so set up my sbt accordingly. I was hoping that the v1.0.x branch of op-rabbit would cross build for scala 2.10, and it seems it is set up to cross build in build.sbt. However I don't see a published release of 1.0.3 for scala 2.10 and when I tried to compile it myself there was a dependency problem. So my question is, do you still plan to support scala 2.10 in the v1.0.x branch? :)

Thanks
Dan

topicExchangeName needs to be more configurable

The only way to set the topicExchangeName is by having a op-rabbit.topic-exchange-name config in application.resources. This should be more flexible by allowing clients to pass in a config or the string itself. All of our configuration is scoped by environment so we will never have a top-level op-rabbit.topic-exchange-name. We will have dev.op-rabbit.topic-exchange-name, prod.op-rabbit.topic-exchange-name, etc.

See https://github.com/SpinGo/op-rabbit/blob/master/core/src/main/scala/com/spingo/op_rabbit/RabbitControl.scala#L18

cant declare retry queue

getting this error from rabbit logs

=ERROR REPORT==== 10-Feb-2016::07:54:10 ===
Channel error on connection <0.24850.3867> (x.x.x.x:51049 -> x.x.x.x:5672, vhost: 'servo', user: 'servo_mq'), channel 21:
{amqp_error,not_found,
           "no queue 'op-rabbit.retry.ads-push-queue' in vhost 'servo'",
           'queue.declare'}

Add missing stream operations

There are several methods that "plain" streams have, but acked streams don't.
Some examples:
Source.via / Flow.via
Source.mapMaterializedValue

There are probably more I'm missing, these are just the ones I encountered.

sending message to non-existent exchange also causes later messages to existing exchange to disappear

here's a short test case

    implicit val actorSystem = ActorSystem("rabbitactor")
    val rabbitControl = actorSystem.actorOf(Props[RabbitControl])

    Subscription.run(rabbitControl) {
      import Directives._
      channel() {
        consume(Binding.direct(Queue("somegoodqueue"), Exchange.direct("somegoodexchange"), List("somegoodroutingkey"))) {
          body(as[JsObject]) {
            data =>
              println(s"got $data")
              ack
          }
        }
      }
    }

    rabbitControl ! Message.exchange("hi", "somebadexchange", "somebadroutingkey")
    Thread.sleep(10000)

    for(i <- 1 to 10) rabbitControl ! Message.exchange(Json.obj("id" -> i), "somegoodexchange", "somegoodroutingkey")

    Thread.sleep(10000)

in this case, the 10 messages to the good exchange are never logged. if i comment out the line where the message is sent to the bad exchange then all 10 good messages are logged

Example for Java users

Hi

Can you please also add example for Java users? I'm not really sure how to use Subscription.run(...).

Edit: previous question was for Java Play, now only Java example.

Question about QOS

Hi there,
I've been tracing the code and trying to see what qos does under the hood. The docs are a little sparse on the details (rightfully so). The reason I need to know is I am trying to create a wrapper for this which closely resembles another NodeJS based RMQ lib we use. What I want to know is what happens in Akka when I define a qos of lets say 10 (channel(qos = 10) {})? Details around that would be very helpful. Thanks in advance!

Error handling in play json unmarshall

When conversion from JSON to case class in the PlayJsonSupport unmarshall function fails, the error is not handled very well:
java.util.NoSuchElementException: JsError.get
Is there any way to handle the error properly, and for example retrieve the contents of the JsError?

Document RecoveryStrategy

How do you set the Recovery Strategy like retry limit, delay, onAbandon, name of abandon queue, etc.

I was not able to find much on this.

Difficulty with build

I've run into two issues.

  • The build.properties file has quotes around the sbt version number. With them, the build fails. Without, it works as expected.
  • I'm not able to resolve the scoped-fixtures dependency:
    [warn] ==== SpinGo OSS: tried
    [warn]   http://spingo-oss.s3.amazonaws.com/repositories/releases/com/spingo/scoped-fixtures_2.11/0.4.0/scoped-fixtures_2.11-0.4.0.pom
    [warn] ==== Sonatype Releases: tried
    [warn]   http://oss.sonatype.org/content/repositories/releases/com/spingo/scoped-fixtures_2.11/0.4.0/scoped-fixtures_2.11-0.4.0.pom
    [info] Resolving jline#jline;2.12.1 ...
    [warn]  ::::::::::::::::::::::::::::::::::::::::::::::
    [warn]  ::          UNRESOLVED DEPENDENCIES         ::
    [warn]  ::::::::::::::::::::::::::::::::::::::::::::::
    [warn]  :: com.spingo#scoped-fixtures_2.11;0.4.0: not found
    [warn]  ::::::::::::::::::::::::::::::::::::::::::::::
    [warn]  
    [warn]  Note: Unresolved dependencies path:
    [warn]      com.spingo:scoped-fixtures_2.11:0.4.0 (/home/rob/git/op-rabbit/project/Build.scala#L16)
    [warn]        +- com.spingo:op-rabbit-core_2.11:1.0.0-SNAPSHOT
sbt.ResolveException: unresolved dependency: com.spingo#scoped-fixtures_2.11;0.4.0: not found

In the later case, I try to follow the link to the pom file and get a NoSuchKey error from AWS.

codeship dependency not found - sbt assembly

[info] Resolving jline#jline;2.12.1 ...
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: com.thenewmotion.akka#akka-rabbitmq_2.11;2.2: not found
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] com.thenewmotion.akka:akka-rabbitmq_2.11:2.2
[warn] +- com.spingo:op-rabbit-core_2.11:1.2.1

........
.
.
.
.

error sbt.ResolveException: unresolved dependency: com.thenewmotion.akka#akka-rabbitmq_2.11;2.2: not found
[error] Total time: 3 s, completed Feb 10, 2016 4:24:18 AM

my config file looks like this

resolvers ++= Seq(
"SpinGo OSS" at "http://spingo-oss.s3.amazonaws.com/repositories/releases"
)

val opRabbitVersion = "1.2.1"

libraryDependencies ++= Seq(
"com.spingo" %% "op-rabbit-core" % opRabbitVersion,
"com.spingo" %% "op-rabbit-play-json" % opRabbitVersion,
"com.spingo" %% "op-rabbit-json4s" % opRabbitVersion,
"com.spingo" %% "op-rabbit-airbrake" % opRabbitVersion,
"com.spingo" %% "op-rabbit-akka-stream" % opRabbitVersion
)

first i was using val opRabbitVersion = "1.2.0" so i updated to the one above 1.2.1 still get this sbt asembly error in codeship, any ideas of why this would happen?

Installing with gradle?

I have a project that uses gradle. I'd expect to be able to install it as a dependency just as any other. In build.gradle:

    repositories {
        maven {
            url "http://spingo-oss.s3.amazonaws.com/repositories/releases"
        }
    }

    dependencies {
        compile "com.spingo:op-rabbit-core:1.0.0"                  // rabbitMQ client
    }

I get:

Could not resolve all dependencies for configuration ':common:compile'.
> Could not find com.spingo:op-rabbit-core:1.0.0.
  Searched in the following locations:
      https://repo1.maven.org/maven2/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.pom
      https://repo1.maven.org/maven2/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.jar
      file:/Users/admin/.m2/repository/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.pom
      file:/Users/admin/.m2/repository/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.jar
      https://jcenter.bintray.com/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.pom
      https://jcenter.bintray.com/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.jar
      http://spingo-oss.s3.amazonaws.com/repositories/releases/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.pom
      http://spingo-oss.s3.amazonaws.com/repositories/releases/com/spingo/op-rabbit-core/1.0.0/op-rabbit-core-1.0.0.jar

I'm surprised this doesn't work. The error message itself appears to show exactly the files it should be looking for. I suspect I've simply gotten something small wrong in my gradle build settings.

How do I declare queues?

In the Java API I can call myChannel.queueDeclare(qName,...) to be sure a queue exists before trying to use it. I typically have a short bit of code at the beginning of my program that rolls through and does this for all the queues I'm going to need.

What's the equivalent in this world? I've tried something like this but it doesn't build:

    Subscription.run(rabbitControl) {
        channel() { 
            queue("testQ")  // Declare the queue
        }
    }

(Note this isn't my actual "using" of the queue in a stream--for which I plan to use Akka Streams. This is just pre-work to declare the queues.)

I tried to call queue("testQ").declare(???), but declare needs a Channel, so how do I get that?
Honestly for this pre-work it doesn't even have to be a fancy Directive-based thing. If there's not already a way to do this, perhaps rabbitControl could receive a QueueDeclare message that tweaks the right things internally?

Ideas appreciated...

Connecting to Rabbit

Is there some kind of Connection Factory so that I can connect to Rabbit without using a conf file.

Thanks

Does RabbitControl actor support several subscriptions ?

Hi !

I was trying to run several AckedFlows with the same RabbitMQ Server (with different queues, though)
RabbitSource materializes to SubscriptionRef and when I call close on it - the top level RabbitControl actor is killed, so is destroyed (in turn) the parallel subscription that I have made with the same rabbitcontrol actor ref.

My question is: according to the RabbitControl.scala code - the actor maintains a list of subscriptions - so is he really meant to support parallel subscriptions (with their independent connections and channels) or not ?

Thanks in advance !

Move "already declared" caching logic to exchange / queue definitions themselves

Right now, it's implemented at a publisher level, so each unique instance of a publisher bound will declare the entity to which it's publishing on first try.

This is suboptimal since publishers enclose the topic, and a use common use-case is that a publisher will publish to the same exchange but a variety of topics.

If the "i've already been declared" logic were moved from Publisher to the defining entity, then this problem is resolved (and the potential for surprise is greatly reduced).

TopicBinding always binds to the default exchange

I was trying to see if I can add support for #7 when I encountered something that looks like a bug in TopicBinding. I don't use topics/exchanges, so I'm not sure about this, but it looks to me like it always binds to the default exchange (RabbitControl.topicExchangeName) instead of using the provided name.

Declaring an exchange through op-rabbit

Hey, thanks for the great library!

What's the recommended way of declaring an exchange using RabbitMQ's Java driver methods through the RabbitControl interface? Couldn't find any mention of this usecase.

To clarify - this is a publisher usecase; I don't require subscription/consumer functionality.

ask pattern mapTo[Boolean]

does not work

actually, this works: mapTo[ConfirmResponse] which makes sense

So i think the docu is wrong?!

QueuePublisher/ConfirmedPublisherSink connecting to non-existent queue

Creating a stream that tries writing to a non-existent queue doesn't report an error, and still somehow acknowledges messages (causing message loss).
I haven't tested this using plain (non-stream) actors, but it might affect them as well.

There are 2 issues here:

  1. There is no check at any point to see if the queue exists or not. I suggest trying to "bind" to the queue during materialization, similar to the way it's done for the source. This can be an active binding (queueDecalre) or a passive one (queueDeclarePassive), but in either case there should be a way to specify error handling strategy or be notified of errors.
  2. Where is the ack coming from? If the message didn't end up on the destination queue, why is it removed from the source queue? I need to investigate this more thoroughly, though. It's possible that the message was actually nacked and moved to a dead letter exchange or something.

(I don't use exchanges/topics, so I'm not sure if this is relevant for them or not)

Reply with `message.uuid`

Replying with true is less than useful when one actor is publishing a message via !.

Modify it to respond with Message.Confirmation(msg.uuid) (and add a Message UUID property to every Message)

Simplify declaration of recovery strategy using Directives

Define a new directive, forward, which delivers the message (with, optionally, additional properties set), and then acks the message.

Define directive, nack(false), which nacks the message and does not re-schedule for redelivery.

Consider adding a "delay" directive?

Forward directive can take a QueueBinding; define a "ifNotExists" wrapper QueueBinding.

Silent failures with an existing queue

val opRabbitVersion = "1.1.2"

Scenario:

  val subscriptionRef = Subscription.run(rabbitControl) {
    channel(qos = 1) {
      consume(
        topic("foo", List("#"), exchange = "some_exchange")
      ) {....

If the queue does not exist, op-rabbit creates it, all is well. Creating the queue manually (via the management plugin) works too.

However, creating the same queue manually with x-max-length:10000 and then starting the Subscription results in messages not being consumed. No errors reported.

More controllable logging

Is there any way to control the logs being spit out into the console? There are numerous lines printed directly, obviously with println, not using the logger. Things like: "handleDelivery 1", "I have ack 1", or something like "You jerk! 312 NO_ROUTE". I almost suspect the third one should be an exception of some sort. I tried to find it in the code, but no luck.

Question about Publishing to an direct exchange

I would like to publish messages to a direct exchange with a specific routing key, and have that exchange created if it doesn't exist, but I can't work out the syntax. My guess is something like:

  rabbitControl ! Message("message", "key", Publisher.exchange(Exchange.direct("test")))

But that doesn't compile, and I can't work it out by digging in the code even. Any chance you could help me out? Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.