Code Monkey home page Code Monkey logo

rebus.kafka's Introduction

Rebus.Kafka

install from nuget

Provides a Apache Kafka transport implementation for Rebus.

Getting started Rebus.Kafka:

  1. Implement Getting started for Rebus
  2. Add transport as UseKafka
builder.RegisterRebus((configurer, context) => configurer
	.Transport(t => t.UseKafka("localhost:9092", "InputQueueName", "groupName"))
);

All parameters for the producer and the consumer can be specified in detail. See this example.

It is possible to configures Rebus to use Apache Kafka to transport messages as a one-way client (i.e. will not be able to receive any messages). See this example.

builder.RegisterRebus((configurer, context) => configurer
	.Transport(t => t.UseKafkaAsOneWayClient("localhost:9092"))
);

See examples and tests for other usage examples.

This provider supports the following Rebus bus functions:

Many others are probably supported too, but I haven't checked.

Additional features:

  • When using the Rebus package.Service Provider, to avoid deadlock, events should be subscribed await bus.Subscribe<TestMessage>() to only after the bus is started, but NOT in the "onCreate" rebus event!
  • At the request of the transport users, the Rebus.Kafka transport automatically creates topics by default. However, I do not recommend using allow.auto.create.topics=true for production! To disable allow.auto.create.topics, pass your ConsumerConfig or ConsumerAndBehaviorConfig configuration to the transport with the AllowAutoCreateTopics = false parameter disabled.
  • The Rebus.Kafka transport has a new overload with the ConsumerAndBehaviorConfig parameter instead of ConsumerConfig. This new configuration type contains transport behavior settings. So far, it has a single CommitPeriod parameter that defines the period after which the commit offset will be set in Apache Kafka. Here is an example of using it
  • UseAttributeOrTypeFullNameForTopicNames simplifies naming the topic of events by Name from TopicAttribute({TopicName}) or to "---Topic---.{Spacename}.{TypeName}".
  • KafkaAdmin which allows you to programmatically create a topic with many partitions and delete topics.
  • In multithreaded message processing, the order in which messages are processed may differ from the order of messages in the queue. If the order is very important, then use single-threaded processing: .Options(o => o.SetMaxParallelism(1))

Note:

  • So as to interact with the Apache Kafka requires the unmanaged "librdkafka", you need to install the appropriate version of the package "librdkafka.redist". If this unmanaged "librdkafka" is not found automatically, you must load it before you can use Rebus.Kafka for the first time as follows:
if (!Library.IsLoaded)
	Confluent.Kafka.Library.Load(pathToLibrd);
  • Due to the features of Apache Kafka, after subscribing or unsubscribing to messages for some time while there is very slowly rebalancing of clients in groups, lasting several seconds or more. therefore, you should avoid the scenario of dynamic subscription to a single reply message, sending a single message to the recipient, and unsubscribing from the message after receiving a single reply. Since this scenario will work very slowly. I recommend that you subscribe to all your messages only when the application starts and that you do not change subscribers in runtime, then the work of transport will be fast.

Log of important changes:

V 3.0.1 (12.01.2024)

  1. Refactoring for Rebus version 8 with the corresponding API change;
  2. Implemented RetryStrategy - automatic retries and error handling. Confirmations of receipt of messages are now sent not after they are received, but only after successful processing of messages or sending them to the error topic;
  3. Add "transaction" support. More precisely, not transactions, because Apache Kafka does not support transactions, but delayed sending of all transaction messages before calling await scope.Complete Async() or canceling the sending of all "sent" messages at the end of the transaction block without calling await scope.Complete Async(). This convenience slows down the maximum performance of sending all messages by half, even those messages that are sent without transactions.

V 2.0.0 (18.08.2023)

  1. Improving data transfer efficiency;
  2. The format of transport messages has changed. In them now the key is not Null, but string. The messages are incompatible with previous versions of the transport!
  3. Message headers are now supported;
  4. Refactoring for the current version of Apache Kafka "confluentinc/cp-kafka:7.0.1";
  5. Transport forcibly creates missing topics if Consumer.Config.AllowAutoCreateTopics == true; However, I do not recommend using allow.auto.create.topics=true for production!

