Code Monkey home page Code Monkey logo

java-concurrent-polling's Introduction

Concurrent Polling in Java

Once I wondered:

What's the best way to poll in an asynchronous Java application?

Intention

This article is intended to provide practical usage examples of java.concurrent classes by building different asynchronous pollers. Fully working examples and many variations can be found in the repository.

It might be useful to you if you are learning tools in Java concurrency or just need some simple and efficient poller implementation.

I worked on a service responsible for Transfer State Machine in TransferWise, one of the central services which managed the integrity of the Transfer data, was defining all possible State Transitions and maintained strict order of events processing. All Transfer actions there went through asynchronous channels, which are an interesting topic alone, but for now I'll show you a way to conveniently test asynchronous flows by polling.

Tests

Polling is best suited for integration tests. We have been launching an application, sending some events, commands or messages to the async channels and waiting for a data change in a database. On a toy example, something like:

def "eventually might generate something divisible by 4"() {
    given:
        def repo = new ListRepository<Integer>()
        def rng = new RandomIntegerPersister(repo)
        rng.start(1, 1, SECONDS)                     // Spawn our worker thread 

    expect:
        pollForIsDivisibleBy(repo, 4)                // Spawn our polling thread
            .map({ log.info("Found: {}", it); it })  // Log some info about found object when needed
            .isPresent()                             // And make the test assertion itself 
}

def <T> Optional<T> pollForIsDivisibleBy(Repository<T> repo, T n) throws InterruptedException {
    // Poll every 1 second with timeout 5 seconds:
    return poller.poll({ repo.findFirst({ it % n == 0 }) }, 1, SECONDS, 5, SECONDS)
}

True or False

Now when you see how the test might look like in the end, here is how we can implement and use the most naive version of a simple poller which returns only a boolean result:

given:
    // ...    
    def pollee = {
        repo.findFirst({ it % 4 == 0 })
            .map({ log.info("Found: {}", it); it })
            .isPresent()
    }
expect:
    poller.poll(pollee, 1, SECONDS, 5, SECONDS)
public class BooleanThreadPoller {
    public Boolean poll(Callable<Boolean> pollee,
                        long period, TimeUnit periodTimeUnit,
                        long timeout, TimeUnit timeoutTimeUnit
    ) {
        AtomicReference<Boolean> resultRef = new AtomicReference<>();
        Thread poller = new Thread(() -> {
            try {
                long startMs = System.currentTimeMillis();
                long timeoutMs = TimeUnit.MILLISECONDS.convert(timeout, timeoutTimeUnit);
                while (true) {
                    if (System.currentTimeMillis() - startMs > timeoutMs) {
                        return;                                                             // Timed out
                    }
                    if (pollee.call()) {
                        resultRef.set(true);
                        return;                                                             // Success
                    } else {
                        periodTimeUnit.sleep(period);                                       // Waiting
                    }
                }
            } catch (Exception e) { }
        });
        poller.start();
        try { poller.join(); } catch (InterruptedException e) { }                           // Waiting
        return resultRef.get();
    }
}

The first thing to notice here is AtomicReference which is used as an effectively final wrapper around the result. Without supplying a final or effectively final variable to lambda Java does not compile.

Also there is a significant difference between new Thread() .start()/.run(). Thread implements Runnable but run does not actually start execution in a new thread as one might expect, instead it just executes in a current thread, but start in a new one.

Schedulers

Now we can sacrifice a bit of flexibility for the sake of a more common approach, using ScheduledThreadPoolExecutor:

public class Poller<Answer> {

    public Optional<Answer> poll(Callable<Optional<Answer>> question,
                                 long period, TimeUnit periodTimeUnit,
                                 long timeout, TimeUnit timeoutTimeUnit
    ) throws InterruptedException {
        SynchronousQueue<Optional<Answer>> answerQueue = new SynchronousQueue<>();
        Runnable questionRunnable = () -> {
            try {
                answerQueue.put(question.call());
            } catch (Exception e) {
                log.error("Polling error", e);
            }
        };
        ScheduledFuture<?> scheduledFuture = Executors.newSingleThreadScheduledExecutor()
            .scheduleAtFixedRate(questionRunnable, 0, period, periodTimeUnit);
        log.info("Started with period of {} {}", period, periodTimeUnit);
        Optional<Answer> answer = answerQueue.poll(timeout, timeoutTimeUnit);
        scheduledFuture.cancel(false);
        return answer;
    }

}

The only problem is that the scheduled task never stops. To solve it we just use SynchronousQueue to execute and set results from the scheduler, and wait for them on the main thread. When we got a result or timed out, the task is stopped.

That's it for now.

Regards,
Dan

References

When I was looking for examples, I've found many notes about BlockingQueue but no one actually showed anything specific, it motivated me to write this, in the process of which I learned more and discovered that we don't even need any implementations of BlockingQueue, although the version with SynchronousQueue is quite clean and efficient.

Thanks to the authors of the following posts:

One of the popular polling libraries: Awaitility. It works well with JUnit and Hamcrest matchers, but at the time of writing it lacks functionality just to poll and get the result as a return value.

java-concurrent-polling's People

Stargazers

 avatar  avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.