Code Monkey home page Code Monkey logo

java-async-util's Introduction

async-util

Introduction

async-util is a library for working with Java 8 CompletionStages. Its primary goal is to provide tools for asynchronous coordination, including iterative production/consumption of CompletionStages and non-blocking asynchronous mutual exclusion support.

The library is broken up into three packages:

To get started, you can browse the javadocs or walk through some example code.

Downloading

To add a dependency on asyncutil

<dependency>
    <groupId>com.ibm.async</groupId>
    <artifactId>asyncutil</artifactId>
    <version>0.1.0</version>
</dependency>

To get support for Flow (JDK9+ only)

<dependency>
    <groupId>com.ibm.async</groupId>
    <artifactId>asyncutil-flow</artifactId>
    <version>0.1.0</version>
</dependency>

Locks

The locks package provides asynchronous analogs of familiar synchronization primitives, all with efficient non-blocking implementations. Imagine we again have some source of asynchronity (say asynchronous network requests), and we'd like to implement an asynchronous method that makes a request and generates a result based on the request's response and some state that requires access under mutual exclusion.

class MyAsyncClass {
  // not thread safe
  private MutableState mutableState;
  
  private CompletionStage<Response> asyncNetworkOperation(Request request) {...}


  CompletionStage<Result> makeRequest(Request request) {
    return asyncNetworkOperation(request)
      .thenApply(response -> {
        // unsafe!
        mutableState.update(response);
        return mutableState.produceResult();
      });
  }
}

If we wrap the mutableState operations in a synchronized block, we'll end up blocking the thread pool that runs our network operations. This is especially undesirable if this threadpool is possibly serving other interests in our application. We could solve that by creating our own thread pool just to do the locking + state manipulation using thenApplyAsync but that has a number of downsides

  • We've added more threads to our application for little benefit
  • If there's lock contention, we'll also be incurring a lot of additional context switching on these threads
  • If many locations in our application solve similar problems, they'll also have to create their own thread pools which is not scalable.

Instead we can use AsyncLock to provide exclusive access to the MutableState. We will try to acquire the lock on the thread that completes the network operation stage, and if it is not available we'll receive a CompletionStage that will notify us when it becomes available.

...
private AsyncLock lock = AsyncLock.create();

CompletionStage<Result> makeRequest(Request request) {
  return asyncNetworkOperation(request)
    .thenCompose(response ->
      lock.acquireLock().thenApply(token -> {
        try {
          mutableState.update(response);
          return mutableState.produceResult();
        } finally {
          token.release();
        }
      })
    });
}

for cleanliness, we can use StageSupport.tryWith for try-with-resources emulation:

CompletionStage<Result> makeRequest(Request request) {
  return asyncNetworkOperation(request)
    .thenCompose(response ->
      StageSupport.tryWith(lock.acquireLock(), ignored -> {
          mutableState.update(response);
          return mutableState.produceResult();
      })
    );
}

The package provides asynchronous versions of read/write locks, stamped locks, semaphores and named locks. The full locks javadoc contains more information.

Iteration

The classes in this package provide ways to generate and consume results asynchronously. The main mechanism is AsyncIterator interface, which can be considered an asynchronous analog of the Stream API. The full iteration javadocs contains more information on AsyncIterator as well as other asynchronous iteration constructs.

Consider the following example from the Stream documentation

int sum = widgets.stream()
  .filter(w -> w.getColor() == RED)
  .mapToInt(w -> w.getWeight())
  .sum();

Say widgets was not a concrete collection, but instead generating a widget involved an asynchronous network request (or an expensive CPU computation, etc). If we instead make the source of widgets an AsyncIterator we can asynchronously apply the pipeline every time a widget becomes available, and return a CompletionStage which will be complete when the pipeline has finished. In this example, let's say we are only interested in the first 100 red widgets.

// make an asynchronous network request that yields a widget
CompletionStage<Widget> getWidget();

CompletionStage<Integer> sum = AsyncIterator
  .generate(() -> getWidget())
  .filter(w -> w.getColor() == RED)
  .take(100)
  .thenApply(w -> w.getWeight())
  .collect(Collectors.summingInt(i -> i));

This will make one getWidget request at a time, running the rest of the pipeline operations each time a widget is generated on whatever thread processes the response required to generate the widget. When the widget stream is finished (in this case, after receiving 100 red widgets), the CompletionStage sum will complete with the result of the reduction operation. AsyncIterators have many other capabilities; if getting the weight required asynchronity we could use thenCompose instead of thenApply, if we needed to collect the weights into a collection we could use collect(Collector), etc.

