Code Monkey home page Code Monkey logo

kafkaflow's Introduction

KafkaFlow · GitHub license nuget version Build Master Codacy Badge Slack

Introduction

⚡️ KafkaFlow was designed to build .NET applications on top of Apache Kafka in a simple and maintainable way.

🏗 Built on top of Confluent Kafka Client.

🔌 Extensible by design.

Want to give it a try? Check out our Quickstart!

Installation

Read the docs for any further information.

Documentation

Learn more about using KafkaFlow on the site!

Contributing

Read our contributing guidelines to learn about our development process, how to propose bugfixes and improvements, and how to build and test your changes.

Get in touch

You can find us at:

License

KafkaFlow is a free and open source project, released under the permissible MIT license.

kafkaflow's People

Contributors

3cpt avatar ailtonguitar avatar dependabot[bot] avatar diogosgibson avatar erik-catalao avatar esskar avatar fasazevedo avatar filipecunhaoliveira avatar filipeesch avatar gnjack avatar ievgenii-shepeliuk avatar joaorodriguesgithub avatar joelfoliveira avatar jose-sousa-8 avatar kikofps avatar kyann avatar lpcouto avatar marcio-azevedo avatar martinhonovais avatar massada avatar miguelcosta avatar nigma143 avatar pmfernandes avatar pvoliveira avatar ricardofelgueiras avatar robertcoltheart avatar ruiqbarbosa avatar simaoribeiro avatar slang25 avatar sonicgd avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafkaflow's Issues

Create a BatchConsumeMiddleware

Today every message is delivered one by one in the middlewares, but sometimes it would be better to process them in batches, some databases or applications work better using batches.

The idea is to accumulate a number of messages or wait some time to build a collection of messages and deliver them to the next middleware to be processed, as it was just one message. The messages offset will be committed only when the entire batch will be processed.

Enable clients to use MessageHeader class

Issue Report

Allow users to create their own headers when publishing

Expected Behavior

Create an instance of MessageHeader to send it with the payload when publishing a message

Actual Behavior

MessageHeader class has internal visibility, so users cant instantiate it

Steps to Reproduce the Issue

Protobuf/Json serialization with schema registry support

Nowadays, only Apache Avro serialization supports integration with schema registry.

It is necessary to extend the schema registry support to Protobuf and Json serializations.

Indeed, the sources of these serializers (here and here) seems to have a more complete implementation than Avro Serializer accepting other configurations than those that are exposed.

fix licenseUrl element deprecation in project files

Issue Report

fix licenseUrl element deprecation in project files when pushing to nuget

Expected Behavior

Use license element instead

Actual Behavior

We are using licenseUrl element

Steps to Reproduce the Issue

N/A

Change TransformMessage() method to create a new MessageContext

Today when the TransformMessage() is called we change the message object inside the MessageContext, this behaviour cause some problems and requires a clone of the MessageContext to be passed to the next middleware (to maintain the "immutability" of the object between middlewares). This solution creates many unnecessary MessageContext instances.

We plan to change this behaviour. The TransformeMessage() will be marked as obsolete and a new method will be created to transform the message and the partition key. This new method will return a new MessageContext with the transformations without changing the original object, this solution will avoid the cloning process.

question: middleware dependency lifetime

Hi guys,

Is it possible to have middleware instances with a scoped or transient dependency lifetime? I've seen that the typed handlers framework allows for dependency lifetime customization, but I haven't found the same option for middleware.

To justify the need, I want to have a scoped instance that holds some management properties about the current consumption flow, but I want it to be as generic as possible, therefore I'd rather use middleware than typed handlers.

Regards.

Use TransactionScope on producer

Hi,
I used one producer, but send message for more topics.
Is possible use TransactionScope for grouping multi send (IProducerAccessor.Produce(...))?

