Code Monkey home page Code Monkey logo

kafka-exactly-once's People

Contributors

iron9light avatar joonsun-baek avatar koeninger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-exactly-once's Issues

few questions about non transactional DB, end-to-end semantics

hi,

I have few questions:

*** sorry from advance if that is not the correct place to ask it

is it possible to have the exactly once semantics with DB such as cassandra ?
(maybe to use atomic batches to keep offsets in another table, but that is also only eventually consistence)

In the approach of saving offsets in same transaction, that will indeed guarantee exactly once reading from kafka, but when using foreach operations (especially those that have side effects such as DB writes), is it possible to achieve exactly once end-to-end ? in case the foreach operation would be re-executed, it can cause to duplications in the DB (especially in cases where some count logic involved and when it is impossible to have idempotent writes)

if I have the following pattern:
for each RDD in Dstream:
for each kafka record in RDD:
currValue = read value from c* with key= kafka record.key
newValue = create new value based on the kafka record value and currValue
write newValue back to c* where key=record.key

Is there way to have in such pattern exactly once ?
I thought to use at-least-once reading from kafka and to store in c* with each newValue the partition/offset of the last processed kafka record corresponding to that c* key.
In case of some failure (whether its committing the offset back to kafka or re-execution of the RDD) which would require data reprocessing, since part of my pattern is first to read data from c*, I can check if the offset of the kafka record being processed is larger then the offset stored with the c* record)

any possible pitfalls with such design ?

Processing Backlog Of Direct Stream

Hi Cody,

  • Spark 1.6.0
  • Kafka 0.9.0 (using 0.8.2.2 API and client libraries in Spark)

Excellent blog post, I've been playing around with direct streaming for a little while now and I have some questions. My use case is to consume all partitions from one or more topic using a custom value decoder which creates a stream of Point objects that I then bulk write to an Influx DB cluster.

I wrote a utility, Korpse, that stores/fetches offsets in Kafka. When there is no backlog, this solution works really well and I process my events in 30 second batches.

    val fromOffsets = korpse.getFromOffsets()
    if (!fromOffsets.isDefined) {
      LOG.error("Unable to determine starting offsets")
      System.exit(1)
    }

    var offsetRanges = Array[OffsetRange]()

    val stream = KafkaUtils.createDirectStream[String, Point, StringDecoder, PointDecoder, (Point)](ssc, kafkaParams, fromOffsets.get,
      (mmd: MessageAndMetadata[String, Point]) => (
        mmd.message()))
      .transform { rdd =>
        offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }

    stream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val influxdb = InfluxDB.connect("influxdb-1.lab, 8086)
        val database = influxdb.selectDatabase("rsyslog-stats-test1")
        val points = partitionOfRecords.toList
        println("there are " + points.size + " points in the total list")
        val batchSize = 4500
        var x = 0
        for (x <- 0 to points.size by batchSize) {
          val end = if (x + batchSize < (points.size - 1)) x + batchSize else points.size
          println("about to send batch " + x + " to " + end)
          database.bulkWrite(points.slice(x, end).toList, precision = Precision.SECONDS)
            .onComplete {
              case Success(b) => println(s"sent batch $x to $end")
              case Failure(t) => println(s"failed to sent batch $x to $end: " + t.getMessage)
            }
        }        
      }
    }

    stream.foreachRDD { rdd =>
      korpse.storeOffsetRanges(offsetRanges)
    }    

