Code Monkey home page Code Monkey logo

ksml's Introduction

Build and test

Axual KSML

KSML is a wrapper around Kafka Streams that allows for development of low code stream processing applications. It was developed by Axual early 2021 and released as open source in May 2021.

Introduction

Kafka Streams has captured the hearts and minds of many developers that want to develop streaming applications on top of Kafka. But as powerful as the framework is, Kafka Streams has had a hard time getting around the requirement of writing Java code and setting up build pipelines. There were some attempts to rebuild Kafka Streams, but up until now popular languages like Python did not receive equally powerful (and maintained) stream processing frameworks. KSML provides a new declarative approach to unlock Kafka Streams to a wider audience. Using only a few simple basic rules and Python snippets, you will be able to write streaming applications in very little time.

Language

To quickly jump to the KSML specification, use this link: https://axual.github.io/ksml/

Project Overview

The project is divided into modules based functionality in order to be included separately depending on the use case.

The submodules are as follows:

Module Description
graalpy-module-collection contains dependencies to all GraalVM runtime modules for Python.
ksml-data contains core data type logic, including BINARY and JSON types, used in Kafka Streams topologies and by Producers.
ksml-data-avro extension to the data library for AVRO support.
ksml-data-csv extension to the data library for CSV support.
ksml-data-soap extension to the data library for SOAP support.
ksml-data-xml extension to the data library for XML support.
ksml the core component that parses KSML definitions and converts them to a Kafka Streams topology.
ksml-kafka-clients the set of Kafka clients for KSML, injected into Kafka Streams, allowing for namespaced Kafka installations.
ksml-query allows an active KSML application to be queries via REST for its internal state stores.
ksml-runner standalone Java application for running KSML definitions.

Building KSML

Building and running KSML requires an installation of GraalVM and the corresponding Python module. There are two ways to do this:

  1. Use the supplied multistage Docker build file
  2. Install GraalVM locally

See the paragraphs below for details.

Using the multistage Docker build

You can build either the standard KSML runner, or the runner for the Axual platform using one of the following commands:

# Create the BuildX builder for KSML 
docker buildx create --name ksml
# Build KSML Runner
docker buildx --builder ksml build --load --platform linux/amd64,linux/arm64 -t axual/ksml:local --target ksml -f Dockerfile .
# Remove the BuildX builder for KSML
docker buildx rm ksml

If you get the following error it means that your setup cannot build for multiple platforms yet.

ERROR: docker exporter does not currently support exporting manifest lists

You can perform a build for just your platform by removing the --platform linux/amd64,linux/arm64 arguments from the commands above

# Create the BuildX builder for KSML 
docker buildx create --name ksml
# Build KSML Runner
docker buildx --builder ksml build --load -t axual/ksml:local --target ksml -f Dockerfile .
# Remove the BuildX builder for KSML
docker buildx rm ksml

Install GraalVM locally

Download GraalVM for Java 21 or later from this page and install it for your platform as explained.

Once installed, select GraalVM as your default Java JVM. Then you can build KSML using the normal Maven commands:

mvn clean package

Running KSML

To run the KSML demo, we provide a Docker compose file which will start Kafka, create the demo topics, and start a container with a demo producer. You can then start the runner you generated in the previous step, passing in a KSML configuration of your choice. See Getting started or Runners for details.

To run the demo, Docker 19.x is required.

Contributing

Axual is interested in building the community; we would welcome any thoughts or patches. You can reach us here.

See Contributing.

ksml's People

Contributors

jeroenvandisseldorp avatar jorisjumanne avatar netgio avatar richard-axual avatar tonvanbart avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

ksml's Issues

Dynamic output to topics by value?

I'm quite familiar at kafka, kafka connect and new with ksml.
Is there a way that output topics can be caculated by value of message (in schema), or some function?

"NoClassDefFoundError: javax/ws/rs/core/UriBuilder" when running the demo runner

I followed the Getting Started tutorial to run the first example YAML, but I get the following error when executing the run.sh file.