example:

  1. open transaction scope
  2. read data from db
  3. write data on db
  4. produce async on topic 1: IProducerAccessor.Produce("topic 1")
  5. produce async on topic 2: IProducerAccessor.Produce("topic 2")
  6. produce async on topic 3: IProducerAccessor.Produce("topic 3")
  7. .................................................
  8. if not error transation complete

feat: add support for statistics handler

Issue Report

Add support for statistics reporting on KafkaFlow.
This can be useful for monitoring metrics for example.

Expected Behavior

When creating the producers/consumers allow to add handlers to report statistics and to set the statistics emit interval

Actual Behavior

Currently there is no way to declare a handle for the consumer/producer statistics

fix: paused consumers are triggering partition rebalance

Expected behaviour

Paused consumers should not trigger a partition rebalance process after poll interval timeout

Current behaviour

Paused consumers are triggering partition rebalance process after poll interval timeout

Propsed solution

Consumers should internally call Consume() method, even when it is paused

Log Kafka errors according to severity

Currently we are logging all consumer or producer Kafka errors with severity 'Error'. There are non-fatal errors, like when a consumer is disconnected being logged as 'Error'. These are normal and expected "errors" when the consumer times out after no activity for 10 minutes (default).

We should set the log severity according to the error being fatal or non-fatal.

Difficulty with Admin / Admin.WebApi

Hello,

I am very new to Kafka and trying to implement a simple consumer with your library (and btw, this is the best one I've seen so far for dotnet so kudos and thanks!). I'm able to get my consumer up and running, and it is consuming messages; however, when I issue admin rest requests through swagger for reset-offsets I continually receive:

KafkaFlow: Error executing consumer | Data: {"Message":{"ConsumerName":"mastery-commissions"},"Topic":"kafka-flow.admin","PartitionKey":"M2I3Yjg3ZDQtMzFmMy00ZGJhLTk4MjYtYTAwNmI0NTM2NDdm","ConsumerName":"5cc8c82c-a8ed-4603-abd3-617e4bbabfec"} | Exception: {"Type":"System.ObjectDisposedException","Message":"The CancellationTokenSource has been disposed.","StackTrace":"   at System.Threading.CancellationTokenSource.ThrowObjectDisposedException()\n   at System.Threading.CancellationTokenSource.Cancel()\n   at KafkaFlow.Consumers.WorkerPoolFeeder.StopAsync()\n   at KafkaFlow.Consumers.MessageConsumer.OverrideOffsetsAndRestartAsync(IReadOnlyCollection\u00601 offsets)\n   at KafkaFlow.TypedHandler.TypedHandlerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)\n   at KafkaFlow.Consumers.ConsumerWorker.\u003CStartAsync\u003Eb__13_0()"}

I've went back through the docs a few times and I'm not seeing what I have missing here. I created the "kafka-flow.admin" topic in my local kafka instance, but no dice. Pause, Resume, Restart seem to work just fine until I issue a reset-offsets; do you have any ideas on what I could be doing wrong? Much thanks in advance.

IProducerAccessor is overridden when there are multiple clusters

When we have multiple Kafka clusters the IProducerAccessor is overridden and it returns only the registered producers on the latest configurated cluster.

 public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration)
        {
            var configuration = new ClusterConfiguration(
                kafkaConfiguration,
                this.brokers.ToList());

            configuration.AddProducers(this.producers.Select(x => x.Build(configuration)));
            configuration.AddConsumers(this.consumers.Select(x => x.Build(configuration)));

            this.DependencyConfigurator.AddSingleton<IProducerAccessor>(
                resolver => new ProducerAccessor(
                    configuration.Producers.Select(producer => new MessageProducer(resolver, producer))));

            return configuration;
        }

Producer transaction support

Hi there,

currently KafkaFlow isn't surfacing pruducer transaction API that are available con the confluent dotnet consumer.
Is there any plan to support it?

Thanks,
Imerio

feat: Implement named producers

Issue Report

Allow the producer configuration based on a name and create an accessor to get the producer

