Code Monkey home page Code Monkey logo

fskafka's Introduction

FsKafka Build Status release NuGet license code size

F# friendly wrapper for Confluent.Kafka, with minimal dependencies or additional abstractions (but see related repos).

FsKafka wraps Confluent.Kafka to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. Depends on Confluent.Kafka [1.9.2], librdkafka.redist [1.9.2] (pinned to ensure we use a tested pairing), Serilog (but no specific Serilog sinks, i.e. you configure to emit to NLog etc) and Newtonsoft.Json (used internally to parse Broker-provided Statistics for logging purposes).

Usage

FsKafka is delivered as a Nuget package targeting netstandard2.0 and F# >= 4.5.

dotnet add package FsKafka

or for paket, use:

paket add FsKafka

Related repos

  • See the Propulsion repo for extended Producers and Consumers.
  • See the Jet dotnet new templates repo's proProjector template (in -k mode) for example producer logic using the BatchedProducer and the proConsumer template for examples of using the BatchedConsumer from FsKafka, alongside the extended modes in Propulsion.
  • See the Equinox QuickStart for examples of using this library to project to Kafka from Equinox.Cosmos and/or Equinox.EventStore.

CONTRIBUTING

Contributions of all sizes are warmly welcomed. See our contribution guide

TEMPLATES

The best place to start, sample-wise is from the dotnet new templates stored in a dedicated repo.

BUILDING

The templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.

NB The tests are reliant on a TEST_KAFKA_BROKER environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run writes to a guid-named topic)

build, including tests on netcoreapp3.1

export TEST_KAFKA_BROKER="<server>:9092"
dotnet build build.proj -v n

FAQ

How do I get rid of all ~~~~the breaking off polling ... resuming polling spam?

  • The BatchedConsumer implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of #32, such messages are tagged with the type FsKafka.Core.InFlightMessageCounter, and as such can be silenced by including the following in one's LoggerConfiguration():

    .MinimumLevel.Override(FsKafka.Core.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)

What is this, why does it exist, where did it come from, is anyone using it ?

This code results from building out an end-to-end batteries-included set of libraries and templates as part of the Equinox project.

Equinox places some key constraints on all components and dependencies:-

  • batteries-included examples of end-to-end functionality within the Equinox remit; samples should have clean consistent wiring
  • pick a well-established base library, try not to add new concepts
  • low dependencies, so it can work in lots of contexts without egregiously forcing you to upgrade things
  • aim to add any resilience features as patches to upstream repos
  • thorough test coverage; integration coverage for core wrapped functionality, unit tests for any non-trivial logic in the wrapper library

Minimal producer example

#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka

let log = Serilog.LoggerConfiguration().CreateLogger()

let batching = Batching.Linger (System.TimeSpan.FromMilliseconds 10.)
let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All, batching)
let producer = KafkaProducer.Create(log, producerConfig, "MyTopic")
   
let key = Guid.NewGuid().ToString()
let deliveryResult = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously

Minimal batched consumer example

#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka

let log = Serilog.LoggerConfiguration().CreateLogger()

let handler (messages : ConsumeResult<string,string> []) = async {
    for m in messages do
        printfn "Received: %s" m.Message.Value
} 

let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)

async {
    use consumer = BatchedConsumer.Start(log, cfg, handler)
    return! consumer.AwaitShutdown()
} |> Async.RunSynchronously

Minimal batched consumer example with monitor

#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka

let log = Serilog.LoggerConfiguration().CreateLogger()

let handler (messages : ConsumeResult<string,string> []) = async {
    for m in messages do
        printfn "Received: %s" m.Message.Value
} 

let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)

async {
    use consumer = BatchedConsumer.Start(log, cfg, handler)
    use _ = KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId)
    return! consumer.AwaitShutdown()
} |> Async.RunSynchronously

Running (and awaiting) a pair of consumers until either throws

#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka

let log = Serilog.LoggerConfiguration().CreateLogger()

let handler (messages : ConsumeResult<string,string> []) = async {
    for m in messages do
        printfn "Received: %s" m.Message.Value
} 

let config topic = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", [topic], "MyGroupId", AutoOffsetReset.Earliest)

let cfg1, cfg2 = config "MyTopicA", config "MyTopicB"

async {
    use consumer1 = BatchedConsumer.Start(log, cfg1, handler)
    use consumer2 = BatchedConsumer.Start(log, cfg2, handler)
    use _ = KafkaMonitor(log).Start(consumer1.Inner, cfg1.Inner.GroupId)
    use _ = KafkaMonitor(log).Start(consumer2.Inner, cfg2.Inner.GroupId)
    return! Async.Parallel [consumer1.AwaitWithStopOnCancellation(); consumer2.AwaitWithStopOnCancellation()]
} |> Async.RunSynchronously

fskafka's People

Contributors

bartelink avatar chrnola avatar deviousasti avatar eiriktsarpalis avatar enricosada avatar erichgoldman avatar gusty avatar jorgef avatar michaelliao5 avatar szer avatar vchekan avatar wantastic84 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fskafka's Issues

Monitor loop not terminating with exception when consumer is already disposed

When the consumer is disposed and the monitor tries to get the assignment info from the consumer, an exception is thrown but the monitor loop does not terminate. The loop continues to increment the fail count and hangs, potentially make the client code hangs as well.

