Code Monkey home page Code Monkey logo

amq-client's People

Contributors

alebsack avatar eliaslevy avatar iconara avatar markiz avatar michaelklishin avatar pje avatar zobar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

amq-client's Issues

Exception when sending binary data

The following code will run for a while and then cause an exception. I was doing a bit of "stress" testing by sending random binary data as my AMQP payload. Not sure if this is amq-client or RabbitMQ failing. I'm using MRI 1.9.2 on OSX with RabbitMQ 2.4.1 installed via HomeBrew. Because of the 1.9.2 string issues, I even tried forcing a binary encoding on the string to no avail.

#!/usr/bin/env ruby
# encoding: utf-8

require "bundler"
Bundler.setup

$:.unshift(File.expand_path("../../../lib", __FILE__))

require 'amqp'

puts "Running amqp gem #{AMQP::VERSION}"

EM.run do 

  AMQP.connect do |conn|
    @amqp_connection = conn

    AMQP::Channel.new(@amqp_connection) do |channel|
      @amqp_channel  = channel
      @amqp_exchange = @amqp_channel.topic

      EventMachine.add_periodic_timer(0) do
        random_binary_string = Array.new(rand(1000)) { rand(256) }.pack('c*')
        random_binary_string.force_encoding('BINARY')
        p random_binary_string
        @amqp_exchange.publish(random_binary_string, :routing_key => "a.my.pattern")
      end
    end
  end

end

The code runs for a while and publishes a bunch of messages but ultimately ends up with the following failure (offending binary string inspect also included):