V 1.6.3 (1.04.2021)

  1. The Rebus.Kafka transport has a new overload with the ConsumerAndBehaviorConfig parameter instead of ConsumerConfig. This new configuration type contains transport behavior settings. So far, it has a single CommitPeriod parameter that defines the period after which the commit offset will be set in Apache Kafka. Here is an example of using it

  2. In the summer of 2020, the Librdkafka v1.5.0 library was updated, which was a change unexpected for many users of the Rebus.Kafka transport.

    Consumer will no longer trigger auto creation of topics, allow.auto.create.topics=true may be used to re-enable the old deprecated functionality:

    At the request of the transport users, I enabled the previous transport behavior by default. Now the Rebus.Kafka transport automatically creates topics by default as before. However, I do not recommend using allow.auto.create.topics=true for production! To disable allow.auto.create.topics, pass your ConsumerConfig or ConsumerAndBehaviorConfig configuration to the transport with the AllowAutoCreateTopics = false parameter disabled.

ToDo:

  • Schema Registry support in Kafka: Avro, JSON and Protobuf
  • In the future, the value from the message header "kafka-key" or, maybe, from the message property marked with the KafkaKey attribute will be inserted into the Apache Kafka message key. This will be useful for partitioning.
  • Start the transport from user-defined offsets for topics and partitions.

If you have any recommendations or comments, I will be glad to hear.

rebus.kafka's People

Contributors

dependabot[bot] avatar glazkovalex avatar openalex avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

rebus.kafka's Issues

The consumer subscribes only to the first topic entered

Hi, I use Rebus for integration events. I have to switch to Kafka for some reason and wanted to use this library.
Unfortunately, I have encountered a problem.

By adding more topics, the consumer subscribes only to the first one,
so that, although many topics are sent, only one is received (subscribed to).

The current setup works flawless for RabbitMQ or InMemory transport implementations.

I suspect that the issue may be related to topic subscription.

I have reproduced the issue on your example:

Steps to reproduce:

Using the Examples projects available: Scaleout.Producer, Scaleout.Messages and Scaleout.Consumers,
I modified the Producer and Consumers project as follows:

  • I added TestMessage2 (a copy of TestMessge).
  • I added TestMessageHandler2 : IHandleMessages<TestMessage2> (copy of TestMessageHandler).
  • I registered the new handler (Consumers).
  • I added Rebus Routing for Assembly TestMessage2 (Producer and Consumers).
  • I subscribed to both topics: (Consumers)
    bus.Subscribe<TestMessage>().Wait();
    bus.Subscribe<TestMessage2>().Wait();
  • I published messages from both TestMessage and TestMessage2 to the do-while loop in Producer.

Expected behaviour:
Console outputs messages from both topics.

Actual:
Console outputs only topics from TestMessage.

Any idea what could be the cause, or maybe I'm doing something wrong?

[Question] Multiple producers, using schema registry

Hi,
trying to get started. I want to be able to produce to two topics. Both topics have with different values. The values need to be serialized with protobuf.

var eventProvider1 =  new ProducerBuilder<string, Event1>(producerConfig)
    .SetValueSerializer(....)
    .Build()
var eventProvider2 =  new ProducerBuilder<string, Event2>(producerConfig)
    .SetValueSerializer(....)
    .Build()

Is this something that can be achieved? I saw that the KafkaTransport has a Producer<string, byte[]>

Working with Partitions

How configure the Transport to work with Partitions? Are there any special configurations?

Performance

I'm playing around with the Kafka transport to examine its viability. When I stand up a basic Kafka environment on docker (2.1 Kafka)...no tweaks...just a single broker and single zookeeper, The throughput I'm seeing is 10K msg per second. Knowing that a fully tweaked kafka can handle up to 2 million. Is the 10K/s benchmark indicative of what others have experienced using this connector? Just trying to gauge whether the bottleneck is the connector, the confluent library, or the fact that I have a generic kafka environment. Whats the throughput others have experienced?

