jhalterman / lyra Goto Github PK
View Code? Open in Web Editor NEWHigh availability RabbitMQ client
License: Apache License 2.0
High availability RabbitMQ client
License: Apache License 2.0
Hi,
My recovering process was ended by the following stack trace:
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:225)
at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:220)
at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:44)
at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:220)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:243)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:321)
... 11 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)
In this case, the ShutdownSignalException is extracted and since 'recovery' is true, the IOException is re-thrown from callWithRetries. I have changed the code so that it looks like this:
if (sse != null && (recovery || !recoverable)){
if(sse.getCause() == null){
//DJM, only exit here if there is no cause
log.log(Level.SEVERE, "Rethrowing, as not recoverable: "+ recovery +", "+ recoverable, e);
throw e;
}
}
but I wonder what cases I'm messing with by making this change.
Best, Dan.
Exchanges/queues/bindings should be scoped by connection.
When a channel is killed by a call in a ChannelListener or ConsumerListener (such as the result of a 404 error), subsequent listeners will be invoked on the channel. This should be handled gracefully.
We can handle this by checking whether a recovery is pending.
In the current version of Lyra, instances of PossibleAuthenticationFailureException
are, understandably, treated as non-retryable. While this makes sense in most cases, I've got a case where I would like to retry after such exceptions. My question is: would it be possible (and acceptable!) to make this behaviour in Lyra configurable?
The need for retrying auth exceptions arises from a situation where new services are deployed using Puppet scripts that set up the service as well as configure the necessary access credentials in RabbitMQ. Because there's no easy way to guarantee the exact timing of these steps (as they're run on different servers), it may happen that the auth config isn't available at the point in time when the service that needs it starts. Hence retrying would be very useful.
If you think this is reasonable, I'd be happy to have a go at coming up with a PR for this.
Hi,
I am now testing using Lyra in my project.
Client side looks great. but when I intentionally kill the erl process and load it back up (quickly) I loose some messages at the producer side. .
I don't get any exception when messages are lost.
How do I know that ?
Only after all messages had been produced (and in the middle the broker was killed and reloaded) I load my consumer and I see that in the time of the crush I have lost around 10 message.
How do I config my producer?
I use
Config config = new Config().withRecoveryPolicy(new RecoveryPolicy().withMaxAttempts(20).withInterval(Duration.seconds(1)).withMaxDuration(Duration.minutes(5)));
Questions :
Thanks.
This might be a duplicate of #20 but I'm not certain.
To reproduce:
Make a bunch of channels, then for each channel, queueName = channel.queueDeclare(someQueue, false, true, true, null), then channel.queueBind(queueName, someExchange, someTopic).
Then channel.close() each channel, and then force a recovery to happen.
Observed behavior: Lyra attempts to recover every queue and binding, even though amqp-client long ago discarded those queues because the connections closed (they're marked exclusive and auto-delete, and it can be observed using rabbitmqctl list_queues that they came and went).
Expected behavior: Lyra forgets about queues that are gone because their channels got closed and they were marked for autodeletion.
Setup:
Trying to update ChannelConfig after creating the channel.
...
ConfigurableConnection configurableConnection = Connections.create(options, rootConfig);
...
ConfigurableChannel channel = Config.of(configurableConnection.createChannel());
...
// This fails with the exception as shown below
channel.withConsumerListeners(new MyConsumerListener());
Exception seen:
ERROR 10:45:47,446 Invocation of ConsumerConfig.withConsumerListeners() failed.
java.lang.IllegalArgumentException: object is not an instance of declaring class
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Exception in thread "main" java.lang.IllegalArgumentException: object is not an instance of declaring class
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at java.lang.reflect.Method.invoke(Method.java:601)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11)
at java.lang.reflect.Method.invoke(Method.java:601)
at net.jodah.lyra.internal.ChannelHandler$1.call(ChannelHandler.java:60)
at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11)
at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:58)
at net.jodah.lyra.internal.ChannelHandler$1.call(ChannelHandler.java:60)
at net.jodah.lyra.internal.ChannelHandler.invoke(ChannelHandler.java:49)
at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:58)
at $Proxy21.withConsumerListeners(Unknown Source)
at net.jodah.lyra.internal.ChannelHandler.invoke(ChannelHandler.java:49)
<Snip>
In our environment, it is entirely possible for systems to be trying to connect to our RabbitMQ server, while the server itself is down. Trying a new connection to a system that is powered off results in a NoRouteToHostException. In our case it would certainly make sense for that to be a retryable error.
In a situation where the RabbitMQ connection drops and a consumer/publisher are using a Lyra managed connection with a RetryPolicy, a situation can occur whereby calling "channel.getNextPublishSeqNo()" will return the sequence number for the old Channel before the Channel is recovered. The code typically then goes onto "channel.basicPublish(...)" which will then block, and eventually publish, but under a sequence number different to that which was previously acquired.
Since in my scenario the issue was being driven by the consumer being recovered first and immediately feeding a publisher that hadn't been recovered, I inverted the Channel declarations so the publisher always recovered first.
However, there is a question here of whether "channel.getNextPublishSeqNo()" should block with a RetryPolicy, as the intent of the programmer is almost certainly to publish a message subsequently and utilise the sequence number.
Hi, and thank you so much for Lyra,
I have a question about the queue, exchange and binding recovery. Using the cookbook example at https://github.com/jhalterman/lyra/wiki/Lyra-Cookbook#wiki-publish--subscribe and Lyra 0.3.2 I get:
13:08:21.947 [lyra-recovery-1] INFO n.j.lyra.internal.ConnectionHandler - Recovered connection cxn-1 to amqp://127.0.0.1:5672/
13:08:21.949 [lyra-recovery-1] INFO n.jodah.lyra.internal.ChannelHandler - Recovering channel-1 on cxn-1
13:08:21.951 [lyra-recovery-1] INFO n.jodah.lyra.internal.ChannelHandler - Recovering consumer-amq.ctag-oycCJ-6aw7D8NXKK370Llg via channel-1 on cxn-1
13:08:21.953 [AMQP Connection 127.0.0.1:5672] ERROR n.jodah.lyra.internal.ChannelHandler - Channel channel-1 on cxn-1 was closed unexpectedly
13:08:21.962 [lyra-recovery-1] ERROR n.jodah.lyra.internal.ChannelHandler - Failed to recover consumer-amq.ctag-oycCJ-6aw7D8NXKK370Llg via channel-1 on cxn-1
java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:976) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:943) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:935) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:928) ~[amqp-client-3.2.1.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_08-ea]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_08-ea]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_08-ea]
at java.lang.reflect.Method.invoke(Method.java:601) ~[na:1.7.0_08-ea]
at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11) ~[lyra-0.3.2.jar:na]
at net.jodah.lyra.internal.ChannelHandler.recoverConsumers(ChannelHandler.java:292) [lyra-0.3.2.jar:na]
at net.jodah.lyra.internal.ChannelHandler.recoverChannel(ChannelHandler.java:225) [lyra-0.3.2.jar:na]
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:249) [lyra-0.3.2.jar:na]
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:34) [lyra-0.3.2.jar:na]
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:81) [lyra-0.3.2.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) [na:1.7.0_08-ea]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) [na:1.7.0_08-ea]
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_08-ea]
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'foo-queue' in vhost '/', class-id=60, method-id=20), null, ""}
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:974) ~[amqp-client-3.2.1.jar:na]
... 16 common frames omitted
com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'foo-queue' in vhost '/', class-id=60, method-id=20), null, ""}
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:474) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.2.1.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:551) ~[amqp-client-3.2.1.jar:na]
13:08:21.963 [lyra-recovery-2] INFO n.jodah.lyra.internal.ChannelHandler - Recovering channel-1 on cxn-1
I went through the code and I don't see anywhere where the calls to queueDeclare, etc are captured by the proxy.
What's my mistake?
Very best, Dan.
While my experiments with two rabbit nodes behind a load balancer i ran into a situation where both nodes where down for a short period. In this case the connection recovery hangs infinitely although both nodes came up again after a few seconds:
Name: main
State: WAITING on net.jodah.lyra.internal.util.concurrent.ReentrantCircuit$Sync@2224ea85
Total blocked: 4 Total waited: 53
Stack trace:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
net.jodah.lyra.internal.util.concurrent.ReentrantCircuit.await(ReentrantCircuit.java:63)
net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:76)
net.jodah.lyra.internal.ChannelHandler.invoke(ChannelHandler.java:167)
com.sun.proxy.$Proxy1.basicPublish(Unknown Source)
de.na.messaging.rabbitmq.TestLoadBalancer.impl(TestLoadBalancer.java:124)
Here is my configuration:
RecoveryPolicy recoveryPolicy = new RecoveryPolicy().withBackoff(Duration.seconds(2), Duration.seconds(20))
.withMaxDuration(Duration.seconds(20));
RetryPolicy retryPolicy = new RetryPolicy().withBackoff(Duration.seconds(3), Duration.seconds(60));
Shouldn't the recovery attempt at least timeout after 20 seconds so a retry will occur or have I misunderstood something?
Channel closures that occur without a connection closure can result in a failed channel recovery since the channel number may have yet to be released by the client. Pending a potential change in the amqp-client to address this, let's switch to using ShutdownListeners for recovering all channel failures.
Some complexity here involves channel recovery failures. If a channel is being recovered (in a recovery thread) and recovery fails with the channel being closed, the subsequent ShutdownListener call results in another recovery thread. Not ideal.
It would be nice if, in the event that Lyra will not attempt a recovery / retry (e.g., if the exception thrown is not among the recoverable/retryable exceptions), we could get a callback in the Channel, Consumer, and/or Connection listeners so that if there is specific program logic we wish to perform in the event of an unrecoverable failure, we can run it. Otherwise it can be hard to discern from the client library side of things whether a recovery is in progress, or things are never going to recover.
It might also be nice to have an onRecoveryStarted hook so internal checks in the app (eg, for health polling) can take recovery into consideration.
Hi,
I have the following policy configured while creating a connection :
Config config = new Config().withRecoveryPolicy(RecoveryPolicies.recoverAlways().withInterval(Duration.seconds(5)));
connection = Connections.create(options, config);
Recovery works when the rabbitmq is stopped and restarted. However when the machine where rabbitmq is running is shutdown and restarted, the recovery fails and never reconnects.
2015-07-14 15:22:10.417 [AMQP Connection 10.2.15.155:5672] [] [] [] [] [] [ShutdownListener:25] [ERROR] connection error; reason: java.net.SocketException: Connection reset
com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:678)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:668)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:168)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:271)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:515)
2015-07-14 15:20:46.853 [AMQP Connection 10.2.15.155:5672] [] [] [] [] [] [ShutdownListener:25] [ERROR] connection error
; reason: com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 580 seconds
com.rabbitmq.client.ShutdownSignalException: connection error; reason: com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heart
beat = 580 seconds
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:678)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:668)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
Caused by: com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 580 seconds
at com.rabbitmq.client.impl.AMQConnection.handleSocketTimeout(AMQConnection.java:578)
at com.rabbitmq.client.impl.AMQConnection.access$500(AMQConnection.java:59)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:541)
Please help
Steps to reproduce:
Example stack:
ERROR [2014-11-21 12:09:25,521] ChannelHandler - Failed to recover consumer-amq.ctag-AYLaSLLWuEpFd0UkR-f6Qg via channel-1 on cxn-1 java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:836) at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61) at net.jodah.lyra.internal.RetryableResource.recoverQueueBindings(RetryableResource.java:193) at net.jodah.lyra.internal.ChannelHandler.recoverQueue(ChannelHandler.java:482) at net.jodah.lyra.internal.ChannelHandler.recoverConsumers(ChannelHandler.java:417) at net.jodah.lyra.internal.ChannelHandler.recoverChannel(ChannelHandler.java:229) at net.jodah.lyra.internal.ChannelHandler$ChannelShutdownListener$1.run(ChannelHandler.java:90) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'x' in vhost '/', class-id=50, method-id=20) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 10 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'x' in vhost '/', class-id=50, method-id=20) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) ... 1 more WARN [2014-11-21 12:09:25,523] AmqpConnectionConfig - Consumer for channel net.jodah.lyra.config.Config@47da83a4 was recovered
Expected behavior: onRecoveryFailure called with the channel and the IOException.
Actual behavior: onConsumerRecovery called with the channel (which is now defunct because the exchange went away).
If we set a request heartbeat > 0 and the connection goes down Lyra will only try to recover for the heartbeat duration.
In the example below handleShutdownSignal will be called 10 seconds after the network goes down, once that have happened Lyra seems to stop trying.
If the network goes up after 7 seconds the consumer recovers without any problem. If we remove the heartbeat the consumer is able to recover after 30 seconds.
Config config = new Config()
.withConnectionRecoveryPolicy(RecoveryPolicies.recoverAlways())
.withConsumerRecovery(true)
.withChannelRetryPolicy(RetryPolicies.retryAlways())
.withRecoveryPolicy(RecoveryPolicies.recoverAlways());
final ConnectionOptions connectionOptions = new ConnectionOptions()
.withUsername(username)
.withPassword(password)
.withHost(url)
.withRequestedHeartbeat(Duration.seconds(10))
.withVirtualHost(virtualHost);
try {
final ConfigurableConnection connection = Connections.create(connectionOptions, config);
final Channel channel = connection.createChannel();
channel.basicConsume("foo", new DefaultConsumer(channel) {
@Override
public void handleCancel(final String consumerTag) throws IOException {
super.handleCancel(consumerTag);
log.info("handleCancel");
}
@Override
public void handleCancelOk(final String consumerTag) {
super.handleCancelOk(consumerTag);
log.info("handleCancelOk");
}
@Override
public void handleConsumeOk(final String consumerTag) {
super.handleConsumeOk(consumerTag);
log.info("handleConsumeOk");
}
@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) throws IOException {
final String message = new String(body);
onCloudMessageHandler.onCloudMessage(message);
log.info("Got a message");
}
@Override
public void handleRecoverOk(final String consumerTag) {
super.handleRecoverOk(consumerTag);
log.info("handleRecoverOk");
}
@Override
public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) {
super.handleShutdownSignal(consumerTag, sig);
log.info("handleShutdownSignal");
}
});
} catch (IOException e) {
e.printStackTrace();
}
Currently channel state is migrated prior to calling the listeners and migrating consumers. Consider migrating channel state, or at least shutdown listeners, after the channel is fully recovered. This way a user's shutdown listeners aren't called on a channel that is only partially recovered.
Hi,
I want to use a thread factory to start connection threads as daemon threads. When creating a new connection with Connections.create(ConnectionOptions options, Config config), the pamater options containes the ConnectionFactory which containes the ThreadFactory.
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("foo").build();
connectionFactory.setThreadFactory(threadFactory);
ConnectionOptions connectionOptions = new ConnectionOptions(connectionFactory);
Connection connection = Connections.create(connectionOptions, getConnectionConfig());
On creation of the ConnectionHandler a copy of the options is used by calling options.copy().
ConnectionHandler handler = new ConnectionHandler(options.copy(), new Config(config));
The copy method calls a constructor of ConnectionOptions which creates a complete new ConnectionFactory and copies some of the attributes. Unfortunately the ThreadFactory is not copied.
private ConnectionOptions(ConnectionOptions options) {
factory = new ConnectionFactory();
factory.setClientProperties(options.factory.getClientProperties());
factory.setConnectionTimeout(options.factory.getConnectionTimeout());
factory.setHost(options.factory.getHost());
factory.setPort(options.factory.getPort());
factory.setUsername(options.factory.getUsername());
factory.setPassword(options.factory.getPassword());
factory.setVirtualHost(options.factory.getVirtualHost());
factory.setRequestedChannelMax(options.factory.getRequestedChannelMax());
factory.setRequestedFrameMax(options.factory.getRequestedFrameMax());
factory.setRequestedHeartbeat(options.factory.getRequestedHeartbeat());
factory.setSaslConfig(options.factory.getSaslConfig());
factory.setSocketFactory(options.factory.getSocketFactory());
hosts = options.hosts;
addresses = options.addresses;
name = options.name;
executor = options.executor;
}
At the end the AMQConnection threads does not run as a daemon thread what makes it hard to shutdown my tomcat properly.
Regards
Claus
Add support for handling consumer cancellations. These can occur separate from channel closures. Recovery could perform a single attempt. Cancellations could occur if, for example, a bound queue were deleted. The recovery could involve re-creating the queue/binding as part of the event listener.
Lyra throws AlreadyClosedException
calling isOpen
on a closed channel
ConnectionOptions#withRequestedHeartbeat calls factory.setRequestedHeartbeat((int) requestedHeartbeat.toMillis()); (line 218 of my copy)
However the units expected by factory.setRequsetedHeartbeat is seconds, not milliseconds. The symptom of this is using a small value (like 15) results in a larger value negotiated from the server (like 580) being used, because the comparison in the AmqpConnection picks the lower of the two of 15000 and 580.
The call should be factory.setRequestedHeartbeat((int) requestedHeartbeat.toSeconds()) instead.
A workaround is to call options.getConnectionFactory().setRequestedHeartbeat(15);
Hi, my simulation of frequently dropping connections never tests the ConnectionTimeout case. I wonder if you can think if any way of achieving this? As I write this I have one ideal I'll test opening the proxy and sleeping the job tomorrow :)
Many thanks, Dan.
If retries and recovery have been exhausted, and no further communication via AMQP is going to be possible, Lyra can be left in a state where there is still a thread running and blocked on a condition, which in turn can prevent the JVM from being able to exit. A thread dump showing an example of this:
"Thread-3" prio=5 tid=0x00007fc3a600b000 nid=0x6403 waiting on condition [0x000000011ac4c000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007d81d2e08> (a net.jodah.lyra.internal.util.concurrent.ReentrantCircuit$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) at net.jodah.lyra.internal.util.concurrent.ReentrantCircuit.await(ReentrantCircuit.java:63) at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:73) at net.jodah.lyra.internal.ChannelHandler.invoke(ChannelHandler.java:168) at com.sun.proxy.$Proxy25.basicPublish(Unknown Source) at x.sendReply(x.java:111) at x.run(x.java:80) at org.springbyexample.bean.scope.thread.ThreadScopeRunnable.run(ThreadScopeRunnable.java:46) at java.lang.Thread.run(Thread.java:745)
Manually sending the thread (which appears to be waiting on a retry that will never occur) an interrupt() will cause it to finally unblock with an InterruptedException, but it might be nice to handle this kind of occurrence gracefully.
To reproduce:
It may be easier to reproduce than this, but this is the way I've managed to do it.
Else the failure is rethrown, so there's no need to log it.
Lyra version 0.5.2 (also tested 0.5.3-SNAPSHOT with the same result)
amqp-client 3.5.5
Broker version 3.5.6
We have an automatic test suite of a java rabbit library built on top of lyra and amqp-client.
We use docker and makes the junit test start and stop a real rabbit broker on the same machine that runs the tests.
One of the tests simulates a broker crash by using the docker kill command on the broker image at the same time as we are consuming from a queue on the broker.
We use persistent messages so when the broker starts up again after a few seconds the messages are still in the queue BUT it seems that Lyra does not re-connect to the broker in all cases.
In my colleges machine, which is a Mac then Lyra correctly re-connects when the broker starts up again. But on my linux machine only one re-connection attempt is made then everything freezes.
The difference we can see is that on a Mac we get a Socket 'connection refused' exception but on linux it is an java.io.EOFException that is thrown.
This is a snippet from the logs in the test suite we have:
19:58:08.322 [main] INFO c.m.d.test.RabbitConnectionTests - Killing the rabbitMQ broker
19:58:08.375 [AMQP Connection 127.0.0.1:5672] ERROR n.jodah.lyra.internal.ChannelHandler - Channel channel-1 on test-app-consume was closed unexpectedly
19:58:08.377 [AMQP Connection 127.0.0.1:5672] ERROR n.jodah.lyra.internal.ChannelHandler - Channel channel-1 on test-app-publish was closed unexpectedly
19:58:08.379 [AMQP Connection 127.0.0.1:5672] ERROR n.j.lyra.internal.ConnectionHandler - Connection test-app-publish was closed unexpectedly
19:58:08.385 [AMQP Connection 127.0.0.1:5672] ERROR n.j.lyra.internal.ConnectionHandler - Connection test-app-consume was closed unexpectedly
19:58:08.385 [lyra-recovery-1] INFO n.j.lyra.internal.ConnectionHandler - Recovering connection test-app-publish to [localhost:5672]
19:58:08.387 [lyra-recovery-2] INFO n.j.lyra.internal.ConnectionHandler - Recovering connection test-app-consume to [localhost:5672]
19:58:08.388 [rabbitmq-test-app-consume-consumer] ERROR c.m.d.c.impl.SingleChannelConsumer - The rabbit connection was unexpectedly disconnected. [ localPort=5672, queue="test-queue", consumerTag="test-app-consumer-1" ]
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:723) ~[amqp-client-3.5.5.jar:na]
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:713) ~[amqp-client-3.5.5.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:567) ~[amqp-client-3.5.5.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: java.io.EOFException: null
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.8.0_60]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.5.5.jar:na]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.5.5.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536) ~[amqp-client-3.5.5.jar:na]
... 1 common frames omitted
Setup:
Problem:
The size of the rabbitmq cluster can scale up and down. But once a connection has been setup, the list of addresses cannot be updated without a restart or by discarding the connection and re-establishing the entire connection again.
Proposal:
Provide a dereferencing mechanism to obtain the list of Addresses by invoking a callback instead of maintaining a static list of Addresses.
Doing so solves the following use-cases:
Let me know your thoughts and I am willing to contribute in any way on this.
While the structure of retry and recovery policies are the same, the purpose is different. So let's give them distinct names in the API.
Hi,
I made a test (not unit, yet) where the connection is broken during recovery.
It resulted in:
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1 run
SEVERE: Failed to recover connection cxn-1
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
at java.net.Socket.connect(Socket.java:579)
at com.rabbitmq.client.ConnectionFactory.createFrameHandler(ConnectionFactory.java:445)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:504)
at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:225)
at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:220)
at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:44)
at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:220)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:243)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
No more connection recovery attempts take place. Can you suggest a fix for this case?
More information:
My application has around 10 Consumers on one Channel. Each one declares an exchange and a queue with 5-10 bindings (for various routing keys) each. It runs in a rather old, embedded environment, where it can take a non-trivial amount of time to remake the connection, channel and all its bindings. In my test, the connection goes down again whilst the exchanges, queues and bindings are being remade. This results in some of those bindings, etc failing with e.g. AlreadyClosedExceptions. Then the reconnection fails with the above stack trace, and no more attempts are reconnection are made.
A fuller log is:
INFO: Recovering connection cxn-1 to [Lcom.rabbitmq.client.Address;@29d9a32a
Jan 29, 2014 4:39:13 PM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovered connection cxn-1 to amqp://192.168.1.103:5672/
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue amq.gen-laBXjopaipQUx8PsvT4VQQ as amq.gen-rtdJ0UhDsQryLhwentPvTQ via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue OsgiShellManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.modifyLlrpDevicePortAddress.useUuidTcpPortIpAddressToModifyDevice.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.deviceInformation.getDeviceInfo.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.removeLlrpDeviceFromAgent.useUuid.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiShellManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.shell.execute.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Discovery Exchange to Discovery Queue with via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.getBundles.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.stop.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.get.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.installBundle.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.refresh.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.uninstall.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.resolve.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.update.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.rebootAgentOsgiFramework.rebootAgentNow.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.start.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with c2.message.send.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:14 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.addC2Device.useIpAddressAndPortToAddDevice.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getDevices.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.setVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getAgent.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.snmp.getSnmpStats.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Shell Commands to amq.gen-rtdJ0UhDsQryLhwentPvTQ with 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler$2 call
INFO: Recovering channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler$2 call
INFO: Recovered channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-et5y9XKtn8Bj6aRxQR9yMw of C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-pWWdE-PT8jNJKfSvUmR34g of amq.gen-rtdJ0UhDsQryLhwentPvTQ via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-ED8tQC4054rH-iXX3W0xyQ of OsgiShellManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-fBAxD8mEawK6qq9PGmhW-w of DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-5bkRcclo5wYvCuh40M7L7g of OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-HuzXlSx7ypV79PUim9qhkg of Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:15 PM net.jodah.lyra.internal.ChannelHandler recoverConsumers
INFO: Recovering consumer-amq.ctag-_2HT7LugX-9F6VAdGUAcWA of DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via channel-1 on cxn-1
Jan 29, 2014 4:39:18 PM net.jodah.lyra.internal.ChannelHandler$ChannelShutdownListener shutdownCompleted
SEVERE: Channel channel-1 on cxn-1 was closed unexpectedly
Jan 29, 2014 4:39:18 PM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener shutdownCompleted
SEVERE: Connection cxn-1 was closed unexpectedly
Jan 29, 2014 4:39:18 PM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovering connection cxn-1 to [Lcom.rabbitmq.client.Address;@4f541332
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovering connection cxn-1 to [Lcom.rabbitmq.client.Address;@6d6506ae
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovered connection cxn-1 to amqp://192.168.1.103:5672/
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:23 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue amq.gen-rtdJ0UhDsQryLhwentPvTQ as amq.gen-Gin0LH8UfSzy8L_jcy4q-Q via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovered queue OsgiShellManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.modifyLlrpDevicePortAddress.useUuidTcpPortIpAddressToModifyDevice.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.deviceInformation.getDeviceInfo.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.removeLlrpDeviceFromAgent.useUuid.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiShellManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.shell.execute.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Discovery Exchange to Discovery Queue with via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.getBundles.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.stop.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.get.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.installBundle.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.refresh.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.uninstall.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundleContext.resolve.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.update.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.rebootAgentOsgiFramework.rebootAgentNow.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to OsgiManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with osgi.bundle.start.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with c2.message.send.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to C2DeviceManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with deviceManagement.addC2Device.useIpAddressAndPortToAddDevice.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getDevices.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getDevices.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 10 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener shutdownCompleted
SEVERE: Connection cxn-1 was closed unexpectedly
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovering connection cxn-1 to [Lcom.rabbitmq.client.Address;@6ebe738c
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.setVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.setVersion.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getAgent.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Maintenance to DiscoveryManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.discovery.getAgent.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Maintenance to Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.snmp.getSnmpStats.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Maintenance to Snmp4jManagementAgent for 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 with meta.snmp.getSnmpStats.#.736ee3a6-f14f-4389-87e2-b0a69fe6cc24.# via cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
INFO: Recovering queue binding from Shell Commands to amq.gen-Gin0LH8UfSzy8L_jcy4q-Q with 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler recoverQueues
SEVERE: Failed to recover queue binding from Shell Commands to amq.gen-Gin0LH8UfSzy8L_jcy4q-Q with 736ee3a6-f14f-4389-87e2-b0a69fe6cc24 via cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)
at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:355)
at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:289)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1 run
SEVERE: Failed to recover connection cxn-1
com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:561)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:501)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:494)
at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:295)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:258)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Jan 29, 2014 4:39:24 PM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1 run
SEVERE: Failed to recover connection cxn-1
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
at java.net.Socket.connect(Socket.java:579)
at com.rabbitmq.client.ConnectionFactory.createFrameHandler(ConnectionFactory.java:445)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:504)
at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:225)
at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:220)
at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:44)
at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:220)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:243)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:90)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Best, Dan.
17:26:20.422 [lyra-recovery-1] INFO n.j.lyra.internal.ConnectionHandler - Recovering connection cxn-1 to [localhost:5672]
17:26:20.491 [lyra-recovery-1] INFO n.j.lyra.internal.ConnectionHandler - Recovered connection cxn-1 to amqp://127.0.0.1:5672/
17:26:20.495 [lyra-recovery-1] INFO n.j.lyra.internal.ConnectionHandler - Recovered queue amq.gen-VEMRx35Rd79EjSgPWzylfw as amq.gen-AVxW2kGA7LRLbE-nRSPy1g via cxn-1
17:26:20.501 [lyra-recovery-1] INFO n.jodah.lyra.internal.ChannelHandler - Recovering channel-1 on cxn-1
17:26:20.501 [lyra-recovery-1] INFO n.jodah.lyra.internal.ChannelHandler - Recovered channel-1 on cxn-1
17:26:20.502 [lyra-recovery-1] INFO n.jodah.lyra.internal.ChannelHandler - Recovering consumer-amq.ctag-nLsqKrC-yFg1vywXyu31wg of amq.gen-AVxW2kGA7LRLbE-nRSPy1g via channel-1 on cxn-1
java.io.EOFException: RpcClient is closed
at com.rabbitmq.client.RpcClient.checkConsumer(RpcClient.java:113)
at com.rabbitmq.client.RpcClient.primitiveCall(RpcClient.java:183)
at com.rabbitmq.client.RpcClient.primitiveCall(RpcClient.java:218)
at com.rabbitmq.client.RpcClient.stringCall(RpcClient.java:238)
at me.chao.test.rabbit.TestRpcClient.main(TestRpcClient.java:27)
Hi jhalterman, I found lyra is not support the RabbbitMQ Rpc, the channel is recovery, but client and server are closed.
This one is probably a little tricky to reproduce, but I was able to get a stack out with jstack. It appears that what happened was we had called exchangeDeclare immediately before a recovery happened, or possibly during recovery, but RabbitMQ blocked forever waiting for a reply from the server.
Here's the stack we got out with jstack:
Thread 1652: (state = BLOCKED)
In our log file, we got a stream of messages like "ChannelHandler - Channel channel-5 on cxn-1 was closed unexpectedly" about 1.7 seconds after we would have called exchangeDeclare, so it would appear that it was the exchangeDeclare that happened first, before recovery began. We see the first instance of a ShutdownSignalException roughly 1.8 seconds later when a different exchangeDeclare call happened. We do not see an exception on the first exchangeDeclare, however, perhaps because it was already blocking prior to the channels being detected as closing unexpectedly.
It looks like probably what should have happened is the blocking get() should have been interrupted somehow when recovery took place so it could throw an exception and bail.
We're seeing this with RabbitMQ's amqp-client-3.3.1, Lyra 0.5.0, and RabbitMQ 3.3.4.
I really appreciate this library but I wonder if and how it might be possible to implement an automatic fail-over of a producer to a running node given a two nodes rabbitmq cluster where it should be possible to reboot each node without data loss. Any pointers are welcome. Thanks!
I'm seeing this error right after the rabbit server goes back to life:
DefaultExceptionHandler: Consumer com.rabbitmq.client.QueueingConsumer@271d2d04 (amq.ctag-cYQcdbtkNo_ZIAoqHEU52A) method handleDelivery for channel AMQChannel(amqp://[email protected]:5672/,1) threw an exception for channel AMQChannel(amqp://[email protected]:5672/,1):
com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
at com.rabbitmq.client.QueueingConsumer.checkShutdown(QueueingConsumer.java:172)
at com.rabbitmq.client.QueueingConsumer.handleDelivery(QueueingConsumer.java:124)
at net.jodah.lyra.internal.ConsumerDelegate.handleDelivery(ConsumerDelegate.java:53)
any idea of what might be going on?
config:
config = config.withRecoveryPolicy(new RecoveryPolicy()
.withBackoff(Duration.seconds(1), Duration.minutes(10)));
ConnectionOptions options = new ConnectionOptions()
.withHost(hostname)
.withPort(port)
.withVirtualHost(virtualHost)
.withUsername(username)
.withPassword(password)
Hi,
Let's say I have configured a recovery policy with max attempts, interval and max duration.
And the message failed to deliver eventually.
Can I get the value with a listener/exception or any other method so I can log it or handle it differently?
Thank you.
I'm looking to use Lyra in the RabbitMQ integration of Druid (http://druid.io/). Druid uses Jackson (https://github.com/FasterXML/jackson) and it would greatly simplify the integration of Lyra if it did as well.
Add support for recovering from AlreadyClosedExceptions by inspecting the closed resource's shutdown cause.
As above.
The log messages from lyra is too verbose. I tried adding "log4j.logger.net.jodah.lyra=OFF" to log4j-server.properties to no avail, any suggestions?
Thanks.
Does Lyra support publisher side recovery ?
What about data loss when publisher side connection fails ?
I can't see any reference in the code.
If so, Can someone refer me ?
BTW:
With the new factory.setAutomaticRecoveryEnabled feature in the rabbitmq java client, is there still a reason to use Lyra at the client side?
Thanks.
Hi.
I use Lyra in a Service and create Connection on a new thread. But when this line goes to execute
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
connection = Connections.create(options, config);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
getConsumerChannel();
}
});
thread.setPriority(android.os.Process.THREAD_PRIORITY_BACKGROUND);
thread.run();
it shows me this error.
android.os.NetworkOnMainThreadException
What is the reason ?
Consider throwing RecoveryInProgressException or something like that if an invocation fails while recovery is in progress. This is perhaps a better alternative to AlreadyClosedException which implies that things are hosed for good.
In line 235 of ConnectionHandler.java
final String amqpAddress = String.format("amqp://%s:%s/%s", connection.getAddress()
.getHostAddress(), connection.getPort(), "/".equals(cxnFactory.getVirtualHost()) ? ""
: cxnFactory.getVirtualHost());
This will always make connection with amqp protocol. You may change to
final String amqpAddress = String.format("%s://%s:%s/%s", cxnFactory.isSSL() ? "amqps" : "amqp", connection.getAddress()
.getHostAddress(), connection.getPort(), "/".equals(cxnFactory.getVirtualHost()) ? ""
: cxnFactory.getVirtualHost());
to check with cxnFactory.isSSL()
The com.rabbitmq.client.QueueingConsumer is never recovered when using Lyra.
The class uses a blocking queue internally and when it receives a ShutdownSignal it places a POISON message to this queue and there is no way to recover from this state.
As this implementation of Consumer is used in most tutorials, usually like this:
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
}
}
It would be great if you could clearly point out in the readme/wiki that the com.rabbitmq.client.QueueingConsumer should not be used in connection with Lyra.
Once the connection is interrupted the QueueingConsumer won't recover even if the connection is re-established (via Lyra) and it will enter an infinite loop of printing the exception.
Thanks!
If a user does not specify a consumer threadpool, we create a named threadpool on their behalf, which is considered by amqp-client to be user-owned, and therefore must be shutdown by us when we're done with it. Therefore we should either close or recycle this thread pool when recovering connections.
Hey
You have the source and target versions set to 1.6 in the pom.xml which naturally results in building Java 6 classes. I just tried changing it to 1.5 and the project compiled successfully, and all tests pass.
RabbitMQ Java client is also compiled for Java 5.
Is there a particularly strong reason to require Java 6?
I am still bound to Java 5 in my project and I need to use Lyra as it fits awesomely to it. I'd very much like to consume it from Maven central, rather than having built my own copy.
Would you consider that, please?
Thanks.
Cheers
Mladen
Hi,
when the connection listener is called with onCreate the connection param is always null. This seems logical because the callback is performed before the proxy is set.
/Herman
Setup
Recovery steps
How do we achieve the recovery?
If the recovery steps are done in a ChannelListener.onResume, the basicConsume is called on the channel before the anonymous queue is declared and/or bound because the ChannelHandler only notifies the ChannelListener post the consumer recovery.
If the recovery steps are done in a ConsumerListener, the recovery steps will get invoked on each consumer which is incorrect.
Question:
Is there a reason why the ChannelListener only receives a post recovery event and no pre recovery event?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.