Code Monkey home page Code Monkey logo

reactive-streams-commons's Issues

Gem 22) discussion

From @dsyer

Interesting. Do you mean Mono.fromCallable() (because when I looked at the source code it seemed to me that the Callable.call() only happens when there is a subscribe())? In general, what's a good way to assert or inspect statements like that? What did I misunderstand?

Explore Backpressurable#getCapacity optimizations

Backpressurable#getCapacity can be a useful tool to detect if the source Publisher is a Completable (0), Single (1), Unbounded (Long.MAX), Bounded (N > 1 && < Integer.MAX) or Mixed (-1) backpressure strategy.

Some operator might adapt their supplied Queue while other can choose to short-circuit more expensive inner Subscriber.

Make GroupBy and future partitioning operator store-agnostic

Currently GroupBy uses a Map to store grouped sequence references. We probably use 4 interactions with Map that could be eventually translated as many java.util.function or java.lang callbacks. The reason is that some repository might actually offer more refined strategies to create and fetch group references. We could use a pooled registry of groups for instance, attach some behavior to a fresh group (timeout) or just increase our route resolution possibilities.

Use shaded dependency on JCTools instead of copy and paste

Currently this library uses some queue implementations lifted in spirit and to some extent code out of JCTools. The 2 projects share a contributor, @akarnokd.
I would recommend this project directly shades JCTools and uses the originals. Versioned dependencies allow for clear release boundaries and well understood state of code while also enabling future improvements and corrections to be fed in easily. This is why everyone does it...

Throttling on leading and trailing edge

Request

It would be great when Reactor would support throttling on leading and trailing edge. This is quite a common use case when working on applications that provide data to some kind of reactive user interface, but may be an interesting feature in general.

Disclaimer: This ticket describes throttling requirements from the perspective of a UI. Various kinds of consumers may have the same requirements, even though they are not explicitly mentioned.

What does this mean?

The popular JS library lodash implements this throttling behavior as a default (as mentioned before, quite a common use case for UI applications). Instead of trying to explain this with words, please check out the following JS Bin which shows the behavior in action:

For easier reference, here the example logic and resulting output:

const windowSize = 100;
const fn = _.throttle(v => console.log(v), windowSize);

fn('a');                                     // start of first window
setTimeout(() => fn('b'), windowSize * 0.5); // in first window, but dropped due to successive message
setTimeout(() => fn('c'), windowSize * 0.9); // last message in first window

setTimeout(() => fn('d'), windowSize * 1.2); // first message in new window


// expected output:
// a
// c
// d

Why is this interesting?

In reactive UIs, throttling logic which emits only on a single edge, i.e. either leading or trailing edge, is insufficient as:

  • The UI should not have to wait for the end of a time window to receive a value when there is only a single value (read: trailing edge only throttling).
  • Only emitting on the leading edge is insufficient as this can result in permanent inconsistent data (read: leading edge only throttling).

Add PublisherEvery

Signatures:

  • PublisherBase<T> : <T> PublisherBase#every(int)
  • PublisherBase<T> : <T> PublisherBase#everyFirst(int)

Return the last or the first element of the given batch count. Allows for simple sampling with fixed size.

Add PublisherIndexOf

Signatures:

  • PublisherBase<Long> : <T> PublisherBase#findIndex(T)
  • PublisherBase<Long> : <T> PublisherBase#findIndex(T, long)

Return a position relative to the sequence observed when element is found. Might onError(NoSuchElementException) or use the fallback argument long, e.g. -1L.

Clarification on the Scheduler.Worker.schedule contract

The Scheduler.Worker#schedule javadoc says:

    /**
     * Creates a worker of this Scheduler that executed task in a strict
     * FIFO order, guaranteed non-concurrently with each other.
     * <p>
     * Once the Worker is no longer in use, one should call shutdown() on it to
     * release any resources the particular Scheduler may have used.
     * 
     * <p>Tasks scheduled with this worker run in FIFO order and strictly non-concurrently, but
     * there are no ordering guarantees between different Workers created from the same
     * Scheduler.
     * 
     * @return the Worker instance.
     */

