Code Monkey home page Code Monkey logo

stream-loader's Introduction

Stream Loader

Stream loader is a collection of libraries providing means to load data from Kafka into arbitrary storages such as Iceberg, HDFS, S3, ClickHouse or Vertica using exactly-once semantics. Users can easily implement various highly customized loaders by combining out of the box components for record formatting and encoding, data storage, stream grouping and so on, implementing their own or extending the existing ones, if needed.

Getting Started

Stream loader is currently available for Scala 2.13 only, to get started add the following to your build.sbt:

libraryDependencies += "com.adform" %% "stream-loader-core" % "(version)"

Various storage specific parts are split into separate artifacts to reduce dependencies and coupling, currently the following artifacts are available:

  • stream-loader-core: core functionality (required).
  • stream-loader-clickhouse: ClickHouse storage and binary file format encoders.
  • stream-loader-hadoop: HDFS storage, components for working with parquet files.
  • stream-loader-iceberg: Iceberg table sink.
  • stream-loader-s3: S3 compatible storage.
  • stream-loader-vertica: Vertica storage and native file format encoders.

As an example we will be implementing an S3 loader, hence we will additionally require the stream-loader-s3 dependency. Any loader is an ordinary JVM application that simply runs an instance of a StreamLoader (or multiple instances in separate threads if higher throughput is needed):

def main(args: Array[String]): Unit = {
  val loader = new StreamLoader(source, sink)
  sys.addShutdownHook { loader.stop() }
  loader.start()
}

All loaders require a source and a sink, where the source is always Kafka:

val source = KafkaSource
  .builder()
  .consumerProperties(consumerProps)
  .pollTimeout(Duration.ofSeconds(1))
  .topics(Seq("topic"))
  .build()

and the sink can be arbitrary, in case of S3 we define it as follows:

val sink = RecordBatchingSink
  .builder()
  .recordBatcher(
    PartitioningFileRecordBatcher
      .builder()
      .recordFormatter(r => Seq(new String(r.consumerRecord.value(), "UTF-8")))     // (1)
      .recordPartitioner((r, _) => Timestamp(r.consumerRecord.timestamp()).toDate)  // (2)
      .fileBuilderFactory(_ => new CsvFileBuilder(Compression.ZSTD))                // (3)
      .fileCommitStrategy(
        MultiFileCommitStrategy.anyFile(ReachedAnyOf(recordsWritten = Some(1000)))  // (4)
      )
      .build()
  )
  .batchStorage(
    S3FileStorage          // (5)
      .builder()
      .s3Client(s3Client)  // (6)
      .bucket("test")      // (7)
      .filePathFormatter(
        new TimePartitioningFilePathFormatter(
          Some("'dt='yyyyMMdd"),                 // (8)
          Some("zst")                            // (9)
        ))
      .build()
  )
  .build()

Reading this from top to bottom we see that this loader will interpret the bytes of incoming Kafka messages as UTF-8 strings (1), partition them by day (2), construct Zstd compressed CSV files for each day partition containing these strings (3) and will close and commit them once 1000 records gets written to at least one file (4). The files will get stored to S3 (5) using the specified s3Client instance (6) to the test bucket (7). The stored files will be prefixed with their formatted day partition, e.g. dt=20200313/ (8), and will have a zst extension (9), the names being random UUIDs.

Every part of this sink definition can be customized, e.g. your Kafka messages might contain JSON which you might want to encode to Avro and write to parquet files. You can decide to use a different storage and switch to e.g. HDFS and so on.

For further complete examples you can explore the loaders used for integration testing, a reference loader for every supported storage is available. For more details about implementing custom loaders refer to the API reference.

Delivery Guarantees

Message processing semantics of any system reading Kafka depends on the way the consumer handles offsets. For applications storing messages to another system, i.e. stream loaders, the question boils down to whether offsets are stored before, after or atomically together with data, which determines whether the loader is at-most, at-least or exactly once.

All the currently bundled storage implementations strive for exactly-once semantics by loading data and offsets together in a single atomic transaction. There are various strategies for achieving this, the most obvious solution being storing the offsets in the data itself, e.g. as extra columns in a database table or in the names of files being transferred to a distributed file system. This is the approach taken by the Iceberg, Vertica and ClickHouse storages.

An alternative approach is to store data in a two-phase commit, which is done in the HDFS and S3 storage implementations. The prepare step consists of staging a file (uploading to a temporary location in HDFS or starting a multi-part upload in S3) and committing new offsets with the staging information to the Kafka offset commit metadata while retaining the original offsets. The commit step stores the staged file (moves it to the final destination in HDFS or completes the multi-part upload in S3) and commits the new offsets to Kafka, at the same time clearing the staging metadata. In this approach the transaction would be rolled back if the loader crashes before completing the staging phase, or replayed if it crashed after staging.

