Code Monkey home page Code Monkey logo

reactive-kafka's Introduction

Reactive Streams for Kafka

Maven Central
If you have questions or are working on a pull request or just curious, please feel welcome to join the chat room: Join the chat at https://gitter.im/softwaremill/reactive-kafka

Reactive Streams wrapper for Apache Kafka.

Initiated by SoftwareMill

Supports Kafka 0.9.0.0
For Kafka 0.8 see this branch.

Available at Maven Central for Scala 2.10 and 2.11:

resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"

libraryDependencies += "com.softwaremill.reactivekafka" %% "reactive-kafka-core" % "0.9.0"

Example usage

Scala

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka}

implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
val publisher: Publisher[StringConsumerRecord] = kafka.consume(ConsumerProperties(
 bootstrapServers = "localhost:9092",
 topic = "lowercaseStrings",
 groupId = "groupName",
 valueDeserializer = new StringDeserializer()
))
val subscriber: Subscriber[StringProducerMessage] = kafka.publish(ProducerProperties(
  bootstrapServers = "localhost:9092",
  topic = "uppercaseStrings",
  valueSerializer = new StringSerializer()
))

Source.fromPublisher(publisher).map(m => ProducerMessage(m.value().toUpperCase))
  .to(Sink.fromSubscriber(subscriber)).run()

Java

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public void run() {
String brokerList = "localhost:9092";

ReactiveKafka kafka = new ReactiveKafka();
ActorSystem system = ActorSystem.create("ReactiveKafka");
ActorMaterializer materializer = ActorMaterializer.create(system);

StringDeserializer deserializer = new StringDeserializer();
ConsumerProperties<String> cp =
   new PropertiesBuilder.Consumer(brokerList, "topic", "groupId", deserializer)
      .build();

Publisher<ConsumerRecord<String, String>> publisher = kafka.consume(cp, system);

StringSerializer serializer = new StringSerializer();
ProducerProperties<String, String> pp = new PropertiesBuilder.Producer(
   brokerList,
   "topic",
   serializer,
   serializer).build();

Subscriber<ProducerMessage<String, String>> subscriber = kafka.publish(pp, system);
Source.fromPublisher(publisher).map(this::toProdMessage)
  .to(Sink.fromSubscriber(subscriber)).run(materializer);
}

private ProducerMessage<String, String> toProdMessage(ConsumerRecord<String, String> record) {
  return KeyValueProducerMessage.apply(record.key(), record.value());
}

Passing configuration properties to Kafka

In order to set your own custom Kafka parameters, you can construct ConsumerProperties and ProducerProperties using some of their provided methods in a builder-pattern-style DSL, for example:

import org.apache.kafka.common.serialization.StringDeserializer
import com.softwaremill.react.kafka.ConsumerProperties

val consumerProperties = ConsumerProperties(
  "localhost:2181",
  "topic",
  "groupId",
  new StringDeserializer()
)
  .readFromEndOfStream()
  .consumerTimeoutMs(300)
  .commitInterval(2 seconds)
  .setProperty("some.kafka.property", "value") 

The ProducerProperties class offers a similar API.

Controlling consumer start offset

By default a new consumer will start reading from the beginning of a topic, fetching all uncommitted messages. If you want to start reading from the end, you can specify this on your ConsumerProperties:

  val consumerProperties = ConsumerProperties(...).readFromEndOfStream()

Working with actors

Since we are based upon akka-stream, the best way to handle errors is to leverage Akka's error handling and lifecycle management capabilities. Producers and consumers are in fact actors.

Obtaining actor references

ReactiveKafka comes with a few methods allowing working on the actor level. You can let it create Props to let your own supervisor create these actor as children, or you can directly create actors at the top level of supervision. Here are a some examples:

import akka.actor.{Props, ActorRef, Actor, ActorSystem}
import akka.stream.ActorMaterializer
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import com.softwaremill.react.kafka.{ReactiveKafka, ProducerProperties, ConsumerProperties}

