Code Monkey home page Code Monkey logo

lyra's People

Contributors

acogoluegnes avatar augi avatar basert avatar jendakol avatar jhalterman avatar jstepien avatar michaelklishin avatar platy avatar rickhanlonii avatar stfs avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lyra's Issues

What is the cause of special handling for ShutdownSignalExceptions in callWithRetries?

Hi,

My recovering process was ended by the following stack trace:

java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:225)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:220)
    at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:44)
    at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:220)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:243)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:321)
    ... 11 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)

In this case, the ShutdownSignalException is extracted and since 'recovery' is true, the IOException is re-thrown from callWithRetries. I have changed the code so that it looks like this:

        if (sse != null && (recovery || !recoverable)){
             if(sse.getCause() == null){
                 //DJM, only exit here if there is no cause 
                log.log(Level.SEVERE, "Rethrowing, as not recoverable: "+ recovery +", "+ recoverable, e);
                throw e;
            }
        }

but I wonder what cases I'm messing with by making this change.

Best, Dan.

Consider making authentication exceptions retryable

In the current version of Lyra, instances of PossibleAuthenticationFailureException are, understandably, treated as non-retryable. While this makes sense in most cases, I've got a case where I would like to retry after such exceptions. My question is: would it be possible (and acceptable!) to make this behaviour in Lyra configurable?

The need for retrying auth exceptions arises from a situation where new services are deployed using Puppet scripts that set up the service as well as configure the necessary access credentials in RabbitMQ. Because there's no easy way to guarantee the exact timing of these steps (as they're run on different servers), it may happen that the auth config isn't available at the point in time when the service that needs it starts. Hence retrying would be very useful.

If you think this is reasonable, I'd be happy to have a go at coming up with a PR for this.

publisher looses messages - a few question regarding Lyra

Hi,
I am now testing using Lyra in my project.
Client side looks great. but when I intentionally kill the erl process and load it back up (quickly) I loose some messages at the producer side. .
I don't get any exception when messages are lost.

How do I know that ?
Only after all messages had been produced (and in the middle the broker was killed and reloaded) I load my consumer and I see that in the time of the crush I have lost around 10 message.

How do I config my producer?
I use
Config config = new Config().withRecoveryPolicy(new RecoveryPolicy().withMaxAttempts(20).withInterval(Duration.seconds(1)).withMaxDuration(Duration.minutes(5)));

Questions :

  1. Why are the messages lost and is there a way to avoid it?
  2. what happens after I sent more attempts than my max attempts? can I write the messages somewhere?

Thanks.

Lyra recovers queues that have already been discarded from channel closures

This might be a duplicate of #20 but I'm not certain.

To reproduce:

Make a bunch of channels, then for each channel, queueName = channel.queueDeclare(someQueue, false, true, true, null), then channel.queueBind(queueName, someExchange, someTopic).

Then channel.close() each channel, and then force a recovery to happen.

Observed behavior: Lyra attempts to recover every queue and binding, even though amqp-client long ago discarded those queues because the connections closed (they're marked exclusive and auto-delete, and it can be observed using rabbitmqctl list_queues that they came and went).

Expected behavior: Lyra forgets about queues that are gone because their channels got closed and they were marked for autodeletion.

Unable to invoke withConsumerListeners() on ConfigurableChannel

Setup:
Trying to update ChannelConfig after creating the channel.

 ...
 ConfigurableConnection configurableConnection = Connections.create(options, rootConfig);
 ...
 ConfigurableChannel channel = Config.of(configurableConnection.createChannel());
 ...
 // This fails with the exception as shown below
 channel.withConsumerListeners(new MyConsumerListener());

Exception seen:

 ERROR 10:45:47,446 Invocation of ConsumerConfig.withConsumerListeners() failed.
 java.lang.IllegalArgumentException: object is not an instance of declaring class
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 Exception in thread "main" java.lang.IllegalArgumentException: object is not an instance of declaring class
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at net.jodah.lyra.internal.ChannelHandler$1.call(ChannelHandler.java:60)
    at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11)
    at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:58)
    at net.jodah.lyra.internal.ChannelHandler$1.call(ChannelHandler.java:60)
    at net.jodah.lyra.internal.ChannelHandler.invoke(ChannelHandler.java:49)
    at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:58)
    at $Proxy21.withConsumerListeners(Unknown Source)
    at net.jodah.lyra.internal.ChannelHandler.invoke(ChannelHandler.java:49)
  <Snip>

Retry on NoRouteToHostException

In our environment, it is entirely possible for systems to be trying to connect to our RabbitMQ server, while the server itself is down. Trying a new connection to a system that is powered off results in a NoRouteToHostException. In our case it would certainly make sense for that to be a retryable error.

Publisher confirms with RetryPolicy can return wrong sequence number

In a situation where the RabbitMQ connection drops and a consumer/publisher are using a Lyra managed connection with a RetryPolicy, a situation can occur whereby calling "channel.getNextPublishSeqNo()" will return the sequence number for the old Channel before the Channel is recovered. The code typically then goes onto "channel.basicPublish(...)" which will then block, and eventually publish, but under a sequence number different to that which was previously acquired.

Since in my scenario the issue was being driven by the consumer being recovered first and immediately feeding a publisher that hadn't been recovered, I inverted the Channel declarations so the publisher always recovered first.