2022-04-26T14:03:09,753Z [system] [pool-4-thread-1] INFO  i.a.k.r.backend.kafka.KafkaBackend -
                Starting Kafka Backend
2022-04-26T14:03:10,053Z [system] [io.ksml.example.processor-00d39059-38f3-4371-b2f6-09c570d8005d-StreamThread-1] ERROR o.apache.kafka.streams.KafkaStreams -
                stream-client [io.ksml.example.processor-00d39059-38f3-4371-b2f6-09c570d8005d] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
java.lang.NoClassDefFoundError: javax/ws/rs/core/UriBuilder
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:656)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:273)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:97)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:76)
        at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
        at io.axual.ksml.notation.AvroNotation$AvroSerde$2.deserialize(AvroNotation.java:77)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:957)
        at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1009)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:907)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: java.lang.ClassNotFoundException: javax.ws.rs.core.UriBuilder
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
        ... 22 common frames omitted
2022-04-26T14:03:10,053Z [system] [io.ksml.example.processor-00d39059-38f3-4371-b2f6-09c570d8005d-StreamThread-1] ERROR o.apache.kafka.streams.KafkaStreams -
                stream-client [io.ksml.example.processor-00d39059-38f3-4371-b2f6-09c570d8005d] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
java.lang.NoClassDefFoundError: javax/ws/rs/core/UriBuilder
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:656)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:273)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:97)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:76)
        at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
        at io.axual.ksml.notation.AvroNotation$AvroSerde$2.deserialize(AvroNotation.java:77)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:957)
        at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1009)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:907)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: java.lang.ClassNotFoundException: javax.ws.rs.core.UriBuilder
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
        ... 22 common frames omitted
2022-04-26T14:03:10,773Z [system] [pool-4-thread-1] INFO  i.a.k.r.backend.kafka.KafkaBackend -
                Streams implementation has stopped, stopping Kafka Backend

Environment

  • Running on the WSL2 (Ubuntu) of Windows 10.
  • Same behaviour on Java 11 or 1.8

Problem

The problem should come from the runner, as the logs of the producer seem good according to the Getting started.

Thank you for your help

Example 10-demo-queryabletable is missing a topic

Upon building the runner image and running 10-demo-queryabletable as per the instructions in the README, the run fails with the following stacktrace:

2022-05-13T15:35:20,452Z [system] [io.ksml.example.processor-eccaa6cf-4139-49ea-ad4b-33ba260763c3-StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator -
                [Consumer clientId=io.ksml.example.processor-eccaa6cf-4139-49ea-ad4b-33ba260763c3-StreamThread-1-consumer, groupId=io.ksml.example.processor] User provided listener org.apache.kafka.streams.processor.internals.StreamsRebalanceListener failed on invocation of onPartitionsAssigned for partitions []
org.apache.kafka.streams.errors.MissingSourceTopicException: One or more source topics were missing during rebalance
	at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:58)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:925)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
2022-05-13T15:35:20,455Z [system] [io.ksml.example.processor-eccaa6cf-4139-49ea-ad4b-33ba260763c3-StreamThread-1] ERROR o.apache.kafka.streams.KafkaStreams -
                stream-client [io.ksml.example.processor-eccaa6cf-4139-49ea-ad4b-33ba260763c3] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.

Looking in the yaml file, I suspect that topic ksml_sensordata_table is missing.

Multistage Dockerfile build fails on Windows

As reported by @hclbaur building using the multistage Dockerfile build does not work on Windows due to carriage return versus line feed issues. See #44 .
Example of an error:

#13 196.1 [ERROR] Tests run: 10, Failures: 5, Errors: 0, Skipped: 0, Time elapsed: 56.805 s <<< FAILURE! - in io.axual.ksml.KSMLTopologyGeneratorBasicTest
#13 196.1 [ERROR] parseAndCheckOuput{int}[1]  Time elapsed: 55.643 s  <<< FAILURE!
#13 196.1 java.lang.AssertionError:
#13 196.1
#13 196.1 Expected: is "Topologies:\r\n   Sub-topology: 0\r\n    Source: sensor_stream (topics: [ksml_sensordata_avro])\r\n      --> for_each_001\r\n    Processor: for_each_001 (stores: [])\r\n      --> none\r\n      <-- sensor_stream\r\n\r\n"
#13 196.1      but: was "Topologies:\n   Sub-topology: 0\n    Source: sensor_stream (topics: [ksml_sensordata_avro])\n      --> for_each_001\n    Processor: for_each_001 (stores: [])\n      --> none\n      <-- sensor_stream\n\n"
#13 196.1       at io.axual.ksml.KSMLTopologyGeneratorBasicTest.parseAndCheckOuput(KSMLTopologyGeneratorBasicTest.java:76)

Update to use latest LTS Java (17)

It might be a good idea to move KSML from Java 11 to the latest LTS release, Java 17
This new release provides new language options, as well as optimisations to the runtime.

Docker image, GraalPy venv, install and GU commands fail

Several errors occur when trying to install a python framework using the GraalVM instructions

I've described the steps below

  1. Start an interactive container with docker run --rm -ti --entrypoint /bin/bash axual/ksml:0.1.0
  2. Initialize a venv in the /opt/ksml directory using the command graalpy -m venv /opt/ksml/
  3. The command fails with graalerror.log

Running the image as root gets me further

  1. Start an interactive container with docker run --rm -ti -u root --entrypoint /bin/bash axual/ksml:0.1.0
  2. Initialize a venv in the /opt/ksml directory using the command graalpy -m venv /opt/ksml/
  3. The command completes without error, only with the message We're not using symlinks in a Graal Python venv
  4. I activate venv by going into the /opt/ksml directory and running source ./bin/activate
  5. The prompt changes to (ksml) [root@14db75e59cc9 ksml]# as expected
  6. Running install NumPy graalpy -m ginstall install numpy fails with graalpyinstall_error.log

It looks like there are two issues with the image

  1. The normal user is not allowed to execute graalpy or gu, which gives a similar error
  2. The installation of python packages require them to be build directly for that image. No build tools are installed

Requested fix:

  1. Update the image that the normal non-root user can run all GraalVM commands
  2. Provide a build image which can be used to create custom images with python packages installed

Investigate Github Actions

Instead of using Travis-CI, we could use Github Actions to set up a CI workflow, including building and pushing Docker images, and running code analysis on Sonarcloud. We should investigate the possibilities and consider if a migration would be useful.

Multistage docker build fails when Maven updates

Whenever the Maven download archive gets updated, the multistage Docker build will fail.
For e.g. at the time of writing, the buildfile contains this:

RUN curl -O https://downloads.apache.org/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gz \

However, Maven has been updated to 3.8.5, and this command will fail.
It would be better to download from https://archive.apache.org/dist/maven/maven-3/ as the intended release will still be present here.

Running 09-demo-aggregate.yaml using script results in Linux ownership issues

Found this while trying to reproduce #35 on my private (Linux) laptop - fails with yet another exception:

Platform: Ubuntu 20.04 LTS
Steps

  • build ksml runner image: docker build -t axual/ksml -f Dockerfile-build-runner . --build-arg runner=ksml-runner
  • start local environment: docker-compose up -d
  • modify examples/ksml-runner.yml so that it points to 09-demo-aggregate.yaml
  • start the local run: ./examples/run.sh

Expected result: same exception as #35
Actual result: the runner fails with the following stacktrace:

2022-04-22T20:55:17,973Z [system] [main] INFO  i.a.k.g.TopologyGeneratorImpl -
                  owner_count: retention=PT3M, key=BINARY:String, value=BINARY:Long, caching=false
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.streams.errors.ProcessorStateException: state directory [/ksml/io.ksml.example.processor] doesn't exist and couldn't be created
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:807)
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:782)
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:692)
	at io.axual.ksml.runner.backend.kafka.KafkaBackend.<init>(KafkaBackend.java:82)
	at io.axual.ksml.runner.backend.kafka.KafkaBackendProvider.create(KafkaBackendProvider.java:44)
	at io.axual.ksml.runner.config.KSMLRunnerConfig.getConfiguredBackend(KSMLRunnerConfig.java:80)
	at io.axual.ksml.runner.KSMLRunner.main(KSMLRunner.java:57)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: state directory [/ksml/io.ksml.example.processor] doesn't exist and couldn't be created
	at org.apache.kafka.streams.processor.internals.StateDirectory.<init>(StateDirectory.java:134)
	at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:804)
	... 6 more
.../ksml on ๎‚  main [$!?] 

KSML crashes when providing Null data

When a Null key or value is used in a message which is read and processed then the PythonFunction fails with a wrong parameter message.
This can be reproduced by running the Peek operation example and produce a record with a Null key or value.

The error was located at PythonFunction class, method call in the validation loop (line 88-92)

Update:
When testing joins with Null data there are also exception, null handling needs to be checked in general.
This join exception can be reproduced by running the join example without a data generator and provide a string/null value on the ksml_sensoralert_settings topic

Caused by: io.axual.ksml.data.exception.ExecutionException: KSML Execution error: Incorrect type passed in: expected=SensorAlertSettings, got Null
	at io.axual.ksml.data.serde.DataObjectSerializer.serialize(DataObjectSerializer.java:60)
	at io.axual.ksml.serde.StoreSerde.lambda$new$0(StoreSerde.java:57)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:83)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:74)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:31)
	at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:195)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:331)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:331)
	at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:131)
	at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.put(KeyValueStoreWrapper.java:102)
	at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:151)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
	at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
	at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
	... 6 common frames omitted

Stream-Stream join issues

Hi!
I'm trying to implement stream-stream join with values of type Avro and getting following error:

2024-01-19T10:18:45,977Z ERROR io.axual.ksml.runner.KSMLRunner      An exception occurred while running KSML
io.axual.ksml.exception.KSMLTopologyException: Topology generation error: Error in topology: Join stream keyType is expected to be of type String
        at io.axual.ksml.operation.BaseOperation.topologyError(BaseOperation.java:101)
        at io.axual.ksml.operation.BaseOperation.checkType(BaseOperation.java:220)
        at io.axual.ksml.operation.BaseOperation.checkType(BaseOperation.java:215)
        at io.axual.ksml.operation.BaseOperation.checkType(BaseOperation.java:211)
        at io.axual.ksml.operation.JoinOperation.apply(JoinOperation.java:82)
        at io.axual.ksml.stream.KStreamWrapper.apply(KStreamWrapper.java:38)
        at io.axual.ksml.generator.TopologyGeneratorImpl.lambda$generate$0(TopologyGeneratorImpl.java:199)
        at java.base/java.util.HashMap.forEach(HashMap.java:1429)
        at io.axual.ksml.generator.TopologyGeneratorImpl.generate(TopologyGeneratorImpl.java:192)
        at io.axual.ksml.generator.TopologyGeneratorImpl.create(TopologyGeneratorImpl.java:96)
        at io.axual.ksml.KSMLTopologyGenerator.create(KSMLTopologyGenerator.java:63)
        at io.axual.ksml.runner.backend.KafkaBackend.<init>(KafkaBackend.java:100)
        at io.axual.ksml.runner.KSMLRunner.main(KSMLRunner.java:84)

My ksml pipeline:

streams:
  left_source:
    topic: ksml_sensordata_avro
    keyType: string
    valueType: avro:SensorData
  right_source:
    topic: ksml_sensordata_avro
    keyType: string
    valueType: avro:SensorData 
  join_result:
    topic: join_result
    keyType: string
    valueType: json
    