With a single topic/partition Kafka receives around 1000 events per second so a 20s micro batch in Spark works rather well. However, when there are a significant amount of offsets (10's of millions or more) in the backlog because either the Spark application died or hadn't been running for some time, the first Spark job processes the entirety of the backlog and often runs out of memory or takes a very long time.

Is there an out-of-the-box method to have consuming distributed across all executors even if there is only a single partition of a single topic being consumed?

Ideally, I'd like to distribute the processing of the backlog across all executors, for the first batch but also all subsequent batches.

I.e. if there are 5 executors and 3 TopicAndPartition's that receive 1000 events/sec each, each executor should consume/process 12 000 events for each micro batch.

I don't mind enhancing my Korpse util and converting to Kafka createKafkaRdd with OffsetRange accordingly but I thought I'd enquire with someone like you who knows way more about Spark before starting...

exactly once with kafka producer

Hi,

I would like to ask how and if its possible to have exactly once write to kafka topics ?
foreachRdd can be executed multiple times in case of failure, and that can cause to sending some messages to kafka topics twice.

Thanks,

numRecords cannot be negative exception

hi,
We are managing offsets externally in HBase. We haven't enabled spark checkpointing.
So everytime we want to start job, we are providing offsets, which are correct when cross checked with Kafka.

Only sometimes we get this exception "numRecords cannot be negative"
Is it the problem cause of property "spark.streaming.kafka.consumer.cache.enabled" set to true inside compute method of DirectKafkaInputDStream class or checkpointing which is done internally by spark? or some other issue.

lib: spark-streaming-kafka-0-10_2.11

Thanks in advance.

what is function fixKafkaParams in kafka010/KafkaUtils.scala?

Hi,everyone, hi koeninger.sorry about making this issue in this project,because I realy don't know how to contact koeninger.
I have some questions about kafka010:

1.     In  KafkaUtils.scala at line 200,there is a function fixKafkaParams,this function is prevent issues on executors,forcing params into that params. Is that legacy code or experimental code ? what about other  params in kafka0.10.x ?

2.     There is still have @Experimental annotation in codes.is this project stopped ?

Forgive my poor English. If someone see this issue please give me some answers.

OffsetOutOfRangeException

Hi,

in case I am getting OffsetOutOfRangeException, how can I handle it ?
should I catch it somewhere in the code and ignore it in case I just want to continue for the next offset range ?
Or perhapses any option to restart in such case to a specific offset for each partition ?
also just to verify, I see in the code that you set auto.offset.reset='none', is that only for the executers ?

Thanks,

multiple kafka clusters

Hi,

we have 2 data center, each having its own kafka cluster. I would like to have spark streaming job that consumes from both of those kafka clusters. Is that possible at all ?
I tried to create 2 kafka direct streams sources, but eventually got an error that kafka consumer is not thread safe:
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
I assume that because each kafka direct stream creates separate consumer, and those 2 consumers are started on same worker - same thread.

Thanks,

value sql is not a member of StringContext

Hi,

I am a beginner of using kafka.

I am using Intranet , so I down the jars manual.

The I found a error in intellij idea.
20170605171928

Because i am using Intranet, so I download the jar files manually.
This is the screenshot of my jars.
20170605172642

could you please tell me what i am doing wrong?

Thank you in advance.

Best wishes,
Clark Liu

createDirectStream, multiple streams, atomicity and checkpoint failures

Very good article on idempotent and transactional exactly once.
But I am hitting the following scenario where checkpoint itself is failing and cause partial writes.
Can you take a look and suggest?

I am using spark streaming 1.3.0 directStream. I am hitting the following Scenario and I would like your suggestion on how to design atomicity.
Here are pseudo codes to describe the flow and key points.

S1=createDirectStream(kafka) ==> I have OffsetRange associated with each RDD
S1.print ==> good

S2=S1.flatMap(some transformation) ==> It does not require checkpoint
S2.print ==> good

S3=S2.updateStateByKey(require checkpoint) ==> checkpoint failed due to hdfs issue for example
S3.print ==> nothing print out

S2.foreachRDD {
SaveToElasticSearch() ==> write to Elastic Search fine
}
S3.foreachRDD {
SaveToElasticSearch() ==> nothing written to Elastic Search
}

I was hoping the batch is atomic, i.e., as long as there are errors, offsets will not change and writes will not happen.
But 2 issues I have observed:
Kafka offsets kept moving on to next batch even there are dependent stream failed, e.g. S3.
Partial writes went to Elastic Search.

We would like to see

  1. the offset stops if anything in this job failed and spark streaming will recover by itself from the right offsets.
  2. Write all streams in one unit.

Any suggestions?

Tian

Convert the Dstream to DF

Hi @koeninger,

In spark 1.6.x, I'm looking at an efficient way to convert the Kafka DStream to DF for performing a query over temp table.

In my case, the stream data is structured like csv data. I want to parse and store it in same format in tempTable. [The stream data may not have the header so I've to place default colnames such as col1 ..... colN ]

Any help would be appreciate!

Thanks!
Karthik

kafka consumer metrics

Hi,

I would like to verify small thing. In case my stream processing is bigger then the micro-batch interval, I see queuing up tasks with how much records would be consumed. The consumption would be done actually only when that task started ? (so the tasks in the queue are not consuming from kafka until they actually executed)

My workflow is quite simple, I read from kafka, do some simple processing then reshuffle the data for some further processing (reduceByKey). Usually the job is completed fast enough without causing tasks to be queued. But from time to time I am seeing quite large delay, usually only on 1-2 workers, in the stage that consume data from kafka. I start to suspicious that the bottleneck is consuming from kafka and hence I would like to somehow measure how long it is actually take to consume each topic/partition before the data processing part starts.
Any idea how to do it? maybe I can access somehow more consumers metrics ?

Thanks,

Dstream mapWithState

Hi,

according to spark docs:
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html

If I would like to commit offsets to kafka (for at least once semantic) the way to go is as following:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed

  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

The problem is that then I have to apply all my logic for each RDD in the Dstream inside the foreachRDD block, however then I could no use mapWithState since it works only with Dstream and the type of the rdd inside the foreachRDD block is simple RDD.

Do I miss something ? is there any workaround ?

Thanks,

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.