It's often limiting to only be able to produce results for consumption iteratively. AsyncQueue provides ways to produce these values in parallel without any blocking synchronization:

// implements AsyncIterator
AsyncQueue widgets = AsyncQueues.unbounded();

// dedicate NUM_THREADS threads to producing widgets
for (int i = 0; i < NUM_THREADS; i++) {
  executor.submit(() -> {
    // send returns whether the queue is still accepting values
    while (widgets.send(expensiveComputeWidget());
  });
}

// create a pipeline the same way as before
CompletionStage<Integer> sum = widgets.filter(...)...;

// once we get our sum, we can terminate the queue, stopping widget production
sum.thenRun(() -> widgets.terminate());

Util

The util package contains interfaces and static functions for commonly reimplemented CompletionStage patterns. The best way to discover them all is to browse the javadoc.

StageSupport

StageSupport contains miscellaneous utility functions, including methods to create already completed exceptional stages, common transformations, and methods for working with resources that need to be asynchronously released.

AsyncCloseable

An interface analogous to AutoCloseable for objects that hold resources that must be relinquished asynchronously. By implementing AsyncCloseable with your own objects, you can take advantage of the try* methods on StageSupport to safely relinquish resources after performing asynchronous actions.

Combinators

Static functions for combining collections of stages into a single result stage. The Java standard library provides two such methods with CompletableFuture.allOf / CompletableFuture.anyOf . Unfortunately, these only work with arrays of CompletableFuture, not CompletionStage, so you must first convert using toCompletableFuture if you wish to use these methods. Collections must be converted to arrays well. The methods on Combinators work on CompletionStage and collections directly, furthermore several additional useful combinators are added. For example, to get the results of multiple stages with CompletableFuture.allOf:

CompletableFuture<Integer>[] arr = ...
CompletionStage<List<Integer>> listFuture = CompletableFuture.allOf(arr).thenApply(ignore -> {
  final List<Integer> ints = new ArrayList<>();
  for (CompletableFuture<Integer> stage : arr) {
    return ints.add(stage.join());
  }
  return ints;
}

Instead, you can use collect on Combinators:

Collection<CompletionStage<Integer>> stages = ...
CompletionStage<List<Integer>> listFuture = Combinators.collect(stages, Collectors.toList());

Contributing

Contributions welcome! See Contributing for details.

java-async-util's People

Contributors

dmor01 avatar kant avatar khadiwala avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

java-async-util's Issues

Affected from CVE-2021-43138

CVE-2021-43138

A vulnerability exists in Async through 3.2.1 (fixed in 3.2.2) , which could let a malicious user obtain privileges via the mapValues() method.

CWE-1321

CVSSv2:
Base Score: MEDIUM (6.8)
Vector: /AV:N/AC:M/Au:N/C:P/I:P/A:P
CVSSv3:
Base Score: HIGH (7.8)
Vector: CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H

References:
MISC - https://github.com/caolan/async/blob/master/lib/internal/iterator.js
MISC - https://github.com/caolan/async/blob/master/lib/mapValuesLimit.js
MISC - caolan/async@e1ecdbf
MISC - https://jsfiddle.net/oz5twjd9/
Vulnerable Software & Versions:

cpe:2.3:a:async_project:async:::::::: versions up to (excluding) 3.2.2

Should we defend against sneaky completion?

Porting issue 37 from the enterprise repo

rkhadiwa commented on Nov 29, 2017

To what extent do we need to defend against a user who backdoors past the "read only" CompletionStage with cs.toCompletableFuture().complete(t) (or the more sinister cs.toCompletableFuture.obtrudeValue) ?

For example, AsyncFunnel gives out the same CompletableFuture to many distinct users. Not sure if there are any other APIs where this matters.

In JDK9 they added CompletableFuture.copy()/CompletableFuture.minimalStage presumably because they realized this is a problem. Should we defensively copy any CompletableFuture we hand out that could be reused? Seems pretty costly to defend against something that no sane user would actually do.

See http://cs.oswego.edu/pipermail/concurrency-interest/2016-July/015299.html

FairAsyncSemaphore link skipping

Porting issue 23 from the enterprise repo

rnarubin commented on Aug 23, 2017

FairAsyncSemaphore currently suffers from 2 (loosely related) shortcomings in its linked queue implementation: excessive head/tail updates, and GC nepotism

The first issue is that the head and tail pointers are updated with every push and poll (i.e. with acquires that queue and releases that dequeue). Stale pointers are tolerable, however, with the only drawback being link traversal to find the true head and tail nodes, so the pointers need not be updated at every opportunity. j.u.c.ConcurrentLinkedQueue uses such a strategy of skipping every other update (and incurring the smaller traversal cost) to reduce the amortized cost of insertion and removal. The same idea can be applied to FairAsyncSemaphore's queue.

The second issue is that forward links from dead nodes in the FAS queue are never cleared. Although a node becomes unreachable after being released (by advancing head), their links can still point to live nodes, which can have GC implications -- namely premature promotion to oldgen if the dead nodes were themselves long-lived. As part of the refactoring necessary to solve the first issue, this GC unlinking can also be addressed (using the same principles as j.u.c.CLQ).

These problems only appear in the FAS queue implementation; neither exist in FALock or FAReadWriteLock. (1) FALock always maintains a strict view of the true tail. FARWLock would require stronger consistency in updating its links to allow concurrent traversal, the cost of which likely isn't worth the benefit. (2) Both of their queues unlink completely during release.

Shouldn't use toCompletableFuture in Combinators

Porting issue 36 from the enterprise repo

rkhadiwa commented on Nov 29, 2017

From CompletionStage javadoc, toCompletableFuture is optional.

     * A CompletionStage
     * implementation that does not choose to interoperate with others
     * may throw {@code UnsupportedOperationException}.
     *
     * @return the CompletableFuture
     * @throws UnsupportedOperationException if this implementation
     * does not interoperate with CompletableFuture
  public static CompletionStage<Void> allOf(
      final Collection<? extends CompletionStage<?>> stages) {

    final Iterator<? extends CompletionStage<?>> backingIt = stages.iterator();
    final int size = stages.size();
    final Iterator<? extends CompletionStage<?>> it =
        size > MAX_DEPENDANT_DEPTH
            ? new Iterator<CompletionStage<?>>() {
              @Override
              public boolean hasNext() {
                return backingIt.hasNext();
              }

              @Override
              public CompletionStage<?> next() {
                return backingIt.next().toCompletableFuture();
              }
            }
            : backingIt;
    return allOfImpl(it);
  }

Just a rant, toCompletableFuture is so stupid.
Because it exists, CompletionStage isn't actually read only since CompletableFuture.toCompletableFuture returns this (and worse than a user completing a stage, the user can also use obtrude*). But because it's optional you can't depend on it for what little use it actually did have (interop).

Iterator Pipelining

Porting Issue 20 from the enterprise repo

rnarubin commented on Aug 17, 2017

This proposal is more of an aspiration or a roadmap goal than it is a current issue, and would be fitting for a 2.0 release rather than the initial drop, but i want to at least bring it some attention.

The current async iterator design has completely independent intermediate methods, which create new and opaque iterators for downstream consumption. This has its benefits (relatively simple implementation, well defined isolation and separation of concerns) but some notable drawbacks, largely in performance. I call it the "futures everywhere" problem, where every layer of transformation adds possibly several future operations to every element in the iterator, even for plain synchronous operations. It is difficult for the JVM to optimize async code in the same way that it can synchronous code, because of inlining challenges with the way that closures are applied to values with many steps in between, and reordering restrictions around atomic fields.

I propose changing the library iterators to use an integrated pipeline design, similar to that of j.u.Stream. Every intermediate method will be stored as an operation in the pipeline, then composed and executed at certain uncollapsible points (terminal operations, "real"/unavoidable async boundaries, manual user iteration). Efficient terminal operations will then depend on an underlying implementation of forEach which can apply the composed operations and possibly terminate early.
For example, some iterators traverse elements in batches where every Nth operation is actually async, and the rest are immediate stages over elements of some buffered collection in memory. A terminal method over this iterator (with an appropriate forEach implementation) could then apply most of its pipeline transformations in a plain loop over these collections -- where HotSpot is great at optimizing loops over virtual calls -- with only occasional async breaks and substantially less overhead.

This solution isn't a substantial improvement in all cases. Notably iterators where every element is accessed asynchronously, or where many transformations are async, could not leverage a collapsed terminal method; though they would see minor benefits from composed intermediate sync methods. It would be no worse in these cases however, and would greatly improve cases where synchronous iteration is possible.

Importantly, these changes can all be done within the library and don't require changes in user code, so there is no compatibility issue. Although a user's iterator would benefit from implementing such a forEach, everything would still work under the hood by falling back to the current implementation when necessary. User controlled iteration would also require a fall-back, where the pipeline must be composed for every poll.

Combinators and fail-fast exceptions

Porting issue 46 from the enterprise repo

rnarubin commented May 2, 2018

Currently the operations in Combinators wait for all of their constituent stages to complete before completing the resulting stage. There could be cases, however, when a user wants to complete as soon as an exception is encountered (analogous to throwing an exception in the middle of a loop). We should add such fail-fast methods

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.