Expected Behavior

Support named and typed configuration

Actual Behavior

Today we only support producers based on a type to bind its configuration

Monitoring dashboard

Create a dashboard page showing relevant data related to :

  • Consumers
  • Producers
  • Lag
  • Throughput
  • Searching data
  • etc

AdminMessage doesnt work in more than one cluster

It is not possible to enable admin operations EnableAdminMessages at more than one cluster.

As every EnableAdminMessages adds the same Producer cluster.AddProducer<AdminProducer>(...), the error One or more errors occurred. (An item with the same key has already been added. Key: KafkaFlow.Admin.Producers.AdminProducer) is being thrown.

Every cluster should be able to enable its admin messages separately. This takes us to another issue, a cluster with EnableAdminMessages can manage all consumers, even the ones in clusters without EnableAdminMessages.

I can see 2 options:

1 - EnableAdminMessages must be configured outside cluster level, which means you are configuring it to all clusters and you will be able to manage all consumers.

2 - EnableAdminMessages must be still configured at cluster level, but it necessary to fix this bug (An item with the same key has already been added.) and each cluster must be able to manage only its own consumers.

ci: update workflow to use makefile commands

Issue Report

Use make file commands to execute integration and unit tests with a single command

Expected Behavior

Github workflow files simpler and cleaner

Actual Behavior

IntegrationTest and UnitTest steps are separated and IntegrationTest has 3 commands

Steps to Reproduce the Issue

Run Github actions

feat: allow middleware to be registered at the beginning

Issue Report

Integrating KafkaFlow with other services may require to have more control for the middleware registration.
Having the ability to add middleware to be registered to run before other middleware can have some use cases.

Use Case

For example a monitoring extension package for KafkaFlow may register a middleware to get metrics about consumer message processing time. The middleware needs run before other middlewares in order to get the time the messages took to processed.
Without this, the monitoring extension package can be limited by where the developer registers the monitoring middleware.

Expected Behavior

When adding the middleware, have a method besides the method Add() that can register the middleware to run before other middlewares.

Actual Behavior

The middleware is always registered last.

feat: Simplify the type resolution for consuming topic with one fixed message type

Issue Report

A common scenario may be consuming the topic with only one fixed message type. Please consider to create a convenient class allow the user to use it directly.

    public class SimpleMessageTypeResolver<TMessage> : IMessageTypeResolver
        where TMessage: class
    {
        public Type OnConsume(IMessageContext context)
        {
            return typeof(TMessage);
        }

        public void OnProduce(IMessageContext context)
        {
        }
    }

How to use it:

services.AddKafka(kafka => kafka
            .UseConsoleLog() 
            .AddCluster(cluster => cluster
                .WithBrokers(new[] { "localhost:9092" })
                .AddConsumer(consumer => consumer
                    .Topic("test-topic")
                    .WithGroupId("print-console-handler")
                    .WithBufferSize(100)
                    .WithWorkersCount(10)
                    .WithAutoOffsetReset(AutoOffsetReset.Latest)
                    .AddMiddlewares(middlewares => middlewares
                        .AddSerializer<NewtonsoftJsonMessageSerializer, SimpleMessageTypeResolver<TMessage>>()
                        .AddTypedHandlers(handlers => handlers
                            .WithHandlerLifetime(InstanceLifetime.Singleton)
                            .AddHandler<PrintConsoleHandler>())
                    )
                )
            )
        );

Expected Behavior

(Write out the expected behavior here.)

Actual Behavior

(Write out what actually happened here.)

Steps to Reproduce the Issue

(Write out detailed steps to reproduce the issue here.)

Support for Outbox pattern

Hi,

Is implementation of outbox pattern on roadmap? With any large distributed app it is almost essential to have that, so would love to see it implemented for this library as well.

Thanks

Refactor samples

Today we have only three samples. We need samples more focused in specific functionalities

Dashboard - Consumers management