The FIFO order seems to be preserved in all of the code examples but I don't understand the semantics being used for "strictly non-concurrently".

For example it appears ExecutorServiceScheduler will have Workers that will run tasks concurrently unless a single thread executor is used.

In other Reactive Streams implementations that have a scheduler aka rxjava there is no mention of that guarantee.

Is there some implementation logic I'm missing or am I misunderstanding the doc?

Add PatitionPublisher and PublisherPartition

Distribute the sequence onto N transformable Publisher that can be eventually joined back.
Ex :

stream.partition(6).dispatchOn(ForkJoinPool.commonPool()).map(service::blockingTask).merge()

Supported PartitionPublisher operator :

  • All PublisherBase
  • merge()
  • concat()

Partition resolution could be provided via key extractor and unlike GroupBy would use a fixed-delimited list of keys.

Unbounded FlatMap with slow consumer

The unbounded flatMap has terrible performance if a consumer, such as observeOn processes slower.

The main reasons:

  • copy-on-write subscriber tracking, which will create N arrays of ever increasing size with O(n * n) complexity and then it takes O(n * n) to remove them one-by-one
  • the cleanup loop runs over all n sources in case requested is zero to evict empty and completed sources, adding another O(n * n) complexity.

For example, this test runs in about 30s:

range(1, 100_000).flatMap(v -> range(1, 10)).subscribe(new TestSubscriber<>(0L));

(then it takes 12s to drain by requesting 96 elements in a loop, see PublisherFlatMapTest.slowDrain().)

The cleanup can be short circuited because if one finds a non-empty source, there is no need to continue removing any potential finished and empty sources after that, the request-based drain will take care of those entries eventually.

The tracking overhead can be reduced via a free slot queue and power-of-2 allocations, but with some cost because of synchronized and no compaction as of now.

The benchmark results (i7 4790, Windows 7 x64, Java 8u72):

image

Where COW is the original copy-on-write tracking, FL is a free-list based solution which already shows some promise; COW-2 is COW where the cleanup is short-circuited and FL-2 is FL with the cleanup short circuited as well.

FL-2 seems to be promising with a bunch of +/- 5% loss/gain, probably due to noise.

I'm looking into non-synchronized options with the FL solution.

Review and Mark Unbounded Operators

Unbounded Operators should at least implement Backpressurable#getCapacity() : Long.MAX_VALUE.
Some Unbounded Operators might also benefit from new accounting strategies. Let's find out possible candidate and extract relevant issues.

Reactive puzzlers (working title)

This issue, similar to #21, collects some interesting cases and pitfalls in the form of Java Puzzlers, giving a scenario and offering 4-6 answers.

101 Reactive Gems (working title)

In this issue, we should collect tips and tricks with reactive systems and dataflows.

These are not particularly advanced topics but the markdown support on GitHub makes it easier to write them up.

Once we run out of ideas, we may tidy it up and release it together (maybe a free ebook?).

Please post only gems here and open discussion about them in separate issues. Thanks.

Design considerations

There are a few decisions to be made:

  1. Repo initialization with Gradle files and directories

Who should do this? I have a gradle setup I usually copy into new projects that can upload to maven based on local settings.

  1. Rackage naming, structure

What should be the base package name. In addition, we may need an internal package.

  1. Lock-free or not?

Most serializing operators can be implemented in both ways but lock-free usually requires lock-free queues (i.e., JCTools).

The project states it wants to avoid queue-use so we don't lock in on Disruptor or JCTools but is it possible to abstract away the queue operations, i.e., is the j.u.c.Queue interface an adequate abstraction?

We can also have implementations for both cases in separate subpackages lockfree and locking.

  1. Which Java version?

Java 8 allows nicer code but Java 6 allows Android use.

  1. Processor implementations

Do we want to provide these as well? Note that the operator redo - which unifies retry and repeat behavior through the signal of a Publisher requires a PublishProcessor at least (or a BehaviorProcessor at most), although these may be inlined.

  1. Factory or classes