functions:
  joiner_func:
    type: valueJoiner
    code: |
      print('JOINING\n\t value1=' + str(value1) + '\n\t value2=' + str(value2))

      new_value={"left": value1, "right": value2 }
      print('JOINED sensordata=' + str(new_value))
    expression: new_value
    resultType: json

pipelines:
  join:
    from: left_source
    via:
      - name: joiner_funcing
        type: join
        stream: right_source
        valueJoiner: joiner_func
        window: 10s
    to: join_result

When i changed source topics to 'ksml_sensordata_json', and value types to 'string' - pipeline started and worked without error.

Ksml was build from main branch. I'm using kafka backend.

Is ksml able to join streams with values of avro type?

P.S.
It seems there is an typo in documentation about join operation: https://axual.github.io/ksml/operations.html#join.
Instead of parameter 'duration' i had to use parameter 'window'. With using 'duration' i got following error:
2024-01-19T10:42:22,889Z ERROR io.axual.ksml.execution.FatalError Description: Invalid value for parameter "timeDifference" (value was: null). It shouldn't be null.

Add Maven dependency convergence check

In order to prevent issues like #37 it would be helpful to add the Maven enforcer plugin with the convergence check rule. This will break the build if dependency conflicts are present.

Error regarding "configDirectory" when running examples/run.sh

When running the run.sh (or run.cmd) script I get the error below.
I believe the problem has to do with the annotation:

[ksml\ksml-runner\src\main\java\io\axual\ksml\runner\config\KSMLConfig.java]

@JsonProperty("configDirectory")
private String configurationDirectory;

Shouldn't the annotation be on the getter method rather than on the field?

2022-05-27T09:37:28,447Z [system] [main] ERROR io.axual.ksml.runner.KSMLRunner - An exception occurred while reading the configuration
com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "configDirectory" (class io.axual.ksml.runner.config.KSMLConfig), not marked as ignorable (3 known properties: "definitions", "application.server", "workingDirectory"])
 at [Source: (File); line: 3, column: 25] (through reference chain: io.axual.ksml.runner.config.KSMLRunnerConfig["ksml"]->io.axual.ksml.runner.config.KSMLConfig["configDirectory"])
        at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:855)
        at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1212)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1604)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1582)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:299)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:156)
        at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:293)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:156)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4482)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3299)
        at io.axual.ksml.runner.KSMLRunner.main(KSMLRunner.java:51)

Allow specifying different security settings in runners and data generators

The current data generator and runner are limited in their configuration options for SASL and TLS:

  • Truststores AND keystores are both required
  • Passwords are required for keystores and truststores
  • No PEM file support for private keys and certificates
  • No PEM file support for trusted certificate authorities
  • No PEM string support for private keys and certificates
  • No PEM string support for trusted certificate authorities

Currently it is not consistently possible with KSML to set SASL credentials.
This might be harder to add because the backing Axual Streams implementation is lacking SASL support and is not considered required as a fix for this issue. If this is the case we should create a new issue to allow SASL to be used with Axual Runners and Data Generators

KSML lacking ARM64 compatibility

The docker image for KSML is For X86/64 only, causing a considerable slowdown on Apple Silicon machines for example.

The Docker compose file for running examples also uses images without support for Apple Silicon.

Update the workflows to release multi arch images, and use images with multiple architecture support in the docker compose

SchemaLoader uses hard coded Unix path separator

Class SchemaLoader has a hard coded constant DIRECTORY_SEPARATOR set to /, which will probably fail if someone tries to run KSML on a Windows system. Replacing this with File.pathSeparator() or similar would be more robust.
I did not check if there are other locations in the code with similar constants.

running the standard demo 09-demo-aggregate.yaml against demo env results in and KSMLParseException

When i run the 09-demo-aggregate.yaml_ against the local build I get the following stacktrace:

022-04-21T13:39:23,730Z [system] [main] INFO  io.axual.ksml.runner.KSMLRunner - Using backed of type kafka
2022-04-21T13:39:23,735Z [system] [main] INFO  i.a.k.runner.config.KSMLRunnerConfig - Found provider io.axual.ksml.runner.backend.kafka.KafkaBackendProvider for type kafka
2022-04-21T13:39:23,771Z [system] [main] INFO  i.a.k.r.backend.kafka.KafkaBackend - Constructing Kafka Backend
2022-04-21T13:39:23,788Z [system] [main] INFO  i.a.k.g.TopologyGeneratorImpl - Reading KSML from source file(s): [09-demo-aggregate.yaml]
2022-04-21T13:39:24,155Z [system] [main] INFO  io.axual.ksml.user.UserFunction - Registered function 'ksml_pipelines_main_step_1_groupBy_mapper(key:?, value:?) ==> String'
2022-04-21T13:39:29,555Z [system] [main] INFO  io.axual.ksml.user.UserFunction - Registered function 'ksml_pipelines_main_step_3_aggregate_initializer() ==> Long'
2022-04-21T13:39:29,560Z [system] [main] INFO  io.axual.ksml.user.UserFunction - Registered function 'ksml_pipelines_main_step_3_aggregate_aggregator(key:?, value:?, aggregatedValue:?) ==> Long'
Exception in thread "main" io.axual.ksml.exception.KSMLParseException: Error in YAML node 'ksml->pipelines->main->step->3->aggregate': User function definition not found, add 'merger' to specification
        at io.axual.ksml.parser.ContextAwareParser.parseFunction(ContextAwareParser.java:53)
        at io.axual.ksml.parser.ContextAwareParser.parseFunction(ContextAwareParser.java:41)
        at io.axual.ksml.operation.parser.AggregateOperationParser.parse(AggregateOperationParser.java:51)
        at io.axual.ksml.operation.parser.AggregateOperationParser.parse(AggregateOperationParser.java:35)
        at io.axual.ksml.operation.parser.PipelineOperationParser.parse(PipelineOperationParser.java:83)
        at io.axual.ksml.operation.parser.PipelineOperationParser.parse(PipelineOperationParser.java:65)
        at io.axual.ksml.parser.ListParser.parse(ListParser.java:40)
        at io.axual.ksml.definition.parser.PipelineDefinitionParser.parse(PipelineDefinitionParser.java:52)
        at io.axual.ksml.definition.parser.PipelineDefinitionParser.parse(PipelineDefinitionParser.java:43)
        at io.axual.ksml.definition.parser.PipelineDefinitionParser.parse(PipelineDefinitionParser.java:36)
        at io.axual.ksml.parser.MapParser.parse(MapParser.java:40)
        at io.axual.ksml.generator.TopologyGeneratorImpl.generate(TopologyGeneratorImpl.java:156)
        at io.axual.ksml.generator.TopologyGeneratorImpl.create(TopologyGeneratorImpl.java:134)
        at io.axual.ksml.KSMLTopologyGenerator.create(KSMLTopologyGenerator.java:54)
        at io.axual.ksml.runner.backend.kafka.KafkaBackend.<init>(KafkaBackend.java:77)
        at io.axual.ksml.runner.backend.kafka.KafkaBackendProvider.create(KafkaBackendProvider.java:44)
        at io.axual.ksml.runner.config.KSMLRunnerConfig.getConfiguredBackend(KSMLRunnerConfig.java:80)
        at io.axual.ksml.runner.KSMLRunner.main(KSMLRunner.java:54)

I did not change a thing to the 09-demo config yaml

Error processing a message with key NULL.

I am not 100% sure what is causing this error in the ksml/runner and also not why it is referring to a Json deserialiser:

Caused by: io.axual.ksml.exception.KSMLExecutionException: Error while executing the Kafka Streams topology: Json serde not found for data type null

I believe it has to do with the fact that the message has no key (or a key with a null value).
The message looks like this (in message browser in the Axual Self-service environment):

