Code Monkey home page Code Monkey logo

clickhouse-kafka-connect's Introduction

ClickHouse Kafka Connect Sink

About

clickhouse-kafka-connect is the official Kafka Connect sink connector for ClickHouse.

The Kafka connector delivers data from a Kafka topic to a ClickHouse table.

Documentation

See the ClickHouse website for the full documentation entry.

Design

For a full overview of the design and how exactly-once delivery semantics are achieved, see the design document.

Help

For additional help, please file an issue in the repository or raise a question in ClickHouse public Slack.

clickhouse-kafka-connect's People

Contributors

abdelhakimbendjabeur avatar abhishekgahlot2 avatar actions-user avatar alexshanabrook avatar arnay-allen avatar askmyhat avatar auvred avatar blinkov avatar ddtkey avatar dependabot[bot] avatar genzgd avatar gingerwizard avatar hamsterready avatar harpaj avatar hekike avatar jirislav avatar junegunn avatar mcascallares avatar mlivirov avatar mshustov avatar mzitnik avatar ne1r0n avatar nicolae-gorgias avatar paultagoras avatar raphala avatar ryado avatar sanadhis avatar tiakon avatar ygrzjh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

clickhouse-kafka-connect's Issues

Cloud integration test

Currently, the integration tests only test reading from a single topic.
We would also create a test that is reading from multi topics to multi tables

Connect Monitoring infrastructure

Since Kafka Connect is a JVM application, we can monitor the Connect in a standard way via JMX

We would like to build infrastructure that will help us quickly expose different applicative metrics.

First candidates for metrics implementation

  • Total inserts/failures to ClickHouse
  • Inserts/failures rate

Already build-in metrics
https://docs.confluent.io/platform/current/connect/monitoring.html
Connect Monitoring
https://kafka.apache.org/documentation.html#connect_monitoring

DB::Exception: KeeperMap is disabled because 'keeper_map_path_prefix' config is not defined. (BAD_ARGUMENTS)

Hi
I have install kafka connect (https://github.com/ClickHouse/clickhouse-kafka-connect/releases/download/v0.0.8-beta/clickhouse-kafka-connect-0.0.8.zip)

and configured my (connect-sink.properties) file as following:

name = kafka-sink
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
bootstrap.servers=localhost:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.sink
rest.port=10082
rest.host.name=localhost
rest.advertised.port=10082
rest.advertised.host.name=localhost

key.converter=org.apache.kafka.connect.storage.StringConverter

tasks.max=1
topics=mysql1.myDB.test
#ssl=true
#security.protocol=SSL
hostname=localhost
database=myDB
username=default
password=P#ssw0rd
#ssl.truststore.location=/tmp/kafka.client.truststore.jks
port=8123
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=true
schemas.enable=false

when I try to start the kafka standalone to sink kafka into clickhouse I get this error:

[2023-02-13 08:10:57,267] WARN [kafka-sink|task-0] Query retry 1 out of 3 (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient:101)
com.clickhouse.client.ClickHouseException: Code: 36. DB::Exception: KeeperMap is disabled because 'keeper_map_path_prefix' config is not defined. (BAD_ARGUMENTS) (version 23.1.3.5 (official build))
, server ClickHouseNode [uri=http://localhost:8123/myDB]
        at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:148)
        at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:291)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Code: 36. DB::Exception: KeeperMap is disabled because 'keeper_map_path_prefix' config is not defined. (BAD_ARGUMENTS) (version 23.1.3.5 (official build))

        at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:187)
        at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:278)
        at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:101)
        at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:156)
        at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:289)
        ... 4 more

any idea what I need to do to fix this issue ?

Exactly once guarantee

Is semantics not respected or am I wrong?

  1. Process 1 read state
    StateRecord stateRecord = stateProvider.getStateRecord(topic, partition);
  2. Process 1 stuck on
    LOGGER.debug(String.format("before drop %d after drop %d state %s", bpBeforeDrop, bpAfterDrop, stateRecord.getOverLappingState(rangeContainer)));
  3. Process 2 started, and read state
    StateRecord stateRecord = stateProvider.getStateRecord(topic, partition);
  4. Process 2 processing new blocks above replicated_deduplication_window
  5. Process 1 recovered and do insert
  6. Inserted data duplicated

