Code Monkey home page Code Monkey logo

kafka-connect-storage-common's Introduction

Kafka Connect Common Modules for Storage Connectors

FOSSA Status

Shared software modules among Kafka Connectors that target distributed filesystems and cloud storage.

Development

To build a development version you'll need a recent version of Kafka. You can build kafka-connect-storage-common with Maven using the standard lifecycle phases.

Contribute

License

This project is licensed under the Confluent Community License.

FOSSA Status

kafka-connect-storage-common's People

Contributors

amitr17 avatar andrewegel avatar aniketshrimal avatar arihant-confluent avatar blueeldur avatar confluentjenkins avatar confluentsemaphore avatar cyrusv avatar dnozay avatar ewencp avatar garrix-fan avatar kkonstantine avatar kpatelatwork avatar maxzheng avatar naveenmall11 avatar norwood avatar patrick-premont avatar pbadani avatar prathibha-m avatar rhauch avatar sambhav-jain-16 avatar shaikzakiriitm avatar sidd1809 avatar snehashisp avatar subhashiyer9 avatar sudeshwasnik avatar vbalani002 avatar venkatteki avatar wicknicks avatar xiangxin72 avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-storage-common's Issues

Interface Partitioner is inefficient

The Partitioner interface design is inefficient. The generatePartitionedPath takes a topic, which is immutable per task, plus an encodedPartition, which is per-record. That leads to things such as confluentinc/kafka-connect-hdfs#224, in which generatePartitionedPath is ignored and a separate method is called that reproduces the behavior of DefaultPartitioner's implementation, minus the partition information.

Instead, a base method should take only topic, and encodedPartition should take that static result and the SinkRecord.

when flush.size > 1, a timeout parameter is needed to trigger force flush regardless of new records read or not.

I set up a kafka s3 sink connector to consume messages from a kafka topic and dump them into minio in parquet format. Finally I query from dremio to verify the integrity of the data pipe.

The kafka topic consists of 12 partitions and each partition contains various # of records.

What I've found out is that

if I set flush.size=1. I can get all records one per parquet file in minio and query in dremio returns correct # of records.

if I set flush.size > 1 I won't be able to get the exact total number of records in minio and dremio query. I've always got less. The larger the flush.size is set, the more records are skipped and if flush.size is set large enough, partitions are skipped as well.

I understand that it's probably not skipping records.

The connector is waiting for more new records to fill up the buffer size then flushes to s3. This won't work as if the data is EOD, I'll have to wait for 24 hours to get yesterdays data dumped to minio?

I am looking for a parameter to trigger time-out then force flush to s3. I tried rotate.interval.ms and rotate.scheduled.internval.ms but they only checks first record and /or next record time stamp span. It will not trigger a time out and force flush if no new record is injected to kafka.

Is there any parameter to trigger time-out and force flush to s3? It seems that all rotate interval parameters are expecting a new record to trigger the evaluation of the flush condition, either span or scheduled. That's not gonna serve the purpose I mentioned. We want to time-out and force flush without the dependency on a new record being processed.

RecordFieldTimestampExtractor does not tolerate string timestamps without milliseconds

Our application is emitting ISO8601 timestamps using java.util.Instant, and our kafka connect hdfs sink is barfing when it hits a timestamp without milliseconds:

java.lang.IllegalArgumentException: Invalid format: "2015-04-02T01:00:00Z" is malformed at "Z

jodatime's ISODateTimeFormat.dateTime() does not support millis but it has a ISODateTimeFormat.dateTimeParser() that supports both millis or no millis, here's a discussion about it.

Conflict between partition fields and the data source

We are using DailyPartitioner for our HDFS sink connector with hive integration. The source of a topic is from Debezium that captures the change from a source table that contains fields called month, year.

This leads to the situation where we have ambiguous fields (two year columns in hive table - one creates from the partitioner and another one from the source).

Is there any way to workaround this at the moment?
Shouldn't we use the prefix for the generated partition fields to avoid the conflict?
e.g. Use _year, _month, _day for DailyPartitioner

Field partitioner does not support multiple event type feature with schema references

With the version 5.5+, Confluent now supports multiple event types in same topic. Based on this official documentation and this blog page, I tried to use this feature. I wrote a kafka streams application which produce avro messages with different schemas by using schema registry schema references. I even tried to consume these messages by an other kafka streams application to test multiple event type functionality and achieved a successful result.
This is my union Avro Schema to use in my tests. eventA and eventB schemas are also registered on Schema Registry:

[
    "com.xxx.xxx.eventA",
    "com.xxx.xxx.eventB"
]

Everything was good until this point. Then I tried to sink these messages to HDFS over Kafka Connect Hdfs Sink Connector alongside FieldPartitioner. And set relevant configuration properties in connector settings. This was the field that I want to use for partitioning:

"partition.field.name" : "field1"

Connector successfully started and read records from kafka but when It comes to partitioning process I got errors. It seems field partitioner was looking for field1 but actually It is not under root. Because of multiple event type functionality there is a wrapper root field with the name eventA. (I think this is made by toConnectSchema method in AvroData class of kafka-schema-registry-parent repository of confluent.)
Struct{eventA=Struct{field1=val1,field2=val2,field3=val3}}

So partition.field.name must be set "eventA.field1". But this is not appropriate approach, root object field name always changes with a different event type name. I think we can say, multiple event type feature broke field partitioning on kafka connect.

As a workaround should I go with implementing custom field partitioner or Is there any consistent solution that I missed?

Custom 'path.format' of 'FieldPartitioner'

I'm new for HDFS connector.In my case, I want to use kafka to do something like log collect. I'm using HDFS connector to put my data from kafka to HDFS. and I'm using Fieldpartitioner with fields name.

For example , here is my sample data:

{"type":"type1", "time":"2018-12-13", "msg":"anything"}
{"type":"type2", "time":"2018-12-13", "msg":"anything"}
I want to store them to hdfs as such path "path-to/type/time"
for example "/tmp/type1/2018-12-13".

however, it worked as "/tmp/tpye=type1/time=2018-12-13"
here is my configuration of HDFS connector:

{
"name": "hdfs-sink-f1",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "t1",
"hdfs.url": "hdfs://:/",
"flush.size": "1",
"format.class": "io.confluent.connect.hdfs.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
"partition.field.name": "type,time"
}
}

I try to use "path.format", but it didn't work, in the doc of "HDFS Connector Configuration Options",it says

path.format
This configuration is used to set the format of the data directories when partitioning with TimeBasedPartitioner.
The format set in this configuration converts the Unix timestamp to proper directories strings.
For example, if you set path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH, the data directories will have the format /year=2015/month=12/day=07/hour=15/.
Type: string
Default: “”
Importance: medium

