Code Monkey home page Code Monkey logo

streamthoughts / azkarra-streams Goto Github PK

View Code? Open in Web Editor NEW
181.0 14.0 25.0 19.27 MB

๐Ÿš€ Azkarra is a lightweight java framework to make it easy to develop, deploy and manage cloud-native streaming microservices based on Apache Kafka Streams.

Home Page: https://www.azkarrastreams.io/

License: Apache License 2.0

Java 2.55% Shell 0.01% Dockerfile 0.01% Makefile 0.01% Kotlin 0.01% HTML 96.87% Vue 0.17% SCSS 0.02% JavaScript 0.24% CSS 0.13%
kafka kafka-streams micro-framework data webui java interactive-queries apache-kafka azkarra-streams cloud-native

azkarra-streams's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

azkarra-streams's Issues

Allow to register a custom KafkaStreamsFactory

The internal Azkarra API is responsible for creating KafkaStreams instances to execute. Currently, it's not possible to create an streams instance with a user-defined KafkaClientSupplier objet or Time.

Developer sshould be able to provide a KafkaStreamsFactory implementation to delegate the instantiation of new KafkaStreams objets.

For that purpose, a new method must be added to :
StreamsExecutionEnvironment#setKafkaStreamsFactory(Supplier<KafkaStreamsFactory> s)

In addition, it should also be possible to register a KafkaStreamsFactory class as a component.

add classloading isolation to support the execution of multiple versions of a topology

It should be possible to deploy Azkarra as a worker executing multiple topologies each having their dependencies. Because, dependency conflicts cannot be avoided, each topology should use a dedicated classloader.

Azkarra should also support running a topology in different versions.

This improvement proposes to add a new configuration azkarra.component.paths that will be used to specify a list of locations separated by a comma from which components will be loaded

Health status marked as UNKNOWN upon crash instead of DOWN

The /health endpoint returns status "UNKNOWN" at the body and status code 200 when a uncaught exception happen in the application.
The application crashes and the stacktrace is logged correctly, but since the healthcheck still returns something that is not an error, it is never captured by any orchestration.

This became a problem in our cluster. For example, when a Kafka node fails, all threads with partitions that had it as the leader throws an exception and crashes, which brings the whole app down. This is the intended behaviour for Kafka Streams, the application crashes and you can restart it, manually or automatically.

What we expected

The health-check endpoint would mark a crashed application as being unhealthy, Kubernetes would restart the pod and the application would connect to the new leader.

What actually happened

The health-check endpoint changed to unknown and consider that a "OK". Therefore Kubernetes, logically, did nothing.


Is that an intended behaviour? Currently, the only way for us to have applications auto-healing is to inject a bash script to parse the response consider the "UNKNOWN" from the body as unhealthy. To me it seems to defeat the purpose of having a health-check endpoint, if you need to use a bash script you can just check if the process is running.

Duplicate messages

Not sure if we are doing something wrong, but in our topologies, we create a stream as follows (in Kotlin):

override fun get(): Topology {
        val builder = StreamsBuilder()
        builder.stream<GenericRecord, GenericRecord>("billing-demo.public.shipment")
                .foreach { key, value -> println("$key: ${value}") }
        
        return builder.build(configureProperties())
}

Would seem to be pretty standard. However, we are seeing that each message is processed twice.

NPE is thrown while querying a non-existing state store

The REST API should return query RESULT with status NOT_AVAILABLE when querying a non-existing state store.

Example :

{
  "took": 1,
  "timeout": false,
  "server": "localhost:8080",
  "status": "NOT_AVAILABLE",
  "result": {
    "failure": [
      {
        "server": "oroborus:8084",
        "remote": false,
        "errors": [
          {
            "message": "no metadata found for store 'invalid'"
          }
        ]
      }
    ],
    "total": 0
  }
}

AzkarraPrincipalBuilder should be invoked after user authentication

Currently, the configured AzkarraPrincipalBuilder is used during authentication when it should only be used for extracting the principal used for checking authrorizations.

In addition, the AzkarraPrincipalBuilder cannot be easily configured by users. The ServerConfBuilder shoud expose a setPrincipalBuilder method.

