Code Monkey home page Code Monkey logo

pulsar-client-reactive's Introduction

Reactive client for Apache Pulsar

Reactive client for Apache Pulsar which is compatible with the Reactive Streams specification. This uses Project Reactor as the Reactive Streams implementation.

Getting it

This library requires Java 8 or + to run.

With Gradle:

dependencies {
    implementation "org.apache.pulsar:pulsar-client-reactive-adapter:0.5.4"
}

With Maven:

<dependencies>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-reactive-adapter</artifactId>
        <version>0.5.4</version>
    </dependency>
</dependencies>

Usage

Initializing the library

In standalone application

Using an existing PulsarClient instance:

ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);

Sending messages

ReactiveMessageSender<String> messageSender = reactivePulsarClient
        .messageSender(Schema.STRING)
        .topic(topicName)
        .maxInflight(100)
        .build();
Mono<MessageId> messageId = messageSender
        .sendOne(MessageSpec.of("Hello world!"));
// for demonstration
messageId.subscribe(System.out::println);

Sending messages with cached producer

By default, a ConcurrentHashMap based cache is used. It’s recommended to use a more advanced cache based on Caffeine. The cache will get used as the default implementation when it is on the classpath.

Adding Caffeine based producer cache with Gradle:

dependencies {
    implementation "org.apache.pulsar:pulsar-client-reactive-adapter:0.5.4"
    implementation "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine:0.5.4"
}

Adding Caffeine based producer cache with Maven:

<dependencies>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-reactive-adapter</artifactId>
        <version>0.5.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-reactive-producer-cache-caffeine</artifactId>
        <version>0.5.4</version>
    </dependency>
</dependencies>

Usage example of cache

ReactiveMessageSender<String> messageSender = reactivePulsarClient
        .messageSender(Schema.STRING)
        .cache(AdaptedReactivePulsarClientFactory.createCache())
        .topic(topicName)
        .maxInflight(100)
        .build();
Mono<MessageId> messageId = messageSender
        .sendOne(MessageSpec.of("Hello world!"));
// for demonstration
messageId.subscribe(System.out::println);

It is recommended to use a cached producer in most cases. The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls. This improves performance since a producer won’t have to be created and closed before and after sending a message.

The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages. The maxInflight setting will limit the number of messages that are pending from the client to the broker. The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit. This limit is per-topic and impacts the local JVM only.

Shaded version of Caffeine

A version of the provider is available that shades it usage of Caffeine. This is useful in scenarios where there is another version of Caffeine required in your application or if you do not want Caffeine on the classpath.

Adding shaded Caffeine based producer cache with Gradle:

dependencies {
    implementation "org.apache.pulsar:pulsar-client-reactive-adapter:0.5.4"
    implementation "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine-shaded:0.5.4"
}

Adding shaded Caffeine based producer cache with Maven:

<dependencies>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-reactive-adapter</artifactId>
        <version>0.5.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-reactive-producer-cache-caffeine-shaded</artifactId>
        <version>0.5.4</version>
    </dependency>
</dependencies>

Reading messages

Reading all messages for a topic:

    ReactiveMessageReader<String> messageReader =
            reactivePulsarClient.messageReader(Schema.STRING)
                    .topic(topicName)
                    .build();
    messageReader.readMany()
            .map(Message::getValue)
            // for demonstration
            .subscribe(System.out::println);

By default, the stream will complete when the tail of the topic is reached.

Example: poll for up to 5 new messages and stop polling when a timeout occurs

With .endOfStreamAction(EndOfStreamAction.POLL) the Reader will poll for new messages when the reader reaches the end of the topic.

    ReactiveMessageReader<String> messageReader =
            reactivePulsarClient.messageReader(Schema.STRING)
                    .topic(topicName)
                    .startAtSpec(StartAtSpec.ofLatest())
                    .endOfStreamAction(EndOfStreamAction.POLL)
                    .build();
    messageReader.readMany()
            .take(Duration.ofSeconds(5))
            .take(5)
            // for demonstration
            .subscribe(System.out::println);

Consuming messages

    ReactiveMessageConsumer<String> messageConsumer=
        reactivePulsarClient.messageConsumer(Schema.STRING)
        .topic(topicName)
        .subscriptionName("sub")
        .build();
    messageConsumer.consumeMany(messageFlux ->
                    messageFlux.map(message ->
                            MessageResult.acknowledge(message.getMessageId(), message.getValue())))
        .take(Duration.ofSeconds(2))
        // for demonstration
        .subscribe(System.out::println);

Consuming messages using a message handler component with auto-acknowledgements

ReactiveMessagePipeline reactiveMessagePipeline =
    reactivePulsarClient
        .messageConsumer(Schema.STRING)
        .subscriptionName("sub")
        .topic(topicName)
        .build()
        .messagePipeline()
        .messageHandler(message -> Mono.fromRunnable(()->{
            System.out.println(message.getValue());
        }))
        .build()
        .start();
// for demonstration
// the reactive message handler is running in the background, delay for 10 seconds
Thread.sleep(10000L);
// now stop the message handler component
reactiveMessagePipeline.stop();

License

Reactive client for Apache Pulsar is Open Source Software released under the Apache Software License 2.0.

How to Contribute

The library is Apache 2.0 licensed.

Contributions are welcome. Please discuss larger changes on the Apache Pulsar dev mailing list. There’s a contributing guide with more details.

Bugs and Feature Requests

If you detect a bug or have a feature request or a good idea for Reactive client for Apache Pulsar, please open a GitHub issue.

Questions

Please use [reactive-pulsar] tag on Stackoverflow. Ask a question now.

pulsar-client-reactive's People

Contributors

cbornet avatar lhotari avatar onobc avatar nicoloboschi avatar tisonkun avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.