Code Monkey home page Code Monkey logo

kafka-connector-extension's Introduction

Kafka Connector Extension

Build Status

Quality Gate Status

Security Rating Reliability Rating Maintainability Rating Technical Debt

Code Smells Coverage Duplicated Lines (%) Lines of Code

Exasol Kafka Extension provides UDF scripts that allow accessing Apache Kafka and importing data from a Kafka topic into an Exasol table.

Features

  • Imports Apache Avro formatted data from Apache Kafka clusters
  • Imports JSON formatted data from Apache Kafka clusters
  • Allows selecting record fields when importing
  • Allows secure connection to Apache Kafka clusters

Information for Users

Additional resources:

Information for Contributors

kafka-connector-extension's People

Contributors

baunz avatar ilikutle avatar jwarlander avatar kaklakariada avatar millin avatar morazow avatar pj-spoelders avatar shmuma avatar

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connector-extension's Issues

Remove or replace `avro4s` dependency

Situation

The avro4s project is a macro based library we use to generate Avro schemas of classes at compile time. Unfortunately, we should replace or remove this dependency since it is in maintenance mode only now.

We can't find an replacement, then we may have to write some boilerplate code to generate the Avro schemas. Please note that it is only used in tests.

AC

  • Replace or remove the dependency

Fix logging bug

Situation

Currently, logging does not work since we missed the slf4j logging dependencies.

Acceptance Criteria

  • Add missing library

Update Kafka Client version to 2.6.0

Situation

Also Kafka Avro Serializer and Kafka Schema Registry versions to 6.0.0.

It includes some breaking changes only for integration tests.

Acceptance Criteria

  • Versions updated
  • Tests are fixed

Add Option to import JSON as-is

Having an option to import the JSON from the kafka message as-is into a VARCHAR(2000000) field will provide more flexibility to read JSON data downstream using JSON functions JSON_VALUE and JSON_EXTRACT. We are currently implementing this and would like to provide a pull request once finished.

Add support for custom krb5.conf

In https://exasol.atlassian.net/browse/L3-1417, client faced with requirement to specify their own krb5.conf, which we currently doesn't support.

Here we should implement the support of this, as this might be needed in complex setups.
It will allow for user to specify the path to the file in the bucketfs and setup java configuration settings accordingly

Need to implement

  • parameter for the user to set the file path
  • check that file indeed exists
  • set configuration options to take the file into account
  • implement integration tests for this
  • update documentation

๐Ÿ” CVE-2023-6378: pkg:maven/ch.qos.logback/[email protected]

Refactor Kafka properties class

Situation

The class that manages Kafka properties is more than 500 lines. It contains both validation and properties settings. We should refactor and decouple these.

Acceptance Criteria

  • Refactored Kafka properties manager class

VM error: Internal error: VM crashed

Hello @morazow!

Sometimes I get the following error, but I can't find the reason.
Cound you help? Maybe you know possible reason of such crash.