"message":{
"partition":4
"offset":951
"timestamp":1652912264843
"headers":[...]
"value":
"0000000911020e43524541544544021a4d6574657252656164696e6773021470352d70..."
"key":NULL
}

The key is NULL and the value is a binary string. I configured the KSML app like this, with keyType none and valueType bytes:

streams:
  p5meterresponse:
    topic: APP9329_LianderP5MeterResponse
    keyType: none
    valueType: bytes

functions:
  print_message:
    type: forEach
    code: "print('key='+(key if isinstance(key,str) else str(key))+', value='+(value if isinstance(value,str) else str(value)))"

pipelines:
  main:
    from: p5meterresponse
    forEach: print_message

I also tried keyType string but then I see a null pointer exception when reading the message.

The message we are trying to read is actually an Avro message from the Liander platform, but for demo purpose I would like to read it as a binary type.

Add stream configuration options to KSML Runners

The current KSML runners are very different with configuration options for the backing Kafka Streams implementations.

A uniform configuration approach is needed to allow for easy and scalable deployments of KSML.
See Kafka Streams Configs for the available configurations
Several of these configurations are required for proper scaling/behaviour in production.

At first I see these items as needed for deployments, based on past experience with cloud and kubernetes deployments:

ksml gives a java.lang.IllegalArgumentException: Illegal base64 character 7b running against the Axual trial env

I build the KSML container to run against a Axual environment. I tried to run it against my current trail environment and a working stream. I used on of the examples which just prints the message (the demo inspect ksml). After a start the container with the working config it start but eventually it fails with the following exception

docker run --name ksml-example --rm -ti -v /Users/frenkochse/src/ksml/examples:/ksml -w /ksml axual/ksml:latest

2022-04-07T09:09:04,524Z [system] [pool-1-thread-1] ERROR io.axual.common.tools.SslUtil -
                Key with alias "dst.root.ca.x3.cert.pem": CN=DST Root CA X3, O=Digital Signature Trust Co. became invalid at Thu Sep 30 14:01:15 UTC 2021
2022-04-07T09:09:04,946Z [system] [pool-1-thread-1] INFO  io.axual.ksml.user.UserFunction -
                Registered function 'ksml_pipelines_main_forEach(key:?, value:?)'
2022-04-07T09:09:11,035Z [system] [pool-1-thread-1] INFO  i.a.k.g.TopologyGeneratorImpl -
                Activating Kafka Streams topology:
2022-04-07T09:09:11,035Z [system] [pool-1-thread-1] INFO  i.a.k.g.TopologyGeneratorImpl -
                KStream[key=BINARY:String, value=AVRO:io.axual.client.example.schema.Application]
2022-04-07T09:09:11,047Z [system] [pool-1-thread-1] INFO  i.a.k.g.TopologyGeneratorImpl -
                    ==> for_each_001 operation ForEach
2022-04-07T09:09:11,068Z [system] [pool-1-thread-1] INFO  i.a.k.g.TopologyGeneratorImpl -
 
...
...
...

Topologies:
   Sub-topology: 0
    Source: application_source (topics: [applicationlogevents])
      --> for_each_001
    Processor: for_each_001 (stores: [])
      --> none
      <-- application_source


2022-04-07T09:49:46,339Z [system] [streams-loganalyser-1-StreamThread-1] ERROR i.a.k.r.backend.axual.AxualBackend -
                Caught serious exception in thread!
java.lang.IllegalArgumentException: Illegal base64 character 7b
        at java.base/java.util.Base64$Decoder.decode0(Base64.java:746)
        at java.base/java.util.Base64$Decoder.decode(Base64.java:538)
        at java.base/java.util.Base64$Decoder.decode(Base64.java:561)
        at org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:1097)
        at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:906)
        at org.apache.kafka.streams.processor.internals.StreamTask.resetOffsetsIfNeededAndInitializeMetadata(StreamTask.java:882)
        at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:244)
        at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:461)
        at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)

I got some feedback which was pointing to the fact that the kafka registry is not able to proces the meta ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.