However, there is a question here of whether "channel.getNextPublishSeqNo()" should block with a RetryPolicy, as the intent of the programmer is almost certainly to publish a message subsequently and utilise the sequence number.

Recovery for Exchange and Queue declaration

Hi, and thank you so much for Lyra,

I have a question about the queue, exchange and binding recovery. Using the cookbook example at https://github.com/jhalterman/lyra/wiki/Lyra-Cookbook#wiki-publish--subscribe and Lyra 0.3.2 I get:

13:08:21.947 [lyra-recovery-1] INFO  n.j.lyra.internal.ConnectionHandler - Recovered connection cxn-1 to amqp://127.0.0.1:5672/
13:08:21.949 [lyra-recovery-1] INFO  n.jodah.lyra.internal.ChannelHandler - Recovering channel-1 on cxn-1
13:08:21.951 [lyra-recovery-1] INFO  n.jodah.lyra.internal.ChannelHandler - Recovering consumer-amq.ctag-oycCJ-6aw7D8NXKK370Llg via channel-1 on cxn-1
13:08:21.953 [AMQP Connection 127.0.0.1:5672] ERROR n.jodah.lyra.internal.ChannelHandler - Channel channel-1 on cxn-1 was closed unexpectedly
13:08:21.962 [lyra-recovery-1] ERROR n.jodah.lyra.internal.ChannelHandler - Failed to recover consumer-amq.ctag-oycCJ-6aw7D8NXKK370Llg via channel-1 on cxn-1
java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:976) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:943) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:935) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:928) ~[amqp-client-3.2.1.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_08-ea]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_08-ea]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_08-ea]
    at java.lang.reflect.Method.invoke(Method.java:601) ~[na:1.7.0_08-ea]
    at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11) ~[lyra-0.3.2.jar:na]
    at net.jodah.lyra.internal.ChannelHandler.recoverConsumers(ChannelHandler.java:292) [lyra-0.3.2.jar:na]
    at net.jodah.lyra.internal.ChannelHandler.recoverChannel(ChannelHandler.java:225) [lyra-0.3.2.jar:na]
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:249) [lyra-0.3.2.jar:na]
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:34) [lyra-0.3.2.jar:na]
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:81) [lyra-0.3.2.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) [na:1.7.0_08-ea]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) [na:1.7.0_08-ea]
    at java.lang.Thread.run(Thread.java:722) [na:1.7.0_08-ea]
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'foo-queue' in vhost '/', class-id=60, method-id=20), null, ""}
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:974) ~[amqp-client-3.2.1.jar:na]
    ... 16 common frames omitted
com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'foo-queue' in vhost '/', class-id=60, method-id=20), null, ""}
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:474) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.2.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:551) ~[amqp-client-3.2.1.jar:na]
13:08:21.963 [lyra-recovery-2] INFO  n.jodah.lyra.internal.ChannelHandler - Recovering channel-1 on cxn-1

I went through the code and I don't see anywhere where the calls to queueDeclare, etc are captured by the proxy.

What's my mistake?

Very best, Dan.

Connection recovery hangs infinitely

While my experiments with two rabbit nodes behind a load balancer i ran into a situation where both nodes where down for a short period. In this case the connection recovery hangs infinitely although both nodes came up again after a few seconds:

Name: main
State: WAITING on net.jodah.lyra.internal.util.concurrent.ReentrantCircuit$Sync@2224ea85
Total blocked: 4  Total waited: 53

Stack trace: 
 sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
net.jodah.lyra.internal.util.concurrent.ReentrantCircuit.await(ReentrantCircuit.java:63)
net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:76)
net.jodah.lyra.internal.ChannelHandler.invoke(ChannelHandler.java:167)
com.sun.proxy.$Proxy1.basicPublish(Unknown Source)
de.na.messaging.rabbitmq.TestLoadBalancer.impl(TestLoadBalancer.java:124)

Here is my configuration:

RecoveryPolicy recoveryPolicy = new RecoveryPolicy().withBackoff(Duration.seconds(2), Duration.seconds(20))
                .withMaxDuration(Duration.seconds(20));

RetryPolicy retryPolicy = new RetryPolicy().withBackoff(Duration.seconds(3), Duration.seconds(60));

Shouldn't the recovery attempt at least timeout after 20 seconds so a retry will occur or have I misunderstood something?

Channel closures (without a connection closure) can result in failed channel recovery

Channel closures that occur without a connection closure can result in a failed channel recovery since the channel number may have yet to be released by the client. Pending a potential change in the amqp-client to address this, let's switch to using ShutdownListeners for recovering all channel failures.

Some complexity here involves channel recovery failures. If a channel is being recovered (in a recovery thread) and recovery fails with the channel being closed, the subsequent ShutdownListener call results in another recovery thread. Not ideal.

Enhancement: add onUnrecoverable() and onUnretryable() methods to listeners

It would be nice if, in the event that Lyra will not attempt a recovery / retry (e.g., if the exception thrown is not among the recoverable/retryable exceptions), we could get a callback in the Channel, Consumer, and/or Connection listeners so that if there is specific program logic we wish to perform in the event of an unrecoverable failure, we can run it. Otherwise it can be hard to discern from the client library side of things whether a recovery is in progress, or things are never going to recover.