This could have been avoided if the insertion and modification of the state had been atomic together.

Exception when running multiple connect instances `state is not synced for topic [ ] partition [ ]`

Describe the bug

We're running the sink connector inside k8s.
When we increase the number of worker instances, all the tasks are crashing w/ this exception:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
...
Caused by: java.lang.RuntimeException: state is not synced for topic [events] partition [2]
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:165)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)

Steps to reproduce

  1. Create a docker image like:
FROM --platform=linux/arm64 confluentinc/cp-kafka-connect:7.4.0
ENV CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt clickhouse/clickhouse-kafka-connect:0.0.10-beta
  1. Deploy to k8s
  2. Configure the worker using the config:
{
  "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
  "tasks.max": "1",
  "topics": "events",
  "key.ignore": "true",
  "ssl": "true",
  "hostname": "<redacted>",
  "database": "<redacted>",
  "password": "<redacted>",
  "port": "8443",
  "consumer.override.max.poll.records": "100000",
  "value.converter.schemas.enable": "false",
  "consumer.override.max.partition.fetch.bytes": "102400000",
  "exactlyOnce": "true",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "username": "default"
}
  1. Scale up the deployment, to more than 1 instances

Expected behaviour

Error log

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: state is not synced for topic [events] partition [2]
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:165)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
	... 11 more

Configuration

Environment

  • Kafka-Connect version: 7.4.0
  • Kafka Connect configuration: default configs
  • Kafka version: -
  • Kafka environment: cloud (Confluent)
  • OS: -

ClickHouse server

  • ClickHouse Server version: 23.3 (cloud)
  • ClickHouse Server non-default settings, if any:

validate exactly-once works correctly

To officially release exactly once.
We need to test the connector with extreme conditions.

One of the approaches to achieve this is to use the jepsen framework.

A few mandatory scenarios

  • Run a multi-instance Kafka Connect Sink
  • Topic needs to have at least three partition
  • Run ClickHouse as multi-node
  • Crush/Stop Kafka Connect Sink randomly

For the test itself
Push about 10M Records to Kafka using trips dataset
The test should be run for at least 10 min. This can be configured with consumer.override.max.poll.records.
Test with a local & cloud version of ClickHouse.

Update Version Number

We should automatically increment the version number when we release a new version - it'll make updating less troublesome (since it would be folded into the PR for whatever issue is being solved).

Index 0 out of bounds for length 0 on empty arrays

The connector fails with an exception on sink empty array because of this check.

Traceback:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
	at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
	at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
	at java.base/java.util.Objects.checkIndex(Objects.java:372)
	at java.base/java.util.ArrayList.get(ArrayList.java:459)
	at com.clickhouse.kafka.connect.sink.data.StructToJsonMap.toJsonMap(StructToJsonMap.java:58)
	at com.clickhouse.kafka.connect.sink.data.convert.SchemaRecordConvertor.convert(SchemaRecordConvertor.java:25)
	at com.clickhouse.kafka.connect.sink.data.Record.convert(Record.java:78)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.lambda$put$0(ProxySinkTask.java:73)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:74)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more

Workaround: use nullable or send at least one element in the array

Small Issues from pre-beta testing

  • I couldn't find an actual zip file created by the ./gradlew command. I eventually got it to work by copying the full contents of the confluentArchive directory to ~/confluent-7.2.2/share/confluent-hub-components
  • In the Confluent Control Center UI, there are dots at the end of the labels for the configuration values
  • Similarly, there is an * after the hostname label in the Confluent Control Center UI

Exception when try to save kafka avro data into clickhouse

i tried to use this connector to save kafka topic data into clickhouse with kafka connector, encountered below exceptions.
Caused by: java.lang.NullPointerException\n\tat com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.validateDataSchema(ClickHouseWriter.java:150)\n\tat com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:266)\n\tat com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)\n\tat com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)\n\tat com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)\n\tat com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)\n\tat com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)