W-UDF-CL-SL-JAVA-1075: Skipping init, because init method cannot be found.
[main] INFO com.exasol.cloudetl.kafka.KafkaTopicDataImporter$ - Starting Kafka consumer for partition '0' at next offset '906128505' for node '0' and vm '140604604606536'.
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [kafka-01:9092, kafka-02:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-ExaDWH-1
	client.rack =
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 209715200
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = ExaDWH
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class com.exasol.cloudetl.kafka.deserialization.IgnoreKeyDeserializer$
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = PLAIN
	security.protocol = SASL_SSL
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = [hidden]
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = /buckets/bfsdefault/kafka/ca.jks
	ssl.keystore.password = [hidden]
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = /buckets/bfsdefault/kafka/ca.jks
	ssl.truststore.password = [hidden]
	ssl.truststore.type = JKS
	value.deserializer = class com.exasol.cloudetl.kafka.deserialization.AsStringDeserializer

[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 6.2.0-ccs
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 1a5755cf9401c84f
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1634206621469
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-ExaDWH-1, groupId=ExaDWH] Subscribed to partition(s): order_attributes-0
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-ExaDWH-1, groupId=ExaDWH] Seeking to offset 906128505 for partition order_attributes-0
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-ExaDWH-1, groupId=ExaDWH] Cluster ID: aVhFD8kQRFevL6md4ocEbQ
[main] INFO com.exasol.cloudetl.kafka.consumer.KafkaRecordConsumer - The last record offset for partition '0' is '906188812'.
[main] INFO com.exasol.cloudetl.kafka.consumer.KafkaRecordConsumer - Polled '500' records, total '500' records for partition '0' in node '0' and vm '140604604606536'.
[main] INFO com.exasol.cloudetl.kafka.consumer.KafkaRecordConsumer - Polled '106' records, total '606' records for partition '0' in node '0' and vm '140604604606536'.
[main] INFO com.exasol.cloudetl.kafka.consumer.KafkaRecordConsumer - Polled '500' records, total '1106' records for partition '0' in node '0' and vm '140604604606536'.
[main] INFO com.exasol.cloudetl.kafka.consumer.KafkaRecordConsumer - Polled '37' records, total '1143' records for partition '0' in node '0' and vm '140604604606536'.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-ExaDWH-1 unregistered
F-UDF-CL-LIB-1000:exaudfclient aborting ... cannot access socket file /tmp/zmqvmcontainer_conn_9703630733253653609.
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f09c31b3a10, pid=1, tid=2
#
# JRE version: OpenJDK Runtime Environment (11.0.10+9) (build 11.0.10+9-Ubuntu-0ubuntu1.18.04)
# Java VM: OpenJDK 64-Bit Server VM (11.0.10+9-Ubuntu-0ubuntu1.18.04, mixed mode, sharing, tiered, compressed oops, serial gc, linux-amd64)
# Problematic frame:
# C  [libc.so.6+0x40a10]  abort+0x230
#
# No core dump will be written. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /tmp/hs_err_pid1.log
#
# If you would like to submit a bug report, please visit:
#   https://bugs.launchpad.net/ubuntu/+source/openjdk-lts
#

[error occurred during error reporting (), id 0xb, SIGSEGV (0xb) at pc=0x00007f09c8426a10]

Add example of JAAS config in docs

File jaas.conf should not contain groups, just login module(s) definition. We should have a small example in docs to illustrate this.

๐Ÿ” CVE-2023-51775: org.bitbucket.b_c:jose4j:jar:0.9.3:test

Summary

The jose4j component before 0.9.4 for Java allows attackers to cause a denial of service (CPU consumption) via a large p2c (aka PBES2 Count) value.

Sonatype's research suggests that this CVE's details differ from those defined at NVD. See https://ossindex.sonatype.org/vulnerability/CVE-2023-51775 for details

CVE: CVE-2023-51775
CWE: CWE-400

References

Optionally check keystore and truststore file when SASL_SSL protocol used

Situation

At the moment, we require keystore and truststore JKS files stored in BucketFS bucket when using security protocols SSL
and SASL_SSL.

For SASL_SSL this should be optional since JAAS configuration can be also used.

Acceptance Criteria

  • Skips keystore and truststore location checks when using SASL_SSL protocol.
  • Sets locations files if they are provided.

๐Ÿ” CVE-2024-29025: io.netty:netty-codec-http:jar:4.1.107.Final:test

Summary

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. The HttpPostRequestDecoder can be tricked to accumulate data. While the decoder can store items on the disk if configured so, there are no limits to the number of fields the form can have, an attacher can send a chunked post consisting of many small fields that will be accumulated in the bodyListHttpData list. The decoder cumulates bytes in the undecodedChunk buffer until it can decode a field, this field can cumulate data without limits. This vulnerability is fixed in 4.1.108.Final.

CVE: CVE-2024-29025
CWE: CWE-770

References

Kafka record consumer fails to recognize when it's *already* at the last offset

If I run an import with CONSUME_ALL_OFFSETS = 'true' on a topic where I'm already caught up, the polling loop will keep spinning until a new message appears on the topic, as the lastRecordOffset is initialized to 0 instead of the current partitionEndOffset -- and then, if no records appeared during the poll, 0 would be compared to the partitionEndOffset in shouldContinue():