One possible caveat is the scenario when data committing queues up, in that case the loader can suspend Kafka consumption and eventually this could lead to a rebalance event. The sink will try to cancel all pending commits and clear the queue, but if the in-progress commit does not react to the cancellation and silently succeeds in the background it would result in data duplication. Setting the max.poll.interval.ms Kafka consumer property to be much larger than the storage timeout helps alleviate the problem, but in theory it can still happen.

Performance

The stream loader library provides basically no overhead compared to a stand-alone Kafka consumer, it actually is just a Kafka consumer that provides tools and components to sink the stream to storage, hence the the overall performance depends on the configuration of the consumer (number of threads, buffer sizes, poll duration, etc.) and the implementation of the sinking part.

For file based sinks performance can be roughly broken down to:

  • Kafka consumer record decoding and processing, which is completely user defined,
  • Record encoding and writing to files, which can be provided by stream-loader components,
  • Committing files to storage, which is largely storage dependent.

Stream loader comes bundled with builders for common file formats and macro based encoder derivations that implement case class serialization to CSV, parquet, native Vertica and row binary ClickHouse file formats. These auto derived encoders provide performance equivalent to hand-written code, as they generate straightforward JVM code with little to none object allocation.

Comparison to Alternatives

There are other options for loading data from Kafka out in the wild, e.g.

  • Kafka Connect: conceptually the stream loader library is an alternative to the Sink part of the Connect framework. The advantage of Kafka Connect is the enormous ecosystem around it with tons of available sinks to chose from, some of which also provide exactly-once semantics. The main disadvantage (for some, at least) is the need to maintain a cluster of workers. Deployment is also problematic and requires an ad-hoc flow. For a more detailed discussion see KIP-304. In contrast stream loader is a library meaning that you can use existing orchestrators such as Kubernetes to deploy and run your loaders.
  • Built-in database Kafka ingesters: some databases such as Vertica and ClickHouse support direct data ingestion from Kafka. The main disadvantage with this approach is the coupling of data processing to the database which can have a performance impact. These mechanisms usually also provide limited options for parsing and transforming the data that is being loaded. On the other hand, using a built-in ingest mechanism can be very easy to configure and deploy for simple data flows.

In summary, stream loader aims to be a correct, flexible, highly customizable and efficient solution for loading data from Kafka, provided as a set of components for users to assemble and run themselves. Some of these design choices can be considered either advantages or disadvantages, depending on the use case.

License

Stream Loader is licensed under the Mozilla Public License Version 2.0.

stream-loader's People

Contributors

fregante avatar huberttatar avatar ppiotrow avatar rok21 avatar sauliusvl avatar shivam247 avatar szasiii avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

stream-loader's Issues

ClickHouse exactly once?

  1. on the system with several replicas committedPositions can return different values on different replicas (due to eventual consistency). So it may work only on a single replica or with insert_quorum/sequential_consistency?

def committedPositions(connection: Connection): TrieMap[TopicPartition, StreamPosition] = {

  1. similarly it looks like that code relies on the strict linear order of inserts (no parallel inserts?). Does it mean single replica only + insert_quorum + insert_quorum_parallel=0 ?

  2. idempotent retries? In case of insert failure (or worse: quorum failure) - is there any guarantee that during the next retry you will repeat exactly the same block?

records.written metric issue

RecordBatchingSinker records number of written records in write but it actually don't represent what was written to batch.

write calls builder.add(record) on RecordBatchBuilder which is overriden by ExternalOffsetVerticaFileBatcher, FileRecordBatcher, PartitioningFileRecordBatcher. Each one of this classes classes use recordFormatter before actually passing record to batch.
And here is the catch, formatter can return empty Seq which means that we actually don't load anything.
I personally think it's misleading and we had a few situations when this metric caused confusion.
I think stream-loader should represent in metrics that not every record read from Kafka is actually written to batch.

lzo-java uses GPL-3 license

Currently stream-loader uses org.anarres.lzo:lzo-commons which is based on GPL-3 license (shevek/lzo-java#9). This means that stream-loader and any software using it should be released under GPL-3.

Can you change your license to reflect that and avoid misinformation?

Kafka offset commit behaviour

While migrating stream-loader to new version we stumbled on change in behavior in reporting offsets to Kafka.
Old behaviour:

builder.extendRange(tp, StreamPosition(consumerRecord.offset(), watermark))

Offset is reported even if app don't write record to file.

New behaviour: https://github.com/adform/stream-loader/blob/master/stream-loader-core/src/main/scala/com/adform/streamloader/sink/file/PartitioningFileRecordBatcher.scala#L69
Offset is reported only if app write record to file.

I think both solutions are viable, their use depends on user needs. It would be nice to be able to choose how you want to operate.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.