kafka-avro-console-consumer can consume the data from the topic, and the data shows like
{"entity":"Random1","v":0.5467754725010544}

my kafka connector config is
{ "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector", "tasks.max": "1", "topics": "simulation", "ssl": "false", "hostname": "clickhouse-db", "database": "sampledatabase", "port": "8123", "exactlyOnce": "false", "username": "default", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter.schema.registry.url": "http://schema-registry:8081" }

Did i miss something for the whole solution?

Implement load testing

Implement load testing to understand the performance limitations of the solution and how different settings affect it.

  • Define the types of workloads to simulate, and the performance metrics to measure.
  • Create a load-testing plan
  • Setup the test environment
  • Collect data for further analysis

"NullPointerException when processing data with missing values"

Describe the bug

A NullPointerException occurs in the SchemalessRecordConvertor when processing data from the Kafka topic with ClickHouse Kafka Connect Sink. This exception is caused by a missing value in the data, which the connector does not handle properly.

Steps to reproduce

1.Set up a Kafka Connect instance with ClickHouse Kafka Connect Sink.
2.Configure the connector to use SchemalessRecordConvertor.
3.Publish data to the Kafka topic that includes null or missing values.

Expected behaviour

The ClickHouse Kafka Connect Sink should handle missing or null values properly and not throw a NullPointerException during data processing.

Error log

[2023-04-05 05:30:04,093] ERROR WorkerSinkTask{id=clickhouse-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
...
Caused by: java.lang.NullPointerException
        at com.clickhouse.kafka.connect.sink.data.convert.SchemalessRecordConvertor.lambda$convert$0(SchemalessRecordConvertor.java:33)
        at java.base/java.util.HashMap.forEach(HashMap.java:1337)
        at com.clickhouse.kafka.connect.sink.data.convert.SchemalessRecordConvertor.convert(SchemalessRecordConvertor.java:31)
...

Configuration

Environment

  • Kafka-Connect version: 2.6.1
  • Kafka version: - 2.6.1
  • Kafka environment: standalone
  • OS: Ubuntu

ClickHouse server

  • ClickHouse Server version: 22.3

NullPointerException when there are some null columns in the schema

Describe the bug

Clickhouse-kafka-connect fails with NullPointerException when there are some null columns in the schema.

Full traceback:

java.lang.NullPointerException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWritePrimitive(ClickHouseWriter.java:174)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteCol(ClickHouseWriter.java:210)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:291)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
java.lang.RuntimeException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:307)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[2023-02-24 15:52:59,493] ERROR [tracking-events-clickhouse-sink|task-2]  WorkerSinkTask{id=tracking-events-clickhouse-sink-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: null (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
java.lang.RuntimeException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:312)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[2023-02-24 15:52:59,497] ERROR [tracking-events-clickhouse-sink|task-2]  WorkerSinkTask{id=tracking-events-clickhouse-sink-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:312)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	... 10 more

Steps to reproduce

  1. Create a table with Nullable columns
  2. Declare a schema that supports null values
  3. Produce a message with null value

Expected behavior

Insert null values if the table and schema support null values.

The solution should be similar to #55 for schemaless.

Workaround

Don't use Nullable columns according to best practices

Optimize inserts to ClickHouse

Currently in the POC stage

We have implemented the inserts as string base inserts we would like to change them to a more efficient format taking into consideration.

  • Memory footprint
  • Fewer data conversions

Currently, we are working directly with ClickHouse HTTP protocol not using the JDBC API

Single steam with multiple destination dbs

We have a multi-tenant environment where certain events are streamed along the same topic but need to be sinked into different databases. Example events

{
  "userName": "someUser",
  "tenantId": "tenantA"
}

we are required to store all tenant information into their own database. We would like to do this dynamically and without having to create a new connector per tenant to reduce onboarding of new tenants and maintainence.

AWS MSK Connect support

It would be great to be able to use this plugin with AWS MSK Connect. It currently uses Kafka Connect 2.7.1 and this connector requires 3.0+. Is it possible to ease version requirements?

Accent characters (and emojis) are all replaced with '?'

Hello, we have run into an issue with non-ascii characters (not sure of full extent of impacted chars, but we have definitely seen this behavior with accented chars and emojis)

It seems like something is converting these charaters from utf-8 to '?' literals somewhere in the sink process (not sure but the gson encoder might be suspect?)


Example:

This message in kafka gets converted to this row in the target clickhouse table:

image

Kafka message value plain-text
{
	"event_day": 2,
	"event_month": 2,
	"event_year": 2023,
	"hist_trait_slots": [
		-1
	],
	"hist_trait_vals": [
		""
	],
	"object_configuration_id": "kris-test-obj-🐤",
	"pocus_id": "kris-accent-test-3",
	"refreshed_at": "2023-02-02 18:10:47",
	"trait_slots": [
		0
	],
	"trait_vals": [
		"I am a string with accents: à á â ã ä ç è é ê ë ì í î ï ñ ò ó ô õ ö š ù ú û ü ý ÿ ž À Á Â Ã Ä Ç È É Ê Ë Ì Í Î Ï Ñ Ò Ó Ô Õ Ö Š Ú Û Ü Ù Ý Ÿ Ž Ÿ Ž 😀 🙀 🐤"
	],
	"workspace_id": "kris-test-workspace-😀"
}

Value added to the target clickhouse table

pocus_id          |workspace_id         |object_configuration_id|event_year|event_month|event_day|refreshed_at       |trait_slots|trait_vals                                                                                                                                               |hist_trait_slots|hist_trait_vals|
------------------+---------------------+-----------------------+----------+-----------+---------+-------------------+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+---------------+
kris-accent-test-3|kris-test-workspace-?|kris-test-obj-?        |      2023|          2|        2|2023-02-02 18:10:47|[0]        |['I am a string with accents: ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?']|[-1]            |['']           |

We confirmed that we can manually insert as JSONEachRow into the table and emojis and accents are preserved:

Manual Insert and results

Insert

insert into pocus_development.`pocus.cdc.structured-trait-updates.v0` format JSONEachRow
{
	"event_day": 2,
	"event_month": 2,
	"event_year": 2023,
	"hist_trait_slots": [
		-1
	],
	"hist_trait_vals": [
		""
	],
	"object_configuration_id": "kris-test-obj-🐤",
	"pocus_id": "kris-insert-test-1",
	"refreshed_at": "2023-02-02 18:10:47",
	"trait_slots": [
		0
	],
	"trait_vals": [
		"I am a string with accents: à á â ã ä ç è é ê ë ì í î ï ñ ò ó ô õ ö š ù ú û ü ý ÿ ž À Á Â Ã Ä Ç È É Ê Ë Ì Í Î Ï Ñ Ò Ó Ô Õ Ö Š Ú Û Ü Ù Ý Ÿ Ž Ÿ Ž 😀 🙀 🐤"
	],
	"workspace_id": "kris-test-workspace-😀"
}

Results

pocus_id          |workspace_id          |object_configuration_id|event_year|event_month|event_day|refreshed_at       |trait_slots|trait_vals                                                                                                                                                  |hist_trait_slots|hist_trait_vals|
------------------+----------------------+-----------------------+----------+-----------+---------+-------------------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+---------------+
kris-insert-test-1|kris-test-workspace-😀|kris-test-obj-🐤       |      2023|          2|        2|2023-02-02 18:10:47|[0]        |['I am a string with accents: à á â ã ä ç è é ê ë ì í î ï ñ ò ó ô õ ö š ù ú û ü ý   ž À Á Â Ã Ä Ç È É Ê Ë Ì Í Î Ï Ñ Ò Ó Ô Õ Ö Š Ú Û Ü Ù Ý Ÿ Ž Ÿ Ž 😀 🙀 🐤']|[-1]            |['']           |

Additional details:

Table DDL
CREATE TABLE pocus_development.`pocus.cdc.structured-trait-updates.v0`
(

    `pocus_id` String,

    `workspace_id` String,

    `object_configuration_id` String,

    `event_year` Int32,

    `event_month` Int32,

    `event_day` Int32,

    `refreshed_at` DateTime,

    `trait_slots` Array(Int32),

    `trait_vals` Array(String),

    `hist_trait_slots` Array(Int32),

    `hist_trait_vals` Array(String)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}',
 '{replica}')