Should the library provide the operators via factory methods, i.e., Publisher<R> map(Publisher<T> source, Function<T, R> mapper); or as named classes: class PublisherMap<T, R>?

The latter case helps discovering what's in a chain.

  1. Operator fusion

I'm in a relatively early stage of describing and prototyping operator fusion. What's definitely necessary is that each operator has a distinct class which also exposes its input parameters (i.e., the source and mapper in 6))

  1. Beta/Experimental annotation

Do we want to copy the RxJava way of introducing new operators/classes?

  1. Benchmarks

Do we want JMH benchmarks in the repo?

  1. Reactive-Streams TCK

I have two problems with it: a) requires TestNG which is quite invasive in Eclipse and b) is designed to test either just and a replaying-type Processor implementation. It doesn't work properly for range for example because the test wants to verify if the output of the Publisher is the same as the test generated.

  1. Optimizations vs Reactive-Streams spec

Certain internal operator optimizations, such as reusing a Subscriber instance, may violate the specification although completely safe to do.

The spec says that you can't subscribe the same instance multiple times to the same or different Publishers and some implementations may actually check for this (such as RxJavaReactiveStreams).

  1. Inlining and API leaks

To avoid allocations, some operators may build upon existing classes or implement interfaces that leak into the API. For example, an auto-sharing operator PublisherAutoShare may extend AtomicInteger directly to save on an atomic counter field. However, this class now has a bunch of other methods that leaked in and requires extra care from the users

  1. Building blocks

There are a few frequent building blocks useful for operators (i.e., arbiters, terminal atomics tools). Do we want to officially support them as well or keep them hidden in internal?

  1. Field updaters, Atomic instances or Unsafe?

Which technique do we want? Note that Java 8 is about to receive performance optimizations targeting field updaters so they get very close to Unsafe.

TestSubscriber constructor with delegate Subscriber

In Reactor I have created a TestSubscriber inspired from the one of RSC, and I have a question.

I am not sure if we want (on Reactor side) to keep constructors using a delegate Subscriber because I am not sure about the use cases, could you please give me some hints about what use cases they fit in?

Thanks in advance for your help.

Fusing subscribeOn with upstream

@akarnokd has written in several places that the classic case of:

publisher.map(x -> someExpensiveComputation(x))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(x -> someFn(x))

means that map and observeOn cannot be fused together. I understand this point well. However, I'm interested in a variant of this pipeline:

publisher.map(x -> someExpensiveComputation(x))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(x -> someFn(x))

I might be misunderstanding the matrix, but it seems to suggest that subscribeOn cannot be fused with the map which as far as I can tell is possible? Moreover, the matrix suggests that subscribeOn and observeOn can be fused together which I am failing to understand.

Also a related question, is the async fusion in subscribeOn currently implemented? From the code it doesn't look like it (I'm looking at https://github.com/reactor/reactive-streams-commons/blob/master/src/main/java/rsc/publisher/PublisherSubscribeOn.java) but I'm not sure if it's been worked on locally.

Finally is there a place where @smaldini and @akarnokd discuss what is happening in this repo? I would be very interested in following along!

Explore timed microbatch operators

Currently written with a combination of 2 or 3 operators, timed microbatch like stream.window(10, () -> PublisherBase.timer(....)) or stream.buffer(10, () -> PublisherBase.timer(....)) would be useful.

Question: Using Reactor streams

Hi, I am from a Financial Organization. have multiple use cases to use stream processing to send alerts and notifications to customers. We are interested in reactor-streams and I came to this project. I understand it is the second generation of Reactor Streams. Please confirm and let me know when is the targeted release for this project to be consumed for production?

Please share if you have any documentation like user guide that would be helpful.

Thanks and Regards
Karthik

Add IndexedQueue contract to optimize draining paths

IndexedQueue usually work with volatile consumer indices. We could define a convention to "batch" increment this index by doing it once at the end of drain loops instead of using queue.poll() that increments index everytime.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.