ruby-amqp / amq-client Goto Github PK
View Code? Open in Web Editor NEWTHIS PROJECT IS NOW DEPRECATED (part of the amqp gem codebase)
License: MIT License
THIS PROJECT IS NOW DEPRECATED (part of the amqp gem codebase)
License: MIT License
The following code will run for a while and then cause an exception. I was doing a bit of "stress" testing by sending random binary data as my AMQP payload. Not sure if this is amq-client or RabbitMQ failing. I'm using MRI 1.9.2 on OSX with RabbitMQ 2.4.1 installed via HomeBrew. Because of the 1.9.2 string issues, I even tried forcing a binary encoding on the string to no avail.
#!/usr/bin/env ruby
# encoding: utf-8
require "bundler"
Bundler.setup
$:.unshift(File.expand_path("../../../lib", __FILE__))
require 'amqp'
puts "Running amqp gem #{AMQP::VERSION}"
EM.run do
AMQP.connect do |conn|
@amqp_connection = conn
AMQP::Channel.new(@amqp_connection) do |channel|
@amqp_channel = channel
@amqp_exchange = @amqp_channel.topic
EventMachine.add_periodic_timer(0) do
random_binary_string = Array.new(rand(1000)) { rand(256) }.pack('c*')
random_binary_string.force_encoding('BINARY')
p random_binary_string
@amqp_exchange.publish(random_binary_string, :routing_key => "a.my.pattern")
end
end
end
end
The code runs for a while and publishes a bunch of messages but ultimately ends up with the following failure (offending binary string inspect also included):
"^:\x0E\xB8\x1Fr3?6r\xB4\xAD\x9E\\+Q&\xDE\xAE\xB2\x87\xD7\n\xE8$u\xDA\xF1\xE8gD\xA5\x11\xF7e\xA30\xE1\xDB\f\x85l\t\ev6\x15B\x02\xCE\x81|B\xA7#\x92\xE5\x05\x97\x06R\x15?L\x11\xA5Lf\x9D/f\xDF2[!\xD4\xBF\xB27\xC5\x86\x00\x99=\xDC\x88\xEA\x8B\xEA\xC2\xE9\xADE\xF27F\xED\xE8\x80\x0E^\xF2H0&\xB2\xC4\xBE\xC0\xB4^\xC1(\x8A]\x86-{3y\xB8Rz\xDF\xC8\xD83.\xE7\x8B\xB1FI$\x1A\e\xEC\x1A\x10\x8A3\xB4\xAEk\xC8Tl\f\x0E\xBD_\aU\xFA\xBBO\x88\x17\xB6\xEC\xC1Z\xEF\xE6$~r?\x8E\x17\x00\xF1\xCE\x11\x8E\x1E\x9E\xC5\xB1$\xBA\xB9\xB7\xA7\xC7\x83r\xCB_\x858\e\xC6\xAC\x14\x14g8\xB9\bi\xF3u\x02\x8D\xD8+K\xBB\x1E\x15\xA0Q\xBF\xB7?\x84|\x13\xFE%h\xDE)\xEB\x97\x89\x7F\x92C\xA2\x0F\xFE0EeLlgR\x0F'E\x00|\xA6>\xC5\x8E`{t\x8Be\xBD^i\x86:\xD3A\x840\x9A\x93i\x8CE\x82\xD9\xDD\xE1`\xF3\xBA\xDC31\x98^9\x83\xFD\xCE@\xD0\xD4\xFFIF\xBE\xD6\x82E\xB5\tRdg={\x99t\x9B\xFCG\xD4\xE2\xAF\xF3`&!\xA5 \xC1v\xC2\x1C\xF7Q,r\xF6\xC7\x1D\xE26\xE8p\xDF*j\x12P,\xA3\xCF\\g\xE0dQ&\xA1\xBFz=I\xEAk\an\xA1\xD7}\xF9\xA1\x8E\xA0+\xFE\x8F \x9A\x19\xE0\xB5X~\xEArG\xB9\xBE#\xDF\xFB\x91\r`\x1C\xA6\v>\xA3m\xE6\xD9\x85\x10\x9Fy\xA4\x19+Wr2\v\xF2\xCBY\xBB1\x8A\x16\xE5\xE9\x90\xE6ru#\xEE\xFA\xA1\xC1\xE9\b,\x02\xF4\xEC\x15\xAE\xEA\x98Z\x00\xA1\xE9\xEAgI\x10\xF9q6\f\eW7\xBC\x85\x02\x9A=\xBCt\x8A\xF3\xE3\x96\x11\x84\x98\xD2b\x12\xA4\x7F\xDB\xDE`n\xE9\x0E\xF2\xFE\xFA\x8BM\xD3\xD8\xDA4A\xB1\xF1\vD\xD6'^\xBF\x96J\xE31\x9D-\x06\x18\x8Ao\xFB\x84\x9F\x83c\x94\bj\xF9\\C\x80\b\x8E\x04Q\x17z\x1D>\xB7\xBC\x05\xD1\x1C\x97NE\x7Fg\xC2\x1E\x92N\xDC\x89w\x8E=<lJ\xCF\x19\xB7\xDB\xEE?m1\xFFB\x93\xE3\xB4\x80\xAE\xF4\xE8uA\xD5/\xF6\xC8[\xEF\xF2\x98-\xA7P9\xBC\xA8\xED\xF2I\xCAa*#6\xCEt\xC2^9m{\x05\xC1\x03\x9F\xD4\xC1\x82\x1E\b\x895\x11)\xA4\fG\\\x98n\x8F/\xA4\\.\xEAv\x19F\x87)I\xC4\xC1\xF5$\xED\xE6\x12\x1C>\x81\x94Y7-\xD6Y\xFA\x02\xC7\xFC.S\xF9&/\xEB\xB4\x80=P\n\xE3M\xF3*\xF9\x807U\x90\x88w9\xE0c;\eTH\xCF\xFDUV9\a\xC7\x80\xBA\x84\x10;\xF6%m\xA1\xD7\xE7\xBE\\\x00\x19\x9Ft\xBD\xEDJ\xB70\xC7\x9FK\a\xF2\xF3}U\x1E\x1F\xF7\r\xC4\x02\x94\xD7\x1A?\\c\x17C\x8FDWg\t8\xAB0%\x0E\xE8\f-\xDEW\x02Uipu\xF1*\x80\xF3H\x84sH-\xA7\x14\xE7i-\x8E\xD8B\\\x83o`\xA3\xB7$\x10&"
/Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/entity.rb:68:in `error': undefined method `callbacks' for #<Hash:0x00000102870430> (NoMethodError)
from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapter.rb:412:in `handle_close'
from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapters/event_machine.rb:333:in `block in <class:EventMachineClient>'
from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapter.rb:340:in `call'
from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapter.rb:340:in `receive_frameset'
from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapter.rb:319:in `receive_frame'
from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapters/event_machine.rb:287:in `receive_data'
from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/eventmachine-1.0.0.beta.3/lib/eventmachine.rb:199:in `run_machine'
from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/eventmachine-1.0.0.beta.3/lib/eventmachine.rb:199:in `run'
I think this should be an unless
instead of an if
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/exceptions.rb#L62
```usr/lib64/ruby/gems/1.8/gems/amq-client-0.9.2/lib/amq/client/exceptions.rb:65:ininitialize': undefined method
method_class' for AMQ::Protocol::HeartbeatFrame:Class (NoMethodError)
from /usr/lib64/ruby/gems/1.8/gems/amq-client-0.9.2/lib/amq/client/async/adapter.rb:247:in`new'
from /usr/lib64/ruby/gems/1.8/gems/amq-client-0.9.2/lib/amq/client/async/adapter.rb:247:in `send_frame'
from /usr/lib64/ruby/gems/1.8/gems/amq-client-0.9.2/lib/amq/client/async/adapter.rb:563:in`send_heartbeat'
from /usr/lib/ruby/site_ruby/1.8/rubygems/custom_require.rb:36:in `to_proc'
from /usr/lib64/ruby/gems/1.8/gems/eventmachine-0.12.10/lib/em/timers.rb:51:in`call'
from /usr/lib64/ruby/gems/1.8/gems/eventmachine-0.12.10/lib/em/timers.rb:51:in `fire'
from /usr/lib64/ruby/gems/1.8/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in`call'
from /usr/lib64/ruby/gems/1.8/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `run_machine'
from /usr/lib64/ruby/gems/1.8/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in`run
Amqp-client's heartbeat does not work, the rabbitmq server can send heartbeat frame to the client, but client can't send heartbeat frame to the server. So after approximately 3*heartbeat_interval, the client's connection is disconnected by the server.
And i find that the client send heartbeat frame when connection initialize and connection complete, but at that time the instance variable heartbeat_interval is null, so the client does not send heartbeat frame to the server.
The client only send heartbeat frame when instance variable heartbeat_interval is not null, and in method handle_tune heartbeat_interval is set. So why not just send heartbeat frame in method handle_tune. And do not send heartbeat frame when connection initialize and connection complete. So I do as follows,
def handle_tune(connection_tune)
@channel_max = connection_tune.channel_max.freeze
@frame_max = connection_tune.frame_max.freeze
client_heartbeat = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0
@heartbeat_interval = negotiate_heartbeat_value(client_heartbeat, connection_tune.heartbeat)
#send heartbeat frame when heartbeats_enabled
self.initialize_heartbeat_sender if self.heartbeats_enabled?
self.send_frame(Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
end
And this just make the heartbeat work!
Once a queue is deleted, subsequent queue declarations by the same name are not executed. How to reproduce, using the amqp gem:
AMQP.channel.queue('foo', :durable => false) do |queue, declare_ok|
puts "first declare succeeded"
puts declare_ok.inspect
queue.delete do
puts "first delete succeeded"
AMQP.channel.queue('foo', :durable => false) do |queue, declare_ok|
puts "second declare succeeded"
puts declare_ok.inspect
queue.delete do
puts "second delete succeeded"
end
end
end
end
output, using a channel level exception handler:
first declare succeeded
0
first delete succeeded
second declare succeeded
nil
Handling a channel-level exception.
AMQP class id : 50
AMQP method id: 40
Status code : 404
Error message : NOT_FOUND - no queue 'foo' in vhost '/'
Looking at a tcpdump trace of the communication, I can see that a second declare/declare-ok never happens. I suspect the amqp client to be at fault. Also interesting:
# from lib/amq/client/async/queue.rb
def delete(if_unused = false, if_empty = false, nowait = false, &block)
nowait = true unless block
@connection.send_frame(Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait))
if !nowait
self.append_callback(:delete, &block)
# TODO: delete itself from queues cache
@channel.queues_awaiting_delete_ok.push(self)
end
self
end # delete(channel, queue, if_unused, if_empty, nowait, &block)
If i use queue!
instead of queue
on the second declaration (bypassing the queue cache), the test succeeds.
Greetings,
The current stable version of amq-client still has the stray puts
5 months after it's been fixed in master. (It's been merged into master as part of f9cd800, but not into the release branch.)
Could you please
git cherry-pick f2c8598
in the 0.9.x-stable branch? (f2c8598 is the commit that was merged as part of the f9cd800 pull request.)
Thanks muchly!
-- Morgan
If a channel is does not have publisher confirmation on when the #on_ack
method is called, #on_ack
will attempt to turn confirmations on by calling #use_publisher_confirmations!
. As that method does not exist, it will cause a NoMethodError
exception.
The problem goes back 2 years. 5653b07 renamed #confirmations
to #use_publisher_confirmations!
. It also added the line to #on_ack
to call #use_publisher_confirmations!
if confirmations were not on. The same day you renamed #use_publisher_confirmations!
to #confirm_select
, but failed to change the reference in #on_ack
(a092ae0).
Since then the code got moved around, but its remained otherwise the same. You last commit related to it moved the code into AMQ::Client::Async::Channel.
This bug has been around for 2 years and we never triggered it since we always call #confirm_select
in the channel before using it. But something in the latest commits must have broken how we were using #confirm_select
leaving the channel with confirms off by the time we call #on_ack
, thus triggering this old bug.
I am going to dig further to find out why the channel is reaching #on_ack
with confirms off in the latest code.
5102> ps aux | grep rabbit # no results
5103> bundle exec spec spec # SRS, private project, it's using the AMQP gem (Git HEAD)
INFO -- : [authentication] Credentials are guest/***** # It's trying to authenticate, why?
We need to collect frames per-channel. Otherwise with large message payloads it is possible that frame sequence will be incorrectly deconstructed.
If amq/settings
is loaded before amq/client
then AMQ::Client::Settings.parse_amqp_url
will raise a NoMethodError
exception.
This is caused by improperly referencing Ruby built-in URI module at line 139 which gets overridden by AMQ::URI
defined in the amq-protocol
(which is not compatible with Ruby URI module).
Steps to reproduce:
I. Create an empty project with following Gemfile and run bundle install
:
source 'https://rubygems.org'
gem 'amq-client', '1.0.2'
Produced Gemfile.lock is:
GEM
remote: https://rubygems.org/
specs:
amq-client (1.0.2)
amq-protocol (>= 1.2.0)
eventmachine
amq-protocol (1.8.0)
eventmachine (1.0.3)
PLATFORMS
ruby
DEPENDENCIES
amq-client (= 1.0.2)
II. Run bundle exec irb
and follow these steps:
1.9.3p392 :001 > require 'amq/settings'
=> true
1.9.3p392 :002 > require 'amq/client'
=> true
1.9.3p392 :003 > AMQ::Client::Settings.parse_amqp_url('amqp://localhost:5672')
NoMethodError: undefined method `scheme' for {:scheme=>"amqp", :host=>"localhost", :port=>5672, :ssl=>nil}:Hash
from /Users/tema/.rvm/gems/ruby-1.9.3-p392/gems/amq-client-1.0.2/lib/amq/client/settings.rb:140:in `parse_amqp_url'
from (irb):3
from /Users/tema/.rvm/rubies/ruby-1.9.3-p392/bin/irb:16:in `<main>'
1.9.3p392 :004 >
Let me know if you need any additional info about the issue. Iโm not sure if it makes sense to send a pull request (since the lib is not supported anymore), but the current stable version of the amqp
gem relies on both amq-client
and amq-protocol
, so this might cause issues for others (as it did for us).
Though not a functionality bug it is a bit irritating to see this every time I run my amqp-agent:
<snip>/gems/amq-client-0.8.2/lib/amq/client/extensions/rabbitmq/confirm.rb:9: warning: already initialized constant Extensions
I don't know if it is fixable by checking for an already defined constant since I guess it actually SHOULD overwrite the old value. Then the only fix would be to delete the constant before defining it again.
The current heartbeat implementation is incomplete and doesn't conform to the AMQP 0.9.1 specification.
For example, any octet sent via the wire serves as a heartbeat indicator.
This means heartbeat frames need only be sent if nothing else gets sent.
The code as is assumes heartbeat frames are sent by the broker at the specified heartbeat rate, which is not true.
Will start working on a fix, because we need correct heartbeat implementation.
amq-client does not currently provide TLS support. I did not look at whether cool.io has TLS support but EventMachine definitely does.
I'm not sure if this is a limitation of Eventmachine so feel free to let me know if so, but currently there is no way to send the client certificate to the peer for verification when using an SSL connection.
You end up seeing something like this:
Apr 2 13:51:05 ubuntu stunnel: LOG5[21813:3073223488]: Service amqp-server accepted connection from 192.168.1.10:44798
Apr 2 13:51:05 ubuntu stunnel: LOG3[21813:3073223488]: SSL_accept: 140890C7: error:140890C7:SSL routines:SSL3_GET_CLIENT_CERTIFICATE:peer did not return a certificate
Apr 2 13:51:05 ubuntu stunnel: LOG5[21813:3073223488]: Connection reset: 0 bytes sent to SSL, 0 bytes sent to socket
On a somewhat related note, if you enable :verify_peer: true
in the SSL configuration hash, this also fails as amq-client does not implement the ssl_verify_peer()
method that Eventmachine will call on the EventMachine::Connection
class. This causes the ruby-amqp end to disconnect early right now, I guess because nothing is handling the verification.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.