It might also be nice to have an onRecoveryStarted hook so internal checks in the app (eg, for health polling) can take recovery into consideration.

Recovery fails if the machine where rabbitmq is running gets rebooted

Hi,

I have the following policy configured while creating a connection :

Config config = new Config().withRecoveryPolicy(RecoveryPolicies.recoverAlways().withInterval(Duration.seconds(5)));
connection = Connections.create(options, config);

Recovery works when the rabbitmq is stopped and restarted. However when the machine where rabbitmq is running is shutdown and restarted, the recovery fails and never reconnects.

2015-07-14 15:22:10.417 [AMQP Connection 10.2.15.155:5672] [] [] [] [] [] [ShutdownListener:25] [ERROR] connection error; reason: java.net.SocketException: Connection reset
com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:678)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:668)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:168)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:271)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:515)

2015-07-14 15:20:46.853 [AMQP Connection 10.2.15.155:5672] [] [] [] [] [] [ShutdownListener:25] [ERROR] connection error
; reason: com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 580 seconds
com.rabbitmq.client.ShutdownSignalException: connection error; reason: com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heart
beat = 580 seconds
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:678)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:668)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
Caused by: com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 580 seconds
at com.rabbitmq.client.impl.AMQConnection.handleSocketTimeout(AMQConnection.java:578)
at com.rabbitmq.client.impl.AMQConnection.access$500(AMQConnection.java:59)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:541)

Please help

After consumer recovery failure, onConsumerRecovery called

Steps to reproduce:

  1. In one process, declare a topic exchange and post messages to it. Declare this exchange as non-durable.
  2. In another process, listen for any messages posted to this exchange. Configure a ChannelListener to log when onConsumerRecovery is called.
  3. Stop the RabbitMQ server. Start the RabbitMQ server. This causes the connection to close and the non-durable exchange to go away.
  4. In the second process, recovery will be attempted for the consumer, which will fail, but onConsumerRecovery will be called.

Example stack:

ERROR [2014-11-21 12:09:25,521] ChannelHandler - Failed to recover consumer-amq.ctag-AYLaSLLWuEpFd0UkR-f6Qg via channel-1 on cxn-1 java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:836) at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61) at net.jodah.lyra.internal.RetryableResource.recoverQueueBindings(RetryableResource.java:193) at net.jodah.lyra.internal.ChannelHandler.recoverQueue(ChannelHandler.java:482) at net.jodah.lyra.internal.ChannelHandler.recoverConsumers(ChannelHandler.java:417) at net.jodah.lyra.internal.ChannelHandler.recoverChannel(ChannelHandler.java:229) at net.jodah.lyra.internal.ChannelHandler$ChannelShutdownListener$1.run(ChannelHandler.java:90) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'x' in vhost '/', class-id=50, method-id=20) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 10 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'x' in vhost '/', class-id=50, method-id=20) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) ... 1 more WARN [2014-11-21 12:09:25,523] AmqpConnectionConfig - Consumer for channel net.jodah.lyra.config.Config@47da83a4 was recovered

Expected behavior: onRecoveryFailure called with the channel and the IOException.
Actual behavior: onConsumerRecovery called with the channel (which is now defunct because the exchange went away).

Consumers never recover if withRequestedHeartbeat() is used

If we set a request heartbeat > 0 and the connection goes down Lyra will only try to recover for the heartbeat duration.

In the example below handleShutdownSignal will be called 10 seconds after the network goes down, once that have happened Lyra seems to stop trying.

If the network goes up after 7 seconds the consumer recovers without any problem. If we remove the heartbeat the consumer is able to recover after 30 seconds.

Config config = new Config()
        .withConnectionRecoveryPolicy(RecoveryPolicies.recoverAlways())
        .withConsumerRecovery(true)
        .withChannelRetryPolicy(RetryPolicies.retryAlways())
        .withRecoveryPolicy(RecoveryPolicies.recoverAlways());

final ConnectionOptions connectionOptions = new ConnectionOptions()
        .withUsername(username)
        .withPassword(password)
        .withHost(url)
        .withRequestedHeartbeat(Duration.seconds(10))
        .withVirtualHost(virtualHost);

try {
    final ConfigurableConnection connection = Connections.create(connectionOptions, config);
    final Channel channel = connection.createChannel();
    channel.basicConsume("foo", new DefaultConsumer(channel) {
        @Override
        public void handleCancel(final String consumerTag) throws IOException {
            super.handleCancel(consumerTag);
            log.info("handleCancel");
        }

        @Override
        public void handleCancelOk(final String consumerTag) {
            super.handleCancelOk(consumerTag);
            log.info("handleCancelOk");
        }

        @Override
        public void handleConsumeOk(final String consumerTag) {
            super.handleConsumeOk(consumerTag);
            log.info("handleConsumeOk");
        }

        @Override
        public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) throws IOException {
            final String message = new String(body);
            onCloudMessageHandler.onCloudMessage(message);
            log.info("Got a message");
        }

        @Override
        public void handleRecoverOk(final String consumerTag) {
            super.handleRecoverOk(consumerTag);
            log.info("handleRecoverOk");
        }

        @Override
        public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) {
            super.handleShutdownSignal(consumerTag, sig);
            log.info("handleShutdownSignal");
        }
    });
} catch (IOException e) {
    e.printStackTrace();
}