Topologies are started twice when both AzkarraApplication#setAutoStart() is enable and a topology is registered using AzkarraContext#addTopology

Two methods are available for registering a TopologyProvider to an AzkarraContext instance :

  • addTopology: (1) allows to register a topology directly to the specified environment. (2) A component is automatically registered for added topology if no one already exist. (3) all added topologies are automatically started when starting the AzkarraContext.

  • addComponent: (1) registers a topology as a managed component. (2) topologies registered through this method are not started when starting the AzkarraContext instance.

These two methods can be used together when an AzkarraContext is used directly to run the application (i.e : the AzkarraContext.start() method is called by the user-code).

*AzkarraApplication.setAutoStart(true) : When this option is enabled, Azkarra will get all registered components that implement the TopologyProvider interface and automatically registered them to the internal AzkarraContext through the addTopology method.

Thus, when the user-code already calls the addTopology and autoStartup is enable the topology is added and run twice.

This bug was first identified in issue #56

The config and env params should be optional for creating a new streams instance

Currenlty, when creating a new streams instance via REST API we are force to pass an empty "config": {} param. This param should be optional

The follow example fails :

curl -H "Content-Type:application/json" \
-sX POST http://localhost:8080/api/v1/streams \
--data '{"type": "azkarra.WordCountTopology", "version": "1.0",  "env": "__default"}' | jq
{
  "error_code": 500,
  "message": "Unexpected internal server error: parameters Conf cannot be null",
  "exception": "java.lang.NullPointerException",
  "path": "/api/v1/streams"
}

In addition, the env param should have __default as default value.

add options to specify the key type or the key serializer to be used when executing a keyed interactive query

Currently, there is no way to specifiy the type of key used to query a state store. We get the default configured key serializer and fallback to the StringSerializer otherwise.

We should support two additionals query options:

  • key_type: The type of the key.
    This option should be optional. The valid values are : [STRING, INT, LONG, UUID]. The value is used to infer the corresponding Serializer.

  • key_serializer:The serializer to be used for serializing the given key.
    This option should be optional.

When a state store is queried, only one of these options should be used.

allow to set the version of a streams jobs to be used while configuring an environment

Currently, when defining a streams environment (i.e: through application.conf), we can only specify the class/alias of the topology to be run.

Example :

azkarra {
  environments = [
    {
      name = "__default"
      config = {}
      jobs = [
        {
          name = "word-count-demo"
          description = "Kafka Streams WordCount Demo"
          topology = WordCountTopology
        }
      ]
    }
  ]
}

We should be able to also configure the 'version' of the topology type to be run when multiple versions are available in the classpath.

MapConf should support type conversion

Currently, when using MapConf or ArgsConf there is no type conversion. For example , this can be an issue when passing string arguments to main method (e.g azkarra.port).

[question] Porting to another framework

I understand this library uses undertow and Rest-assured.

What would you estimate the level of difficulty would be in porting to another framework?

Say Quarkus or getting Kafka Streams functionality added into dropwizard-kafka (JAX-RS based)

How to specify the application ID?

We'd like to always use the same application ID regardless of the current version to ensure the application will not start processing the messages from the beginning. Is there a configuration for it?

add a new interface StreamThreadExceptionHandler to allow custom UncaughtExceptionHandler strategies

Currenlty, when a StreamThread abruptly terminates due to an uncaught exception, Azkarra immediatly closes the KafkaStreams instance.

Azkarra should expose a new public interface StreamThreadExceptionHandler to allow implementing custom logic.

This new StreamThreadExceptionHandler interface should support interfaces Configurable and StreamsExecutionEnvironmentAware interface. In addition, this feature will add new context property : default.stream.thread.exception.handler

How to handle exception thrown during Kafka Streams execution?

First of all, thank you for the work on this fantastic project.

I'd love to read this FAQ entry , but it's unfinished. I have a stream application that writes to Google BigTable using GRPC and needs to produce a new message only when the record has been successfully flushed.

Does the framework provide any way to recover the stream from an uncaught error during the message processing, such as a connection error?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.