spring-attic / reactive-streams-commons Goto Github PK
View Code? Open in Web Editor NEWA joint research effort for building highly optimized Reactive-Streams compliant operators.
License: Apache License 2.0
A joint research effort for building highly optimized Reactive-Streams compliant operators.
License: Apache License 2.0
Everything that can be done pre-assembly or during assembly
From @dsyer
Interesting. Do you mean Mono.fromCallable() (because when I looked at the source code it seemed to me that the Callable.call() only happens when there is a subscribe())? In general, what's a good way to assert or inspect statements like that? What did I misunderstand?
Backpressurable#getCapacity
can be a useful tool to detect if the source Publisher
is a Completable
(0), Single
(1), Unbounded
(Long.MAX), Bounded
(N > 1 && < Integer.MAX) or Mixed
(-1) backpressure strategy.
Some operator might adapt their supplied Queue
while other can choose to short-circuit more expensive inner Subscriber
.
Currently GroupBy uses a Map to store grouped sequence references. We probably use 4 interactions with Map that could be eventually translated as many java.util.function
or java.lang
callbacks. The reason is that some repository might actually offer more refined strategies to create and fetch group references. We could use a pooled registry of groups for instance, attach some behavior to a fresh group (timeout) or just increase our route resolution possibilities.
Currently this library uses some queue implementations lifted in spirit and to some extent code out of JCTools. The 2 projects share a contributor, @akarnokd.
I would recommend this project directly shades JCTools and uses the originals. Versioned dependencies allow for clear release boundaries and well understood state of code while also enabling future improvements and corrections to be fed in easily. This is why everyone does it...
It would be great when Reactor would support throttling on leading and trailing edge. This is quite a common use case when working on applications that provide data to some kind of reactive user interface, but may be an interesting feature in general.
Disclaimer: This ticket describes throttling requirements from the perspective of a UI. Various kinds of consumers may have the same requirements, even though they are not explicitly mentioned.
The popular JS library lodash implements this throttling behavior as a default (as mentioned before, quite a common use case for UI applications). Instead of trying to explain this with words, please check out the following JS Bin which shows the behavior in action:
For easier reference, here the example logic and resulting output:
const windowSize = 100;
const fn = _.throttle(v => console.log(v), windowSize);
fn('a'); // start of first window
setTimeout(() => fn('b'), windowSize * 0.5); // in first window, but dropped due to successive message
setTimeout(() => fn('c'), windowSize * 0.9); // last message in first window
setTimeout(() => fn('d'), windowSize * 1.2); // first message in new window
// expected output:
// a
// c
// d
In reactive UIs, throttling logic which emits only on a single edge, i.e. either leading or trailing edge, is insufficient as:
Signatures:
PublisherBase<Long> : <T> PublisherBase#findIndex(T)
PublisherBase<Long> : <T> PublisherBase#findIndex(T, long)
Return a position relative to the sequence observed when element is found. Might onError(NoSuchElementException)
or use the fallback argument long, e.g. -1L
.
The Scheduler.Worker#schedule
javadoc says:
/**
* Creates a worker of this Scheduler that executed task in a strict
* FIFO order, guaranteed non-concurrently with each other.
* <p>
* Once the Worker is no longer in use, one should call shutdown() on it to
* release any resources the particular Scheduler may have used.
*
* <p>Tasks scheduled with this worker run in FIFO order and strictly non-concurrently, but
* there are no ordering guarantees between different Workers created from the same
* Scheduler.
*
* @return the Worker instance.
*/
The FIFO order seems to be preserved in all of the code examples but I don't understand the semantics being used for "strictly non-concurrently".
For example it appears ExecutorServiceScheduler will have Workers that will run tasks concurrently unless a single thread executor is used.
In other Reactive Streams implementations that have a scheduler aka rxjava there is no mention of that guarantee.
Is there some implementation logic I'm missing or am I misunderstanding the doc?
Distribute the sequence onto N transformable Publisher
that can be eventually joined back.
Ex :
stream.partition(6).dispatchOn(ForkJoinPool.commonPool()).map(service::blockingTask).merge()
Supported PartitionPublisher operator :
Partition resolution could be provided via key extractor and unlike GroupBy would use a fixed-delimited list of keys.
The unbounded flatMap
has terrible performance if a consumer, such as observeOn
processes slower.
The main reasons:
For example, this test runs in about 30s:
range(1, 100_000).flatMap(v -> range(1, 10)).subscribe(new TestSubscriber<>(0L));
(then it takes 12s to drain by requesting 96 elements in a loop, see PublisherFlatMapTest.slowDrain()
.)
The cleanup can be short circuited because if one finds a non-empty source, there is no need to continue removing any potential finished and empty sources after that, the request-based drain will take care of those entries eventually.
The tracking overhead can be reduced via a free slot queue and power-of-2 allocations, but with some cost because of synchronized
and no compaction as of now.
The benchmark results (i7 4790, Windows 7 x64, Java 8u72):
Where COW
is the original copy-on-write tracking, FL
is a free-list based solution which already shows some promise; COW-2
is COW
where the cleanup is short-circuited and FL-2
is FL
with the cleanup short circuited as well.
FL-2 seems to be promising with a bunch of +/- 5% loss/gain, probably due to noise.
I'm looking into non-synchronized options with the FL solution.
Unbounded Operators should at least implement Backpressurable#getCapacity() : Long.MAX_VALUE
.
Some Unbounded Operators might also benefit from new accounting strategies. Let's find out possible candidate and extract relevant issues.
This issue, similar to #21, collects some interesting cases and pitfalls in the form of Java Puzzlers, giving a scenario and offering 4-6 answers.
Review micro fusion path for more dropping back-fusion.
In this issue, we should collect tips and tricks with reactive systems and dataflows.
These are not particularly advanced topics but the markdown support on GitHub makes it easier to write them up.
Once we run out of ideas, we may tidy it up and release it together (maybe a free ebook?).
Please post only gems here and open discussion about them in separate issues. Thanks.
There are a few decisions to be made:
Who should do this? I have a gradle setup I usually copy into new projects that can upload to maven based on local settings.
What should be the base package name. In addition, we may need an internal
package.
Most serializing operators can be implemented in both ways but lock-free usually requires lock-free queues (i.e., JCTools).
The project states it wants to avoid queue-use so we don't lock in on Disruptor or JCTools but is it possible to abstract away the queue operations, i.e., is the j.u.c.Queue interface an adequate abstraction?
We can also have implementations for both cases in separate subpackages lockfree
and locking
.
Java 8 allows nicer code but Java 6 allows Android use.
Do we want to provide these as well? Note that the operator redo
- which unifies retry and repeat behavior through the signal of a Publisher requires a PublishProcessor at least (or a BehaviorProcessor at most), although these may be inlined.
Should the library provide the operators via factory methods, i.e., Publisher<R> map(Publisher<T> source, Function<T, R> mapper);
or as named classes: class PublisherMap<T, R>
?
The latter case helps discovering what's in a chain.
I'm in a relatively early stage of describing and prototyping operator fusion. What's definitely necessary is that each operator has a distinct class which also exposes its input parameters (i.e., the source and mapper in 6))
Do we want to copy the RxJava way of introducing new operators/classes?
Do we want JMH benchmarks in the repo?
I have two problems with it: a) requires TestNG which is quite invasive in Eclipse and b) is designed to test either just
and a replaying-type Processor implementation. It doesn't work properly for range
for example because the test wants to verify if the output of the Publisher is the same as the test generated.
Certain internal operator optimizations, such as reusing a Subscriber instance, may violate the specification although completely safe to do.
The spec says that you can't subscribe the same instance multiple times to the same or different Publishers and some implementations may actually check for this (such as RxJavaReactiveStreams).
To avoid allocations, some operators may build upon existing classes or implement interfaces that leak into the API. For example, an auto-sharing operator PublisherAutoShare
may extend AtomicInteger
directly to save on an atomic counter field. However, this class now has a bunch of other methods that leaked in and requires extra care from the users
There are a few frequent building blocks useful for operators (i.e., arbiters, terminal atomics tools). Do we want to officially support them as well or keep them hidden in internal
?
Which technique do we want? Note that Java 8 is about to receive performance optimizations targeting field updaters so they get very close to Unsafe.
In Reactor I have created a TestSubscriber
inspired from the one of RSC, and I have a question.
I am not sure if we want (on Reactor side) to keep constructors using a delegate Subscriber
because I am not sure about the use cases, could you please give me some hints about what use cases they fit in?
Thanks in advance for your help.
Hi,
I only skimmed the README, but shouldn't the Transformations be named "Processor" rather than "Publisher"? (https://github.com/reactor/reactive-streams-commons#supported-transformations)
@akarnokd has written in several places that the classic case of:
publisher.map(x -> someExpensiveComputation(x))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(x -> someFn(x))
means that map and observeOn cannot be fused together. I understand this point well. However, I'm interested in a variant of this pipeline:
publisher.map(x -> someExpensiveComputation(x))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(x -> someFn(x))
I might be misunderstanding the matrix, but it seems to suggest that subscribeOn cannot be fused with the map which as far as I can tell is possible? Moreover, the matrix suggests that subscribeOn and observeOn can be fused together which I am failing to understand.
Also a related question, is the async fusion in subscribeOn currently implemented? From the code it doesn't look like it (I'm looking at https://github.com/reactor/reactive-streams-commons/blob/master/src/main/java/rsc/publisher/PublisherSubscribeOn.java) but I'm not sure if it's been worked on locally.
Finally is there a place where @smaldini and @akarnokd discuss what is happening in this repo? I would be very interested in following along!
Currently written with a combination of 2 or 3 operators, timed microbatch like stream.window(10, () -> PublisherBase.timer(....))
or stream.buffer(10, () -> PublisherBase.timer(....))
would be useful.
Hi, I am from a Financial Organization. have multiple use cases to use stream processing to send alerts and notifications to customers. We are interested in reactor-streams and I came to this project. I understand it is the second generation of Reactor Streams. Please confirm and let me know when is the targeted release for this project to be consumed for production?
Please share if you have any documentation like user guide that would be helpful.
Thanks and Regards
Karthik
IndexedQueue usually work with volatile consumer indices. We could define a convention to "batch" increment this index by doing it once at the end of drain loops instead of using queue.poll()
that increments index everytime.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.