Migrate channel state after recovery is complete

Currently channel state is migrated prior to calling the listeners and migrating consumers. Consider migrating channel state, or at least shutdown listeners, after the channel is fully recovered. This way a user's shutdown listeners aren't called on a channel that is only partially recovered.

ThreadFactory missing in ConnectionOptions.copy()

Hi,

I want to use a thread factory to start connection threads as daemon threads. When creating a new connection with Connections.create(ConnectionOptions options, Config config), the pamater options containes the ConnectionFactory which containes the ThreadFactory.

ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("foo").build();
connectionFactory.setThreadFactory(threadFactory);
ConnectionOptions connectionOptions = new ConnectionOptions(connectionFactory);
Connection connection = Connections.create(connectionOptions, getConnectionConfig());

On creation of the ConnectionHandler a copy of the options is used by calling options.copy().

 ConnectionHandler handler = new ConnectionHandler(options.copy(), new Config(config));

The copy method calls a constructor of ConnectionOptions which creates a complete new ConnectionFactory and copies some of the attributes. Unfortunately the ThreadFactory is not copied.

private ConnectionOptions(ConnectionOptions options) {
    factory = new ConnectionFactory();
    factory.setClientProperties(options.factory.getClientProperties());
    factory.setConnectionTimeout(options.factory.getConnectionTimeout());
    factory.setHost(options.factory.getHost());
    factory.setPort(options.factory.getPort());
    factory.setUsername(options.factory.getUsername());
    factory.setPassword(options.factory.getPassword());
    factory.setVirtualHost(options.factory.getVirtualHost());
    factory.setRequestedChannelMax(options.factory.getRequestedChannelMax());
    factory.setRequestedFrameMax(options.factory.getRequestedFrameMax());
    factory.setRequestedHeartbeat(options.factory.getRequestedHeartbeat());
    factory.setSaslConfig(options.factory.getSaslConfig());
    factory.setSocketFactory(options.factory.getSocketFactory());
    hosts = options.hosts;
    addresses = options.addresses;
    name = options.name;
    executor = options.executor;
  }

At the end the AMQConnection threads does not run as a daemon thread what makes it hard to shutdown my tomcat properly.

Regards
Claus

Handle consumer cancellations separate from channel closures.

Add support for handling consumer cancellations. These can occur separate from channel closures. Recovery could perform a single attempt. Cancellations could occur if, for example, a bound queue were deleted. The recovery could involve re-creating the queue/binding as part of the event listener.

ConnectionOptions uses wrong units for factory.setRequestedHeartbeat

ConnectionOptions#withRequestedHeartbeat calls factory.setRequestedHeartbeat((int) requestedHeartbeat.toMillis()); (line 218 of my copy)

However the units expected by factory.setRequsetedHeartbeat is seconds, not milliseconds. The symptom of this is using a small value (like 15) results in a larger value negotiated from the server (like 580) being used, because the comparison in the AmqpConnection picks the lower of the two of 15000 and 580.

The call should be factory.setRequestedHeartbeat((int) requestedHeartbeat.toSeconds()) instead.

A workaround is to call options.getConnectionFactory().setRequestedHeartbeat(15);

Simulating ConnectionTimeouts

Hi, my simulation of frequently dropping connections never tests the ConnectionTimeout case. I wonder if you can think if any way of achieving this? As I write this I have one ideal I'll test opening the proxy and sleeping the job tomorrow :)

Many thanks, Dan.

When recovery fails, and the connection is closed, Lyra may leave pending publishes waiting forever

If retries and recovery have been exhausted, and no further communication via AMQP is going to be possible, Lyra can be left in a state where there is still a thread running and blocked on a condition, which in turn can prevent the JVM from being able to exit. A thread dump showing an example of this:

"Thread-3" prio=5 tid=0x00007fc3a600b000 nid=0x6403 waiting on condition [0x000000011ac4c000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007d81d2e08> (a net.jodah.lyra.internal.util.concurrent.ReentrantCircuit$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) at net.jodah.lyra.internal.util.concurrent.ReentrantCircuit.await(ReentrantCircuit.java:63) at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:73) at net.jodah.lyra.internal.ChannelHandler.invoke(ChannelHandler.java:168) at com.sun.proxy.$Proxy25.basicPublish(Unknown Source) at x.sendReply(x.java:111) at x.run(x.java:80) at org.springbyexample.bean.scope.thread.ThreadScopeRunnable.run(ThreadScopeRunnable.java:46) at java.lang.Thread.run(Thread.java:745)

Manually sending the thread (which appears to be waiting on a retry that will never occur) an interrupt() will cause it to finally unblock with an InterruptedException, but it might be nice to handle this kind of occurrence gracefully.

To reproduce:

  1. Set up a DefaultConsumer that has a handleDelivery method that spawns a thread to handle the details of the delivery. Configure Lyra to have one retry and one recovery only.
  2. Have that delivery wait for 10 seconds or so before sending a publish event.
  3. After sending a message to be handled by this DefaultConsumer, forcibly stop the RabbitMQ server.
  4. Watch Lyra attempt a recovery, and then give up after a retry.
  5. The thread that started finishes sleeping and tries to do a publish, which then blocks forever.

