ruby-amqp / amqp Goto Github PK
View Code? Open in Web Editor NEWEventMachine-based RabbitMQ client. Prefer Bunny: http://rubybunny.info. See documentation guides at http://ruby-amqp.github.io/amqp/.
Home Page: http://ruby-amqp.github.io/amqp/
EventMachine-based RabbitMQ client. Prefer Bunny: http://rubybunny.info. See documentation guides at http://ruby-amqp.github.io/amqp/.
Home Page: http://ruby-amqp.github.io/amqp/
We need to be able to specify an errback if something goes south.
See also http://groups.google.com/group/ruby-amqp/browse_thread/thread/9702b907f6c0e33f?hl=en
I'm working with Windows 7, Jruby 1.6.
When i gem install amqp -v '0.8.0***'
It comes out this error:
E:>gem install amqp -v '0.8.0.rc1'
System.java:-2:in arraycopy': java.lang.ArrayIndexOutOfBoundsException from DefaultResolver.java:111:in
makeTime'
from DefaultResolver.java:277:in create' from DefaultResolver.java:317:in
handleScalar'
from DefaultResolver.java:435:in orgHandler' from DefaultResolver.java:455:in
node_import'
from DefaultResolver$s$1$0$node_import.gen:65535:in `call'
<...rest of those output...>
But version 0.7 is okay.
BTW: Could you please update some uptodate examples? Some of those including in the package can not run properly.
I am having trouble getting AMQP to reconnect if a connection fails. Automatic reconnects seem to happen okay after an initial connection, but I can't find a way to make the script recover from an initial failure and reconnect until it works:
loop do
begin
AMQP.start do
# do stuff
end
rescue Exception => e
log "Failed to connect, #{e}"
# uncommenting this makes no difference
#AMQP.stop { EM.stop }
end
sleep 5 # try again in 5 seconds
end
On ruby 1.9.2, this hangs forever the second time I get to AMQP.start if I have not yet started the broker. Without catching the exception, the script of course exits, and if I catch it and do nothing (no loop), it continues to block forever. Am I missing something?
Currently can't use the AMQP gem in any gemfile / environment which is using the latest beta builds of eventmachine (at beta3). Example:
Bundler could not find compatible versions for gem "eventmachine":
In Gemfile:
amqp depends on
eventmachine (~> 0.12.10)
goliath depends on
eventmachine (1.0.0.beta.3)
Can you guys change or remove the version lock? There is no reason why the current gem should not work with more recent versions of EM.
I'm posting this here in hopes that my work might be of some use for the future development of the amqp gem. So go ahead and rip my (hopefully not too horrible) code apart. Hopefully a failover/fallback feature will make it back into the amqp gem itself sometime soon :)
https://github.com/jimeh/amqp-failover
For some background; We needed failover and fallback capabilities in our AMQP workers. I originally achieved it through a quite messy and untestable hacked version of AMQP::BasicClient. I just finished rewriting all of it to a proper gem, with proper specs to make sure everything works.
I decided to make a gem rather than fork amqp due to the rapid progress of development here as of late, and also to simplify our production deployment process. It currently works with amqp 0.7.0, but I have not tested it against the master branch yet.
https://github.com/ruby-amqp/rabbitmq-tutorials
Cross-reference to RabbitMQ bugzilla (not public, sorry) https://bugzilla.rabbitmq.com/show_bug.cgi?id=23703
An old issue from the tmm1 queue, sending multibyte characters fails on Ruby 1.9.x. A fix is at kasperbn@279dd11; we've been using it for a month with no issues.
Something is very wrong with amqp + builder 2.1.2 (ALOT of stuff depends on that version, among them rails 3.0.x). I haven't tested ruby 1.8.7, I'm running 1.9.2.
Just check this out:
require 'rubygems'
require 'mq'
require 'builder'
builder = Builder::XmlMarkup.new
xml = builder.person { |b| b.name("Jim"); b.phone("555-1234") }
puts xml
Gives:
NameError: uninitialized constant Builder::XmlBase::Symbol
method method_missing in xmlbase.rb at line 40
method <main> in untitled at line 7
Hi,
In mq.rb this line can create connection to AMQP
@connection = connection || AMQP.start
But they are no way to close it.
Thanks
From the mailing list, "Blocking while publishing", by Theo:
What happens is that when you run
@large_set_of_ids.each do | id |
q.publish(id)
end
you're blocking the EventMachine reactor. Blocking the reactor means
that EventMachine can't do anything else, like checking for incoming
data, or trigger timers. It takes some time to loop over all the IDs
(not very long in the grand scheme of things, but it's not
instantaneous if large means thousands or hundreds of thousands), and
it takes (comparatively) a very long time for the AMQP code to encode
each message and send it off to the connection. I assume the reason
your consumer does not see the messages is that since the reactor is
blocked by all the calls to #publish, EventMachine hasn't had the
opportunity to send anything over the connection socket.
I made the same mistake myself when I started using the AMQP gem, I
didn't understand why I could publish 1000 messages per second when my
consumer didn't get more than 10 or so. As the others have mentioned
the solution is to publish only one message per tick.
Solution: http://rdoc.info/github/eventmachine/eventmachine/master/EventMachine/Iterator
We should put a note about it to the README.
Some other operations have similar issues. E.g. the close method of
MQ does a channel.close - a synchronous operation, but it is not
exposed as such in the API. The whole channel/connection close
business is pretty ugly, e.g. the way that closing the last channel
implicitly closes the connection.
When calling the MQ declaraction methods (queue, fanout, topic,
etc.), if you supply a callback, and the queue or exchange is not known,
everything is fine - the callback is called as expected. But if the
queue or exchange is already known, the callback is never called; it
simply gets ignored. All callbacks should get called (when the reply
arrives, or immediately if we have already got the reply).
When the broker sends Channel.Close, the responding MQ instance should not let user to use it anymore.
See the discussion on 75e5407
So we will use the official AMQP terminology, this way it's confusing.
I have seen this issue with EM.run and JRuby myself a few times. In my case I was able to move reactor run into the main thread but this is a big deal for web apps that don't run on Thin.
Now, this looks more like an EM issue but I think we can investigate it. It definitely affects amqp gem users.
It was suggested to add :prefetch option to AMQP::Channel constructor. Even thought 0.6/0.7 do not seem to support this, it seems that there is code out there that looks like this:
ch = AMQP::Channel.new(connection)
ch.prefetch(10)
so we need to wrap #prefetch implementation into AMQP::Channel#once_open and making it a constructor option seems natural.
Server-assigned queue names are not supported by MQ.queue.
Right now the init_heartbeat method (lib/amqp/client.rb) looks something like this:
def init_heartbeat
@last_server_heartbeat = Time.now
@timer ||= EM::PeriodicTimer.new(@settings[:heartbeat]) do
if connected?
if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2))
log "Reconnecting due to missing server heartbeats"
reconnect(true)
else
send AMQP::Frame::Heartbeat.new
end
end
end
end
However, when you send the hearbeat frame you don't update the @last_server_heartbeat variable. That causes delivery tags for long running jobs to be reset, and all sorts of other problems. You can fix this by updating the @last_server_heartbeat time when you send the heartbeat frame. Like so:
def init_heartbeat
@last_server_heartbeat = Time.now
@timer ||= EM::PeriodicTimer.new(@settings[:heartbeat]) do
if connected?
if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2))
log "Reconnecting due to missing server heartbeats"
reconnect(true)
else
@last_server_heartbeat = Time.now
send AMQP::Frame::Heartbeat.new
end
end
end
end
If you try to pass a nil queue name to AMQP::Queue.new, the initialize method bombs with nil.empty? not being a valid method. This is on version 0.7.1 of the gem. That line is setting the name ivar to nil if the passed in name parameter is an empty string. It seems that it should check that name is not nil first.
@name = name unless name.empty?
NoMethodError: You have a nil object when you didn't expect it! You might have expected an instance of Array. The error occurred while evaluating nil.empty?
[GEM_ROOT]/gems/amqp-0.7.1/lib/amqp/queue.rb:72:in `initialize'
[GEM_ROOT]/gems/amqp-0.7.1/lib/amqp/channel.rb:619:in `new'
[GEM_ROOT]/gems/amqp-0.7.1/lib/amqp/channel.rb:619:in `queue'
Mainly applies to when reconnections without application restarts occur. This manifests for us in our monitoring systems that are looking for N consumers of a particular queue in RabbitMQ to be present, and N starts growing as transient network errors happen (due to heartbeat being enabled).
Solution is to Queue#unsubscribe and Queue#cancelled inside Queue#reset.
Small patch here: cloudcrowd@417700e
Some examples for MQ.headers time out. See them disabled in the spec suite. Need to investigate what is up.
https://gist.github.com/783038, works in Ruby 1.9.2.
Possibility of Qpid compatibility is not clear (Qpid has 2 versions/implementations, one of them implementation of AMQP 0.9.1 and is in Java and the other one is AMQP 0.10.0 implementation in C++). But we will see what we can do (given there is a maintained Qpid version for 0.9.1).
I could fix problems with amqp-spec but I still can't get spec suite to pass on rbx for 0.7.x.
I keep getting something like this in error reports:
=ERROR REPORT==== 20-Apr-2011::00:00:32 ===
exception on TCP connection <0.27501.3> from 127.0.0.1:57057
{bad_payload,1,0,8,<<0,10,0,40,1,47,0,0,0>>}
Spent two hours trying to figure out, no dice. Probably something is wrong with serialization.
I disabled travis rbx builds for 0.7.x for now.
master right now does not support TLS because amq-client does not
With amqp:// connection string support contributor decided to pull in cgi from standard Ruby library. It is slow, unmaintained and has all kinds of bad reputation.
We need to switch to addressible gem: it has very stable API, is efficient and pulls no other dependencies with it.
Right now when exchange is created with :passive option, InvalidOptionError kicks in even though :passive parameter on Exchange.Declare is not supposed to declare anything. We need to handle this in MQ.topic, MQ.direct and so on.
Initial spec coverage for MQ.headers
Repro:
Rough code:
channel = ::AMQP::Channel.new
queue = channel.queue("foobar") # Protocol::Queue::Declare sent
# using these to simulate synchronous call
m = Mutex.new
cv = ConditionVariable.new
msgs = 0
queue.status { |msgs, _| cv.signal } # Another Protocol::Queue::Declare sent
m.synchronize { cv.wait(m) }
puts "queue #{queue.name} has #{msgs} msgs"
Currently the whole library is loaded to the memory, that's not good at all. User should require what he wants to use explicitly. Michael mentioned Kernel#autoload
, but as it isn't thread-safe, I'd rather leave it to user to do it manually โ it's more transparent anyway.
Example that demonstraits option mismatch on re-declaration for MQ.fanout currently fails. Does not happen with direct and fanout exchanges.
I am trying to get Ruby AMQP to connect to RabbitMQ over SSL to no avail.
I am using the RabbitMQ settings and Ruby code block listed here:
https://gist.github.com/839690
I get the following error when running the code block in my Rails app's console:
AMQP::Error: Could not connect to server 127.0.0.1:5671 from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/bundler/gems/amqp-38622e1bc721/lib/amqp/client.rb:26:in `block in initialize' from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/bundler/gems/amqp-38622e1bc721/lib/amqp/client.rb:87:in `call' from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/bundler/gems/amqp-38622e1bc721/lib/amqp/client.rb:87:in `block in unbind' from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:996:in `call' from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:996:in `block in run_deferred_callbacks' from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:996:in `each' from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:996:in `run_deferred_callbacks' from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `run_machine' from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `run' from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/bundler/gems/amqp-38622e1bc721/lib/amqp/connection.rb:83:in `start
I am running ruby-amqp edge as of Feb 22, 2011. That's commit 38622e1.
I actually need to be able to connect using client-side SSL cert as well, but I am just trying to get classic SSL to work to start with.
Here is a minimalistic case I can reproduce it with. Happens to me with two production apps, I have to kill -9 the JVM to stop them.
We might want to use nake rather than rake, as rake utterly sucks in using command line arguments.
Initial spec coverage for MQ.queue
We need to come up with a migration guide for 0.8.0. Even if it ends up being a drop-in replacement for most people, there are certain practices in old examples that we should stop promoting. And it overlaps with other guides: AMQP 0.8.0 vs. 0.9.1, old RabbitMQ versions that Debian and Ubuntu ship with are just two examples.
Question: what is the API for re-establishing a severed connection? Looking through the source, it looks like reconnect
method is raising a "not implemented" exception. Is the plan to add that as a flag? How do we, currently, go about re-establishing a broken connection?
This is work-in-progress, the 0.9.1 support will come with (finishing and) integration of https://github.com/botanicus/amq-protocol It should be also faster than the current amqp/spec.rb and amqp/buffer.rb. This was originally tmm1#2.
Support :passive option on MQ.queue
Cf http://rubydoc.info/github/ruby-amqp/amqp/master/AMQP/Exchange:publish & the actual documentation in source code. One of missing parts is:
# @option [Boolean] :immediate (false) This flag tells the server how to react if the message cannot be
# routed to a queue consumer immediately. If this flag is set, the
# server will return an undeliverable message with a Return method.
# If this flag is zero, the server will queue the message, but with
# no guarantee that it will ever be consumed.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.