PRIMARY KEY pocus_id
ORDER BY pocus_id
SETTINGS index_granularity = 8192;
Clickhouse Sink Connector Config
  config:
    hostname: <redacted>
    port: <redacted>
    username: <redacted>
    password: <redacted>
    database: pocus_development
    topics: "pocus.cdc.structured-trait-updates.v0"
    key.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false

Provide documentation

https://clickhouse.com/docs/en/integrations/kafka/self-managed/connect-sink contains some basic information that must be extended with additional details.

Add documentation on the company website:

  • Requirements for the environment
  • Version compatibility matrix
  • Support policy
  • Installation instructions.
    • self-managed
    • on the Confluent platform
    • on Confluent Cloud
  • Configuration options. List options
  • Provide usage examples
    • exactly once semantic. In-memory vs KeeperMap
    • multiple topics
  • Advanced usage:
    • supported Data types
    • Logging
    • Monitoring
    • Usage with MSK
    • [GA] Scaling
  • Architecture
    • overview
    • exactly once implementation
  • Limitations

Provide CONTRIBUTING doc:

  • Requirements for the dev environment
  • How to run tests

Special characters in table name cause exception

2023-01-14 00:17:35,842 ERROR [clickhouse-sink.cdc.structured-trait-updates.v0|task-0] Got exception when running DESCRIBE TABLE default.pocus.cdc.structured-trait-updates.v0 (com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient) [task-thread-clickhouse-sink.cdc.structured-trait-updates.v0-0]
com.clickhouse.client.ClickHouseException: Code: 62. DB::Exception: Syntax error: failed at position 44 ('-'): -trait-updates.v0. Expected one of: token, Dot. (SYNTAX_ERROR) (version 22.9.3.18 (official build))
, server ClickHouseNode [uri=http://clickhouse:8123/default]
	at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:148)
	at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:291)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Code: 62. DB::Exception: Syntax error: failed at position 44 ('-'): -trait-updates.v0. Expected one of: token, Dot. (SYNTAX_ERROR) (version 22.9.3.18 (official build))

	at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:187)
	at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:278)
	at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:101)
	at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:156)
	at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:289)
	... 4 more