Dashboard with all the partitions assignment of each consumer, allowing to manage a consumer in all instances/machines.

It must have the following features:

  • Pause the consumption
  • Restart the consumption
  • Resume the consumption
  • Reset the partition offset
  • Rewind the partition offset
  • Change the number of workers

Standardize serializers classes and packages name

Today we have serializer packages and classes following different naming strategy.

We should standardize the names to follow the same naming strategy.

Proposal:

  • ProtobufNetSerializer
  • NewtonsoftJsonSerializer
  • CoreJsonSerializer (System.Text.Json)
  • ConfluentAvroSerializer

All serializers should be below the KafkaFlow.Serializer namespace directly

Remove remove constraint from AddHandlersFromAssemblyOf<T>

Currently the AddHandlersFromAssemblyOf<T> method in the TypedHandler constraints the type T to be an instance of IMessageHandler. We can extent the build configuration so it can accept marker types from any class.

Currently we need to:

class SomeMessageHandler : IMessageHandler { ... }

builder.AddHandlersFromAssemblyOf<SomeMessageHandler>();

With a new method:

builder.AddHandlersFromAssemblyOf(typeof(Program));

builder.AddHandlersFromAssemblyOf(typeof(ServiceCollectionExtensions), typeof(SomeExternalPackage));

Support native compression

Allow the use of native compression available on the confluence driver.

The client shouldn't be able to use the native compressor with middleware message compressors on the consumer.

As "the message.max.bytes limit is checked on each produce() for the uncompressed size" (link), it makes sense to keep the possibility of using middleware message compressor on the producers.

WorkerCount removed from MessageConsumer

The property WorkerCount was removed from MessageConsumer in the PR #123 in version 1.5.1

This property is useful when using KafkaFlow.Admin.WebApi to know how many workers are assigned for each consumer.

Allow multiple TypedHandlers handling the same MessageType

Actual Behavior

Consumers can configure only one Typed Handler to handle the same message type.

For example, the code below would throw an exception:

... Bootstrapper
.AddMiddlewares(middlewares => middlewares.AddTypedHandlers(
                      handlers => handlers
                              .AddHandler<MessageHandler>()
                              .AddHandler<OtherMessageHandler>()))
...
public class MessageHandler : IMessageHandler<TestMessage1>
{
   public Task Handle(IMessageContext context, TestMessage1 message){ return Task.CompletedTask;}
}

public class OtherMessageHandler: IMessageHandler<TestMessage1>
{
   public Task Handle(IMessageContext context, TestMessage1 message){ return Task.CompletedTask;}
}

Expected Behavior

Adapt Kafkaflow to allow consumers to configure more than one TypedHandlers related to the same MessageType.

Dashboard - Data

Dashboard section allowing to search data and publish messages in a topic.

feat: resilience strategy

Issue Report

Create a resilience strategy to prevent message loss.

Expected Behavior

When some expected exceptions (such as SQLTimeout, KafkaTimeout, etc.) are thrown, the worker must stop processing and retry later.
Clients should be able to easily define which exceptions should stop processing and retry intervals time.

Actual Behavior

Currently, there is no one resilience strategy.

Dashboard - Broker management

Dashboard section allowing to execute operations in a broker level, such as:

  • Create a topic
  • Update a topic configuration
  • Manage schema registry configurations

ProtobufMessageSerializer can't serialize a class generated using a file .proto

Issue Report

When you try to Serialize a class generated using a .proto file the ProtobufMessageSerializer throw an Exception System.InvalidOperationException Type is not expected, and no contract can be inferred: ...

The Current Serializer appears to work only for .Net classes decorated with attributes DataContract , DataMember

Expected Behavior

Should be able to Serialize a class generated using a proto file.
A reference here: https://developers.google.com/protocol-buffers/docs/csharptutorial#parsing_and_serialization

Actual Behavior