It may be easier to reproduce than this, but this is the way I've managed to do it.

Connection recovery hangs after EOFException is thrown

Lyra version 0.5.2 (also tested 0.5.3-SNAPSHOT with the same result)
amqp-client 3.5.5
Broker version 3.5.6

We have an automatic test suite of a java rabbit library built on top of lyra and amqp-client.
We use docker and makes the junit test start and stop a real rabbit broker on the same machine that runs the tests.
One of the tests simulates a broker crash by using the docker kill command on the broker image at the same time as we are consuming from a queue on the broker.

We use persistent messages so when the broker starts up again after a few seconds the messages are still in the queue BUT it seems that Lyra does not re-connect to the broker in all cases.

In my colleges machine, which is a Mac then Lyra correctly re-connects when the broker starts up again. But on my linux machine only one re-connection attempt is made then everything freezes.

The difference we can see is that on a Mac we get a Socket 'connection refused' exception but on linux it is an java.io.EOFException that is thrown.

This is a snippet from the logs in the test suite we have:

19:58:08.322 [main] INFO  c.m.d.test.RabbitConnectionTests - Killing the rabbitMQ broker
19:58:08.375 [AMQP Connection 127.0.0.1:5672] ERROR n.jodah.lyra.internal.ChannelHandler - Channel channel-1 on test-app-consume was closed unexpectedly
19:58:08.377 [AMQP Connection 127.0.0.1:5672] ERROR n.jodah.lyra.internal.ChannelHandler - Channel channel-1 on test-app-publish was closed unexpectedly
19:58:08.379 [AMQP Connection 127.0.0.1:5672] ERROR n.j.lyra.internal.ConnectionHandler - Connection test-app-publish was closed unexpectedly
19:58:08.385 [AMQP Connection 127.0.0.1:5672] ERROR n.j.lyra.internal.ConnectionHandler - Connection test-app-consume was closed unexpectedly
19:58:08.385 [lyra-recovery-1] INFO  n.j.lyra.internal.ConnectionHandler - Recovering connection test-app-publish to [localhost:5672]
19:58:08.387 [lyra-recovery-2] INFO  n.j.lyra.internal.ConnectionHandler - Recovering connection test-app-consume to [localhost:5672]
19:58:08.388 [rabbitmq-test-app-consume-consumer] ERROR c.m.d.c.impl.SingleChannelConsumer - The rabbit connection was unexpectedly disconnected. [ localPort=5672, queue="test-queue", consumerTag="test-app-consumer-1" ]
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:723) ~[amqp-client-3.5.5.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:713) ~[amqp-client-3.5.5.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:567) ~[amqp-client-3.5.5.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.8.0_60]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.5.5.jar:na]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.5.5.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536) ~[amqp-client-3.5.5.jar:na]
    ... 1 common frames omitted

Avoid static list of Addresses in ConnectionHandler

Setup:

  • an auto-scaling cluster of rabbitmq nodes

Problem:
The size of the rabbitmq cluster can scale up and down. But once a connection has been setup, the list of addresses cannot be updated without a restart or by discarding the connection and re-establishing the entire connection again.

Proposal:
Provide a dereferencing mechanism to obtain the list of Addresses by invoking a callback instead of maintaining a static list of Addresses.

Doing so solves the following use-cases:

  • For long running (days, months, etc) AMQP clients there would be no restart required even if the entire rabbitmq cluster was re-created on different systems. The deference callback can load and return the latest list of addresses.
  • Balance the load across rabbitmq nodes - the callback can use different mechanisms and find the least loaded server to connect to rather than the ConnectionHandler trying them in sequence.
  • Integrate with discovery/directory services like ldap, zookeeper or netflix-eureka - each time a connection recovery is attempted, the latest state of the rabbitmq cluster can be read and returned

Let me know your thoughts and I am willing to contribute in any way on this.

Reconnection not reattempted if the connection is broken whilst recovery of Channels, etc is in progress?

Hi,

I made a test (not unit, yet) where the connection is broken during recovery.

It resulted in:

Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1 run
SEVERE: Failed to recover connection cxn-1
java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
    at java.net.Socket.connect(Socket.java:579)
    at com.rabbitmq.client.ConnectionFactory.createFrameHandler(ConnectionFactory.java:445)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:504)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:225)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:220)
    at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:44)
    at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:220)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:243)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)

No more connection recovery attempts take place. Can you suggest a fix for this case?

More information:

My application has around 10 Consumers on one Channel. Each one declares an exchange and a queue with 5-10 bindings (for various routing keys) each. It runs in a rather old, embedded environment, where it can take a non-trivial amount of time to remake the connection, channel and all its bindings. In my test, the connection goes down again whilst the exchanges, queues and bindings are being remade. This results in some of those bindings, etc failing with e.g. AlreadyClosedExceptions. Then the reconnection fails with the above stack trace, and no more attempts are reconnection are made.

A fuller log is:

INFO: Recovering connection cxn-1 to [Lcom.rabbitmq.client.Address;@29d9a32a
Jan 29, 2014 4:39:13 PM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovered connection cxn-1 to amqp://192.168.1.103:5672/
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue amq.gen-laBXjopaipQUx8PsvT4VQQ as amq.gen-rtdJ0UhDsQryLhwentPvTQ via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue OsgiShellManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.modifyLlrpDevicePortAddress.useUuidTcpPortIpAddressToModifyDevice.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.deviceInformation.getDeviceInfo.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.removeLlrpDeviceFromAgent.useUuid.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiShellManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.shell.execute.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Discovery Exchange to Discovery Queue with  via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.getBundles.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.stop.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.get.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.installBundle.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.refresh.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.uninstall.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.resolve.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.update.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.rebootAgentOsgiFramework.rebootAgentNow.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.start.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with c2.message.send.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.addC2Device.useIpAddressAndPortToAddDevice.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getDevices.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.setVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getAgent.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.snmp.getSnmpStats.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Shell Commands to amq.gen-rtdJ0UhDsQryLhwentPvTQ with 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler$2 call
INFO: Recovering channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler$2 call
INFO: Recovered channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-et5y9XKtn8Bj6aRxQR9yMw of C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-pWWdE-PT8jNJKfSvUmR34g of amq.gen-rtdJ0UhDsQryLhwentPvTQ via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-ED8tQC4054rH-iXX3W0xyQ of OsgiShellManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-fBAxD8mEawK6qq9PGmhW-w of DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-5bkRcclo5wYvCuh40M7L7g of OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-HuzXlSx7ypV79PUim9qhkg of Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-_2HT7LugX-9F6VAdGUAcWA of DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:18 PM net.jodah.lyra.internal.ChannelHandler$ChannelShutdownListener shutdownCompleted
SEVERE: Channel channel-1 on cxn-1 was closed unexpectedly
Jan 29, 2014 4:39:18 PM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener shutdownCompleted
SEVERE: Connection cxn-1 was closed unexpectedly
Jan 29, 2014 4:39:18 PM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovering connection cxn-1 to [Lcom.rabbitmq.client.Address;@4f541332
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovering connection cxn-1 to [Lcom.rabbitmq.client.Address;@6d6506ae
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovered connection cxn-1 to amqp://192.168.1.103:5672/
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue amq.gen-rtdJ0UhDsQryLhwentPvTQ as amq.gen-Gin0LH8UfSzy8L_jcy4q-Q via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue OsgiShellManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.modifyLlrpDevicePortAddress.useUuidTcpPortIpAddressToModifyDevice.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.deviceInformation.getDeviceInfo.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.removeLlrpDeviceFromAgent.useUuid.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiShellManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.shell.execute.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Discovery Exchange to Discovery Queue with  via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.getBundles.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.stop.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.get.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.installBundle.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.refresh.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.uninstall.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.resolve.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.update.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.rebootAgentOsgiFramework.rebootAgentNow.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.start.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with c2.message.send.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.addC2Device.useIpAddressAndPortToAddDevice.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getDevices.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getDevices.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
    at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
    at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 10 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)

Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener shutdownCompleted
SEVERE: Connection cxn-1 was closed unexpectedly
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovering connection cxn-1 to [Lcom.rabbitmq.client.Address;@6ebe738c
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
    at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
    at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)

Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.setVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.setVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
    at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
    at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)

Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getAgent.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getAgent.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
    at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
    at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)

Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.snmp.getSnmpStats.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Maintenance to Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.snmp.getSnmpStats.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
    at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
    at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)

Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Shell Commands to amq.gen-Gin0LH8UfSzy8L_jcy4q-Q with 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Shell Commands to amq.gen-Gin0LH8UfSzy8L_jcy4q-Q with 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
    at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
    at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)

Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1 run
SEVERE: Failed to recover connection cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
    at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
    at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:561)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:501)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:494)
    at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:295)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)

Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1 run
SEVERE: Failed to recover connection cxn-1
java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
    at java.net.Socket.connect(Socket.java:579)
    at com.rabbitmq.client.ConnectionFactory.createFrameHandler(ConnectionFactory.java:445)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:504)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:225)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:220)
    at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:44)
    at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:220)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:243)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)

Best, Dan.

RabbbitMQ RpcServer and RpcClient recovery fail

17:26:20.422 [lyra-recovery-1] INFO  n.j.lyra.internal.ConnectionHandler - Recovering connection cxn-1 to [localhost:5672]
17:26:20.491 [lyra-recovery-1] INFO  n.j.lyra.internal.ConnectionHandler - Recovered connection cxn-1 to amqp://127.0.0.1:5672/
17:26:20.495 [lyra-recovery-1] INFO  n.j.lyra.internal.ConnectionHandler - Recovered queue amq.gen-VEMRx35Rd79EjSgPWzylfw as amq.gen-AVxW2kGA7LRLbE-nRSPy1g via cxn-1
17:26:20.501 [lyra-recovery-1] INFO  n.jodah.lyra.internal.ChannelHandler - Recovering channel-1 on cxn-1
17:26:20.501 [lyra-recovery-1] INFO  n.jodah.lyra.internal.ChannelHandler - Recovered channel-1 on cxn-1
17:26:20.502 [lyra-recovery-1] INFO  n.jodah.lyra.internal.ChannelHandler - Recovering consumer-amq.ctag-nLsqKrC-yFg1vywXyu31wg of amq.gen-AVxW2kGA7LRLbE-nRSPy1g via channel-1 on cxn-1
java.io.EOFException: RpcClient is closed
    at com.rabbitmq.client.RpcClient.checkConsumer(RpcClient.java:113)
    at com.rabbitmq.client.RpcClient.primitiveCall(RpcClient.java:183)
    at com.rabbitmq.client.RpcClient.primitiveCall(RpcClient.java:218)
    at com.rabbitmq.client.RpcClient.stringCall(RpcClient.java:238)
    at me.chao.test.rabbit.TestRpcClient.main(TestRpcClient.java:27)