It looks like the DESCRIBE TABLE call is not back-quoting the table name, which is causing a SQL syntax error when the table name is not strictly alphanumeric.

Configurable serialisation format for keys

During BYOC testing, we discovered connector fails on keys containing invalid json (a simple string, for example).
Kafka Connect takes a default converter configuration at the worker level. We should allow users to specify the serialization format for both keys and values in the sink connector configuration explicitly.

Format/etc write options

Is your feature request related to a problem? Please describe.
I need to provide necessary options in order to write the JSON row into ClickHouse correctly. One example could be "input_format_import_nested_json=1".

Describe the solution you'd like
I'd expect to have a CSV parameters string in Connector configuration file.

Describe alternatives you've considered

  • modifying messages with SMT
  • changing table schema
  • changing JSONRow format
    but that's not always an option. I can see scenarios when problem is unsolvable due to legacy limitations.

Additional context
None.

Caused by: java.net.SocketException: Unexpected end of file from server

Describe the bug

This error happened hours after I had started this process with no other visible issues. This causes a crash afterwards. I am using these settings:

tasks.max=4
topics=ethereum_blocks,ethereum_logs,ethereum_transactions,ethereum_traces
ssl=false
hostname=
database=ethereum
password=
port=8123
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=false
username=
schemas.enable=false

Steps to reproduce

  1. Start kafka connect with the clickhouse connector and wait a few hours

Expected behaviour

Error log

Configuration

Environment

  • Kafka-Connect version:
  • Kafka Connect configuration:
  • Kafka version:
  • Kafka environment:
  • OS:

ClickHouse server

ClickHouse server version 23.3.1 revision 54462

