Description:
This issue happens on a 2 node min-HA DAS 3.1.0 setup.
Think of a scenario where the QueueInputEventDispatcher.onEvent method acquires the thread barrier of QueueInputEventDispatcher and then waiting at the BlockingEventQueue.put since the queue is full.
Current code:
@Override
public void onEvent(Event event) {
try {
threadBarrier.lock();
eventQueue.put(event);
threadBarrier.unlock();
} catch (InterruptedException e) {
log.error("Interrupted while waiting to put the event to queue.", e);
}
}
You'll see a thread stack like below in such a scenario;
"JMSThreads3f6df5b2-0a40-479a-82d0-da2c0c05fae9-1" #929 prio=5 os_prio=0 tid=0x00007fb7e0288000 nid=0x297c waiting on condition [0x00007fb69ccb2000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000004cf79fc48> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
at org.wso2.carbon.event.receiver.core.internal.management.BlockingEventQueue.put(BlockingEventQueue.java:66)
at org.wso2.carbon.event.receiver.core.internal.management.QueueInputEventDispatcher.onEvent(QueueInputEventDispatcher.java:68)
at org.wso2.carbon.event.receiver.core.internal.EventReceiver.sendEvent(EventReceiver.java:298)
at org.wso2.carbon.event.receiver.core.internal.EventReceiver.processMappedEvent(EventReceiver.java:222)
at org.wso2.carbon.event.receiver.core.internal.EventReceiver$MappedEventSubscription.onEvent(EventReceiver.java:355)
at org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime.onEvent(InputAdapterRuntime.java:110)
at org.wso2.carbon.event.input.adapter.jms.internal.util.JMSMessageListener.onMessage(JMSMessageListener.java:61)
at org.wso2.carbon.event.input.adapter.jms.internal.util.JMSTaskManager$MessageListenerTask.handleMessage(JMSTaskManager.java:643)
at org.wso2.carbon.event.input.adapter.jms.internal.util.JMSTaskManager$MessageListenerTask.run(JMSTaskManager.java:542)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Now, in a 2 node HA cluster, there's a periodic state sync that is happening and when the state sync is started, it'll first pause all the receivers, then pause all the processors and then ask all the snapshotables to return their current state. QueueInputEventDispatcher.getState method looks like below;
@Override
public byte[] getState() {
threadBarrier.lock();
byte[] state = objectToBytes(eventQueue);
threadBarrier.unlock();
return state;
}
Now, since all the processors are blocked, there would be no one to consume from the BlockingEventQueue which is already full. But remember, that the thread barrier lock is already being acquired by that thread, hence the state sync thread will wait at the threadBarrier.lock();
of the getState method. Following is a sample thread state;
"pool-26-thread-4" #357683 prio=5 os_prio=0 tid=0x00007fdee80ee800 nid=0x7d15 waiting on condition [0x00007fdda58da000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000004db14b310> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at org.wso2.carbon.event.receiver.core.internal.management.QueueInputEventDispatcher.getState(QueueInputEventDispatcher.java:83)
at org.wso2.carbon.event.receiver.core.internal.CarbonEventReceiverManagementService.getState(CarbonEventReceiverManagementService.java:50)
at org.wso2.carbon.event.processor.manager.core.internal.HAManager.getState(HAManager.java:184)
at org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService.getState(CarbonEventManagementService.java:258)
at org.wso2.carbon.event.processor.manager.core.internal.thrift.ManagementServiceImpl.takeSnapshot(ManagementServiceImpl.java:35)
at org.wso2.carbon.event.processor.manager.core.internal.thrift.service.ManagementService$Processor$takeSnapshot.getResult(ManagementService.java:174)
at org.wso2.carbon.event.processor.manager.core.internal.thrift.service.ManagementService$Processor$takeSnapshot.getResult(ManagementService.java:158)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Hence, there arises a deadlock situation.
Suggested Labels:
Suggested Assignees:
Affected Product Version:
DAS 3.1.0
CEP 4.2.0
OS, DB, other environment details and versions:
2 node HA
Steps to reproduce:
- needs to make the queue full and let the state sync run during such a time.
Related Issues: