Comments (8)
@timothybasanov Does this test describe the scenario you're seeing?
public void test() throws Throwable {
Bulkhead<Object> bulkhead = Bulkhead.of(1);
List<CompletableFuture<Object>> stages = new ArrayList<>();
Waiter waiter = new Waiter();
for (int i = 0; i < 3; i++) {
stages.add(Failsafe.with(bulkhead).getStageAsync(() -> CompletableFuture.supplyAsync(() -> {
Testing.sleep(10);
waiter.resume();
return null;
})));
Testing.sleep(200); // Needed to avoid BulkheadFullException
}
waiter.await(1, TimeUnit.MINUTES, 3);
for (CompletableFuture<Object> stage : stages)
assertTrue(stage.isDone());
}
In my testing, the only time a task is not run is when it's rejected with BulkheadFullException, which will happen if the previous task is still running.
from failsafe.
Yep, you're right. I was not specific enough in the original issue. It only happens if a Bulkhead has a wait time set up. With 3 requests 1 second each and a max wait time of 10 seconds I expect all of them to succeed.
from failsafe.
This reproduces it as a JUnit test.
package com.example.failsafe;
import dev.failsafe.*;
import java.time.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.junit.*;
import static org.junit.Assert.*;
/**
* See https://github.com/failsafe-lib/failsafe/issues/365
*/
public class Issue365 {
static final int NUM_TASKS = 3;
static final long MAX_WAIT_SECS = 10;
static final long TASK_SLEEP_SECS = 1;
static final int CONCURRENCY = 1;
Bulkhead<Integer> BULKHEAD = Bulkhead.<Integer>builder(CONCURRENCY)
.withMaxWaitTime(Duration.ofSeconds(MAX_WAIT_SECS))
.build();
FailsafeExecutor<Integer> EXEC = Failsafe.with(BULKHEAD);
@Test public void threeTasks() {
AtomicInteger count = new AtomicInteger(NUM_TASKS);
try {
CompletableFuture<?>[] tasks = new CompletableFuture<?>[NUM_TASKS];
for (int id = 0; id < NUM_TASKS; ++id) {
int ID = id;
tasks[ID] = EXEC.getStageAsync(() ->
CompletableFuture.supplyAsync(() -> sleepThenReturn(count, ID)));
}
CompletableFuture.allOf(tasks).get();
} catch (InterruptedException ex) {
System.out.println("interrupted get");
} catch (ExecutionException ex) {
if (ex.getCause() instanceof BulkheadFullException) {
System.out.printf("Bulkhead full: %s%n", ex.getCause());
}
}
assertEquals(0, count.get());
}
private Integer sleepThenReturn(AtomicInteger count, int id) {
try {
TimeUnit.SECONDS.sleep(TASK_SLEEP_SECS);
int remaining = count.decrementAndGet();
System.out.printf("task %d done sleeping, %d remaining%n", id, remaining);
return id;
} catch (InterruptedException ex) {
System.out.println("interrupted sleep");
return -1;
}
}
}
The assertEquals
test fails, and the output is:
task 0 done sleeping, 2 remaining
task 1 done sleeping, 1 remaining
Bulkhead full: dev.failsafe.BulkheadFullException
from failsafe.
I recently came across this issue and think I have found the culprit. In BulkHeadImpl the release permit method looks like this
public synchronized void releasePermit() {
if (permits < maxPermits) {
permits += 1;
CompletableFuture<Void> future = futures.pollFirst();
if (future != null)
future.complete(null);
}
}
Which I think needs to be changed to this:
public synchronized void releasePermit() {
if (permits < maxPermits) {
permits += 1;
CompletableFuture<Void> future = futures.pollFirst();
if (future != null){
permits -= 1;
future.complete(null);
}
}
}
This ensures that we properly claim the permit when we execute the next pending task. What is currently happening is that each time a job from the queue is completed, the number of permits increments. If there are more jobs in the queue than maxPermits, this results in permits eventually equaling maxPermits and the initial if-statement never fires. By claiming a permit before running the queued task, it should keep the count in order.
from failsafe.
Thanks for the test @Tembrel.
@nicky9door Well spotted! Would you like to submit a PR for this?
from failsafe.
@jhalterman PR #366 submitted. I added a test case based on @Tembrel comment. Wasn't to sure on the correct way the test should be written but it seems to work ok
from failsafe.
Fixed by #366
from failsafe.
This fix has been released in 3.3.2.
from failsafe.
Related Issues (20)
- Add FailsafeExecutionException extending FailsafeException for wrapping Throwables in sync get HOT 1
- Make CircuitBreaker delay time mockable for testing HOT 6
- Support Java Modules HOT 1
- Add Failsafe to vertx-awesome HOT 1
- Result futures strongly retain supplying functions
- Why does CheckedPredicate exist? HOT 2
- Also add note about default maxRetries of 3 to maxDuration section of the documentation HOT 7
- Guidance around CheckedPredicate HOT 2
- Time based error limiter policy
- [Question]Can withBackoff and withDelay be used at the same time? HOT 2
- RetryPolicy handleIf not working with DynamoDbException in predicate HOT 4
- Having a context object in the run(...) and get(...) methods HOT 7
- dev.failsafe.Functions.* do not propagate toString to the wrapped object
- [Question] Is it possible to have multiple Fallback policies in one Failsafe executor? HOT 4
- [question] how to use failsafe with kotlin
- Feature Request: Integrate / add support for Lunar Proxy (backend policy) HOT 1
- Why recommend CircuitBreaker before Retry? HOT 1
- CircuitBreaker stays in OPEN even after DELAY time HOT 1
- Nested retries, how to prevent duplicate retrying? HOT 5
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from failsafe.