reactor / .github Goto Github PK
View Code? Open in Web Editor NEWTemplate repository for reactor projects, includes typical README + labels
License: Apache License 2.0
Template repository for reactor projects, includes typical README + labels
License: Apache License 2.0
In my project I had to add stuff in the context and I had to duplicate the method for Mono and Flux.
It would be convenient to have an interface such as CoreContext
public interface CoreContext {
CoreContext contextCapture();
CoreContext contextWrite(ContextView contextToAppend);
CoreContext contextWrite(Function<Context, Context> contextModifier);
}
And in Mono becomes :
public abstract class Mono<T> implements CorePublisher<T>, CoreContext {
//...
public final Mono<T> contextCapture() {
if (!ContextPropagationSupport.isContextPropagationAvailable()) {
return this;
}
if (ContextPropagationSupport.propagateContextToThreadLocals) {
return onAssembly(new MonoContextWriteRestoringThreadLocals<>(
this, ContextPropagation.contextCapture()
));
}
return onAssembly(new MonoContextWrite<>(this, ContextPropagation.contextCapture()));
}
public final Mono<T> contextWrite(ContextView contextToAppend) {
return contextWrite(c -> c.putAll(contextToAppend));
}
public final Mono<T> contextWrite(Function<Context, Context> contextModifier) {
if (ContextPropagationSupport.shouldPropagateContextToThreadLocals()) {
return onAssembly(new MonoContextWriteRestoringThreadLocals<>(
this, contextModifier
));
}
return onAssembly(new MonoContextWrite<>(this, contextModifier));
}
//...
}
And Flux :
public abstract class Flux<T> implements CorePublisher<T>, CoreContext {
...
public final Flux<T> contextCapture() {
if (!ContextPropagationSupport.isContextPropagationAvailable()) {
return this;
}
if (ContextPropagationSupport.propagateContextToThreadLocals) {
return onAssembly(new FluxContextWriteRestoringThreadLocals<>(
this, ContextPropagation.contextCapture()
));
}
return onAssembly(new FluxContextWrite<>(this, ContextPropagation.contextCapture()));
}
public final Flux<T> contextWrite(ContextView contextToAppend) {
return contextWrite(c -> c.putAll(contextToAppend));
}
public final Flux<T> contextWrite(Function<Context, Context> contextModifier) {
if (ContextPropagationSupport.shouldPropagateContextToThreadLocals()) {
return onAssembly(new FluxContextWriteRestoringThreadLocals<>(
this, contextModifier
));
}
return onAssembly(new FluxContextWrite<>(this, contextModifier));
}
...
}
The motivation for a developer who uses reactor would be to have :
public static <T> Mono<T> addStuffInContext(Mono<T> mono){
return mono.contextWrite(context->context.putAllMAp(Map.of("key", "value")));
}
public static <T> Flux<T> addStuffInContext(Flux<T> mono){
return mono.contextWrite(context->context.putAllMAp(Map.of("key", "value")));
}
becoming :
public static <T> CoreContext<T> addStuffInContext(CoreContext<T> mono){
return mono.contextWrite(context->context.putAllMAp(Map.of("key", "value")));
}
I could contribute it if we agree where in which package to put CoreContext (ex: reactor.core) and how to name it
Hi, I am encountering a somewhat weird behavior when using bufferUntilChanged with a hot publisher.
When an element comes in, it is compared to the previous element and if the keyComparator returns false, the previous element is added to the buffer and the buffer is emitted.
A new buffer is created containing the current element and it is left hanging until a new element arrives or the publisher completes.
If no new element arrives, this new buffer is not emitted.
More specifically, I am trying to implement a session window as defined by Kafka Streams and Flink, so I am actually comparing the event times of the elements.
This window is defined based on the time between two consecutive messages.
If the time between two consecutive messages is less than the specified session gap, then the messages are considered to belong to the same session.
If the gap is larger than the session gap, the window is emitted and a new window is started.
I created a test that shows this behavior, the last element is emitted in its own buffer only when the publisher is completed.
void testBufferUntilChanged() {
var testPublisher = TestPublisher.<Pair<Integer, Long>>create();
var flux = testPublisher.flux().bufferUntilChanged(Function.identity(), (pair1, pair2) -> {
var diff = pair2.getRight() - pair1.getRight();
System.out.println("Diff " + diff);
return diff < 500L;
});
StepVerifier
.create(flux)
.then(() -> testPublisher.next(Pair.of(1, System.currentTimeMillis())))
.thenAwait(Duration.ofMillis(400L))
.then(() -> testPublisher.next(Pair.of(2, System.currentTimeMillis())))
.thenAwait(Duration.ofMillis(400L))
.then(() -> testPublisher.next(Pair.of(3, System.currentTimeMillis())))
.thenAwait(Duration.ofMillis(600L))
.then(() -> testPublisher.next(Pair.of(4, System.currentTimeMillis())))
.assertNext(buffer -> assertThat(buffer.stream().map(Pair::getKey).collect(Collectors.toList()), containsInAnyOrder(1, 2, 3)))
.thenAwait(Duration.ofMillis(1000L))
.then(testPublisher::complete)
.assertNext(buffer -> assertThat(buffer.stream().map(Pair::getKey).collect(Collectors.toList()), containsInAnyOrder(4)))
.verifyComplete();
}
As an enhancement to the bufferUntilChanged methods, do you think it would be useful to create a new signature with a new Duration parameter?
It would be similar to other buffer methods that have Duration parameters.
If a new element does not arrive in the specified Duration interval, the buffer will be emitted no matter what.
As a workaround, I am thinking of adding some kind of scheduled heartbeat elements to the publisher.
I am using Flux.create, so I can emit items from a scheduled service.
But it is not ideal, I believe it will complicate the processing pipeline unnecessarily.
Thanks.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.