The following exception is thrown from FsKafka0:

Confluent.Kafka.KafkaException: Local: Broker handle destroyed
   at Confluent.Kafka.Impl.SafeKafkaHandle.GetAssignment()
   at [email protected](IEnumerable`1& next) in D:\a\1\s\src\FsKafka\Monitor.fs:line 249
   at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.MoveNextImpl() in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\seqcore.fs:line 371
   at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.System-Collections-IEnumerator-MoveNext() in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\seqcore.fs:line 403
   at Microsoft.FSharp.Collections.SetTreeModule.mkFromEnumerator[a](IComparer`1 comparer, SetTree`1 acc, IEnumerator`1 e) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\set.fs:line 499
   at Microsoft.FSharp.Collections.SetTreeModule.ofSeq[a](IComparer`1 comparer, IEnumerable`1 c) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\set.fs:line 505
   at Microsoft.FSharp.Collections.FSharpSet`1..ctor(IEnumerable`1 elements) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\set.fs:line 738
   at [email protected](Unit unitVar0) in D:\a\1\s\src\FsKafka\Monitor.fs:line 254
   at [email protected](Unit unitVar) in D:\a\1\s\src\FsKafka\Monitor.fs:line 273
   at Microsoft.FSharp.Control.AsyncPrimitives.CallThenInvoke[T,TResult](AsyncActivation`1 ctxt, TResult result1, FSharpFunc`2 part2) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 398
   at <StartupCode$FSharp-Core>[email protected](AsyncActivation`1 ctxt) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 1066
   at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in F:\workspace\_work\1\s\src\fsharp\FSharp.Core\async.fs:line 109

KafkaConsumerConfig expecting list as custom argument

Description

KafkaConsumerConfig custom arguments expecting list of key value pairs. That means you can't pass Map or Dictionary there.

Repro steps

let config =
    KafkaConsumerConfig.Create(
        clientId = "",
        broker = System.Uri(""),
        topics = [],
        groupId = "",
        custom = Map [ "", "" ])

Expected behavior

it should compile!

Actual behavior

custom expecting list. Compilation error

Known workarounds

Pass a list!
custom = (Map [ "", "" ] |> Seq.toList)

Add statisctics integration

Description

Confluent kafka publish internal statistics (instrumentation) in OnStatistics event. It is published as string in JSON format.
It is much better if the library would do json parsing and expose strong-typed event object.

TODO: discuss with confluent developers possibility to change C driver to publish Protobuf-serialized statistics object. This would benefit all clients by avoiding the need to implement json parsing.

Solve blocking producer buffer problem

Description

Confluent producer will block (or throw exception) when internal message buffer is full. This does not play well with async nature of F#.

Consider possible modifications to librdkafka and C# client to keep F# wrapper thin.

offsetRange fails when topic does not exist

Description

If topic does not exist, offsetRange call will fail because Seq.head is used after filtering topic.

Expected behavior

It would be expected to throw properly worded exception when there is no such topic found.

Decide which tests are needed and implement

Description

Given that librdkafka and confluent-kafka-dotnet have test coverage, decide which test coverage is needed for wrapper.

  1. Sanity check for trivial code: can produce and consume.
  2. Message ordering is preserved within partition.
  3. F# specific: async cancellation works.
  4. Autocommit:
    • it works when it is on
    • commit interval works
    • it does not commit when it is off
  5. Configuration: convert Seq<string,obj> to string and compare to what is expected.
  6. Our implementation of offsetRange works
  7. Legacy produce/consume works.

Consumer without joining a group

Description

It is possible to assign TopicPartitionOffset[] to consumer but it will join a group anyway. This cause unnecessary delay when no offset savings is required. This happen when implementing service operations, for example read last message in partition.

Wrapper introduce NLog dependency

Description

CK does not have logger dependency. It has OnLog callback leaving it to the application to do wiring to concrete logger implementation.
By introducing NLog dependency we may inconvenience people who use other loggers, log4net for example.

V1.7.0 checklist

  • review Async in method names; consider removing in line with modern APIs #47
  • probably remove validateBrokerUri as its no longer an integral part of the API as of 1.4 in 1.5
  • prune less frequently used arguments in Create calls in 1.5.6
  • Complete #86 @deviousasti
  • remove FsKafka0 eed9940
  • remove net461 multi-targeting
  • remove
    member this.AwaitThreshold(ct : CancellationToken, consumer : IConsumer<_,_>) =
  • target CK 1.7.0

QueryWatermarkOffsets queries one partition at the time

Description

QueryWatermarkOffsets queries one partition at the time, which is slow and unreasonable because kafka protocol allows querying multiple topics and all partitions.

Expose override which query all partitions and maybe list of topics.

the interval for checking threshold on max inflight counter differs between FsKafka0 and FsKafka

Just a small thing I found while doing some research:
FsKafka0 has 5ms interval for checking the threshold for max inflight bytes whereas FsKafka has 1ms

FsKafka0:

counter.AwaitThreshold(fun () -> Thread.Sleep 5)

FsKafka:
counter.AwaitThreshold(fun () -> Thread.Sleep 1)

I wonder if this was intended.. Otherwise, we can sync them to be the same (5ms?)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.