private[this] def emitRecords(
iterator: ExaIterator,
r: ConsumerRecords[Map[FieldSpecification, Seq[Any]], Map[FieldSpecification, Seq[Any]]]
): Long = {
var lastRecordOffset = 0L
r.asScala.foreach { record =>
lastRecordOffset = record.offset()
val metadata: Seq[Object] = Seq(
record.partition().asInstanceOf[AnyRef],
record.offset().asInstanceOf[AnyRef]
)
val columnsCount = tableColumnCount - metadata.size
val rowValues = RowBuilder.buildRow(recordFieldSpecifications, record, columnsCount)
val rows: Seq[Any] = rowValues ++ metadata
iterator.emit(rows: _*)
}
lastRecordOffset
}
private[this] def shouldContinue(
recordOffset: Long,
recordCount: Int,
totalRecordCount: Long
): Boolean =
(properties.isConsumeAllOffsetsEnabled() && recordOffset < partitionEndOffset) ||
(recordCount >= minRecordsPerRun && totalRecordCount < maxRecordsPerRun)

The fix would probably be to just initialize lastRecordOffset to the value of partitionEndOffset before the polling loop.

However, not sure what happens if the topic partition is actually empty? Would the Kafka consumer return the end offset as 0? In this case partitionEndOffset would become -1, lastRecordOffset would also be initialized to -1, and all is fine.

Import raw JSON from Kafka

Hi,

I wonder if this library would be the right place to import raw JSON from kafka into a single column. This is a common use case as a lot of engineering teams place their messages on kafka using this format. Working with JSON data in Exasol is easy nowadays and more convenient.

This would require changes in the deserialization (no schema registry but plain StringDeserializer) but the rest is pretty the same and one could omit the schema registry url.

Greetings,
Johannes

Error when trying to consume a topic with NULL values

When running a local test in my dev schema after upgrading the Kafka Connector to 1.0.0, I ran into the following issue:

[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-EXASOL_KAFKA_UDFS_CONSUMERS-1, groupId=EXASOL_KAFKA_UDFS_CONSUMERS] Fetch position FetchPosition{offset=117013, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>:<PORT> (id: 22 rack: null)], epoch=0}} is out of range for partition public.pro-seller.offer-0, resetting offset
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-EXASOL_KAFKA_UDFS_CONSUMERS-1, groupId=EXASOL_KAFKA_UDFS_CONSUMERS] Resetting offset for partition public.pro-seller.offer-0 to position FetchPosition{offset=139566, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>:<PORT> (id: 22 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-EXASOL_KAFKA_UDFS_CONSUMERS-1 unregistered
F-UDF-CL-SL-JAVA-1080: Exception during run
com.exasol.cloudetl.kafka.KafkaConnectorException: Error consuming Kafka topic 'public.pro-seller.offer' data. It occurs for partition '0' in node '0' and vm '140681706548736' Cause: null
com.exasol.cloudetl.kafka.KafkaTopicDataImporter$.run(KafkaTopicDataImporter.scala:99)
com.exasol.cloudetl.kafka.KafkaTopicDataImporter.run(KafkaTopicDataImporter.scala)
com.exasol.ExaWrapper.run(ExaWrapper.java:196)
Caused by: java.lang.NullPointerException
com.exasol.cloudetl.kafka.KafkaTopicDataImporter$.$anonfun$run$1(KafkaTopicDataImporter.scala:84)
com.exasol.cloudetl.kafka.KafkaTopicDataImporter$.$anonfun$run$1$adapted(KafkaTopicDataImporter.scala:71)
scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
scala.collection.AbstractIterable.foreach(Iterable.scala:919)
com.exasol.cloudetl.kafka.KafkaTopicDataImporter$.run(KafkaTopicDataImporter.scala:71)
	... 6 more
 
W-UDF.CL.SL.JAVA-1082: Skipping init, because cleanup method cannot be found.

This seems to relate to the following line in KafkaTopicDataImporter.scala:

          val exasolRow: Seq[Any] = recordValue ++ metadata

Is there a possibility that the above construct will fail if the recordValue is null? This is something that quite frequently happens in the topic I'm reading here, as a way to indicate for a record to be deleted..

Previously, that particular line looked like:

          val exasolRow: Seq[Object] = getAvroRow(singleColumnJson, recordValue) ++ metadata

..where getAvroRow() was defined like:

  private def getAvroRow(singleJson: Boolean, recordValue: GenericData.Record): Seq[Object] =
    if (singleJson) {
      Seq(s"$recordValue")
    } else {
      AvroRow(recordValue).getValues().map(_.asInstanceOf[AnyRef])
    }

Apparently the null conversion would pass by just fine as part of the string interpolation above (I'm extracting value as JSON).

extension v0.2.1 on exasol v7.0.4 crashes VM

I am trying to make the extension v0.2.1 work with an exasol v7.0.4.
When running the import statement the client receives a short note that the VM has crashed.
Please see the attached files with commands/results and server-side exception log.
commands.txt
exasol_exception.log

I asked my colleague @ilikutle to take a look at the error message and the setup together with me.
He confirmed the correct setup of the udfs & target table and the installation of jars (as expected as this step is automated and version controlled). During developing the feature AS_JSON_DOC he never observed such an exception (development was done on exasol v7.0.3).

Are there any known issues with v7.0.4?
Why is it the VM crashing?
What do i have to do to make this work?

Thank you in advance

๐Ÿ” CVE-2024-27309: org.apache.kafka:kafka-server-common:jar:3.6.0:test

Summary

While an Apache Kafka cluster is being migrated from ZooKeeper mode to KRaft mode, in some cases ACLs will not be correctly enforced.

Two preconditions are needed to trigger the bug:

  1. The administrator decides to remove an ACL
  2. The resource associated with the removed ACL continues to have two or more other ACLs associated with it after the removal.

When those two preconditions are met, Kafka will treat the resource as if it had only one ACL associated with it after the removal, rather than the two or more that would be correct.

The incorrect condition is cleared by removing all brokers in ZK mode, or by adding a new ACL to the affected resource. Once the migration is completed, there is no metadata loss (the ACLs all remain).

The full impact depends on the ACLs in use. If only ALLOW ACLs were configured during the migration, the impact would be limited to availability impact. if DENY ACLs were configured, the impact could include confidentiality and integrity impact depending on the ACLs configured, as the DENY ACLs might be ignored due to this vulnerability during the migration period.

CVE: CVE-2024-27309
CWE: CWE-863

References

๐Ÿ” CVE-2024-22201: org.eclipse.jetty.http2:http2-common:jar:9.4.53.v20231009:test

Summary

Jetty is a Java based web server and servlet engine. An HTTP/2 SSL connection that is established and TCP congested will be leaked when it times out. An attacker can cause many connections to end up in this state, and the server may run out of file descriptors, eventually causing the server to stop accepting new connections from valid clients. The vulnerability is patched in 9.4.54, 10.0.20, 11.0.20, and 12.0.6.

CVE: CVE-2024-22201
CWE: CWE-400

References

Uploaded jar file and checksum mismatches

Situation

There is a mismatch between uploaded jar file and sha256sum file. The uploaded jar is the stripped one, but checksum file is for regular jar.

Acceptance Criteria

  • Correctly upload jar and checksum file

Rename UDF Java class names

Situation

Acceptance Criteria

  • UDF Script names are changed
  • UDF referenced Java class names are changed to more descriptive names
  • Documentation is updated

Kafka consumer fails to break polling loop when it has to reset the offset, but can't read any new records

If we try to import from a Kafka topic, using CONSUME_ALL_OFFSETS = 'true', that has had messages in it but that were all expired through eg. retention time, we will be stuck in a loop (at least until some new messages appear).

This may sound reasonable (waiting for messages), but in our case we're trying to set up a scheduled import from a pre-production topic, which may or may not get new messages on any given day (or week). We just want the import to bail out after the first poll attempt if there's nothing to be found.

Here's what happens, more or less:

  1. We start with current offset = 0 (start offset), as table in Exasol is empty
  2. We do poll, consumer returns empty records
  3. And meanwhile consumer resets offset to the topic start offset (which is = end offset in this case due to expired records)
  4. Since empty records returned, we compare our current offset with -1 and 0 (start offset) and stay again on 0
  5. And loop back to (2)

Check for SCHEMA_REGISTRY_URL during IMPORT fails to recognize connection config

I'm defining a Kafka connection like this:

CREATE OR REPLACE CONNECTION MY_CONNECTION TO '' USER ''
IDENTIFIED BY 'BOOTSTRAP_SERVERS=<host:port>;SCHEMA_REGISTRY_URL=<url>;SSL_ENABLED=true;...';

Then run imports with only the connection as parameter:

IMPORT INTO my_schema.my_table (json_doc_col, kafka_partition, kafka_offset)
FROM SCRIPT ETL.KAFKA_CONSUMER WITH
  CONNECTION_NAME         = 'MY_CONNECTION'
  TOPIC_NAME              = 'my.topic'
  TABLE_NAME              = 'MY_SCHEMA.MY_TABLE'
  AS_JSON_DOC             = 'true'
;

As of the 1.0 release, though, I get an error when doing this:

VM error: F-UDF-CL-LIB-1127: F-UDF-CL-SL-JAVA-1002: F-UDF-CL-SL-JAVA-1013: 
  com.exasol.ExaUDFException: F-UDF-CL-SL-JAVA-1080: Exception during run 
  com.exasol.cloudetl.kafka.KafkaConnectorException: SCHEMA_REGISTRY_URL must be provided for record type 'avro'
  com.exasol.cloudetl.kafka.deserialization.AvroDeserialization$.getSingleColumnJsonDeserializer(AvroDeserialization.scala:29)
  com.exasol.cloudetl.kafka.KafkaTopicDataImporter$.run(KafkaTopicDataImporter.scala:52)
  com.exasol.cloudetl.kafka.KafkaTopicDataImporter.run(KafkaTopicDataImporter.scala)
  com.exasol.ExaWrapper.run(ExaWrapper.java:196)

If I add the SCHEMA_REGISTRY_URL parameter to the IMPORT itself, too, it works (so a workaround is available).

Improve integration tests

Situation

Since moving to the Scala 2.12 version, it is now possible to use the Java 11 environment. Thus, Exasol Testcontainers should be used for integration tests.

Acceptance Criteria

  • Find similar docker based test containers for Kafka stack (zookeeper, kafka, schema registry)
  • Add docker based integration tests for data import
  • Obtain coverage results from docker containers (similar setup as in DynamoDB VS)

๐Ÿ” CVE-2024-23080: joda-time:joda-time:jar:2.10.8:test

๐Ÿ” CVE-2024-25710: org.apache.commons:commons-compress:jar:1.21:compile

Summary

Loop with Unreachable Exit Condition ('Infinite Loop') vulnerability in Apache Commons Compress.This issue affects Apache Commons Compress: from 1.3 through 1.25.0.

Users are recommended to upgrade to version 1.26.0 which fixes the issue.

CVE: CVE-2024-25710
CWE: CWE-835

References

Add option for consuming ALL events as of start-of-import

I'd like to be able to simply fetch all events from a topic, up until the latest offset in each partition that existed when the IMPORT started.

This option should override any record count triggers etc, and fully catch up on the topic from where the last import left off (or from the beginning of the topic if no previous data exists in the target table).

Add support for extension manager

In order to support the extension mananger we need to add an extension that manages the Kafka connector.

Steps for installation

See user guide:

  • Upload adapter jar to bucketfs โœ…
  • Create three UDF scripts โœ…
  • Create Exasol table with columns KAFKA_PARTITION DECIMAL(18, 0), KAFKA_OFFSET DECIMAL(36, 0) โŒ
  • Run import with IMPORT INTO <schema_name>.<table_name> FROM SCRIPT KAFKA_CONSUMER WITH ... โŒ
  • If required: create connection with IDENTIFIED BY 'SSL_KEY_PASSWORD=<PASSWORD>;SSL_KEYSTORE_PASSWORD=<SSLPASSWORD>;SSL_KEYSTORE_LOCATION=/buckets/bfsdefault/<BUCKET>/keystore.jks;SSL_TRUSTSTORE_PASSWORD=<TRUSTSTOREPASS>;SSL_TRUSTSTORE_LOCATION=/buckets/bfsdefault/<BUCKET>/truststore.jks' โŒ

Migrate to Github Actions

Situation

We are moving our build infrastructure to Github Actions. This repository should be also build in GHA.

Acceptance Criteria

  • Updated build files
  • Added sonar settings
  • Updated readme badges

๐Ÿ” CVE-2024-27309: org.apache.kafka:kafka-clients:jar:3.6.0:compile

Summary

While an Apache Kafka cluster is being migrated from ZooKeeper mode to KRaft mode, in some cases ACLs will not be correctly enforced.

Two preconditions are needed to trigger the bug:

  1. The administrator decides to remove an ACL
  2. The resource associated with the removed ACL continues to have two or more other ACLs associated with it after the removal.

When those two preconditions are met, Kafka will treat the resource as if it had only one ACL associated with it after the removal, rather than the two or more that would be correct.

The incorrect condition is cleared by removing all brokers in ZK mode, or by adding a new ACL to the affected resource. Once the migration is completed, there is no metadata loss (the ACLs all remain).

The full impact depends on the ACLs in use. If only ALLOW ACLs were configured during the migration, the impact would be limited to availability impact. if DENY ACLs were configured, the impact could include confidentiality and integrity impact depending on the ACLs configured, as the DENY ACLs might be ignored due to this vulnerability during the migration period.

CVE: CVE-2024-27309
CWE: CWE-863

References

Secure SSL credentials should be provided from connection object

Situation

Currently, users can provide SSL enabled properties (SSL key pasword, keystore location) using key value parameters and Exasol named connection object.

They should be only provided using connection object.

Acceptance Criteria

  • Remove user parameters and only accept the connection object
  • Update the user guide

Update documentation

Acceptance Criteria

  • Move the Kafka documentation from other repository
  • Update the links
  • Update user guide

Add latest common library

It includes parsing Avro nested and logical types.

Acceptance Criteria

  • Library version updated
  • User guide with Avro datatype mapping description added

Import values from two level nested Avro records

Situation

Sometimes Avro records are encoded using top level record, for example payload, with all the data fields. Currently, these records are imported into a single table column as JSON strings.

This is usual Avro record in Kafka (in JSON representation)

{
  "type": "record",
  "name": "KafkaExasolAvroRecord",
  "fields": [
    { "name": "product", "type": "string" },
    { "name": "price", "type": { "type": "bytes", "precision": 4, "scale": 2, "logicalType": "decimal" }}
    { "name": "sale_time", "type": { "type": "long", "logicalType": "timestamp-millis" }}
  ]
}

which could map to this Exasol table:

CREATE OR REPLACE TABLE <schema_name>.<table_name> (
    PRODUCT     VARCHAR(500),
    PRICE       DECIMAL(4, 2),
    SALE_TIME   TIMESTAMP,

    KAFKA_PARTITION DECIMAL(18, 0),
    KAFKA_OFFSET DECIMAL(36, 0)
);

the last two columns are only for metadata keeping.

But unfortunately, sometimes data is stored with one more top level field, as payload.

{
   "type":"record",
   "name":"KafkaExasolAvroRecord",
   "fields":[
      {  "name":"payload",
         "type":{
            "name": "PayloadRecord",
            "type": "record",
            "fields": [
              { "name":"product", "type":"string" },
              { "name":"price", "type":{ "type":"bytes", "precision":4, "scale":2, "logicalType":"decimal" }}
              { "name":"sale_time", "type":{ "type":"long", "logicalType":"timestamp-millis" }}
            ]
         }
      }
   ]
}

This will map to single column in an Exasol table and imported into VARCHAR column as JSON string. It is still possible to use Exasol JSON functions to extract fields. However, it can cause issues if JSON string exceeds 2M characters which limit of Exasol VARCHAR type.

It would be nice feature to be able to specify the nested fields to be imported as separate columns.

Acceptance Criteria

  • Added field specification for second level fields.
  • Unified field specifications for JSON and Avro
  • Unit and integration tests are added for the feature

Convert Kafka long values to timestamps

Situation

Kafka stored longs should be converted to timestamps if destination column has timestamp type.

Acceptance Criteria

  • Longs converted to timestamps

Add ability to access all fields from a kafka record

Hi,

currently the table contains only the record value plus the offset and partition of the record. There are at least two core elements missing (key and timestamp) and there is also headers available.

  • The key can be very important because often, deletions in the source system are tracked with a key containing the entity id and a null record value - this is valueable information
  • For the JSON-Case it can be helpful to have a separate key that can be used without unwrapping the json
  • The timestamp from the record is very valuable information to restrict queries on the table (e.g. for further daily processing) to a certain amount of time. Maybe this driven a little bit by the JSON code path because when working with avro records one would expect to have a true timestamp in the record, but nonetheless, the timestamp is a first level citizen in kafka records.

First I started with a boolean config setting EMIT_RECORD_KEY-Setting but it feels wrong to have EMIT_RECORD_KEY and EMIT_RECORD_TIMESTAMPS flags because the table DDL would be hard to figure out.

The big solution would be to somehow be able to specify something with template variables:

ADDITIONAL_FIELDS=${partition}, ${offset}, ${timestamp}, ${key}, ${headers.sourceSystem}

(being able to remove the necessary partition and offset fields makes no sense, just for clarification)

Greetings,
Johannes

๐Ÿ” CVE-2023-6378: pkg:maven/ch.qos.logback/[email protected]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.