The diff of this file produces a strange diff. I don't understand how gumtree produces the diff.
Delete If at org.apache.camel.processor.MulticastProcessor:445
if ((future == null) && timedOut) {
break;
}else
if (future == null) {
org.apache.camel.processor.aggregate.AggregationStrategy strategy = getAggregationStrategy(null);
if (strategy instanceof org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy) {
Exchange oldExchange = result.get();
if (oldExchange == null) {
oldExchange = original;
}
((org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy) (strategy)).timeout(oldExchange, aggregated, total.intValue(), timeout);
}else {
org.apache.camel.processor.MulticastProcessor.LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated);
}
org.apache.camel.processor.MulticastProcessor.LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated);
timedOut = true;
if ((completion) instanceof org.apache.camel.util.concurrent.SubmitOrderedCompletionService) {
((org.apache.camel.util.concurrent.SubmitOrderedCompletionService<?>) (completion)).timeoutTask();
}
}else {
Exchange subExchange = future.get();
java.lang.Integer number = getExchangeIndex(subExchange);
boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, ("Parallel processing failed for number " + number), org.apache.camel.processor.MulticastProcessor.LOG);
if ((stopOnException) && (!continueProcessing)) {
result.set(subExchange);
stoppedOnException = true;
break;
}
org.apache.camel.processor.aggregate.AggregationStrategy strategy = getAggregationStrategy(subExchange);
doAggregate(strategy, result, subExchange);
}
Delete Block at org.apache.camel.processor.MulticastProcessor:408
{
boolean timedOut = false;
boolean stoppedOnException = false;
final StopWatch watch = new StopWatch();
int aggregated = 0;
boolean done = false;
while (!done) {
if ((allTasksSubmitted.get()) && (aggregated >= (total.get()))) {
MulticastProcessor.LOG.debug("Done aggregating {} exchanges on the fly.", aggregated);
break;
}
Future<Exchange> future;
if (timedOut) {
future = completion.poll();
MulticastProcessor.LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future);
}else
if ((timeout) > 0) {
long left = (timeout) - (watch.taken());
if (left < 0) {
left = 0;
}
MulticastProcessor.LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left);
future = completion.poll(left, java.util.concurrent.TimeUnit.MILLISECONDS);
}else {
MulticastProcessor.LOG.trace("Polling completion task #{}", aggregated);
future = completion.poll(1, java.util.concurrent.TimeUnit.SECONDS);
if (future == null) {
continue;
}
}
if ((future == null) && timedOut) {
break;
}else
if (future == null) {
org.apache.camel.processor.aggregate.AggregationStrategy strategy = getAggregationStrategy(null);
if (strategy instanceof org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy) {
Exchange oldExchange = result.get();
if (oldExchange == null) {
oldExchange = original;
}
((org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy) (strategy)).timeout(oldExchange, aggregated, total.intValue(), timeout);
}else {
MulticastProcessor.LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated);
}
MulticastProcessor.LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated);
timedOut = true;
if ((completion) instanceof org.apache.camel.util.concurrent.SubmitOrderedCompletionService) {
((org.apache.camel.util.concurrent.SubmitOrderedCompletionService<?>) (completion)).timeoutTask();
}
}else {
Exchange subExchange = future.get();
java.lang.Integer number = getExchangeIndex(subExchange);
boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, ("Parallel processing failed for number " + number), MulticastProcessor.LOG);
if ((stopOnException) && (!continueProcessing)) {
result.set(subExchange);
stoppedOnException = true;
break;
}
org.apache.camel.processor.aggregate.AggregationStrategy strategy = getAggregationStrategy(subExchange);
doAggregate(strategy, result, subExchange);
}
aggregated++;
}
if (timedOut || stoppedOnException) {
if (timedOut) {
MulticastProcessor.LOG.debug("Cancelling tasks due timeout after {} millis.", timeout);
}
if (stoppedOnException) {
MulticastProcessor.LOG.debug("Cancelling tasks due stopOnException.");
}
running.set(false);
}
}
Insert While at org.apache.camel.processor.MulticastProcessor:415
while (!done) {
if ((allTasksSubmitted.get()) && (aggregated >= (total.get()))) {
MulticastProcessor.LOG.debug("Done aggregating {} exchanges on the fly.", aggregated);
break;
}
Future<Exchange> future;
if (timedOut) {
future = completion.poll();
MulticastProcessor.LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future);
}else
if ((timeout) > 0) {
long left = (timeout) - (watch.taken());
if (left < 0) {
left = 0;
}
MulticastProcessor.LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left);
future = completion.poll(left, java.util.concurrent.TimeUnit.MILLISECONDS);
}else {
MulticastProcessor.LOG.trace("Polling completion task #{}", aggregated);
future = completion.poll(1, java.util.concurrent.TimeUnit.SECONDS);
if (future == null) {
continue;
}
}
if (future == null) {
AggregationStrategy strategy = getAggregationStrategy(null);
if (strategy instanceof TimeoutAwareAggregationStrategy) {
Exchange oldExchange = result.get();
if (oldExchange == null) {
oldExchange = original;
}
((TimeoutAwareAggregationStrategy) (strategy)).timeout(oldExchange, aggregated, total.intValue(), timeout);
}else {
MulticastProcessor.LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated);
}
MulticastProcessor.LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated);
timedOut = true;
if ((completion) instanceof SubmitOrderedCompletionService) {
((SubmitOrderedCompletionService<?>) (completion)).timeoutTask();
}
}else {
Exchange subExchange = future.get();
Integer number = getExchangeIndex(subExchange);
boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, ("Parallel processing failed for number " + number), MulticastProcessor.LOG);
if ((stopOnException) && (!continueProcessing)) {
result.set(subExchange);
stoppedOnException = true;
break;
}
AggregationStrategy strategy = getAggregationStrategy(subExchange);
doAggregate(strategy, result, subExchange);
}
aggregated++;
}
Move Block from org.apache.camel.processor.MulticastProcessor$AggregateOnTheFlyTask:415 to org.apache.camel.processor.MulticastProcessor$AggregateOnTheFlyTask:408
{
if ((allTasksSubmitted.get()) && (aggregated >= (total.get()))) {
MulticastProcessor.LOG.debug("Done aggregating {} exchanges on the fly.", aggregated);
break;
}
Future<Exchange> future;
if (timedOut) {
future = completion.poll();
MulticastProcessor.LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future);
}else
if ((timeout) > 0) {
long left = (timeout) - (watch.taken());
if (left < 0) {
left = 0;
}
MulticastProcessor.LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left);
future = completion.poll(left, java.util.concurrent.TimeUnit.MILLISECONDS);
}else {
MulticastProcessor.LOG.trace("Polling completion task #{}", aggregated);
future = completion.poll(1, java.util.concurrent.TimeUnit.SECONDS);
if (future == null) {
continue;
}
}
if ((future == null) && timedOut) {
break;
}else
if (future == null) {
org.apache.camel.processor.aggregate.AggregationStrategy strategy = getAggregationStrategy(null);
if (strategy instanceof org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy) {
Exchange oldExchange = result.get();
if (oldExchange == null) {
oldExchange = original;
}
((org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy) (strategy)).timeout(oldExchange, aggregated, total.intValue(), timeout);
}else {
MulticastProcessor.LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated);
}
MulticastProcessor.LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated);
timedOut = true;
if ((completion) instanceof org.apache.camel.util.concurrent.SubmitOrderedCompletionService) {
((org.apache.camel.util.concurrent.SubmitOrderedCompletionService<?>) (completion)).timeoutTask();
}
}else {
Exchange subExchange = future.get();
java.lang.Integer number = getExchangeIndex(subExchange);
boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, ("Parallel processing failed for number " + number), MulticastProcessor.LOG);
if ((stopOnException) && (!continueProcessing)) {
result.set(subExchange);
stoppedOnException = true;
break;
}
org.apache.camel.processor.aggregate.AggregationStrategy strategy = getAggregationStrategy(subExchange);
doAggregate(strategy, result, subExchange);
}
aggregated++;
}
Move Block from org.apache.camel.processor.MulticastProcessor$AggregateOnTheFlyTask:-1 to org.apache.camel.processor.MulticastProcessor$AggregateOnTheFlyTask:415
if (future == null) {
org.apache.camel.processor.aggregate.AggregationStrategy strategy = getAggregationStrategy(null);
if (strategy instanceof org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy) {
org.apache.camel.Exchange oldExchange = result.get();
if (oldExchange == null) {
oldExchange = original;
}
((org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy) (strategy)).timeout(oldExchange, aggregated, total.intValue(), timeout);
}else {
org.apache.camel.processor.MulticastProcessor.LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated);
}
org.apache.camel.processor.MulticastProcessor.LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated);
timedOut = true;
if ((completion) instanceof org.apache.camel.util.concurrent.SubmitOrderedCompletionService) {
((org.apache.camel.util.concurrent.SubmitOrderedCompletionService<?>) (completion)).timeoutTask();
}
}else {
org.apache.camel.Exchange subExchange = future.get();
java.lang.Integer number = getExchangeIndex(subExchange);
boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, ("Parallel processing failed for number " + number), org.apache.camel.processor.MulticastProcessor.LOG);
if ((stopOnException) && (!continueProcessing)) {
result.set(subExchange);
stoppedOnException = true;
break;
}
org.apache.camel.processor.aggregate.AggregationStrategy strategy = getAggregationStrategy(subExchange);
doAggregate(strategy, result, subExchange);
}
Move UnaryOperator from org.apache.camel.processor.MulticastProcessor$AggregateOnTheFlyTask:415 to org.apache.camel.processor.MulticastProcessor$AggregateOnTheFlyTask:415
!done