How well goes saga pattern and Kafka together?

Hello,

Does the saga pattern implemented in Rebus and Kafka go together?

I'm very happy to see that you have coded a transport layer for Kafka. However it's not part of the rebus transports available on rebus github. I've on Stackoverflow found this answer for a question regarding Kafka and the saga pattern: https://stackoverflow.com/a/58748235

I case your answer is yes, please explain the conditions and setup of Rebus. This may be among a lot of other setups Kafka partitions and topics.

Regards
Jan Rou

Unable to invoke deadletter topic

I noticed that the SimpleRetryStrategy doesn't work with the Rebus.Kafka plugin. When I have a message handler that throws an exception, Rebus will log it as a warning, but the Rebus.Kafka just goes on its merry way and consumes the next message in the kafka topic as if the previous message was processed successfully. I can see where it can be difficult to handle replaying of a previous message as it requires moving the offset backwards to try reprocessing it before finally giving up and forwarding the message to an actual deadletter topic. Is there something that I'm missing that needs to be configured to turn on the retry and deadletter mechanism or is this feature not implemented in the plugin yet?

It seems that we always have the option to build out a retry mechanism and forwarding of the message to an error queue from within handler code, but this is not ideal. It leads to a lot of repetitive code and is not safe from the perspective that the message may never get completely processed if the service is restarted as the message is in the middle of its retry logic. It would be nice if that was leveraging the fallback mechanism already built into rebus as it accounts for those scenarios.

One option I can think of is to block performing a commit() until after the successful processing of a message by the handler (this would obviously require turning off the autocommit feature). If an exception is thrown., don't allow the code to determine if a commit() should be called. Instead, try calling the handler again. Once the retry is exceeded, push the message to the deadletter topic defined in the SimpleRetryStrategy and then we can proceed to the commit() logic and subsequent consume() of the next message.

It seems like this would be the most straight forward way to hook into the SimpleRetryStrategy. Obviously, if SimpleRetry is not defined, the current implementation seems like it would be appropriate.

Is there a setting I'm missing somewhere or is this a legitimate limitation of the plugin?

Rebus.Kafka in .Net core Web API

I am trying to use Rebus.Kafka in .Net core Web API. I tried to do by register it in startup.cs but it always throw "The transport initialization procedure is still incomplete. There is no confirmation of completion of the subscription to the input queue".

Code is like this :
services.AddRebus(configure => configure
.Transport(t => t.UseKafka("localhost:9092", "MainQueue",producerConfig,consumerConfig))
.Routing(r =>
{
r.TypeBased()
.MapAssemblyOf(MainQueue)
})

Usage of Confluent 1.5.3 causes topic creation issues

It seems that using Confluent 1.5.3+ causes the auto topic creation capability to fail. This seems to result in stalling the Rebus.Kafka connector to the point where calling Publish() through Rebus will eventually result in a large delay followed by an error condition.

I've tried setting the AutoCreateTopicsEnabled flag in the Confluent.Consumer structure to TRUE. Plus, I verified that the kafka brokers can also accept Auto creation of topics. It just plain doesn't work. Many people are complaining about this very problem in version 1.5.3 and higher.

Recompiling Rebus.Kafka with 1.4.4 of Confluent's library seems to straighten everything out. According to Confluent, they turned off Auto Topic creation by default in the 1.5.3. But, apparently there is no way I've found to override that and turn it back on.

Auto topic creation is not a problem for production deploys since we want to manually create/adjust topics...but when it comes to developer workflows on isolated dev boxes, forcing devs to create topics seems like a step too far....the software should just do that as a reaction to working with Kafka.

Not sure if the solution is to go back to 1.4.4 in the nuget references of Rebus.Kafka...or just track that this is going to be an issue for people to be aware of until Confluent puts out a release that doesn't exhibit this problem.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.