So is it unable to set the format of "FieldPartitioner" through "path.foramt"?

Is there any easy way to config the connector to achieve my goal?
see also: confluentinc/kafka-connect-hdfs#397 (comment)
Thank you!

Please don't use project.version for dependencies in parent pom

doing things like:

  <dependency>
                <groupId>io.confluent</groupId>
                <artifactId>kafka-connect-avro-converter</artifactId>
                <version>${project.version}</version>
</dependency>

can lead to really weird errors in projects that declare kafka-connect-storage-common-parent as the parent in the pom.

because project.version gets resolved to the version of the child project. for example i was working on my own version of kafka-connect-hdfs, so i changed the version in the pom, a reasonable enough thing to do, and suddenly it tried to download a whole buch of dependencies with the version specified by me, which makes no sense and will fail.

FieldPartition with lowercase field names

Hi.

i am using HDFS sink connector to put my data into hadoop from kafka. and i am using Fieldpartitioner with a field name. kafka connect creates field name in uppercase since the field name from the source is in uppercase. Problem i have is when i create hive table over the data, hive couldn't recognize the partitions with uppercase since all the fields in hive is converted to lowercase when stored in meta-store. And i don't want to use hive sink since streaming properties are not enabled in hive-site.xml and i dont want my table to be in ORC format. Is there anyways i could change my field names to lowercase/ is there any possible ways to create a custom field partitioner?

TimestampExtractor fails to extract timestamp with RecordField from a protobuf event

Event Schema

syntax = "proto3";

import "google/protobuf/wrappers.proto";

option java_package = "com.bagi.protobuf";
option java_outer_classname = "TrackingEvent";

/**
 * single tracking event
*/
message Event {
    string build = 1; // build
    uint64 event_ts = 2; // epoch seconds at the time of event
    string user_id = 3; // user id
    EventType eventType = 4; // event type

    enum EventType {
            UNKNOWN = 0; // the default value
            IMPRESSION = 1;
            CLICK = 2;
            PLAY = 3;
    }

    google.protobuf.Int32Value vertical_index = 5; // vertical index on the grid
    google.protobuf.Int32Value horizontal_index = 6; // horizontal index on the grid
}

I am using kafka-connector-s3 to sink events to s3 bucket. Here is worker.properties

bootstrap.servers = localhost:9092
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = com.blueapron.connect.protobuf.ProtobufConverter
value.converter.protoClassName = com.bagi.protobuf.TrackingEvent$Event
plugin.path = /usr/local/share/java/
offset.storage.file.filename = /etc/kafka-offsets

and s3-sink.properties

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector

tasks.max=1
topics=PLAY

format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size=100000
rotate.interval.ms=900000

s3.region=us-east-1
s3.bucket.name=dev-bagi
s3.part.size=5242880

storage.class=io.confluent.connect.s3.storage.S3Storage
file.delim=_

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
partition.duration.ms=900000
path.format='dt'=YYYY-MM-dd
locale=en-US
timezone=UTC
timestamp.extractor=RecordField
timestamp.field=event_ts

schema.compatibility=NONE

Stacktrace from logs

[2020-01-24 13:24:20,469] ERROR Unsupported type 'bytes' for user-defined timestamp field. (io.confluent.connect.storage.partitioner.TimeBasedPartitioner:295)
[2020-01-24 13:24:20,469] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559)
io.confluent.connect.storage.errors.PartitionException: Error extracting timestamp from record field: event_ts
	at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:299)
	at io.confluent.connect.storage.partitioner.TimestampExtractor.extract(TimestampExtractor.java:41)
	at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:206)
	at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188)
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Is using uint64 for event_ts in protocol buffer schema the reason for this failure?

Partitioning with key

How can I partitioning with key? If I use fieldpartitioner, partition.field.name should be set to which value?
And if I want to partitioning my data with key and hold just latest 10 minutes or 10 data for each key, what should I do?

Partitioner classes misuse PartitionerConfig

The classes extending Partitioner misuse PartitionerConfig as a simple storage for configuration key constants, which prevents it from being extended by third parties in a way that requires other configuration parameters.

Because of the way it is currently used, DataWriter in kafka-connect-hdfs (and, presumably other users) need to provide the "parsed" configuration, which, for example, handles conversion of numeric values into Long. Parsed configurations, however, do not include unknown keys, and there's no way of "injecting" keys.

Instead, all of the classes extending Partitioner should instantiate a PartitionerConfig from the properties received, and get their properties through it, so that unfiltered properties can be used to instantiate those classes.

RecordField w/ timestamp.field should support nested timestamps inside maps

Similarly to #47 I have a nested timestamp field. However it's nested inside a map, for example, like this:

Schema schema = SchemaBuilder.struct()
    .field("headers", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
    .field("data", Schema.BYTES_SCHEMA)
    .build();

So, when using headers.timestamp it fails with:

org.apache.kafka.connect.errors.DataException: Unable to get field 'headers.timestamp' from schema Schema{STRUCT}.

	at io.confluent.connect.storage.util.DataUtils.getNestedField(DataUtils.java:87)
	at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:282)
	at io.confluent.connect.storage.partitioner.TimeBasedPartitioner.encodePartition(TimeBasedPartitioner.java:162)
	at io.confluent.connect.storage.partitioner.TimeBasedPartitionerTest.testNestedRecordFieldTimeExtractor2(TimeBasedPartitionerTest.java:553)
        ...
Caused by: org.apache.kafka.connect.errors.DataException: Cannot look up fields on non-struct type
	at org.apache.kafka.connect.data.SchemaBuilder.field(SchemaBuilder.java:349)
	at io.confluent.connect.storage.util.DataUtils.getNestedField(DataUtils.java:82)
	... 28 more

Reason: DataUtils.getNestedField doesn't expect Maps, only Structs. It's possible to create a similar helper method to return Schema instead of Field and adjust it to check valueSchema() first. Any thoughts? Happy to prepare a PR.

Kafka Connect REST API Security

Hi, regarding the documentation here, I have tried adding the following configuration parameters to my connect-distributed.properties file:

listeners=https://myhost:8443
rest.advertised.listener=https
rest.advertised.host.name=<localhost>
rest.advertised.host.port=8083
listeners.https.ssl.client.auth=requested
listeners.https.ssl.truststore.location=/var/ssl/private/kafka.server.truststore.jks
listeners.https.ssl.truststore.password=test1234
listeners.https.ssl.keystore.location=/var/ssl/private/kafka.server.keystore.jks
listeners.https.ssl.keystore.password=test1234
listeners.https.ssl.key.password=test1234