// inside an Actor:
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
// consumer
val consumerProperties = ConsumerProperties(
  brokerList = "localhost:9092",
  topic = "lowercaseStrings",
  groupId = "groupName",
  valueDeserializer = new StringDeserializer()
)
val consumerActorProps: Props = kafka.consumerActorProps(consumerProperties)
val publisherActor: ActorRef = context.actorOf(consumerActorProps)
// or:
val topLevelPublisherActor: ActorRef = kafka.consumerActor(consumerActorProps)

// subscriber
val producerProperties = ProducerProperties(
  brokerList = "localhost:9092",
  topic = "uppercaseStrings",
  new StringSerializer()
)
val producerActorProps: Props = kafka.producerActorProps(producerProperties)
val subscriberActor: ActorRef = context.actorOf(producerActorProps)
// or:
val topLevelSubscriberActor: ActorRef = kafka.producerActor(producerProperties)

Handling errors

When a consumer or a producer fail to read/write from Kafka, the error is unrecoverable and it requires that the connection get terminated. This will be performed automatically and the KafkaActorSubscriber / KafkaActorPublisher which failed will be stopped. You can use DeathWatch to detect such failures in order to restart your stream. Additionally, when a producer fails, it will signal onError() to its inputs in order to stop the rest of stream graph elements.

Example of monitoring routine:

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.stream.ActorMaterializer
import com.softwaremill.react.kafka.KafkaMessages._
import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka}

class Handler extends Actor {
  implicit val materializer = ActorMaterializer()

  def createSupervisedSubscriberActor() = {
    val kafka = new ReactiveKafka()

    // subscriber
    val subscriberProperties = ProducerProperties(
      bootstrapServers = "localhost:9092",
      topic = "uppercaseStrings",
      valueSerializer = new StringSerializer()
    )
    val subscriberActorProps: Props = kafka.producerActorProps(subscriberProperties)
    val subscriberActor = context.actorOf(subscriberActorProps)
    context.watch(subscriberActor)
  }

  override def receive: Receive = {
    case Terminated(actorRef) => // your custom handling
  }
  
  // Rest of the Actor's body
}

Cleaning up

If you want to manually stop a publisher or a subscriber, you have to send an appropriate message to the underlying actor. KafkaActorPublisher must receive a ActorPublisherMessage.Cancel, where KafkaActorSubscriber must receive a ActorSubscriberMessage.OnComplete.
If you're using a PublisherWithCommitSink returned from ReactiveKafka.consumeWithOffsetSink(), you must call its cancel() method in order to gracefully close all underlying resources.

Manual Commit (version 0.8 and above)

In order to be able to achieve "at-least-once" delivery, you can use following API to obtain an additional Sink, when you can stream back messages that you processed. An underlying actor will periodically flush offsets of these messages as committed. Example:

import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.softwaremill.react.kafka.KafkaMessages._
import akka.stream.scaladsl.Source
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}

implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
val consumerProperties = ConsumerProperties(
  brokerList = "localhost:9092",
  topic = "lowercaseStrings",
  groupId = "groupName",
  valueDeserializer = new StringDeserializer())
.commitInterval(5 seconds) // flush interval
    
val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source.fromPublisher(consumerWithOffsetSink.publisher)
  .map(processMessage(_)) // your message processing
  .to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit
  .run()

Tuning

KafkaActorSubscriber and KafkaActorPublisher have their own thread pools, configured in reference.conf. You can tune them by overriding kafka-publisher-dispatcher.thread-pool-executor and kafka-subscriber-dispatcher.thread-pool-executor in your application.conf file.
Alternatively you can provide your own dispatcher name. It can be passed to appropriate variants of factory methods in ReactiveKafka: publish(), producerActor(), producerActorProps() or consume(), consumerActor(), consumerActorProps().

Testing

Tests require Apache Kafka and Zookeeper to be available on localhost:9092 and localhost:2181

reactive-kafka's People

Contributors

amorfis avatar andrioni avatar dragisak avatar gitter-badger avatar jamesmorgan avatar kazuhiro avatar kciesielski avatar ktoso avatar manub avatar markglh avatar note avatar suiryc avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.