CREATE TABLE ethereum.ethereum_traces
(
    type LowCardinality(String),
    transaction_index UInt16,
    from_address String,
    to_address String,
    value Decimal(38, 0),
    input String,
    output String,
    trace_type Enum8('call' = 1, 'create' = 2, 'suicide' = 3, 'reward' = 4, 'genesis' = 5, 'daofork' = 6),
    call_type Enum8('' = 0, 'call' = 1, 'callcode' = 2, 'delegatecall' = 3, 'staticcall' = 4),
    reward_type Enum8('' = 0, 'block' = 1, 'uncle' = 2),
    gas UInt32 CODEC(Delta(4), ZSTD(1)),
    gas_used UInt32,
    subtraces UInt32,
    trace_address Array(UInt32),
    error String,
    status Bool,
    transaction_hash String,
    block_number UInt64,
    trace_id String,
    trace_index UInt16,
    block_timestamp UInt64,
    block_hash String,
    item_id String,
    item_timestamp String,
    chain_id UInt32
)
ENGINE = MergeTree
ORDER BY (trace_type, block_number, block_timestamp)
TTL now() + INTERVAL 1 DAY

Implement Retry tests

We have developed a retry mechanism for our Kafka connect.
It uses the built-in mechanism of the Kafka Connect framework

Tests in the implementation are missing.

Add sslrootcert host url option

That's not allowed at the moment and it is required in the cloud I'm using cloud. The only thing which stops me from migrating from jdbc sink

The parameter needs to me added in the same manner 'ssl' is appended in current version

Support for Schemaless

When retrieving a SinkRecord from the Connect API the value type can be

  • Struct -> Schema with types
  • Map -> Map<String,Objects> without types
  • String -> unhandled
  • bytes[] -> unhandled

Currently supporting the Map type is partial we would like to refactor a more complete and stable solution

Config error connecting to clickhouse on port 80

Hello, we are trying to connect to our clickhouse cluster via chproxy which we have exposed on port 80.

However when we try to do that, we are seeing the following error:

        PUT /connectors/clickhouse-sink.cdc.structured-trait-updates.v0/config
        returned 400 (Bad Request): Connector configuration is invalid and
        contains the following 1 error(s):

        Invalid value 80 for configuration port: Value must be at least 1024

        You can also find the above list of errors at the endpoint
        `/connector-plugins/{connectorType}/config/validate`

Seems like the port number is restricted here:

Is there a reason for these bounds or can the range be expanded?

In meantime we will build from source, bumping that range from 1to Integer.MAX_VALUE

Thanks!

Kafka Connect basic Validation and Full configuration for Connection

Getting Connect Configuration from External Source

We need to validate all incoming params to the connect

  • ClickHouse timeout - default is 30 check that is not negative and less than 10min
  • ClickHouse port - default HTTP port needs to be in unsigned int ranges
  • ClickHouse database name check that we have valid characters

Support Delete mode

@cwurm requested support for tombstone messages to delete records from the storage.
The use case is essential for customers sharing the same message pipeline among several DB and expecting records to be removed from every destination.

Depends on lightweight deletes ClickHouse/ClickHouse#42126

How to insert DateTime values?

I need to insert a unix timestamp as a DateTime.
Kafka Connect does not have a DateTime value. How should you insert a DateTime value using the Clickhouse connector?

One way would be to insert string/int/long timestamps:

create table `dev.thilo.timestamp_str` (timestamp DateTime) ENGINE = TinyLog;

insert into `dev.thilo.timestamp_str` (timestamp) values ('1676283600');

SELECT * FROM `dev.thilo.timestamp_str`

Query id: f247748f-0178-47b9-a25d-fb9e06071f7e

┌───────────timestamp─┐
│ 2023-02-13 10:20:00 │
└─────────────────────┘

However this does not work with the Clickhouse connector, the following error is logged when trying to map a string/int field into a DateTime column:

loc_clickhouse_sink  | [2023-02-13 10:15:23,434] ERROR [clickhouse-sink-13|task-1] Table column name [timestamp] type [NONE] is not matching data column type [STRING] (com.clickhouse.kafka.connect.sink.db.ClickHouseWriter:154)

The connector could allow mapping ints/longs/string to DateTime columns. What are your thoughts on this?