What's weird is that no matter how I change these values, for example listeners.https.ssl.client.auth from requested to none or to required, I always get the same handshake failures when accessing through both curl and Java client. When I tested the connection using openssl, it shows no certificate is found. I am just curious if the parameters listeners.https are being picked up when Kafka Connect cluster is launched? Thanks.

[Request] Add support to FieldPartitioner class for handling JSON

JSON support in Kafka Connect is known to not have too much functionality built in for JSON at the moment. However, it would be really nice to allow for stringified JSON records to be able to use the FieldPartitioner. Currently only a Struct type record (AVRO) is supported. If the FieldPartitioner can parse a JSON string blob as a Map, and get the value at a specified key, this could work!

on-resolvable parent POM for io.confluent:kafka-connect-storage-common-parent:6.0.0?

I imported the code into eclipse as a maven project and I got the following error for all the projects. Any thoughts of how to solve this?
Is there a way to run this in the eclipse or I have to run it via terminal?

`Project build error: Non-resolvable parent POM for io.confluent:kafka-connect-storage-common-parent:6.0.0-
SNAPSHOT: Failure to transfer io.confluent:common:pom:6.0.0-SNAPSHOT from ${confluent.maven.repo} was cached in
the local repository, resolution will not be reattempted until the update interval of confluent has elapsed or updates are
forced. Original error: Could not transfer artifact io.confluent:common:pom:6.0.0-SNAPSHOT from/to confluent ($
{confluent.maven.repo}): Cannot access ${confluent.maven.repo} with type default using the available connector
factories: AetherRepositoryConnectorFactory, BasicRepositoryConnectorFactory and 'parent.relativePath' points at
wrong local POM'

RecordField w/ timestamp.field should support nested timestamps

Currently for the RecordField configurations, I see that most use cases are for a top-level timestamp.field.

There should be support for dot-notated nested fields. Without having to go though a ExtractValueFromStruct transformation

For example,

"timestamp.field": "header.time"

kafka s3 sink connector distributed. NOT_ENOUGH_REPLICAS error.

0

I'm setting up kafka s3 sink connector and everything works when I tried with connect-standalone.sh

Now I switched to connect-distributed.sh I ran into the following error:

WARN [Producer clientId=producer-3] Got error produce response with correlation id 40 on topic-partition connect-configs-0, retrying (2147483611 attempts le
ft). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
I tried to change

offset.storage.replication.factor=5
config.storage.replication.factor=5
status.storage.replication.factor=5
which is the actual number of replicas however this error does not go away.

I also tried to change all values to match the min.insync.replicas, it still does not work.

Topic: myTopic PartitionCount: 12 ReplicationFactor: 5 Configs: compression.type=producer,min.insync.replicas=2,segment.bytes=1073741824,max.me
ssage.bytes=2097152,index.interval.bytes=4096,retention.bytes=-1,segment.index.bytes=10485760
Topic: myTopic Partition: 0 Leader: 43 Replicas: 43,11,9,42,10 Isr: 10,42,9,11,43
Topic: myTopic Partition: 1 Leader: 11 Replicas: 11,42,10,43,9 Isr: 10,42,9,11,43
Topic: myTopic Partition: 2 Leader: 9 Replicas: 9,42,10,43,11 Isr: 10,42,9,11,43
Topic: myTopic Partition: 3 Leader: 42 Replicas: 42,10,43,11,9 Isr: 10,42,9,11,43
Topic: myTopic Partition: 4 Leader: 10 Replicas: 10,43,11,9,42 Isr: 10,42,9,11,43
Topic: myTopic Partition: 5 Leader: 43 Replicas: 43,10,11,9,42 Isr: 10,42,9,11,43
Topic: myTopic Partition: 6 Leader: 11 Replicas: 11,43,9,42,10 Isr: 10,42,9,11,43
Topic: myTopic Partition: 7 Leader: 9 Replicas: 9,43,11,42,10 Isr: 10,42,9,11,43
Topic: myTopic Partition: 8 Leader: 42 Replicas: 42,11,9,10,43 Isr: 10,42,9,11,43
Topic: myTopic Partition: 9 Leader: 10 Replicas: 10,42,43,11,9 Isr: 10,42,9,11,43
Topic: myTopic Partition: 10 Leader: 43 Replicas: 43,11,9,42,10 Isr: 10,42,9,11,43
Topic: myTopic Partition: 11 Leader: 11 Replicas: 11,42,10,43,9 Isr: 10,42,9,11,43

root@9c4c4d97dcd6:/opt/bitnami/kafka/bin# grep replication.factor= /plugins/worker.properties
offset.storage.replication.factor=2
config.storage.replication.factor=2
status.storage.replication.factor=2
Is there anything I mis-configured? Thanks

How to configure a different URL than default AWS S3 URL

I would like to configure a different S3 URL than default S3 URL (s3.amazonaws.com). But I could not find a configurable directive like s3.bucket, s3.region. Could you provide details on how to configure that?
Our company abstracted the S3 default URL with a different URL and internally it talks to AWS S3.

Support all primitive types in FieldPartitioner

Now Field Partitioner can be partitioning to some schema types(INT, STRING, BOOLEAN, STRUCT) only not all of Schema.Type (FLOAT, BYTES..).

IMO, there is no issue for partitioning all primitive types to FieldPartitioner.

Improve to storage partitioning scheme to be flexible and configurable at runtime.

Currently, the partitioning scheme includes topic name by default in all storage partitioning scheme. It will be flexible to have it configurable and let users chose.

Along the same lines, the partitioning scheme also assumes the 'timestamp' column unit in seconds by default. Instead, it will be flexible to have this configurable and let users specify a scale factor to boost or shrink to seconds scale to support units in different scales.
This is similar to #122, but a little more generic.

KafkaConnect issue with SASL/Kerberos

trying to setup Confluent4.0.0 with SASL/Kerberos implemented. configured the Zookeeper, Kakfa, SchemaRegistry and Kafka-Rest with SASL/Kerberos. facing issue while executing connect.

below is the connect-avro-distributed.properties config
bootstrap.servers=SASL_PLAINTEXT://localhost:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

plugin.path=share/java
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
producer.sasl.mechanism=GSSAPI
producer.security.protocol=SASL_PLAINTEXT

consumer.sasl.mechanism=GSSAPI
consumer.security.protocol=SASL_PLAINTEXT

producer.confluent.monitoring.interceptor.sasl.mechanism=GSSAPI
producer.confluent.monitoring.interceptor.security.protocol=SASL_PLAINTEXT

consumer.confluent.monitoring.interceptor.sasl.mechanism=GSSAPI
consumer.confluent.monitoring.interceptor.security.protocol=SASL_PLAINTEXT

sasl.kerberos.service.name=kafka
sasl.jaas.config=[/opt/packages/confluent-4.0.0/etc/kafka/kafka_server_jaas.conf]

below is the connect Log,
'''[2018-01-04 10:03:42,553] INFO DistributedConfig values:
access.control.allow.methods =
access.control.allow.origin =
bootstrap.servers = [SASL_PLAINTEXT://localhost:9092]
client.id =
config.storage.replication.factor = 1
config.storage.topic = connect-configs
connections.max.idle.ms = 540000
group.id = connect-cluster
heartbeat.interval.ms = 3000
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class io.confluent.connect.avro.AvroConverter
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 60000
offset.flush.timeout.ms = 5000
offset.storage.partitions = 25
offset.storage.replication.factor = 1
offset.storage.topic = connect-offsets
plugin.path = [/opt/packages/confluent-4.0.0/share/java]
rebalance.timeout.ms = 60000
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
rest.advertised.host.name = null
rest.advertised.port = null
rest.host.name = null
rest.port = 8083
retry.backoff.ms = 100
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
status.storage.partitions = 5
status.storage.replication.factor = 1
status.storage.topic = connect-statuses
task.shutdown.graceful.timeout.ms = 5000
value.converter = class io.confluent.connect.avro.AvroConverter
worker.sync.timeout.ms = 3000
worker.unsync.backoff.ms = 300000
(org.apache.kafka.connect.runtime.distributed.DistributedConfig:223)
[2018-01-04 10:03:42,711] DEBUG Logging to org.slf4j.impl.Log4jLoggerAdapter(org.eclipse.jetty.util.log) via org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:176)
[2018-01-04 10:03:42,713] INFO Logging initialized @35598ms (org.eclipse.jetty.util.log:186)
[2018-01-04 10:03:42,721] DEBUG org.eclipse.jetty.server.Server@62ee5b2a added {qtp789707827{STOPPED,8<=0<=200,i=0,q=0},AUTO} (org.eclipse.jetty.util.component.ContainerLifeCycle:324)
[2018-01-04 10:03:42,733] DEBUG HttpConnectionFactory@549ae373{HTTP/1.1} added {HttpConfiguration@4f349ae0{32768/8192,8192/8192,https://:0,[]},POJO} (org.eclipse.jetty.util.component.ContainerLifeCycle:324)
[2018-01-04 10:03:42,736] DEBUG ServerConnector@7f60e4{null}{0.0.0.0:0} added {org.eclipse.jetty.server.Server@62ee5b2a,UNMANAGED} (org.eclipse.jetty.util.component.ContainerLifeCycle:324)
[2018-01-04 10:03:42,736] DEBUG ServerConnector@7f60e4{null}{0.0.0.0:0} added {qtp789707827{STOPPED,8<=0<=200,i=0,q=0},AUTO} (org.eclipse.jetty.util.component.ContainerLifeCycle:324)
[2018-01-04 10:03:42,736] DEBUG ServerConnector@7f60e4{null}{0.0.0.0:0} added {org.eclipse.jetty.util.thread.ScheduledExecutorScheduler@55c07ec8,AUTO} (org.eclipse.jetty.util.component.ContainerLifeCycle:324)
[2018-01-04 10:03:42,737] DEBUG ServerConnector@7f60e4{null}{0.0.0.0:0} added {org.eclipse.jetty.io.ArrayByteBufferPool@56a2c988,POJO} (org.eclipse.jetty.util.component.ContainerLifeCycle:324)
[2018-01-04 10:03:42,737] DEBUG ServerConnector@7f60e4{null}{0.0.0.0:0} added {HttpConnectionFactory@549ae373{HTTP/1.1},AUTO} (org.eclipse.jetty.util.component.ContainerLifeCycle:324)
[2018-01-04 10:03:42,739] DEBUG ServerConnector@7f60e4{HTTP/1.1}{0.0.0.0:0} added {org.eclipse.jetty.server.ServerConnector$ServerConnectorManager@42f32b2,MANAGED} (org.eclipse.jetty.util.component.ContainerLifeCycle:324)
[2018-01-04 10:03:42,739] DEBUG org.eclipse.jetty.server.Server@62ee5b2a added {ServerConnector@7f60e4{HTTP/1.1}{0.0.0.0:8083},AUTO} (org.eclipse.jetty.util.component.ContainerLifeCycle:324)
[2018-01-04 10:03:42,908] DEBUG Registering Connect metrics with JMX for worker '10.0.4.93:8083' (org.apache.kafka.connect.runtime.ConnectMetrics:78)
[2018-01-04 10:03:42,909] INFO Kafka version : 1.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-01-04 10:03:42,909] INFO Kafka commitId : bedb2a8697fecd0d (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-01-04 10:03:42,918] DEBUG Added sensor with name connect-sensor-group: connect-worker-metrics;connector-startup-results (org.apache.kafka.common.metrics.Metrics:404)
[2018-01-04 10:03:42,918] DEBUG Added sensor with name connect-sensor-group: connect-worker-metrics;connector-startup-attempts (org.apache.kafka.common.metrics.Metrics:404)
[2018-01-04 10:03:42,919] DEBUG Added sensor with name connect-sensor-group: connect-worker-metrics;connector-startup-successes (org.apache.kafka.common.metrics.Metrics:404)
[2018-01-04 10:03:42,919] DEBUG Added sensor with name connect-sensor-group: connect-worker-metrics;connector-startup-failures (org.apache.kafka.common.metrics.Metrics:404)
[2018-01-04 10:03:42,919] DEBUG Added sensor with name connect-sensor-group: connect-worker-metrics;task-startup-results (org.apache.kafka.common.metrics.Metrics:404)
[2018-01-04 10:03:42,920] DEBUG Added sensor with name connect-sensor-group: connect-worker-metrics;task-startup-attempts (org.apache.kafka.common.metrics.Metrics:404)
[2018-01-04 10:03:42,920] DEBUG Added sensor with name connect-sensor-group: connect-worker-metrics;task-startup-successes (org.apache.kafka.common.metrics.Metrics:404)
[2018-01-04 10:03:42,920] DEBUG Added sensor with name connect-sensor-group: connect-worker-metrics;task-startup-failures (org.apache.kafka.common.metrics.Metrics:404)
[2018-01-04 10:03:42,940] DEBUG Added sensor with name connect-sensor-group: connect-worker-rebalance-metrics;completed-rebalance-count (org.apache.kafka.common.metrics.Metrics:404)
[2018-01-04 10:03:42,940] DEBUG Added sensor with name connect-sensor-group: connect-worker-rebalance-metrics;rebalance-time (org.apache.kafka.common.metrics.Metrics:404)
[2018-01-04 10:03:42,947] DEBUG Updated cluster metadata version 1 to Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: null)], partitions = []) (org.apache.kafka.clients.Metadata:270)
[2018-01-04 10:03:42,950] DEBUG [Worker clientId=connect-1, groupId=connect-cluster] The Connect group member has stopped. (org.apache.kafka.connect.runtime.distributed.WorkerGroupMember:177)'''

post this connect gets terminated.

any advice on this will be appreciated!

Deprecated HiveFactory Support

Hello all,

I'm just curious why the hiveFactory method is deprecated in Format class. I'm not able to find any explanation as a replacement even though it's used by kafka-connect-hdfs connector.

I'm asking this question because we are thinking about adding Hive partitions from Kafka S3 Connector in order to access the data whenever it's transferred to S3. Now, we are handling this requirement with a scheduled workflow that checks the partition folders in S3 and adds corresponding Hive partitions if not exists. But, we would like to handle that through Kafka Connect.

To sum up, we just want to check the project roadmap before implementing this feature. If the hive support is no longer considered in the future, we don't want to lock into the specific Kafka Connect versions.

Thank you in advance

S3 Source custom partition Caused by: NoSuchMethodException: after creation

Hi, I implement a custom partitioner for the sink and source s3 connector.
Partitioner:
package io.confluent.connect.storage.partitioner;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.kafka.connect.sink.SinkRecord;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FieldAndTimeBasedPartitioner extends TimeBasedPartitioner {

private static final String PARTITION_FIELD_FORMAT_PATH_CONFIG = "partition.field.format.path";
private static final boolean PARTITION_FIELD_FORMAT_PATH_DEFAULT = true;
private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private PartitionFieldExtractor partitionFieldExtractor;

public FieldAndTimeBasedPartitioner() {
}

protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) {
    super.init(partitionDurationMs, pathFormat, locale, timeZone, config);

    final List<String> fieldNames = (List<String>) config.get(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG);
    // option value is parse as string all other type is cast as string by kafka connect need to parse by ourselves
    boolean formatPath =
        Boolean.parseBoolean((String) config.getOrDefault(PARTITION_FIELD_FORMAT_PATH_CONFIG, PARTITION_FIELD_FORMAT_PATH_DEFAULT));

    this.partitionFieldExtractor = new PartitionFieldExtractor(fieldNames, formatPath);
}

public String encodePartition(final SinkRecord sinkRecord, final long nowInMillis) {
    final String partitionsForTimestamp = super.encodePartition(sinkRecord, nowInMillis);
    final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
    final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);

    log.info("Encoded partition : {}", partition);

    return partition;
}

public String encodePartition(final SinkRecord sinkRecord) {
    final String partitionsForTimestamp = super.encodePartition(sinkRecord);
    final String partitionsForFields = this.partitionFieldExtractor.extract(sinkRecord);
    final String partition = String.join(this.delim, partitionsForFields, partitionsForTimestamp);

    log.info("Encoded partition : {}", partition);

    return partition;
}

public static class PartitionFieldExtractor {
private static final String DELIMITER_EQ = "=";

private final boolean formatPath;
private final List<String> fieldNames;
private static final Logger log = LoggerFactory.getLogger(PartitionFieldExtractor.class);


PartitionFieldExtractor(final List<String> fieldNames, final boolean formatPath) {
    this.fieldNames = fieldNames;
    this.formatPath = formatPath;
}

public String extract(final ConnectRecord<?> record) {
    Object value = record.value();
    StringBuilder builder = new StringBuilder();
    for (final String fieldName : this.fieldNames) {
        if (builder.length() != 0) {
            builder.append(StorageCommonConfig.DIRECTORY_DELIM_DEFAULT);
        }
        if (value instanceof Struct || value instanceof Map) {
            final String partitionField = (String) DataUtils.getNestedFieldValue(value, fieldName);
            if (formatPath) {
                builder.append(String.join(DELIMITER_EQ, fieldName, partitionField));
            } else {
                builder.append(partitionField);
            }
        } else {
            log.error("Value is not of Struct or Map type. type {}", value.getClass());
            throw new PartitionException("Error encoding partition.");
        }
    }
    return builder.toString();
}

}

}
When I create sink s3 connector everuthing is fine it working correct:
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "test-s3-sink",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "20",
"s3.region": "us-west-2",
"s3.proxy.url": "http://localstack:4566",
"s3.bucket.name": "confluent-kafka-connect-s3-testing",
"s3.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"schema.compatibility": "NONE",
"value.converter.schemas.enable": "true",
"partitioner.class": "com.kafka.connect.extention.FieldAndTimeBasedPartitioner",
"partition.field.name": "customerId,channelType,channelId",
"partition.field.format.path": true,
"partition.duration.ms": 86400000,
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"locale": "US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "messageDate"
}
}
}
But when i try create source s3 connector with this partitioner I got exception after creation
{
"name": "s3-source",
"config": {
"confluent.license": "",
"connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
"confluent.topic.bootstrap.servers": "kafka01.internal-service:9092",
"s3.region": "us-west-2",
"s3.proxy.url": "http://localstack:4566",
"s3.bucket.name": "confluent-kafka-connect-s3-testing",
"confluent.topic.replication.factor": 1,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"schema.compatibility": "NONE",
"value.converter.schemas.enable": "true",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner",
"partition.field.name": "customerId,channelType,channelId",
"partition.field.format.path": true,
"partition.duration.ms": 86400000,
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"locale": "US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "messageDate"
}
}
Exception:
[2022-05-30 21:33:00,018] ERROR WorkerConnector{id=s3-source} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
2022-05-30T21:33:00.023992047Z org.apache.kafka.common.config.ConfigException: Invalid value java.lang.NoSuchMethodException: io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner.(io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig, io.confluent.connect.cloud.storage.source.SourceStorage) for configuration Failed to instantiate partitioner class io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner
2022-05-30T21:33:00.024020005Z at io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig.getPartitioner(StorageSourceConnectorConfig.java:667)
2022-05-30T21:33:00.024032949Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.doStart(RestoreStorageSourceConnector.java:90)
2022-05-30T21:33:00.024044101Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.start(RestoreStorageSourceConnector.java:83)
2022-05-30T21:33:00.024054699Z at io.confluent.connect.cloud.storage.source.CompositeSourceConnector.start(CompositeSourceConnector.java:72)
2022-05-30T21:33:00.024118151Z at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
2022-05-30T21:33:00.024226395Z at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
2022-05-30T21:33:00.024239719Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
2022-05-30T21:33:00.024250041Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
2022-05-30T21:33:00.024260690Z at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
2022-05-30T21:33:00.024271458Z at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
2022-05-30T21:33:00.024283104Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022-05-30T21:33:00.024293380Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022-05-30T21:33:00.024303275Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022-05-30T21:33:00.024313394Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022-05-30T21:33:00.024323897Z at java.base/java.lang.Thread.run(Thread.java:829)
2022-05-30T21:33:00.029909345Z [2022-05-30 21:33:00,028] ERROR [Worker clientId=connect-1, groupId=kafka-connect] Failed to start connector 's3-source' (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
2022-05-30T21:33:00.029993514Z org.apache.kafka.connect.errors.ConnectException: Failed to start connector: s3-source
2022-05-30T21:33:00.030013367Z at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$25(DistributedHerder.java:1461)
2022-05-30T21:33:00.030028022Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:334)
2022-05-30T21:33:00.030039736Z at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
2022-05-30T21:33:00.030051017Z at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
2022-05-30T21:33:00.030061715Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022-05-30T21:33:00.030072480Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022-05-30T21:33:00.030084369Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022-05-30T21:33:00.030095914Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022-05-30T21:33:00.030107039Z at java.base/java.lang.Thread.run(Thread.java:829)
2022-05-30T21:33:00.030118727Z Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to transition connector s3-source to state STARTED
2022-05-30T21:33:00.030130358Z ... 8 more
2022-05-30T21:33:00.030140662Z Caused by: org.apache.kafka.common.config.ConfigException: Invalid value java.lang.NoSuchMethodException: io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner.(io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig, io.confluent.connect.cloud.storage.source.SourceStorage) for configuration Failed to instantiate partitioner class io.confluent.connect.storage.partitioner.FieldAndTimeBasedPartitioner
2022-05-30T21:33:00.030181168Z at io.confluent.connect.cloud.storage.source.StorageSourceConnectorConfig.getPartitioner(StorageSourceConnectorConfig.java:667)
2022-05-30T21:33:00.030193293Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.doStart(RestoreStorageSourceConnector.java:90)
2022-05-30T21:33:00.030204530Z at io.confluent.connect.cloud.storage.source.RestoreStorageSourceConnector.start(RestoreStorageSourceConnector.java:83)
2022-05-30T21:33:00.030218397Z at io.confluent.connect.cloud.storage.source.CompositeSourceConnector.start(CompositeSourceConnector.java:72)
2022-05-30T21:33:00.030229561Z at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
2022-05-30T21:33:00.030240106Z at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
2022-05-30T21:33:00.030250930Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
2022-05-30T21:33:00.030261898Z at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
2022-05-30T21:33:00.030272834Z ... 7 more

I don't know why I got this exception because constructor in present for partitioner

How to modify the filename of the S3 object?

Hi, I've been using the S3 connector for a couple of weeks now, and I want to change the way the connector names each file. I am using the HourlyBasedPartition, so the path to each file is already enough for me to find each file, and I want the filenames to be something generic for all the files, like just 'Data.json.gzip' (with the respective path from the partitioner).

For example, I want to go from this:
<prefix>/<topic>/<HourlyBasedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

To this:
<prefix>/<topic>/<HourlyBasedPartition>/Data.<format>

The objective of this is to only make one call to S3 to download the files later, instead of having to look for the filename first and then download it.

[Question]: S3 Sink Connector

Hi,

I am using s3 sink connector with the relevant config defined as follows:

	
	"storage.class": "io.confluent.connect.s3.storage.S3Storage",
	"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
	"key.converter": "org.apache.kafka.connect.storage.StringConverter",
	"value.converter": "org.apache.kafka.connect.storage.StringConverter",
	
	"key.converter.schemas.enable": "false",
	"value.converter.schemas.enable": "false",
	
	"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
	
	"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
	"partition.duration.ms": 3600000,
	"timestamp.extractor": "RecordField",
	"timestamp.field": "timestamp",
	"locale": "en",
	"timezone": "UTC",

However, the sink task throws the following exception:

[2017-09-15 02:40:05,830] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
io.confluent.connect.storage.errors.PartitionException: Error encoding partition.
        at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:289)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:174)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

The kafka messages are in JSON format and there is a field named timestamp which holds the epochtime of the event.

Could someone help me figure out the issue?

Thanks in Advance!!

StorageSinkConnectorConfig#avroDataConfig() uses wrong AvroDataConfig keys

When creating an AvroDataConfig out of the values present in StorageSinkConnectorConfig, the method avroDataConfig() uses StorageSinkConnectorConfig as keys in the map, and, thus, end up not passing the schema cache config size, whose keys differ between them -- schema.cache.size on storage, schemas.cache.config on avro data.

support to configure multiple partitioner and encode sequentially

Hi All :)

TL;DR

When the s3 partition is blunt, athena's query take long time.


In my environment, the s3 sink connector encode partition by FieldPartitioner.
As a result, one of s3 partition has so many objects.
and it cause long time query when using athena.
if s3 sink connector can configure more than one partitioner(like FieldPartitioner + DailyPartitioner), the s3 objects are encoded in detail.

So more than one partitioning is needed. plz support this 🙏

Version of commons.io used has a high-severity vulnerability

Found when scanning an image with JFrog XRAY that pulls in kafka-connect-storage-common.

XRAY-125253
Severity: High
Type: Security
Summary: Apache Commons IO input/InfiniteCircularInputStream.java InfiniteCircularInputStream::read() Function Buffer Handling Divide-by-zero DoS
Description: Apache Commons IO contains a divide-by-zero condition in the InfiniteCircularInputStream::read() function in input/InfiniteCircularInputStream.java that is triggered when the input buffer is of size 0. This may allow a context-dependent attacker to crash a process linked against the library.
Version: 2.7
Fix version: 2.8.0

https://issues.apache.org/jira/browse/IO-675

kafka-connect-storage-common is using version 2.7 of commons.io, as seen here: https://github.com/confluentinc/kafka-connect-storage-common/blob/master/pom.xml#L70

Solution: Upgrade dependency version to 2.8.0 or later.

Build Failure due to missing pentaho-addesigner-algorithm

Similarly to reported here the build on fresh m2 repo (after following steps in FAQ for Confluent s3) doesn't work: #60

[INFO] kafka-connect-storage-hive ......................... FAILURE [  6.034 s]
[INFO] kafka-connect-storage-common-htrace-core4-shaded ... SKIPPED
[INFO] kafka-connect-storage-common-avatica-shaded ........ SKIPPED
[INFO] Kafka Connect Storage Common Source's version of Apache Hadoop's third-party shaded Protobuf SKIPPED
[INFO] kafka-connect-storage-common-package ............... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  40.428 s
[INFO] Finished at: 2022-07-12T19:05:17+01:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project kafka-connect-storage-hive: Could not resolve dependencies for project io.confluent:kafka-connect-storage-hive:jar:11.1.0-SNAPSHOT: Could not find artifact org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde in confluent (https://packages.confluent.io/maven/) -> [Help 1]

I think the Pentaho libraries are no longer available in maven central/other configured repositories, at least I couldn't find them manually...

After adding the following maven repo it works:
https://public.nexus.pentaho.org/content/groups/omni/

You can navigate and see package is there, but not in any other repositories:
https://public.nexus.pentaho.org/#browse/browse:omni:org%2Fpentaho%2Fpentaho-aggdesigner-algorithm

Any idea if this could be proper fix to add this repository? It seems people are randomly getting this error, probably depending if they have those libraries installed locally or not!

io.confluent.connect.storage.errors.PartitionException: Error encoding partition

I added my custom partitioner and I get the error when I write to a topic and I see the following error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:559) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:315) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:186) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: io.confluent.connect.storage.errors.PartitionException: Error encoding partition. at com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner$PartitionFieldExtractor.extract(FieldAndTimeBasedPartitioner.java:153) at com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner.encodePartition(FieldAndTimeBasedPartitioner.java:97) at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:205) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:176) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:195) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:537) ... 10 more

Config of json file:

for i in "${!KCONNECT_NODES[@]}"; do
    curl ${KCONNECT_NODES[$i]}/connectors -XPOST -H 'Content-type: application/json' -H 'Accept: application/json' -d '{
        "name": "connect-s3-sink-'$i'",
        "config": {
            "topics": "events",
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "tasks.max" : 4,
            "flush.size": 100,
            "rotate.schedule.interval.ms": "-1",
            "rotate.interval.ms": "-1",
            "s3.region" : "eu-west-1",
            "s3.bucket.name" : "byob-raw",
            "s3.compression.type": "gzip",
            "topics.dir": "topics",
            "storage.class" : "io.confluent.connect.s3.storage.S3Storage",
            "partitioner.class": "com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner",
            "partition.duration.ms" : "3600000",
            "path.format": "YYYY-MM-dd",
            "locale" : "US",
            "timezone" : "UTC",
            "schema.compatibility": "NONE",
            "format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
            "timestamp.extractor": "Record",
            "partition.field" : "appId"
        }
    }'
done

Build Failure when I try to install using Maven: Failed to execute goal on project kafka-connect-storage-hive

I'm following these instructions to eventually build the hdfs connector. But I get the following error when I try to build "kafka-connect-storage-common":

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] kafka-connect-storage-common-parent ................ SUCCESS [  3.537 s]
[INFO] kafka-connect-storage-common ....................... SUCCESS [  2.201 s]
[INFO] kafka-connect-storage-core ......................... SUCCESS [  3.698 s]
[INFO] kafka-connect-storage-wal .......................... SUCCESS [  0.597 s]
[INFO] kafka-connect-storage-partitioner .................. SUCCESS [  2.561 s]
[INFO] kafka-connect-storage-hive ......................... FAILURE [  8.483 s]
[INFO] kafka-connect-storage-format ....................... SKIPPED
[INFO] kafka-connect-storage-common-package ............... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 21.584 s
[INFO] Finished at: 2018-03-06T16:43:52-08:00
[INFO] Final Memory: 44M/572M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project kafka-connect-storage-hive: Could not resolve dependencies for project io.confluent:kafka-connect-storage-hive:jar:5.0.0-SNAPSHOT: Failure to find org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde in http://mvn.fanops.net/nexus/content/groups/public was cached in the local repository, resolution will not be reattempted until the update interval of nexus has elapsed or updates are forced -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <goals> -rf :kafka-connect-storage-hive

I was successfully able to build kafka, schema-registry and rest-utils. Also, I'm building these in ".m2" directory.

How could I persist Object Meta Data along with Object into S3 using the Connector

I have requirement that I have to persist the object metadata along with object. So later we could use that in Amazon Athena to do some queries and also avoid applications to pull only meta data instead of entire object. Is there any support in the connector to do persist the meta data (which AWS S3 SDK supports)?
I have seen great provisions to dynamically create S3 object Key by deriving from Object fields etc, but couldn't find a way to derive the meta data and persist that along with Object.

google-cloud-storage Library Requires Update Due to Issue With Lifecycles

I received the below email from Google Cloud regarding the GCS connector needing to be upgraded to version 1.114.0 in order to work with certain lifecycle conditions. I checked and version 5.5.8 of the connector uses version 1.112.0 of the gooogle-cloud-storage library.

Update your Java Libraries for them to continue working with certain Google Cloud Storage Object Lifecycle Management conditions.

Dear Google Cloud Storage Customer,

We're writing to let you know that as of August 23, 2021, when completing a read, list, update or patch, Cloud Storage buckets with the customTimeBefore, daysSinceCustomTime, daysSinceNoncurrentTime and noncurrentTimeBefore lifecycle conditions will require an update of your Java client library to version 1.114.0 or higher.

In addition, if you consume the library through the Google Cloud Java libraries-bom, you will need to upgrade these Java client libraries to 20.5.0 or higher as soon as possible.

What do I need to know?

We have identified an issue with older versions of the Java Client Libraries and Object Lifecycle Management. Versions 1.50.0-1.111.2 of the Cloud Storage client library and 1.0.0-10.0.0 of the Google Cloud Java libraries-bom do not support the customTimeBefore, daysSinceCustomTime, daysSinceNoncurrentTime and noncurrentTimeBefore lifecycle conditions.

For Java client version < 1.114.0 and Google Cloud Java libraries-bom version < 20.5.0, the Cloud Storage API has the following client-side behaviors:

  • If a user is attempting to update bucket metadata that contains new lifecycle conditions, this will result in a failure with a 400 HTTP error code.
  • For read-only operations (e.g. get or list), new lifecycle conditions are removed from the response.

What do I need to do?

To be able to use Cloud Storage with those lifecycle conditions, update your Java client library to version 1.114.0 or higher as soon as possible. Maven Central Repository details and an Overview of the latest Storage Parent will assist you in integrating your Java client library to work with the Cloud Storage Object Lifecycle Management conditions mentioned above.

What if I’m unable to upgrade my Java version?

If it's not possible to upgrade your Java client library version, you can alternatively select specific bucket fields that are needed. Note, however, that you will not be able to modify lifecycle rules with the old version of the library this way. Instead, you will first need to upgrade the library to do so.

The following example gets a bucket by selecting all bucket fields except the lifecycle rules:

Bucket bucket = storage.get("bucket-name",
    Storage.BucketGetOption.fields(Storage.BucketField.ID,
    Storage.BucketField.SELF_LINK,
    Storage.BucketField.NAME,
    Storage.BucketField.TIME_CREATED,
    Storage.BucketField.METAGENERATION,
    Storage.BucketField.ACL,
    Storage.BucketField.DEFAULT_OBJECT_ACL,
    Storage.BucketField.OWNER,
    Storage.BucketField.LABELS,
    Storage.BucketField.LOCATION,
    Storage.BucketField.LOCATION_TYPE,
    Storage.BucketField.WEBSITE,
    Storage.BucketField.VERSIONING,
    Storage.BucketField.CORS,
    Storage.BucketField.STORAGE_CLASS,
    Storage.BucketField.ETAG,
    Storage.BucketField.ENCRYPTION,
    Storage.BucketField.BILLING,
    Storage.BucketField.DEFAULT_EVENT_BASED_HOLD,
    Storage.BucketField.RETENTION_POLICY,
    Storage.BucketField.IAMCONFIGURATION,
    Storage.BucketField.LOGGING,
    Storage.BucketField.UPDATED));

The following example lists buckets by selecting all bucket fields except the lifecycle rules:

storage.list(Storage.BucketListOption.fields(Storage.BucketField.ID,
    Storage.BucketField.SELF_LINK,
    Storage.BucketField.NAME,
    Storage.BucketField.TIME_CREATED,
    Storage.BucketField.METAGENERATION,
    Storage.BucketField.ACL,
    Storage.BucketField.DEFAULT_OBJECT_ACL,
    Storage.BucketField.OWNER,
    Storage.BucketField.LABELS,
    Storage.BucketField.LOCATION,
    Storage.BucketField.LOCATION_TYPE,
    Storage.BucketField.WEBSITE,
    Storage.BucketField.VERSIONING,
    Storage.BucketField.CORS,
    Storage.BucketField.STORAGE_CLASS,
    Storage.BucketField.ETAG,
    Storage.BucketField.ENCRYPTION,
    Storage.BucketField.BILLING,
    Storage.BucketField.DEFAULT_EVENT_BASED_HOLD,
    Storage.BucketField.RETENTION_POLICY,
    Storage.BucketField.IAMCONFIGURATION,
    Storage.BucketField.LOGGING,
    Storage.BucketField.UPDATED));

changelog missing

There is no change-log , it's very hard to know the changes that occurred between versions

nothing in the tags or releases and the git history is full of Merge branch 'X' into X

Extend list of basic partitioner: FieldAndTimeBasedPartitioner.java & HeaderAndTimeBasedPartitioner.java

We use KafkaConnect to dump topics to AWS S3. Analyzing data is pretty simple with Athena + AWS Glue (Crawlers) + AWS S3. It looks like a common way for AWS users.

Problem
The base problem happens when we partition by fields from the Kafka message. Athena can not create a table because parts of S3 subpath are separate columns and all Json keys are separate columns too. Two the same column names are impossible.

Solution
It's a good idea to add Partitioner based on Header field & Time

Extra
There is a good custom Partitioner which also can be used as default in this repo FieldAndTimeBasedPartitioner

Add ability to write multiple topics into single output path

From StackOverflow

Naturally, one might try to use RegexRouter to send multiple topics to a single directory. Say, data coming from JDBC Source connector

    "topics": "SQLSERVER-TEST-TABLE_TEST",

    "transforms":"dropPrefix",      
    "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",  
    "transforms.dropPrefix.regex":"SQLSERVER-TEST-(.*)",  
    "transforms.dropPrefix.replacement":"$1"

But this will throw a NPE

Caused by: java.lang.NullPointerException
    at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
    ... 10 more

And if debugging (the S3 Connector, specifically), we see that the data that's needed to generate the top level folder is available, but the storage writer cannot access it from the map.

There is a HashMap with the original topic name (SQLSERVER_TEST_TABLE_TEST-0), and the transform has already been applied (TABLE-TEST-0), so if we lookup the "new" topicname, it cannot find the S3 writer for the TopicPartition.

image

I think adding a separate config in the storage-common module for performing the logic of the RegexRouter outside of the SMT pipeline will help solve this problem, and can be patched into the Hadoop, S3, and other storage connectors

kafka s3 sink connect complains Error retrieving Avro value schema version for id 12345

I'm trying to set up a kafka s3 sink connector that will consume messages in avro format and dump to s3 compatible storage (minio) in parquet format.

This pipe line works for certain topics but fails for others. After some investigation, I've found that: if a topic has an corresponding schema entry of the same topic's name in schema registry. the kafka connector can successfully de-serialize the messages and convert to parket.

If a topic has no schema entry of the same name in schema registry (the schema is there, as offsetexplorer and nifi flow) can still de-serialize the message from such a topic. The connector fails with

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic <topic name> to Avro:
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 12345

I tried to dig more in hope to find a solution but got stuck.

finding schema based on schema ID, which might be related to io.confluent.kafka.serializers.subject.TopicNameStrategy In case of schema entries are published to schema registry over a long period of time and it's not been done consistently. some entries are follow topic names others are not. How do I deal with this situation
and

I did try to take a look at https://schema.registry.url/schemas/12345 where 12345 is the schema ID shown in exception message. I did see a default schema version declared here:

{\"type\":\"record\",\"name\":\"schema_name\",\"namespace\":\"name_space\",\"fields\":
... ... [{"name":"schemaVersion","type":"string","default":"1.47"},

finding schema version based on schema ID which might be related to use.latest.version=true and auto.register.schemas=false but after I set it it does not help. connector complains that these options are not recognized. Tried value.converter.use.latest.version=true as well, no difference.

How do we force it to use latest version of schema regardless of default version?

Your help is greatly appreciated.

How to add a custom partitioner

We have created a custom partitioner whose behaviour is same as FieldPartitioner but we are passing the timestamp field from which we are deriving current year,month,date and hour and adding it to partition fields. for this we extend the current existing FieldPartitioner and added changes . Now on pointing partitioner.class to our new extended FieldPartitioner we are getting null pointer exception. can someone explain how to add custome partitioner.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.