"^:\x0E\xB8\x1Fr3?6r\xB4\xAD\x9E\\+Q&\xDE\xAE\xB2\x87\xD7\n\xE8$u\xDA\xF1\xE8gD\xA5\x11\xF7e\xA30\xE1\xDB\f\x85l\t\ev6\x15B\x02\xCE\x81|B\xA7#\x92\xE5\x05\x97\x06R\x15?L\x11\xA5Lf\x9D/f\xDF2[!\xD4\xBF\xB27\xC5\x86\x00\x99=\xDC\x88\xEA\x8B\xEA\xC2\xE9\xADE\xF27F\xED\xE8\x80\x0E^\xF2H0&\xB2\xC4\xBE\xC0\xB4^\xC1(\x8A]\x86-{3y\xB8Rz\xDF\xC8\xD83.\xE7\x8B\xB1FI$\x1A\e\xEC\x1A\x10\x8A3\xB4\xAEk\xC8Tl\f\x0E\xBD_\aU\xFA\xBBO\x88\x17\xB6\xEC\xC1Z\xEF\xE6$~r?\x8E\x17\x00\xF1\xCE\x11\x8E\x1E\x9E\xC5\xB1$\xBA\xB9\xB7\xA7\xC7\x83r\xCB_\x858\e\xC6\xAC\x14\x14g8\xB9\bi\xF3u\x02\x8D\xD8+K\xBB\x1E\x15\xA0Q\xBF\xB7?\x84|\x13\xFE%h\xDE)\xEB\x97\x89\x7F\x92C\xA2\x0F\xFE0EeLlgR\x0F'E\x00|\xA6>\xC5\x8E`{t\x8Be\xBD^i\x86:\xD3A\x840\x9A\x93i\x8CE\x82\xD9\xDD\xE1`\xF3\xBA\xDC31\x98^9\x83\xFD\xCE@\xD0\xD4\xFFIF\xBE\xD6\x82E\xB5\tRdg={\x99t\x9B\xFCG\xD4\xE2\xAF\xF3`&!\xA5 \xC1v\xC2\x1C\xF7Q,r\xF6\xC7\x1D\xE26\xE8p\xDF*j\x12P,\xA3\xCF\\g\xE0dQ&\xA1\xBFz=I\xEAk\an\xA1\xD7}\xF9\xA1\x8E\xA0+\xFE\x8F \x9A\x19\xE0\xB5X~\xEArG\xB9\xBE#\xDF\xFB\x91\r`\x1C\xA6\v>\xA3m\xE6\xD9\x85\x10\x9Fy\xA4\x19+Wr2\v\xF2\xCBY\xBB1\x8A\x16\xE5\xE9\x90\xE6ru#\xEE\xFA\xA1\xC1\xE9\b,\x02\xF4\xEC\x15\xAE\xEA\x98Z\x00\xA1\xE9\xEAgI\x10\xF9q6\f\eW7\xBC\x85\x02\x9A=\xBCt\x8A\xF3\xE3\x96\x11\x84\x98\xD2b\x12\xA4\x7F\xDB\xDE`n\xE9\x0E\xF2\xFE\xFA\x8BM\xD3\xD8\xDA4A\xB1\xF1\vD\xD6'^\xBF\x96J\xE31\x9D-\x06\x18\x8Ao\xFB\x84\x9F\x83c\x94\bj\xF9\\C\x80\b\x8E\x04Q\x17z\x1D>\xB7\xBC\x05\xD1\x1C\x97NE\x7Fg\xC2\x1E\x92N\xDC\x89w\x8E=<lJ\xCF\x19\xB7\xDB\xEE?m1\xFFB\x93\xE3\xB4\x80\xAE\xF4\xE8uA\xD5/\xF6\xC8[\xEF\xF2\x98-\xA7P9\xBC\xA8\xED\xF2I\xCAa*#6\xCEt\xC2^9m{\x05\xC1\x03\x9F\xD4\xC1\x82\x1E\b\x895\x11)\xA4\fG\\\x98n\x8F/\xA4\\.\xEAv\x19F\x87)I\xC4\xC1\xF5$\xED\xE6\x12\x1C>\x81\x94Y7-\xD6Y\xFA\x02\xC7\xFC.S\xF9&/\xEB\xB4\x80=P\n\xE3M\xF3*\xF9\x807U\x90\x88w9\xE0c;\eTH\xCF\xFDUV9\a\xC7\x80\xBA\x84\x10;\xF6%m\xA1\xD7\xE7\xBE\\\x00\x19\x9Ft\xBD\xEDJ\xB70\xC7\x9FK\a\xF2\xF3}U\x1E\x1F\xF7\r\xC4\x02\x94\xD7\x1A?\\c\x17C\x8FDWg\t8\xAB0%\x0E\xE8\f-\xDEW\x02Uipu\xF1*\x80\xF3H\x84sH-\xA7\x14\xE7i-\x8E\xD8B\\\x83o`\xA3\xB7$\x10&"
/Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/entity.rb:68:in `error': undefined method `callbacks' for #<Hash:0x00000102870430> (NoMethodError)
    from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapter.rb:412:in `handle_close'
    from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapters/event_machine.rb:333:in `block in <class:EventMachineClient>'
    from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapter.rb:340:in `call'
    from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapter.rb:340:in `receive_frameset'
    from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapter.rb:319:in `receive_frame'
    from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/amq-client-0.7.0.alpha27/lib/amq/client/adapters/event_machine.rb:287:in `receive_data'
    from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/eventmachine-1.0.0.beta.3/lib/eventmachine.rb:199:in `run_machine'
    from /Users/mlartz/.rvm/gems/ruby-1.9.2-p180@rflow-component-devel/gems/eventmachine-1.0.0.beta.3/lib/eventmachine.rb:199:in `run'

ConnectionClosedError exception error

I think this should be an unless instead of an if

https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/exceptions.rb#L62

```usr/lib64/ruby/gems/1.8/gems/amq-client-0.9.2/lib/amq/client/exceptions.rb:65:ininitialize': undefined method method_class' for AMQ::Protocol::HeartbeatFrame:Class (NoMethodError)
from /usr/lib64/ruby/gems/1.8/gems/amq-client-0.9.2/lib/amq/client/async/adapter.rb:247:in`new'
from /usr/lib64/ruby/gems/1.8/gems/amq-client-0.9.2/lib/amq/client/async/adapter.rb:247:in `send_frame'
from /usr/lib64/ruby/gems/1.8/gems/amq-client-0.9.2/lib/amq/client/async/adapter.rb:563:in`send_heartbeat'
from /usr/lib/ruby/site_ruby/1.8/rubygems/custom_require.rb:36:in `to_proc'
from /usr/lib64/ruby/gems/1.8/gems/eventmachine-0.12.10/lib/em/timers.rb:51:in`call'
from /usr/lib64/ruby/gems/1.8/gems/eventmachine-0.12.10/lib/em/timers.rb:51:in `fire'
from /usr/lib64/ruby/gems/1.8/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in`call'
from /usr/lib64/ruby/gems/1.8/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `run_machine'
from /usr/lib64/ruby/gems/1.8/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in`run