Workaround: I'm aware that with the current version of the connector (v0.0.8) I can use MATERIALIZED columns to map timestamps to a DateTime column:

create table `dev.thilo.materialize_test` (timestamp Int64, date DateTime MATERIALIZED toDateTime(timestamp), also_timestamp Int64 ALIAS timestamp) ENGINE = TinyLog;

More details:

Full Error:

loc_clickhouse_sink  | [2023-02-13 10:15:23,434] ERROR [clickhouse-sink-13|task-1] Table column name [timestamp] type [NONE] is not matching data column type [STRING] (com.clickhouse.kafka.connect.sink.db.ClickHouseWriter:154)
...
loc_clickhouse_sink  | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
loc_clickhouse_sink  |  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
loc_clickhouse_sink  |  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
loc_clickhouse_sink  |  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
loc_clickhouse_sink  |  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
loc_clickhouse_sink  |  at java.base/java.lang.Thread.run(Thread.java:829)
loc_clickhouse_sink  | Caused by: java.lang.RuntimeException
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:267)
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:128)
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
loc_clickhouse_sink  |  at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
loc_clickhouse_sink  |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)

Avro schema:

{
  "schema": {
    "type": "record",
    "name": "timestamp_str",
    "namespace": "dev.thilo",
    "fields": [
      {
        "name": "timestamp",
        "type": "string"
      }
    ]
  }
}

Connector:

{
    "name": "clickhouse-sink",
    "config": {
        "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
        "topics": "dev.thilo.timestamp_str",
        "exactlyOnce": false,
        "hostname": "clickhouse",
        "port": "8123",
        "database": "default",
        "ssl": false,
        "username": "default",
        "tasks.max": "2",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
}

Schema Changes and Exactly Once

Having read the design doc and the source code I'm not sure I understand how schema changes interact with exactly-once'ness, e.g. consider a situation when the sink writes BEGIN, submits a batch and I forcefully kill it (or it crashes) before writing END. In the mean time I introduce an SMT, e.g. I start dropping some field in the source data and restart the connector. In this case, if I understand correctly, a batch would get formed with the same exact offsets, however with a different schema and thus a different hash, meaning it would not get deduplicated in ClickHouse?

If that's the case, would it make sense to store and compare hashes in the state store and fail if they do not match?

Add support for proxy server

We are trying to send data from an on-premise Kafka Connect cluster to a Clickhouse Cloud environment. Our infrastructure requires us to use a proxy server.
This cannot be configured in the connector at the moment

Local Cluster integration test

Our goal is to create a ClickHouse local cluster for Integration tests.

  • Spin Kafka Confluent Platform / Amazon MSK
  • Spin ClickHouse Cluster
  • Tests
    • single topic
    • multitopic
  • schema validation

Schema insert doesn't handle missing fields when a column has default value

Describe the bug

I am trying to remove a field from the schema registry. After successfully deploying my changes, I attempted to remove the corresponding column from ClickHouse. I expected that once the field was missing and a new insert occurred, it would be populated with its default value. However, the connector crashed instead.

Steps to reproduce

  1. Create a table with two NOT NULL columns and with a default value
  2. Declare a schema that contains only one field
  3. Try to insert a row

Expected behavior

The missing field should be populated with his default value, like for Nullable columns. Not sure about the correct approach here, probably populating with default value should be on Clickhouse side then we can just send null for columns which have default value but there are missing in the schema.

Error log

[2023-04-17 15:26:06,519] ERROR [tracking-events-clickhouse-sink|task-1]  Record is missing field integrationId (com.clickhouse.kafka.connect.sink.db.ClickHouseWriter:320)
java.lang.RuntimeException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteCol(ClickHouseWriter.java:321)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:366)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:131)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
java.lang.RuntimeException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:385)
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:131)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:45)
	at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:102)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:80)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:60)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Configuration

Example of a table:

create table test_table ON CLUSTER '{cluster}'
(
    accountId           Int64,
    integrationId       Int64  DEFAULT 0
)
    engine = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/{database}/{table}', '{replica}') ORDER BY (accountId);

Schema

{
    type: 'record',
    namespace: 'test',
    name: 'TestTable',
    fields: [
        {
            name: 'accountId',
            type: 'long',
        }
    ],
}