When you try to Serialize a class generated using a .proto file the ProtobufMessageSerializer throws an Exception System.InvalidOperationException Type is not expected, and no contract can be inferred: ...

Steps to Reproduce the Issue

  • Design a simple Protobuf file (from the Protobuf documentation)

`message Person {
string name = 1;
int32 id = 2; // Unique ID number for this person.
string email = 3;

enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}

message PhoneNumber {
string number = 1;
PhoneType type = 2;
}

repeated PhoneNumber phones = 4;

google.protobuf.Timestamp last_updated = 5;
}

// Our address book file is just one of these.
message AddressBook {
repeated Person people = 1;
}`

  • Generate classes

protoc -I=$SRC_DIR --csharp_out=$DST_DIR $SRC_DIR/addressbook.proto

  • Create a test

`
Person msg = new Person
{
Id = 1234,
Name = "John Doe",
Email = "[email protected]",
Phones = { new Person.Types.PhoneNumber { Number = "555-4321", Type = Person.Types.PhoneType.Home } }
};

var serializer = new ProtobufMessageSerializer();
var byteArray1 = serializer.Serialize(msg ); // An Exception ara going to happen here
`

Partitions Revoked/Assigned do not support async operations

Partitions revoked and partitions assigned handlers don't execute async operations in a safe way.

This can be easily seen putting a Console.Write() after a Task.Delay(1000) in the Partitions revoked handler, if you stop the bus this Console.Write() will not be executed.

It is necessary to change the signature of these handlers from Action<T> to Func<T, Task>.

Referenced schemas: multiple event types in the same topic using ApacheAvroMessageSerializer

Hi there,

we are currently evaluating kafka-flow in a scenario where multiple event types are published with Avro serialization and with a schema that's already properly defined in the schema registry. To achieve a bounded set of schemas associated with a topic we are trying to use the schema reference support that has been added since kafka 5.5.0 (https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#referenced-schemas).

Since the AvroSerializerConfig is not directly exposing the "use.latest.version" property we tried imposing the configuration inside the producer config but apparently with no success.

What we are experiencing is that the publish is failing after a subject lookup failure on the schema registry. Apparently the software is still trying to create a new subject by priorly removing a subject with the conventional [topic-name]-value scheme.

The exception captured on the schema registry is quite revealing:
"POST /subjects/-value?deleted=False HTTP/1.1" 404 60 29 (io.confluent.rest-utils.requests)
[2021-04-08 10:51:21,970] INFO Deleting subject -value (io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource)
[2021-04-08 10:51:21,971] ERROR Request Failed with exception (io.confluent.rest.exceptions.DebuggableExceptionMapper)
io.confluent.rest.exceptions.RestNotFoundException: Subject '-value' not found.

As you can see the subject name composed is incorrectly "-value": the topic name "topic1" is missing

Overall, the behaviour expected is to access an existing subject by using the topic strategy and to select the latest schema version of the record value being published.

Our environment is:

  1. Schema registry: 6.1.1
  2. kafka-flow: 1.5.2

Here is the relevant producer configuration:

.AddProducer("domain-event-union.publisher", producer => producer
       .DefaultTopic("topic1")
       .WithAcks(KafkaFlow.Acks.All)
       .WithProducerConfig(new ProducerConfig(new Dictionary<string, string> { ["use.latest.version"] = "true" }))
       .AddMiddlewares(middlewares => middlewares
           .AddSerializer(resolver => new ApacheAvroMessageSerializer(resolver,
                new AvroSerializerConfig
                {
                    AutoRegisterSchemas = false,
                    SubjectNameStrategy = SubjectNameStrategy.Topic
                }))))

Thank you very much for your time,
Imerio

refactor: Rewrite integration tests

Currently, the integration tests project has room for improvement, such as:

  • Put the case tests into specific classes
  • Get producers by name instead of by type
  • Rewrite the Boostrapper class
  • Create kafka topics in another place than makefile

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.