Hi jhalterman, I found lyra is not support the RabbbitMQ Rpc, the channel is recovery, but client and server are closed.

RabbitMQ can block forever in exchangeDeclare during recovery

This one is probably a little tricky to reproduce, but I was able to get a stack out with jstack. It appears that what happened was we had called exchangeDeclare immediately before a recovery happened, or possibly during recovery, but RabbitMQ blocked forever waiting for a reply from the server.

Here's the stack we got out with jstack:

Thread 1652: (state = BLOCKED)

  • java.lang.Object.wait(long) @bci=0 (Interpreted frame)
  • java.lang.Object.wait() @bci=2, line=503 (Compiled frame)
  • com.rabbitmq.utility.BlockingCell.get() @bci=8, line=50 (Compiled frame)
  • com.rabbitmq.utility.BlockingCell.uninterruptibleGet() @bci=1, line=89 (Interpreted frame)
  • com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue() @bci=1, line=33 (Interpreted frame)
  • com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply() @bci=4, line=343 (Interpreted frame)
  • com.rabbitmq.client.impl.AMQChannel.privateRpc(com.rabbitmq.client.Method) @bci=15, line=216 (Interpreted frame)
  • com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(com.rabbitmq.client.Method) @bci=2, line=118 (Interpreted frame)
  • com.rabbitmq.client.impl.ChannelN.exchangeDeclare(java.lang.String, java.lang.String, boolean, boolean, boolean, java.util.Map) @bci=38, line=675 (Interpreted frame)
  • com.rabbitmq.client.impl.ChannelN.exchangeDeclare(java.lang.String, java.lang.String, boolean, boolean, java.util.Map) @bci=9, line=662 (Interpreted frame)
  • com.rabbitmq.client.impl.ChannelN.exchangeDeclare(java.lang.String, java.lang.String, boolean) @bci=6, line=692 (Interpreted frame)
  • com.rabbitmq.client.impl.ChannelN.exchangeDeclare(java.lang.String, java.lang.String, boolean) @bci=4, line=61 (Interpreted frame)
  • sun.reflect.GeneratedMethodAccessor71.invoke(java.lang.Object, java.lang.Object[]) @bci=79 (Interpreted frame)
  • sun.reflect.DelegatingMethodAccessorImpl.invoke(java.lang.Object, java.lang.Object[]) @bci=6, line=43 (Interpreted frame)
  • java.lang.reflect.Method.invoke(java.lang.Object, java.lang.Object[]) @bci=57, line=606 (Interpreted frame)
  • net.jodah.lyra.internal.util.Reflection.invoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[]) @bci=3, line=11 (Interpreted frame)
  • net.jodah.lyra.internal.ChannelHandler$1.call() @bci=396, line=131 (Interpreted frame)
  • net.jodah.lyra.internal.RetryableResource.callWithRetries(java.util.concurrent.Callable, net.jodah.lyra.internal.RecurringPolicy, net.jodah.lyra.internal.RecurringStats, java.util.Set, boolean, boolean) @bci=12, line=51 (Interpreted frame)
  • net.jodah.lyra.internal.ChannelHandler.invoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[]) @bci=88, line=167 (Interpreted frame)

In our log file, we got a stream of messages like "ChannelHandler - Channel channel-5 on cxn-1 was closed unexpectedly" about 1.7 seconds after we would have called exchangeDeclare, so it would appear that it was the exchangeDeclare that happened first, before recovery began. We see the first instance of a ShutdownSignalException roughly 1.8 seconds later when a different exchangeDeclare call happened. We do not see an exception on the first exchangeDeclare, however, perhaps because it was already blocking prior to the channels being detected as closing unexpectedly.

It looks like probably what should have happened is the blocking get() should have been interrupted somehow when recovery took place so it could throw an exception and bail.

We're seeing this with RabbitMQ's amqp-client-3.3.1, Lyra 0.5.0, and RabbitMQ 3.3.4.

How to implement automatic fail-over to running node

I really appreciate this library but I wonder if and how it might be possible to implement an automatic fail-over of a producer to a running node given a two nodes rabbitmq cluster where it should be possible to reboot each node without data loss. Any pointers are welcome. Thanks!

shutdownsignal exception when reconnecting

I'm seeing this error right after the rabbit server goes back to life:

DefaultExceptionHandler: Consumer com.rabbitmq.client.QueueingConsumer@271d2d04 (amq.ctag-cYQcdbtkNo_ZIAoqHEU52A) method handleDelivery for channel AMQChannel(amqp://[email protected]:5672/,1) threw an exception for channel AMQChannel(amqp://[email protected]:5672/,1):
com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
    at com.rabbitmq.client.QueueingConsumer.checkShutdown(QueueingConsumer.java:172)
    at com.rabbitmq.client.QueueingConsumer.handleDelivery(QueueingConsumer.java:124)
    at net.jodah.lyra.internal.ConsumerDelegate.handleDelivery(ConsumerDelegate.java:53)

any idea of what might be going on?

config:

config = config.withRecoveryPolicy(new RecoveryPolicy()
                        .withBackoff(Duration.seconds(1), Duration.minutes(10)));

        ConnectionOptions options = new ConnectionOptions()
                .withHost(hostname)
                .withPort(port)
                .withVirtualHost(virtualHost)
                .withUsername(username)
                .withPassword(password)

Receive a (eventually) failed message from the client

Hi,
Let's say I have configured a recovery policy with max attempts, interval and max duration.
And the message failed to deliver eventually.

Can I get the value with a listener/exception or any other method so I can log it or handle it differently?
Thank you.

How do I disable the INFO log messages?

As above.

The log messages from lyra is too verbose. I tried adding "log4j.logger.net.jodah.lyra=OFF" to log4j-server.properties to no avail, any suggestions?

Thanks.

Does Lyra support publisher side recovery ?

Does Lyra support publisher side recovery ?
What about data loss when publisher side connection fails ?

I can't see any reference in the code.
If so, Can someone refer me ?

BTW:
With the new factory.setAutomaticRecoveryEnabled feature in the rabbitmq java client, is there still a reason to use Lyra at the client side?
Thanks.

android.os.NetworkOnMainThreadException

Hi.
I use Lyra in a Service and create Connection on a new thread. But when this line goes to execute

Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {

 try {
         connection = Connections.create(options, config);
            } catch (IOException e) {
               e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
           }
            getConsumerChannel();
        }
    });

    thread.setPriority(android.os.Process.THREAD_PRIORITY_BACKGROUND);
    thread.run();

it shows me this error.
android.os.NetworkOnMainThreadException

What is the reason ?

amqps in ConnectionHandler - createConnection()

In line 235 of ConnectionHandler.java

          final String amqpAddress = String.format("amqp://%s:%s/%s", connection.getAddress()
              .getHostAddress(), connection.getPort(), "/".equals(cxnFactory.getVirtualHost()) ? ""
              : cxnFactory.getVirtualHost());

This will always make connection with amqp protocol. You may change to

          final String amqpAddress = String.format("%s://%s:%s/%s", cxnFactory.isSSL() ? "amqps" : "amqp", connection.getAddress()
              .getHostAddress(), connection.getPort(), "/".equals(cxnFactory.getVirtualHost()) ? ""
              : cxnFactory.getVirtualHost());

to check with cxnFactory.isSSL()

com.rabbitmq.client.QueueingConsumer not recovering

The com.rabbitmq.client.QueueingConsumer is never recovered when using Lyra.
The class uses a blocking queue internally and when it receives a ShutdownSignal it places a POISON message to this queue and there is no way to recover from this state.

As this implementation of Consumer is used in most tutorials, usually like this:

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);

while (true) {
    try {
       QueueingConsumer.Delivery delivery = consumer.nextDelivery();
       String message = new String(delivery.getBody());
       System.out.println(" [x] Received '" + message + "'");
    } catch (Exception e) {
        e.printStackTrace();    
    }
}

It would be great if you could clearly point out in the readme/wiki that the com.rabbitmq.client.QueueingConsumer should not be used in connection with Lyra.

Once the connection is interrupted the QueueingConsumer won't recover even if the connection is re-established (via Lyra) and it will enter an infinite loop of printing the exception.

Thanks!

Either recycle or close consumer threadpools after recovering connections

If a user does not specify a consumer threadpool, we create a named threadpool on their behalf, which is considered by amqp-client to be user-owned, and therefore must be shutdown by us when we're done with it. Therefore we should either close or recycle this thread pool when recovering connections.

Compilation with target Java 5

Hey

You have the source and target versions set to 1.6 in the pom.xml which naturally results in building Java 6 classes. I just tried changing it to 1.5 and the project compiled successfully, and all tests pass.

RabbitMQ Java client is also compiled for Java 5.

Is there a particularly strong reason to require Java 6?
I am still bound to Java 5 in my project and I need to use Lyra as it fits awesomely to it. I'd very much like to consume it from Maven central, rather than having built my own copy.

Would you consider that, please?
Thanks.

Cheers
Mladen

onCreate contains null for connection

Hi,

when the connection listener is called with onCreate the connection param is always null. This seems logical because the callback is performed before the proxy is set.

/Herman

Need a pre recover event on ChannelListener

Setup

  • A rabbitmq cluster with >2 nodes
  • A channel is setup over a connection to one of the rabbitmq nodes
  • An anonymous queue is declared and bound to an exchange
  • The queue is being consumed by multiple consumers

Recovery steps

  • the anonymous queue needs to be re-declared
  • the anonymous queue is bound to the exchange
  • the consumers are re-registered with the channel

How do we achieve the recovery?
If the recovery steps are done in a ChannelListener.onResume, the basicConsume is called on the channel before the anonymous queue is declared and/or bound because the ChannelHandler only notifies the ChannelListener post the consumer recovery.

If the recovery steps are done in a ConsumerListener, the recovery steps will get invoked on each consumer which is incorrect.

Question:
Is there a reason why the ChannelListener only receives a post recovery event and no pre recovery event?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.