aol / cyclops Goto Github PK
View Code? Open in Web Editor NEWAn advanced, but easy to use, platform for writing functional applications in Java 8.
License: Apache License 2.0
An advanced, but easy to use, platform for writing functional applications in Java 8.
License: Apache License 2.0
Currently always uses an Executor for Sequential Elastic Pools.
Something like
bind(Function<T,CompletableFuture<R>> mapper);
That flattens the returned future into the internal operating Stream of futures. By comparison
map(t->CompletableFuture.supplyAsync(()->process(t))
which would nest the returned Futures inside the current internal Stream of Futures .
Current behaviour is to wait for a result, rather than timing out when batch max time reached for a batch.
Using fromStream forces these methods to execute synchronously.
Results in occasionally inconsistent results.
Such as
thenAsync
peekAsync
filterAsync
Overloaded versions that allow a different Executor to be supplied should also be provided. This could be especially useful for Streams which fan out & in at different stages e.g. use async() to fan out and distribute work across threads, before continuing using sync methods. Advanced operators force a fan in - via a Many Producer / Single Consumer Queue, and user code currently has to fan back out - again via switching between async() and sync() - being able to write
lazyReact.react(()->service1,()->service2,()->service3)
.thenSync(this::process)
.flatMap(Collection::stream)
.thenAsync(this::save)
.run();
is a little bit simpler than
lazyReact.react(()->service1,()->service2,()->service3)
.sync() // continue on calling thread
.map(this::process)
.flatMap(Collection::stream)
.async() //resubmit to an Executor
.map(this::save)
.run();
Cyclops issue : #85
LazyFutureStream.range(1,10);
LazyFutureStream.range(1L,10L);
LazyFutureStream.rangeBetween(1.00d,10.00d).withStep(0.05);
Terminal operations typically run on a single thread (excluding parallel reduction options -which make use of JDK parallel Streams).
Any processing in terminal ops (such as anyMatch, xMatch, noneMatch) that can be moved onto the Futures should be, that aren't already should be (it's currently available for forEach)
capture(e->log.error(e,e.getMessage))
Not capturing errors when runOnCurrent and other run methods used.
Allow grouping by up to a maximum time or size.
NonBlockingQueue's implement BlockingQueue interface which is confusing.
Create new interface that wraps both and break out into separate top level classes.
FuturePool is written to support single threaded reads and writes, but multiple threads can write to the pool. The main pathway is where the thread used by the underlying Stream both reads and writes to the pool. When a toQueue operation is completed however, a Future can returned itself to the pool when it completes - this is on a different thread.
FuturePool needs to use a thread-safe collection such as an AtomicArrayReference to store pooled objects.
This is on master, but on any released version of simple-react.
LazyReact builder = new LazyReact(100,100);
Currently to process a generative data supply we have to similate generate with limit with range functionality. E.g.
builder.range(0,max)
.map(i->dataPool.next())
.forEach(data -> work(data);
Generate with a limit would be more intuitive
builder.generate(()->dataPool.next())
.limit(max)
.forEach(data -> work(data);
Currently simple-react is built on top of a JDK 8 Stream of CompletableFutures. This involves the Stream creating or managing a CompletableFuture for each element in the Stream. On top of this the Stream passes the execution pipeline to each element (via the CompletableFuture interface and methods such as thenApply or thenApplyAsync). CompletableFuture is an immutable functional class, and returns a new instance each time, so we create an additional set of Objects proportional to the number of operations to be applied per element in the Stream. This is ripe for optimisation.
A simple benchmark test shows that throughput measuring the repeated application of the identity function, on a Mac Book Pro, more than halves from 330 million applications per second to less than half that (137 million application per second) when the process is allowed to run long enough to cause the garbage collector to kick in. A large portion of the Objects that build up in memory are CompletableFuture instances.
The execution pipeline is (logically) shared, it need only be defined once. This change alone could reduce the number of new Objects to 1 per element.
There are a finite number of Future tasks active per Stream at any given point, recycling Future instances via Object pooling would also significantly reduce the number Objects created. For a large Stream of data the number of Objects in total (Futures + elements) would trend towards the number of elements alone (as the number of elements dwarfs the number of reused Future objects).
Implementing the reactive-streams spec will allow interoperability with other streams implementations.
If Streams are above a certain threshold in size, and if the autoParallelReduction flag is selected LazyFutureStreams can switch from single-threaded reduction to parallel reduction.
Related to #30
Offer EagerFutureStream future based operators methods on LazyFutureStream where is makes sense.
LazyFutureStreams could automatically use better performing Agrona based Queues for all operators except for flatMap (which may need to use an unbounded Queue).
LazyFutureStream can perform complex operations on results asynchronously, because evaluation is lazy (e.g. map (CompletableFuture::completedFuture) doesn't have to be completed immediately before continuing).
Because EagerFutureStream materializes a collection at each phase, any operation that requires a mapping of concrete results back to CompletableFutures needs to be completed before the phase definition can be complete. This creates a bottlekneck where EagerFutureStream behaves syncrhonously.
xxxxFutures methods should become the default behaviour for EagerFutureStream by swapping with their results based equialent. The methods can be called xxxxOnResults.
simple-react v0.99 will introduce a new operator that returns a set of asynchronous terminal operations (the futureOperations operator).
A similar set of organisational operators could be used to organise access to other operators - in particular to separate access to operators that act on bare metal futures and those that operate on results.
operateOnFutures : switch to a mode where all available operators act directly on underlying futures
standardOperation : return to the standard view from operateOnFutures view
Producers of NonBlocking Queues should be able to either
Consumers should be able to sleep for a configurable time period or spin until data available.
It should be possible in LazyReact to select an option that will result in all operations being memoized. It should also be possible to plugin the memoization method, so that it could be backed by LRU or TTL based caching.
LazyFutureStream should extends (cyclops-streams)SequenceM which extends (jool)Seq which extends (JDK)Stream. Taking advantage of functionality available for the Streams eco-system across 3 other projects. This will add all the SequenceM operators to LazyFutureStream.
SequenceM should also be made available as a top level Stream.
SequenceM - fast sequential Stream, can execute asynchronously on a separate (single thread), supports Reactive Streams and connectable hotstreams.
LazyFutureStream - advanced stream functionality particularly useful for multi-threaded I/O - based on FastFuture. Supports Reactive Streams and connectable hotstreams. Can be used as a sequential Stream, but SequenceM would be much faster (conversely LazyFutureStream much more performant for multi-threaded blocking I/O).
SimpleReactStream - merged SimpleReact and EagerFutureStreams - simpler API, eager behaviour - for blocking I/O - based on CompletableFuture (allOf/ anyOf also exposed)
Adds new operators
and more!
Works fine for SimpleReact, but not LazyReact
This works, but with LazyReact it doesn't
new SimpleReact().from(
this.restClient.postForEntity(apiURL, new HttpEntity(query,headers), Result.class))
.peek(System.out::println)
.then(action->asyncResponse.resume(action))
.peek(System.out::println)
.onFail(error->{ error.printStackTrace(); return asyncResponse.resume(error.getCause());})
.peek( status->bus.post(RequestEvents.finish(query, correlationId)));
Fixed on performance-reliability branch
As of 0.99.3 it is
SequenceM<T> shuffle()
Should be
LazyFutureStream<T> shuffle()
make sure suitable
react methods in SimpleReact, LazyReact and EagerReact
of methods in LazyFutureStream, EagerFutureStream, SimpleReactStream
Output onFail should be the same as then/map/retry phase.
Currently Capture is implemented by checking the result of a completed future, it should be implemented as an event instead - which may speed up some terminal operations.
Agrona Many-to-one Concurrent Queue seems a good fit.
LazyFutureStreams with multi-threaded executors can 'fan-out' operations across all executors. Subsequent operations should occur on the calling thread for speed. Some more advanced operations require data sharing across threads and make use of a Many Producer Single Consumer Queue. Fan out should occur again once data is collected from the queue.
An autoOptimise option should manage automatic fanOut
Include javadoc for each operator (can't rely on @see to cyclops-sequence-api for example). Fix formatting on each operator.
e.g.
EagerFutureStream.parallelCommonBuilder()
.fromPrimitiveStream(IntStream.range(0, 1000000))
.map(it -> it*100)
.jitter(10l)
.peek(System.out::println)
.block();
It works with run, runOnCurrent etc
Issue occurs when filtering on in a stream that includes an operation that makes use of queues (e.g. flatMap)
LazyReact
.parallelCommonBuilder()
.iterateInfinitely("", last -> nextFile())
.limit(100)
.map(this::readFileToString)
.map(this::parseJson)
.batchByTime(1, TimeUnit.MICROSECONDS)
.peek(batch -> System.out.println("batched : " + batch))
.filter(c->!c.isEmpty())
.map(this::processOrders)
.forEach(next -> count2= count2+next.size());
Eager Streams (EagerFutureStream and SimpleReactStream) perform a terminal operation on the underlying Stream at each phase, this means for any (more complex) operators (see list of operators that are batched for EagerFuturestream) that require data to flow though a queue they need to pull all data in from that Queue before the subsequent stages can be started (in other words data must flow through the queue to the next stage as a batch).
This makes time based operators such as batchByTime, jitter, debounce, chunked etc. less useful than for LazyFutureStreams. Reducing the operator set for EagerFutureStream would reduce the gap between it and SimpleReactStream - and perhaps they should be merged, leaving 2 streams
LazyFutureStream for infinite async streaming
and
EagerFutureStream, simpler API, for processing of small batches
Currently it will return your Nano seconds * 10000000, but it should be a divide.
public final long getElapsedMillis(){
return elapsedNanos / 1000000;
}
Although, this will give you a rounding error, which may or may not matter.
Create a single Eager based 'stream' - not a JDK Stream. Passing data to a queue for aggregation doesn't work as well with Eager Streams. The new Stream should not use operations that result in batchng - a small, simple API for primarily for handling blocking I/O asyncrhonously and in parallel.
Zip currently relies on the Seq implementation which takes two iterators on the top level Streams. This means that Zip populates the new Zipped Stream as results from those Streams flow in.
We should keep this functionality but make it available under a different name.
I think it makes more sense when zipping two Streams if the original order of the data is maintained. We can do this by Zipping the underlying Streams of CompletableFutures, the new Stream can be populated asynchronously.
Hi
First congratulation for this great tools.
i want to process a batch tool, in parallel for each files of the File.walk stream , how to mixed them ?
like
LazyFutureStream.parallel(Files.walk(dir))
.forEach(id -> { //do work here
System.out.println(id + "\t" + Thread.currentThread());
});
any idea ?
Best regards
Bruno
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.