amqp-client's heartbeat does not work, the client doesn't send heartbeat frame to the server

Amqp-client's heartbeat does not work, the rabbitmq server can send heartbeat frame to the client, but client can't send heartbeat frame to the server. So after approximately 3*heartbeat_interval, the client's connection is disconnected by the server.
And i find that the client send heartbeat frame when connection initialize and connection complete, but at that time the instance variable heartbeat_interval is null, so the client does not send heartbeat frame to the server.
The client only send heartbeat frame when instance variable heartbeat_interval is not null, and in method handle_tune heartbeat_interval is set. So why not just send heartbeat frame in method handle_tune. And do not send heartbeat frame when connection initialize and connection complete. So I do as follows,
def handle_tune(connection_tune)
@channel_max = connection_tune.channel_max.freeze
@frame_max = connection_tune.frame_max.freeze
client_heartbeat = @settings[:heartbeat] || @settings[:heartbeat_interval] || 0
@heartbeat_interval = negotiate_heartbeat_value(client_heartbeat, connection_tune.heartbeat)
#send heartbeat frame when heartbeats_enabled
self.initialize_heartbeat_sender if self.heartbeats_enabled?
self.send_frame(Protocol::Connection::TuneOk.encode(@channel_max, [settings[:frame_max], @frame_max].min, @heartbeat_interval))
end

And this just make the heartbeat work!

queue cache not maintained properly, channel crashes

Once a queue is deleted, subsequent queue declarations by the same name are not executed. How to reproduce, using the amqp gem:

AMQP.channel.queue('foo', :durable => false) do |queue, declare_ok|                   
  puts "first declare succeeded"                                                      
  puts declare_ok.inspect                                                             
  queue.delete do                                                                     
    puts "first delete succeeded"                                                     
    AMQP.channel.queue('foo', :durable => false) do |queue, declare_ok|               
      puts "second declare succeeded"                                                 
      puts declare_ok.inspect                                                         
      queue.delete do                                                                 
        puts "second delete succeeded"                                                
      end                                                                             
    end                                                                               
  end                                                                                 
end          

output, using a channel level exception handler:

first declare succeeded
0
first delete succeeded
second declare succeeded
nil
Handling a channel-level exception.

AMQP class id : 50
AMQP method id: 40
Status code   : 404
Error message : NOT_FOUND - no queue 'foo' in vhost '/'

Looking at a tcpdump trace of the communication, I can see that a second declare/declare-ok never happens. I suspect the amqp client to be at fault. Also interesting:

# from lib/amq/client/async/queue.rb
        def delete(if_unused = false, if_empty = false, nowait = false, &block)
          nowait = true unless block
          @connection.send_frame(Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait))

          if !nowait
            self.append_callback(:delete, &block)

            # TODO: delete itself from queues cache
            @channel.queues_awaiting_delete_ok.push(self)
          end

          self
        end # delete(channel, queue, if_unused, if_empty, nowait, &block)

If i use queue! instead of queue on the second declaration (bypassing the queue cache), the test succeeds.

Never merged f9cd800 into 0.9.x-stable?

Greetings,
The current stable version of amq-client still has the stray puts 5 months after it's been fixed in master. (It's been merged into master as part of f9cd800, but not into the release branch.)

Could you please

git cherry-pick f2c8598

in the 0.9.x-stable branch? (f2c8598 is the commit that was merged as part of the f9cd800 pull request.)

Thanks muchly!

-- Morgan

AMQ::Client::Async::Channel#on_ack calls non-existent #use_publisher_confirmations!

If a channel is does not have publisher confirmation on when the #on_ack method is called, #on_ack will attempt to turn confirmations on by calling #use_publisher_confirmations!. As that method does not exist, it will cause a NoMethodError exception.

The problem goes back 2 years. 5653b07 renamed #confirmations to #use_publisher_confirmations!. It also added the line to #on_ack to call #use_publisher_confirmations! if confirmations were not on. The same day you renamed #use_publisher_confirmations! to #confirm_select, but failed to change the reference in #on_ack (a092ae0).

Since then the code got moved around, but its remained otherwise the same. You last commit related to it moved the code into AMQ::Client::Async::Channel.