My SQL expectation in this case

INSERT INTO test_table (accountId) values (2);

or

INSERT INTO test_table (accountId, integrationId) values (2, null);

Materialized views in database cause NPE on start

Full stack trace:

2023-01-07 02:45:34,987 ERROR [###|task-1] WorkerSinkTask{id=###} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-###]
java.lang.NullPointerException
	at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.start(ClickHouseWriter.java:84)
	at com.clickhouse.kafka.connect.sink.ProxySinkTask.<init>(ProxySinkTask.java:46)
	at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.start(ClickHouseSinkTask.java:54)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

From following the above stack trace, it looks like the connector attempts to execute SHOW TABLES and then call DESCRIBE TABLE {database}.${tableName} for each table in the database (link)

This fails because materialized views create tables with names like .inner_id.659b0a9b-b939-4c95-9631-0d007656df21, which is throwing an error that is uncaught. Confirmed that NPE does not occur after all materialized views are dropped from the database.

Sink crashes when inserting required LowCardinality columns

Describe the bug

When a table has a required LowCardinality column, the sink fails because of some byte mismatch (see below for log message).
It works fine if the column is nullable.

I have started working on it and managed to reproduce the error using a unit test.

If anybody has an idea on how to quickly fix, it'd be great. In the meantime, I'll see what I can do

Steps to reproduce

Check the Unit Test in this PR #104

Expected behaviour

Data should be inserted correctly

Error log

Caused by: java.util.concurrent.ExecutionException: com.clickhouse.client.ClickHouseException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 66. Bytes expected: 98.: (at row 7)

👇 This is the line that throws the exception

ClickHouseResponse response = future.get()

Configuration

Environment

  • Kafka-Connect version:
  • Kafka Connect configuration:
  • Kafka version:
  • Kafka environment:
  • OS:

ClickHouse server

  • ClickHouse Server version:
  • ClickHouse Server non-default settings, if any:
  • CREATE TABLE statements for tables involved:
  • Sample data for all these tables, use clickhouse-obfuscator if necessary

Expose virtual columns

It would be nice if the plugin supported virtual columns in a similar way as the Kafka Table Engine. I need the key information as well as timestamp, offset and partition. As it stands currently, these values don't seem to be available.

Config > PORT: Allow ports below 1024 (e.g. http or https)

I'm running clickhouse in a Kubernetes setup where I use a cluster internal service to redirect clickhouse:80 -> clickhouse-server-ip:8123.

When I try to configure the connector to use port 80, I receive an error:

{
  "error_code": 400,
  "message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value 80 for configuration port: Value must be at least 1024\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}

The cause of this error is the connect config def:

configDef.define(PORT,
  ConfigDef.Type.INT,
  portDefault,
  ConfigDef.Range.between(1024,Integer.MAX_VALUE),
  ConfigDef.Importance.HIGH,
  "port",
  group,
  ++orderInGroup,
  ConfigDef.Width.SHORT,
"ClickHouse Port.");

I would like to run the service on 80 or 443 - or is there another reason why this is not possible?

Aws Msk connector config

Hi, thank you for releasing the new version 0.05 to support msk. But can I have an example or tutorial to config on aws mask ? very appreciate for this help. I tried it on my config but looks like it doesn't work. This is my connector config

connector.class=com.clickhouse.kafka.connect.sink
tasks.max=2
topics=Events_Test
schema.enable=false
key.converter.schemas.enable=false
hostname=https://demo.us-east-2.aws.clickhouse.cloud
password=demopass
database=demopassword
aws.region=us-east-1
port=8443
value.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.storage.StringConverter
username=default

Allow sink to a table with a different name than the source topic

Hello,

As it is mentioned in Target Tables section the docs, the table name must match the topic name. This can be a problem when we have a naming convention for topics (in which we may find dots and hyphens, e.g. foo.bar.table-name

One improvement I can suggest is to add a mapping that will match the topic name with the target table name

target.tables.mapping="foo.bar.table-name:foo_bar_table_name;topic-one:topic_one;..."

Thank you

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.