This bug has been around for 2 years and we never triggered it since we always call #confirm_select in the channel before using it. But something in the latest commits must have broken how we were using #confirm_select leaving the channel with confirms off by the time we call #on_ack, thus triggering this old bug.

I am going to dig further to find out why the channel is reaching #on_ack with confirms off in the latest code.

Collect frames per-channel

We need to collect frames per-channel. Otherwise with large message payloads it is possible that frame sequence will be incorrectly deconstructed.

There is a conflict between amq-client and amq-protocol settings

If amq/settings is loaded before amq/client then AMQ::Client::Settings.parse_amqp_url will raise a NoMethodError exception.

This is caused by improperly referencing Ruby built-in URI module at line 139 which gets overridden by AMQ::URI defined in the amq-protocol (which is not compatible with Ruby URI module).

Steps to reproduce:

I. Create an empty project with following Gemfile and run bundle install:

source 'https://rubygems.org'

gem 'amq-client', '1.0.2'

Produced Gemfile.lock is:

GEM
  remote: https://rubygems.org/
  specs:
    amq-client (1.0.2)
      amq-protocol (>= 1.2.0)
      eventmachine
    amq-protocol (1.8.0)
    eventmachine (1.0.3)

PLATFORMS
  ruby

DEPENDENCIES
  amq-client (= 1.0.2)

II. Run bundle exec irb and follow these steps:

1.9.3p392 :001 > require 'amq/settings'
 => true 
1.9.3p392 :002 > require 'amq/client'
 => true 
1.9.3p392 :003 > AMQ::Client::Settings.parse_amqp_url('amqp://localhost:5672')
NoMethodError: undefined method `scheme' for {:scheme=>"amqp", :host=>"localhost", :port=>5672, :ssl=>nil}:Hash
    from /Users/tema/.rvm/gems/ruby-1.9.3-p392/gems/amq-client-1.0.2/lib/amq/client/settings.rb:140:in `parse_amqp_url'
    from (irb):3
    from /Users/tema/.rvm/rubies/ruby-1.9.3-p392/bin/irb:16:in `<main>'
1.9.3p392 :004 > 

Let me know if you need any additional info about the issue. Iโ€™m not sure if it makes sense to send a pull request (since the lib is not supported anymore), but the current stable version of the amqp gem relies on both amq-client and amq-protocol, so this might cause issues for others (as it did for us).

Warning about already initialized constant Extensions

Though not a functionality bug it is a bit irritating to see this every time I run my amqp-agent:

<snip>/gems/amq-client-0.8.2/lib/amq/client/extensions/rabbitmq/confirm.rb:9: warning: already initialized constant Extensions

I don't know if it is fixable by checking for an already defined constant since I guess it actually SHOULD overwrite the old value. Then the only fix would be to delete the constant before defining it again.

issues with heartbeat implementation

The current heartbeat implementation is incomplete and doesn't conform to the AMQP 0.9.1 specification.

For example, any octet sent via the wire serves as a heartbeat indicator.

This means heartbeat frames need only be sent if nothing else gets sent.

The code as is assumes heartbeat frames are sent by the broker at the specified heartbeat rate, which is not true.

Will start working on a fix, because we need correct heartbeat implementation.

TLS support

amq-client does not currently provide TLS support. I did not look at whether cool.io has TLS support but EventMachine definitely does.

Support sending client certificates for remote verification over SSL/TLS

I'm not sure if this is a limitation of Eventmachine so feel free to let me know if so, but currently there is no way to send the client certificate to the peer for verification when using an SSL connection.

You end up seeing something like this:

Apr  2 13:51:05 ubuntu stunnel: LOG5[21813:3073223488]: Service amqp-server accepted connection from 192.168.1.10:44798
Apr  2 13:51:05 ubuntu stunnel: LOG3[21813:3073223488]: SSL_accept: 140890C7: error:140890C7:SSL routines:SSL3_GET_CLIENT_CERTIFICATE:peer did not return a certificate
Apr  2 13:51:05 ubuntu stunnel: LOG5[21813:3073223488]: Connection reset: 0 bytes sent to SSL, 0 bytes sent to socket

On a somewhat related note, if you enable :verify_peer: true in the SSL configuration hash, this also fails as amq-client does not implement the ssl_verify_peer() method that Eventmachine will call on the EventMachine::Connection class. This causes the ruby-amqp end to disconnect early right now, I guess because nothing is handling the verification.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.