Code Monkey home page Code Monkey logo

kafka-connect-ably's Introduction

Ably Kafka Connector

Ably is a realtime experiences infrastructure platform. Developers use our SDKs and APIs to build, extend and deliver realtime capabilities like Chat, Data Broadcast, Data Synchronization, Multiplayer Collaboration, and Notifications. Each month, Ably powers digital experiences in realtime for more than 350 million devices across 80 countries each. Organizations like HubSpot, Genius Sports, Verizon, Webflow, and Mentimeter depend on Ably’s platform to offload the growing complexity of business-critical realtime infrastructure at a global scale. For more information, see the Ably documentation.

Overview

The Ably Kafka Connector is a sink connector used to publish data from Apache Kafka into Ably and is available on Confluent Hub. It has also been tested with AWS MSK.

The connector will publish data from one or more Kafka topics into one or more Ably channels.

The connector is built on top of Apache Kafka Connect and can be run locally with Docker, installed into an instance of Confluent Platform or attached to an AWS MSK cluster through MSK Connect.

Install

Install the connector using the Confluent Hub Client or manually on Confluent Platform. Alternatively deploy it locally using Docker. If installing in Amazon MSK see the AWS MSK section below.

Confluent Hub installation

To install the connector on a local installation of Confluent using the Confluent Hub Client:

  1. Ensure that the Confluent Hub Client is installed. See the Confluent instructions for steps to complete this.

  2. Run the following command to install the Ably Kafka Connector:

    confluent-hub install ably/kafka-connect-ably:<version>

    Where <version> is the latest version of the connector.

  3. Configure the connector.

Manual installation

To manually install the connector on a local installation of Confluent:

  1. Obtain the .zip of the connector from Confluent Hub or this repository:

    From Confluent Hub:

    Visit the Ably Kafka Connector page on Confluent Hub and click the Download button.

    From this repository:

    1. Clone the repository:

      git clone [email protected]:ably/kafka-connect-ably.git

    2. Build the connector using Maven:

      mvn clean package

    3. A .zip file will be produced in the /target/components/packages/ folder after the process has run.

  2. Extract the .zip into a directory specified in the plugin.path of your connect worker's configuration properties file. See the Confluent instructions for further information on this step.

  3. Configure the connector.

Confluent Cloud Custom Connector

It is possible to use the connector as a plugin on Confluent Cloud as a Custom Connector. These steps assume that you have created a Confluent Cloud account and configured your cluster.

Important

In order to run Ably connector, your Kafka cluster must reside in a supported cloud provider and region.

  1. Obtain the .zip of the connector as per the manual installation guide.
  2. Inside the cluster on your Confluent Cloud account, add a new Connector
  3. Instead of selecting Ably Kafka Connector from the Hub, instead click Add Plugin
  4. Give the plugin a name, and set the class to com.ably.kafka.connect.ChannelSinkConnector
  5. Upload the .zip file you obtained in step 1
  6. In the plugin config, insert the following, replacing the placeholder with your Ably API key:
{
  "connector.class": "com.ably.kafka.connect.ChannelSinkConnector",
  "tasks.max": "3",
  "group.id": "ably-connect-cluster",
  "topics": "<topic1>,<topic2>",
  "client.id": "Ably-Kafka-Connector",
  "channel": "#{topic}",
  "message.name": "#{topic}_message",
  "client.key": "<YOUR_ABLY_API_KEY>",
  "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
  "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
  "value.converter.schemas.enable": "false"
}
  1. When asked for an endpoint, enter rest.ably.io:443:TCP. If you are using Kafka Schema Registry, also add <SCHEMA_REGISTRY_URL>:443:TCP.

AWS MSK

See the getting started instructions and example deployment in the examples section.

Docker

There is a docker-compose.yml file for standalone mode and an alternative docker-compose-distributed.yml for distributed mode included in this repository that can be used to run the connector locally using Docker Compose. The Docker Compose file is based on the Confluent Platform Docker images.

  1. Create and configure a configuration file ensuring you have set at least the basic properties. Note: You must provide connector properties when starting connector in distributed mode. An example cURL command to start the connector in distributed mode is:
 curl -X POST -H "Content-Type: application/json" --data '{"name": "ably-channel-sink",
     "config": {"connector.class":"com.ably.kafka.connect.ChannelSinkConnector", "tasks.max":"3", 
     "group.id":"ably-connect-cluster",
     "topics":"topic1,topic2","client.id":"Ably-Kafka-Connector","channel":"#{topic}","message.name": "#{topic}_message",
     "client.key":"<Put your API key here>" }}' http://localhost:8083/connectors
  1. Start the cluster using:

    docker-compose up -d for standalone mode or

    docker-compose -f docker-compose-distributed.yml up -d for distributed mode.

Note: You can view the logs using docker-compose logs connector

Note 2: You must start your connectors using Connect REST interface when using distributed mode.

  1. Once the containers have started, you can test the connector by subscribing to your Ably channel using SSE in a new terminal window. Replace <channel-name> with the channel set in your configuration file and <ably-api-key> with an API key with the capability to subscribe to the channel.

    curl -s -u "<ably-api-key>" "https://realtime.ably.io/sse?channel=<channel-name>&v=1.1"

    Note: SSE is only used as an example. An Ably SDK can also be used to subscribe to the channel.

  2. Produce a set of test messages in Kafka using the Kafka CLI tool. Replace <kafka-topic-name> with one of the topics set in your configuration file.

    docker-compose exec -T kafka kafka-console-producer --topic <kafka-topic-name> --broker-list kafka:9092 <<EOF
    message 1
    message 2
    message 3
    EOF
    
  3. In the terminal window where you subscribed to the Ably channel, you will receive messages similar to the following:

    id: e026fVvywAz6Il@1623496744539-0
    event: message
    data: {"id":"1543960661:0:0","clientId":"kafka-connect-ably-example","connectionId":"SuJTceISnT","timestamp":1623496744538,"encoding":"base64", "channel":"kafka-connect-ably-example","data":"bWVzc2FnZSAx","name":"sink"}
    
    id: e026fVvywAz6Il@1623496744539-1
    event: message
    data: {"id":"1543960661:0:1","clientId":"kafka-connect-ably-example","connectionId":"SuJTceISnT","timestamp":1623496744538,"encoding":"base64", "channel":"kafka-connect-ably-example","data":"bWVzc2FnZSAy","name":"sink"}
    
    id: e026fVvywAz6Il@1623496744539-2
    event: message
    data: {"id":"1543960661:0:2","clientId":"kafka-connect-ably-example","connectionId":"SuJTceISnT","timestamp":1623496744538,"encoding":"base64", "channel":"kafka-connect-ably-example","data":"bWVzc2FnZSAz","name":"sink"}
    

Publishing messages with a Push Notification

Messages can be delivered to end user devices as Push Notifications by setting a Kafka message header named com.ably.extras.push with a notification payload, for example:

{
  "notification": {
    "title": "Notification title",
    "body": "This is the body of notification"
  },
  "data": {
    "foo": "foo",
    "bar": "bar"
  }
}

Extra Ably configuration is also required to enable push notifications, see the Push Notification documentation.

Publishing messages with schema

The Ably Kafka Connector supports messages which contain schema information by converting them to JSON before publishing them to Ably. To check how to use schema registry and supported converters, see Using Kafka Connect with Schema Registry. For example, if messages on the Kafka topic are serialized using Avro with schemas registered at https://, then set the following configuration so that those messages are converted from Avro to JSON:

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=https://<your-schema-registry-host>

If you're running the Ably Kafka Connector locally using Docker Compose as outlined above, then you can use the kafka-avro-console-producer CLI to test producing Avro serialized messages by running the following:

 docker-compose exec -T schema-registry kafka-avro-console-producer \
   --topic topic1 \
   --broker-list kafka:9092 \
   --property key.schema='{"type":"string"}' \
   --property parse.key=true \
   --property key.separator=":" \
   --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"count","type":"int"}]}' \
   --property schema.registry.url=http://schema-registry:8081 <<EOF
"key1":{"count":1}
EOF

You should receive following Ably message where you subscribed. You will also receive an Avro-formatted key base64 encoded in the extras.

{
	"clientId": "Ably-Kafka-Connector",
	"connectionId": "VSuDXysgaz",
	"data": {
		"count": 1
	},
	"extras": {
    "kafka": {
      "key": "AAAAAKEIa2V5MQ=="
    }
	},
	"id": "-868034334:0:351",
	"name": "topic1_message",
	"timestamp": 1653923422360
}

Note that configuring the schema registry and appropriate key or value converters also enables referencing of record field data within Dynamic Channel Configuration

Breaking API Changes in Version 2.0.0

Please see our Upgrade / Migration Guide for notes on changes you need to make to your configuration to update it with changes introduced by version 2.0.0 of the connector.

Configuration

Configuration is handled differently depending on how the connector is installed:

Installation Configuration
Docker Create a docker-compose-connector.properties file in the /config directory. An example file already exists.
Single connect worker Provide a configuration file as a command line argument.
Distributed connect workers Use the Confluent REST API /connectors endpoint to pass the configuration as JSON.

Configuration properties

The basic properties that must be configured for the connector are:

Property Description Type Default
channel The name of the Ably channel to publish to. See also: Dynamic channel configuration String
client.key An API key from your Ably dashboard to use for authentication. This must have the publish capability for the channel being published to by the connector. String
client.id The Ably client ID to use for the connector. String kafka-connect-ably-example
name A globally unique name for the connector. String ably-channel-sink
topics A comma separated list of Kafka topics to publish from. String
tasks.max The maximum number of tasks to use for the connector. Integer 1
connector.class The name of the class for the connector. This must be a subclass of org.apache.kafka.connect.connector. String io.ably.kakfa.connect.ChannelSinkConnector

The advanced properties that can be configured for the connector are:

Property Description Type Default
message.name Ably message name to publish. Can be a pattern, as per Dynamic Channel Configuration below. String
batchExecutionThreadPoolSize The maximum number of parallel outgoing REST API requests to publish content to Ably Integer 10
batchExecutionMaxBufferSize The maximum number of records to publish in a single batch to Ably. The maximum size of these batches must be less than the maximum batch publish size. Integer 100
batchExecutionMaxBufferSizeMs The maxmium amount of time (in milliseconds) to wait for the batch publishing buffer to fill before publishing anyway Integer 100
onFailedRecordMapping Action to take if a dyanmic channel mapping fails for a record. See Dynamic Channel Configuration for full details. String stop
client.async.http.threadpool.size The size of the asyncHttp threadpool. Integer 64
client.fallback.hosts A list of custom fallback hosts. This will override the default fallback hosts. List
client.http.max.retry.count The maximum number of fallback hosts to use when an HTTP request to the primary host is unreachable or indicates that it is unserviceable. Integer 3
client.http.open.timeout The timeout period for opening an HTTP connection. Integer 4000
client.http.request.timeout The timeout period for any single HTTP request and response. Integer 15000
client.proxy Sets whether the configured proxy options are used. Boolean
client.proxy.host The proxy host to use. Requires client.proxy to be set to true. String
client.proxy.non.proxy.hosts A list of hosts excluded from using the proxy. Requires client.proxy to be set to true. List
client.proxy.username The client proxy username. Requires client.proxy to be set to true. String
client.proxy.password The client proxy password. Requires client.proxy to be set to true. String
client.proxy.port The client proxy port. Requires client.proxy to be set to true. Integer
client.proxy.pref.auth.type The authentication type to use with the client proxy. Must be one of BASIC, DIGEST or X_ABLY_TOKEN. Requires client.proxy to be set to true. String Basic
client.push.full.wait Sets whether Ably should wait for all the effects of push REST requests before responding. Boolean
client.tls Sets whether TLS is used for all connection types. Boolean True
client.loglevel Sets the verbosity of logging. Integer 0
client.environment Custom Ably environment (https://ably.com/docs/platform-customization) String

Buffering Records

The Ably Kafka connector buffers records locally so that larger batches can be sent to Ably in parallel for improved throughput. This behaviour is configurable using these settings:

  • batchExecutionMaxBufferSizeMs - the maximum amount of time to wait (in milliseconds) for record data to accumulate before submitting as many records as have been buffered so far to Ably.
  • batchExecutionMaxBufferSize - the maximum number of records to buffer before submitting to Ably
  • batchExecutionThreadPoolSize - the size of the thread pool used to submit buffered batches to Ably, and therefore the maximum number of concurrent submissions to Ably per sink task.

Some consideration should be given with respect to your workload and requirements:

  • batchExecutionMaxBufferSizeMs is effectively the minimum latency for outgoing records. Increasing this value gives records more time to accumulate so that more can be sent in each batch to Ably for improved efficiency, but if your workload requires that latency between Kafka and the end user is low, you may need to decrease this value.
  • batchExecutionMaxBufferSize can be used to set a maximum on the number of records being sent to Ably. You should consider your typical outgoing Ably message sizes and ensure that this is not set so high that batch submissions to Ably may exceed your account limits.

Message Ordering

If your workload requires that messages are sent to Ably channels in the order that they were published to Kafka Topic partitions, you will need to disable parallel submissions with sink tasks from the Ably connector as follows:

  • Set batchExecutionThreadPoolSize=1 to prevent parallel submissions per task
  • Likely increase max.tasks to be the desired number of separate task instances required to achieve parallelism across all topic partitions

Dynamic Channel Configuration

Ably Channels are very lightweight, and it's idiomatic to use very high numbers of distinct channels to send messages between users. In many use cases, it makes sense to create an Ably channel per user or per session, meaning there could be millions in total. Contrast with a typical Kafka deployment, where you're more likely to be putting records related to all users but of some common type through a single topic.

To enable "fan-out" to high numbers of channels from your Kafka topic, the Ably Kafka Connector supports Dynamic Channel Configuration, whereby you configure a template string to substitute data from incoming Kafka records into the outgoing Ably Channel name. The same functionality is also supported for the Message name field, if required.

To make use of this feature, simply set the channel and/or message.name settings in your Connector properties file to reference data from the incoming Kafka record key or value, or metadata from the Kafka Topic, using the #{...} placeholder. Referencing data within a record key or value is only possible when making use of Kafka Connector Schema Support, though Topic metadata can always be referenced without a schema. References to unstructured key values is also supported, assuming they can be converted to a string.

For example, the following configuration references a field within the record value and the topic name:

channel = user:#{value.userId}
message.name = #{topic.name}

This assumes that a schema registry is available and contains a schema for record values, and that an appropriate converter has been configured for values. If the value schema contains a field, userId, the configuration above will substitute those values into the outgoing channel names as per the template. The Ably Message name will also be set to the Kafka topic name, in this example.

Fields can be nested within other Structs, for example value.someStruct.userId would also be valid if someStruct has STRUCT type and userId can be converted to a string. Given that both message and channel names are ultimately strings, the referenced fields must be reasonably interpretable as a string. The supported conversions are:

  • String
  • Integer (any precision)
  • Boolean
  • Bytes (assumed to be UTF-8 encoded string data)

The table below summarises the substitutions that can be made within a #{} placeholder:

Placeholder Description
#{topic.name} The Kafka Topic name
#{topic.partition} The Kafka Topic partition
#{topic} Alias for topic.name
#{key} The record key. Must be convertible to a string
#{key.nestedField} If key is a struct with a schema, uses nestedField value
#{value.nestedField} If value is a struct with a schema, uses nestedField value

Handling Failed Mappings

Dynamic channel mapping can fail at runtime, if:

  • The template references a field that a record is missing
  • The referenced field cannot be converted to a string

In these situations, it's possible to configure the desired error handling behaviour by setting the onFailedRecordMapping property to one of the following values:

  • stop (default) - Treat failed mappings as fatal and stop the Sink Task completely.
  • skip - Silently ignore records that cannot be mapped to the required template. Use this only if you're sure that the Kafka topic contains irrelevant records you'd like to filter out this way.
  • dlq - Send records with failed mappings along with the error (as a header) to a configured dead-letter queue.

Dead Letter Queues

The Ably Kafka Connector is able to forward records to a dead-letter queue topic using Kafka Connect dead-letter queue support. You can learn more about dead-letter queues here.

As an example, adding the following configuration to your connector properties file will cause all failed records to be sent to a dlq_ably_sink topic with a replication factor of 1 and headers attached giving you full exception details for each record.

errors.tolerance = all
errors.deadletterqueue.topic.name = dlq_ably_sink
errors.deadletterqueue.topic.replication.factor=1
errors.deadletterqueue.context.headers.enable=true

Situations in which the Ably connector will forward records to a dead-letter queue include:

  • Errors submitting to Ably, perhaps due to insufficient permissions to publish to a specific destination channel
  • Serious Ably service outages, after retrying with fallback endpoints
  • Failed dynamic channel mappings, if onFailedRecordMapping = dlq

Contributing

For guidance on how to contribute to this project, see CONTRIBUTING.md.

kafka-connect-ably's People

Contributors

andytwf avatar ikbalkaya avatar jaley avatar kacperkluka avatar lmars avatar m-hulbert avatar matt-esch avatar mclark-ably avatar owenpearson avatar quintinwillison avatar stmoreau avatar subkanthi avatar tomczoink avatar tomkirbygreen avatar ttypic avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-ably's Issues

Offset tracking and committing

We need to keep track of SinkRecord offsets somewhere centrally - perhaps create a new injectable component with an interface a bit like this?

interface OffsetRegistry {
  void update(TopicPartition partition, long offset);
  long latestOffset(TopicPartition partition);
}

The flush() implementation would then make use of this component to commit progress back to Kafka when needed.

Error with retrieving key of SinkRecords

Currently if the key is a byte array and if its not UTF-8 encoded, the key is not retrieved.

    private String getKeyString(SinkRecord record) {
        String keyString = null;
        final Object recordKey = record.key();
        if (recordKey != null) {
            if (recordKey instanceof byte[] && ByteArrayUtils.isUTF8Encoded((byte[]) recordKey)) {
                keyString = new String((byte[]) recordKey, StandardCharsets.UTF_8);
            } else if (recordKey instanceof String) {
                keyString = (String) recordKey;
            }
        }
        return keyString;
    }

┆Issue is synchronized with this Jira Story by Unito

Republish messages that were canceled after connection suspension

Connection suspension will cause queued messages (if any) to fail.

Upon connection resume we need to resend those canceled messages (before any other subsequent messages) to maintain continuity of published messages.

For this, create an intermediary queue and add missing sink records so that they can be resent when connection is re-established. Make sure the order of publishes do not change

Integration Testing

We need an end-to-end test to cover:

  • Submitting messages to Kafka
  • Configuring the Connector to forward messages to channels based on key or other fields
  • Subscribing to relevant target channels in Ably to confirm that the messages are received in the expected order (if using single-threaded Connector, at least)

Write integration tests for data conversions for schema

Currently we use EmbeddedConnectCluster to run our integration tests. This provides a way to run tests without having to provision an external cluster for tests.

However it looks like it is currently not possible to exchange data with schema registry - and it doesn't seem to be possible to produce data with a schema (Avro schema in the case I tried)

While trying to find a way to write unit tests I found myself using classess here https://github.com/confluentinc/schema-registry it is easy to serialize, deserialize and convert messages with Avro using this repo as it contains the AvroConverter itself. It also has MockSchemaRegistryClient that can be used to exchange schema between producers and consumers for unit tests.

I think this particular repo can be further checked to see if there are embedded classes / utilities that provide ability to wire up schemas with producers and connectors and also some utilities that provide a way to send a data with schema to Kafka.

It is also worth to check whether we can use MockSchemaRegistryClient in our current test setup

┆Issue is synchronized with this Jira Uncategorised by Unito

Migrate kafka-connect-ably Java packages from io.ably to com.ably

Discussion from #6 (comment):

{quote}paddybyers: Why is this all in the io.ably package? It should be com.ably

lmars: I assumed there was some historic reason (e.g. we're registered under io.ably in some packaging system), if that's not the case then yes we should change it. Tracked in https://ably.atlassian.net/browse/INT-8

paddybyers: Historically that was true for Maven Central, but nonetheless we're now building stuff in com.ably (eg https://github.com/ably/ably-asset-tracking-android ){quote}

┆Issue is synchronized with this Jira Story by Unito

Provide ability to skip a record when a key is absent and channel is configured with a key

Currently when users provide patterned configuration that includes #{key} and record doesn't include key, the connector throws an exception and stops. Below is the code block which does that:

  if (keyString == null && pattern.contains(KEY_TOKEN)) {
            throw new IllegalArgumentException("Key is null or not a string type but pattern contains #{key}");
        }

We want to tolerate this by skipping a record, based on another configuration value user provides
For this

  • Create an optional configuration named : skipOnKeyAbsence which defaults to false
  • Skip the record and provide an info log if key is not provided and new configuration is set to true

Errors in javadocs

There are lot of errors in javadocs that need to be fixed, disabling doc generation for now.

Error:  Failed to execute goal org.apache.maven.plugins:maven-javadoc-plugin:3.3.0:jar (attach-javadocs) on project kafka-connect-ably: MavenReportException: Error while generating Javadoc: 
Error:  Exit code: 1 - /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/ChannelSinkConnector.java:36: warning: no comment
Error:  public class ChannelSinkConnector extends SinkConnector {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/ChannelSinkTask.java:28: warning: no comment
Error:  public class ChannelSinkTask extends SinkTask {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/ChannelSinkTask.java:39: warning: no comment
Error:      public ChannelSinkTask() {}
Error:             ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/batch/BatchProcessingThread.java:21: warning: no comment
Error:      public BatchProcessingThread(List<SinkRecord> sinkRecords, DefaultAblyBatchClient ablyBatchClient) {
Error:             ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/AblyClient.java:12: warning: no description for @param
Error:       * @param records
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/AblyClient.java:13: warning: no description for @throws
Error:       * @throws ConnectException
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/AblyClient.java:14: warning: no description for @throws
Error:       * @throws AblyException
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/AblyClient.java:9: warning: no comment
Error:  public interface AblyClient {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/AblyClientFactory.java:17: warning: no @throws for io.ably.lib.types.AblyException
Error:      AblyClient create(Map<String, String> settings) throws ChannelSinkConnectorConfig.ConfigException, AblyException;
Error:                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/AblyClientFactory.java:8: warning: no comment
Error:  public interface AblyClientFactory {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/BatchSpec.java:9: warning: no comment
Error:  final public class BatchSpec {
Error:               ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/BatchSpec.java:12: warning: no comment
Error:      public BatchSpec(Set<String> channels, List<Message> messages) {
Error:             ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/BatchSpec.java:16: warning: no comment
Error:      public Set<String> getChannels() {
Error:                         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/BatchSpec.java:19: warning: no comment
Error:      public List<Message> getMessages() {
Error:                           ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java:52: warning: no description for @throws
Error:       * @throws ConnectException
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java:74: warning: no description for @param
Error:       * @param sinkRecords
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java:75: warning: no description for @return
Error:       * @return
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java:120: warning: no description for @param
Error:       * @param batches
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java:121: warning: no description for @throws
Error:       * @throws AblyException
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java:33: warning: no comment
Error:      protected final ChannelSinkMapping channelSinkMapping;
Error:                                         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java:35: warning: no comment
Error:      protected final ChannelSinkConnectorConfig connectorConfig;
Error:                                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java:34: warning: no comment
Error:      protected final MessageSinkMapping messageSinkMapping;
Error:                                         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java:40: warning: no comment
Error:      public DefaultAblyBatchClient(ChannelSinkConnectorConfig connectorConfig, ChannelSinkMapping channelSinkMapping,
Error:             ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyBatchClient.java:99: warning: no comment
Error:      protected boolean shouldSkip(SinkRecord record) {
Error:                        ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/client/DefaultAblyClientFactory.java:19: warning: no comment
Error:  public class DefaultAblyClientFactory implements AblyClientFactory {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelConfig.java:5: warning: no comment
Error:  public interface ChannelConfig {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelConfig.java:6: warning: no comment
Error:      String getName();
Error:             ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelConfig.java:8: warning: no comment
Error:      ChannelOptions getOptions() throws ChannelSinkConnectorConfig.ConfigException;
Error:                     ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:29: warning: no comment
Error:  public class ChannelSinkConnectorConfig extends AbstractConfig {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:224: warning: no comment
Error:      public static class ConfigException extends Exception {
Error:                    ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:197: warning: no comment
Error:      public static final String BATCH_EXECUTION_FLUSH_TIME = "batchExecutionFlushTime";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:199: warning: no comment
Error:      public static final String BATCH_EXECUTION_FLUSH_TIME_DEFAULT = "5000";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:205: warning: no comment
Error:      public static final String BATCH_EXECUTION_MAX_BUFFER_SIZE = "batchExecutionMaxBufferSize";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:207: warning: no comment
Error:      public static final String BATCH_EXECUTION_MAX_BUFFER_SIZE_DEFAULT = "1000";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:183: warning: no comment
Error:      public static final String BATCH_EXECUTION_THREAD_POOL_SIZE = "batchExecutionThreadPoolSize";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:185: warning: no comment
Error:      public static final String BATCH_EXECUTION_THREAD_POOL_SIZE_DEFAULT = "10";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:31: warning: no comment
Error:      public static final String CHANNEL_CONFIG = "channel";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:163: warning: no comment
Error:      public static final String CLIENT_ASYNC_HTTP_THREADPOOL_SIZE = "client.async.http.threadpool.size";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:70: warning: no comment
Error:      public static final String CLIENT_AUTO_CONNECT = "client.auto.connect";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:171: warning: no comment
Error:      public static final String CLIENT_CHANNEL_CIPHER_KEY = "client.channel.cipher.key";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:175: warning: no comment
Error:      public static final String CLIENT_CHANNEL_PARAMS = "client.channel.params";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:155: warning: no comment
Error:      public static final String CLIENT_CHANNEL_RETRY_TIMEOUT = "client.channel.retry.timeout";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:79: warning: no comment
Error:      public static final String CLIENT_ECHO_MESSAGES = "client.echo.messages";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:110: warning: no comment
Error:      public static final String CLIENT_ENVIRONMENT = "client.environment";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:134: warning: no comment
Error:      public static final String CLIENT_FALLBACK_HOSTS = "client.fallback.hosts";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:124: warning: no comment
Error:      public static final String CLIENT_HTTP_MAX_RETRY_COUNT = "client.http.max.retry.count";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:118: warning: no comment
Error:      public static final String CLIENT_HTTP_OPEN_TIMEOUT = "client.http.open.timeout";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:121: warning: no comment
Error:      public static final String CLIENT_HTTP_REQUEST_TIMEOUT = "client.http.request.timeout";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:40: warning: no comment
Error:      public static final String CLIENT_ID = "client.id";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:114: warning: no comment
Error:      public static final String CLIENT_IDEMPOTENT_REST_PUBLISHING = "client.idempotent.rest";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:36: warning: no comment
Error:      public static final String CLIENT_KEY = "client.key";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:46: warning: no comment
Error:      public static final String CLIENT_LOG_LEVEL = "client.loglevel";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:62: warning: no comment
Error:      public static final String CLIENT_PORT = "client.port";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:83: warning: no comment
Error:      public static final String CLIENT_PROXY = "client.proxy";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:86: warning: no comment
Error:      public static final String CLIENT_PROXY_HOST = "client.proxy.host";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:102: warning: no comment
Error:      public static final String CLIENT_PROXY_NON_PROXY_HOSTS = "client.proxy.non.proxy.hosts";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:98: warning: no comment
Error:      public static final String CLIENT_PROXY_PASSWORD = "client.proxy.password";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:90: warning: no comment
Error:      public static final String CLIENT_PROXY_PORT = "client.proxy.port";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:106: warning: no comment
Error:      public static final String CLIENT_PROXY_PREF_AUTH_TYPE = "client.proxy.pref.auth.type";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:94: warning: no comment
Error:      public static final String CLIENT_PROXY_USERNAME = "client.proxy.username";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:167: warning: no comment
Error:      public static final String CLIENT_PUSH_FULL_WAIT = "client.push.full.wait";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:74: warning: no comment
Error:      public static final String CLIENT_QUEUE_MESSAGES = "client.queue.messages";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:58: warning: no comment
Error:      public static final String CLIENT_REALTIME_HOST = "client.realtime.host";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:128: warning: no comment
Error:      public static final String CLIENT_REALTIME_REQUEST_TIMEOUT = "client.realtime.request.timeout";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:54: warning: no comment
Error:      public static final String CLIENT_REST_HOST = "client.rest.host";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:50: warning: no comment
Error:      public static final String CLIENT_TLS = "client.tls";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:66: warning: no comment
Error:      public static final String CLIENT_TLS_PORT = "client.tls.port";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:138: warning: no comment
Error:      public static final String CLIENT_TOKEN_PARAMS = "client.token.params";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:145: warning: no comment
Error:      public static final String CLIENT_TOKEN_PARAMS_CAPABILITY = "client.token.params.capability";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:150: warning: no comment
Error:      public static final String CLIENT_TOKEN_PARAMS_CLIENT_ID = "client.token.params.client.id";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:141: warning: no comment
Error:      public static final String CLIENT_TOKEN_PARAMS_TTL = "client.token.params.ttl";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:158: warning: no comment
Error:      public static final String CLIENT_TRANSPORT_PARAMS = "client.transport.params";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:222: warning: no comment
Error:      public final ClientOptions clientOptions;
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:33: warning: no comment
Error:      public static final String MESSAGE_CONFIG = "message.name";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:187: warning: no comment
Error:      public static final String MESSAGE_PAYLOAD_SIZE_MAX = "messagePayloadSizeMax";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:190: warning: no comment
Error:      public static final int MESSAGE_PAYLOAD_SIZE_MAX_DEFAULT = 64 * 1024;
Error:                              ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:179: warning: no comment
Error:      public static final String SKIP_ON_KEY_ABSENCE = "skipOnKeyAbsence";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:236: warning: no comment
Error:      public ChannelSinkConnectorConfig(Map<?, ?> originals) {
Error:             ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:314: warning: no comment
Error:      public static ConfigDef createConfig() {
Error:                              ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:227: warning: no comment
Error:          public ConfigException(String message) {
Error:                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ChannelSinkConnectorConfig.java:231: warning: no comment
Error:          public ConfigException(String message, Exception cause) {
Error:                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ConfigValueEvaluator.java:8: warning: no comment
Error:  public class ConfigValueEvaluator {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ConfigValueEvaluator.java:13: warning: no comment
Error:      public static class Result {
Error:                    ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ConfigValueEvaluator.java:30: warning: no comment
Error:      public static final String KEY_TOKEN = "#{key}";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ConfigValueEvaluator.java:31: warning: no comment
Error:      public static final String TOPIC_TOKEN = "#{topic}";
Error:                                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ConfigValueEvaluator.java:16: warning: no comment
Error:          public Result(boolean skip, String value) {
Error:                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ConfigValueEvaluator.java:21: warning: no comment
Error:          public String getValue() {
Error:                        ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/ConfigValueEvaluator.java:25: warning: no comment
Error:          public boolean shouldSkip() {
Error:                         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/DefaultChannelConfig.java:17: warning: no comment
Error:  public class DefaultChannelConfig implements ChannelConfig {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/DefaultChannelConfig.java:20: warning: no comment
Error:      public DefaultChannelConfig(final ChannelSinkConnectorConfig sinkConnectorConfig) {
Error:             ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/KafkaRecordErrorReporter.java:5: warning: no comment
Error:  public interface KafkaRecordErrorReporter {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/config/KafkaRecordErrorReporter.java:6: warning: no comment
Error:      void reportError(SinkRecord record, Exception e);
Error:           ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/mapping/ChannelSinkMapping.java:14: warning: no description for @param
Error:       * @param sinkRecord
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/mapping/ChannelSinkMapping.java:15: warning: no description for @return
Error:       * @return
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/mapping/ChannelSinkMapping.java:16: error: exception not thrown: io.ably.lib.types.AblyException
Error:       * @throws AblyException
Error:                 ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/mapping/ChannelSinkMapping.java:16: warning: no description for @throws
Error:       * @throws AblyException
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/mapping/ChannelSinkMapping.java:11: warning: no comment
Error:  public interface ChannelSinkMapping {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/mapping/DefaultChannelSinkMapping.java:9: warning: no comment
Error:  public class DefaultChannelSinkMapping implements ChannelSinkMapping {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/mapping/DefaultChannelSinkMapping.java:13: warning: no comment
Error:      public DefaultChannelSinkMapping(ConfigValueEvaluator configValueEvaluator, ChannelConfig channelConfig) {
Error:             ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/mapping/DefaultMessageSinkMapping.java:20: warning: no comment
Error:  public class DefaultMessageSinkMapping implements MessageSinkMapping {
Error:         ^
Error:  /home/runner/work/kafka-connect-ably/kafka-connect-ably/src/main/java/com/ably/kafka/connect/mapping/DefaultMessageSinkMapping.java:26: warning: no comment
Error:      public DefaultMessageSinkMapping(@Nonnull ChannelSinkConnectorConfig config, @Nonnull ConfigValueEvaluator configValueEvaluator) {
Error:             ^
Error:  
Error:  Command line was: /opt/hostedtoolcache/Java_Adopt_jdk/16.0.2-7/x64/bin/javadoc @options @packages
Error:  
Error:  Refer to the generated Javadoc files in '/home/runner/work/kafka-connect-ably/kafka-connect-ably/target/apidocs' dir.
Error:  -> [Help 1]
Error:  
Error:  To see the full stack trace of the errors, re-run Maven with the -e switch.
Error:  Re-run Maven using the -X switch to enable full debug logging.
Error:  
Error:  For more information about the errors and possible solutions, please read the following articles:
Error:  [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
Error: Process completed with exit code 1.

┆Issue is synchronized with this Jira Task by Unito

DLQ Error Handling

Initially, forward all messages with non-retryable errors to the DLQ. That is, if the Batch API response as an error code starting with a 4.

Later we might want to reduce the volumes of messages forwarded to the DLQ by avoiding certain limits more proactively. For now, we'll get things working by just attempting submissions and forwarding failed messages to the DLQ.

See failure response types here: https://ably.com/docs/rest/batch#failure

NullPointerException when proxy password is not specified

Ably Kafka connector version: 1.0.2

I tried to configure a proxy for the connector without any auth being used so I left the username and password configuration option not specified:

client.proxy = true
client.proxy.port = 3128
client.proxy.host = 10.1.1.1

After connector start I got the following exception:

[Worker-0fcc9b0fb72f3264d] java.lang.NullPointerException
[Worker-0fcc9b0fb72f3264d] at com.ably.kafka.connect.ChannelSinkConnectorConfig.getAblyClientOptions(ChannelSinkConnectorConfig.java:269)
[Worker-0fcc9b0fb72f3264d] at com.ably.kafka.connect.ChannelSinkConnectorConfig.<init>(ChannelSinkConnectorConfig.java:234)
[Worker-0fcc9b0fb72f3264d] at com.ably.kafka.connect.ChannelSinkTask.start(ChannelSinkTask.java:55)
[Worker-0fcc9b0fb72f3264d] at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:308)
[Worker-0fcc9b0fb72f3264d] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
[Worker-0fcc9b0fb72f3264d] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-0fcc9b0fb72f3264d] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-0fcc9b0fb72f3264d] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-0fcc9b0fb72f3264d] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-0fcc9b0fb72f3264d] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-0fcc9b0fb72f3264d] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-0fcc9b0fb72f3264d] at java.base/java.lang.Thread.run(Thread.java:829)

I checked the source code and found the following line:

proxyOpts.password = getPassword(CLIENT_PROXY_PASSWORD).value();

If a password is not specified, getPassword will return Null and the values() will throw the exception.

UPDATE: The same for client.proxy.non.proxy.hosts

┆Issue is synchronized with this Jira Task by Unito

Take complete control of our Maven POM

Since the initial commit in this repository our POM has inherited from a parent POM.

I believe this was in order to bootstrap this project quickly, based on this quick start:
https://github.com/jcustenborder/kafka-connect-archtype

Described to me as producing:

a scaffolded project with default opinions

Looking at the sources involved, in the best case they've not been worked on in a year, and in the worst case it's more like five years (the checkstyle config 🙄, see also #74). On top of this, I'm pretty sure that none of these sources are being actively maintained or otherwise cared for (perhaps further evidenced by the lack of activity on jcustenborder/kafka-connect-archtype#13, being a pull request submitted to fix up the aforementioned 'quick start', but effectively ignored by its creator).

I would much rather get rid of external dependency specification and configuration injection like this and start controlling the full domain ourselves. This might be as simple as replacing our POM with the output of help:effective-pom and thinning / improving from there. But, I'll admit, I'm not a Maven expert.

These foundations are like walking around in concrete boots for developers working on this repository, especially those less familiar with Kafka - or even Maven. We need:

  • take back control of all project dependencies
  • start preferring Maven convention, over configuration
  • only deviate from Maven convention when that is required to make the solution work

┆Issue is synchronized with this Jira Uncategorised by Unito

Publish to Confluent Hub

Publish the kafka-connect-ably ZIP archive to Confluent Hub.

Part of this ticket is figuring out how to do that.

We may need to update the packaging configuration in pom.xml.

Note:

A package with this specification https://docs.confluent.io/current/connect/managing/confluent-hub/component-archive.html  is required to publish the Ably connector to the hub(https://www.confluent.io/hub/ ).

I am attaching a sample for a connector type which is plugin type.

┆Issue is synchronized with this Jira Story by Unito
┆Attachments: sap-sapintegrationsuite-kafka-adapter-latest.zip

Query about functionality

Hello, I'm considering using this for our platform (we're currently moving over our websocket implementation to ably).

I saw at the bottom of the README.md that you're able to use the topic and the key to modify the name of the channel.

However is it possible to use part of the json message to change the channel id? I appreciate that there may be a cost to this in parsing the JSON schemad message itself, but wondering whether it's possible or whether there's any plans to implement something like this.

For instance, if there were a topic called "notifications" which had a set of varying keys, and the data in the message was like {"userId":"x"...}, then construct a channel like: "notifications:x" which then we could use the tokens to make it so that only user x could listen to that channel in the browser.

It's a long shot I know, but thought i'd ask

┆Issue is synchronized with this Jira Uncategorised by Unito

Connector cannot JSonify Date object

Raised by our customer, it looks like when Avro schema with

"type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }

Is provided and a record like

{"timeAt":1671413240210}

is given ably-kafka-connector raises an exception. An excerpt from top of stact trace

by: java.lang.ClassCastException: class java.util.Date cannot be cast to class java.lang.Long (java.util.Date and java.lang.Long are in module java.base of loader 'bootstrap')\n\tat org.apache.kafka.connect.data.Struct.getInt64(Struct.java:130)\n\tat com.ably.kafka.connect.utils.StructToJsonConverter.structJsonMap(StructToJsonConverter.java:71)\n\tat com.ably.kafka.connect.utils.StructToJsonConverter.toJsonString(StructToJsonConverter.java:27)\n\tat

So this issue seems to be a bug with Kafka Connect Data framework as their internal framework cannot properly create a data type given by schema. In fact Date type isn't even defined in Kafka connect framework. Or possibly a compatibility version issue... Need to investigate.

┆Issue is synchronized with this Jira Task by Unito

Update ably-java version

A bug in ably-java can lead to connections suspending as soon as the transport is unavailable (rather than instant retry) if they're in the connected state for > 2 minutes.

This is fixed in this ably-java PR: ably/ably-java#928

Once the above is released, we should update the ably-java version here.

Issue when installing the connector

Describe the problem you have found:

As part of the Ably Kafka Sink Connector verification process, I started working on verifying the connector resources. While building the connector, I am seeing the following issue. Tried different permutations like modifying settings.xml, pom.xml but without any success and eventually found this URL https://dl.bintray.com/palantir/releases/ , its seems they winded up business and now running under a different name JFrog. Could you please suggest any workarounds for the same or share the approach your team following for building the connector.

image

How to reproduce:

Setup:
OS : Windows 10
JDK : Both 1.8 and 11
Apache Maven 3.8.1
Run 'mvn clean package'

┆Issue is synchronized with this Jiraserver Task by Unito

Use OIDC to publish from GitHub workflow runners to AWS S3 for `sdk.ably.com` deployments

Overview

To improve and conform our means of accessing AWS resources across Ably, we have recently started using OpenID Connect to authenticate with Amazon Web Services.

Requirement

This necessitates changing workflows in this repository that use our ably/sdk-upload-action so that they:

  1. Stop using s3AccessKeyId and s3AccessKey action inputs (no longer required)
  2. Add a preceding step to use AWS' aws-actions/configure-aws-credentials to configure AWS credentials in the current runner's environment, using our ABLY_AWS_ACCOUNT_ID_SDK org-level GitHub secret

For an example of how this is done, see ably/ably-flutter@6f7ed4f and ably/ably-flutter@818933c, which resulted in these workflow steps and this permission addition.

Once this work has been landed to the default branch (main) and open pull requests have incorporated that change also, then the SDK_S3_ACCESS_KEY_ID and {{SDK_S3_ACCESS_KEY }} repository-level secrets should be deleted.

Additional Information

This additional detail should not be necessary to complete the work required for this issue, but may help to add additional context if required. Internal links:

Publish to Confluent Hub

Publish the kafka-connect-ably ZIP archive to Confluent Hub.

Part of this ticket is figuring out how to do that.

We may need to update the packaging configuration in pom.xml.

┆Issue is synchronized with this Jira Task by Unito

Consider rewriting the project in Kotlin

Kotlin is a modern language that's fully compatible with Java. It should be possible to rewrite the project in Kotlin and it will be much cleaner. There are automatic tools for converting Java to Kotlin but they have flaws so someone with Kotlin knowledge would have to look at the code and fix it if needed.

However, this might not be a popular choice in this domain, as almost all projects use Java.

Use record headers to add push payload

Currently there is no way for connector users to publish messages as push notifications.

We want to add ability for users to add push payload on record basis. For this use a record header value (using header key: com.ably.extras.push) to pass as extras.push for Ably message.

Refactor (rename and move) the way we get Ably channel name and options

There's some inconsistency in method names in this class. While I recognise some are private, I still can't understand why getAblyChannelName() and getAblyChannelOptions() both have Ably in their name, where getChannel(SinkRecord, AblyRealtime) doesn't. Plus, of course, convertChannelParams(List<String>) doesn't have Ably in its name.

I generally try to go for "less is more" with naming, taking into account context... i.e.:

  • we already know this is Ably because package name is com.ably.kafka.connect, so safe to drop our name from names if possible
  • we already know this relates to channels because the class has Channel in its name, as well as the name of the interface it implements

Originally posted by @QuintinWillison in #40 (comment)

┆Issue is synchronized with this Jira Task by Unito

Add support for String keys

Currently we expect keys to be byte[], when we use keys to

  • set up extras header
  • do interpolation
    We should support string keys too

Dependabot raises vulnerability alert for jackson-mapper dependency

Alert : https://github.com/ably/kafka-connect-ably/security/dependabot/1

XML external entity (XXE) vulnerability in XmlMapper in the Data format extension for Jackson (aka jackson-dataformat-xml) allows attackers to have unspecified impact via unknown vectors.

There doesn't seem to be a fix version for now - but this dependency should either be replaced or removed completely

┆Issue is synchronized with this Jira Task by Unito

Refactor record / message mapping

Currently the message mapping logic happens inside task class.

We want to move message mapping logic to its own interface/class so that we are able to

  • Make it easy to test sink record conversions to Ably messages by separating the logic from task
  • Currently message name, id etc are static. We will most likely change those to make it configurable as we did for channels.

For this TODOs

  • Create an interface that creates a message based on a sink record
  • Create an implementation and move the current logic into that
  • Write some unit tests for implementaion

┆Issue is synchronized with this Jira Task by Unito

Proxy settings are not used in the code

Ably Kafka connector version: 1.0.2

I tried to find the root case of my issues with the Ably Kafka connector and found out that the proxy config is not used in the client options object:

    opts.recover = getString(CLIENT_RECOVER);
    if (getBoolean(CLIENT_PROXY)) {
      ProxyOptions proxyOpts = new ProxyOptions();
      proxyOpts.host = getString(CLIENT_PROXY_HOST);
      proxyOpts.port = getInt(CLIENT_PROXY_PORT);
      proxyOpts.username = getString(CLIENT_PROXY_USERNAME);
      proxyOpts.password = getPassword(CLIENT_PROXY_PASSWORD).value();
      proxyOpts.nonProxyHosts = getList(CLIENT_PROXY_NON_PROXY_HOSTS).toArray(new String[0]);
      proxyOpts.prefAuthType = HttpAuth.Type.valueOf(getString(CLIENT_PROXY_PREF_AUTH_TYPE));
    }
    opts.environment = getString(CLIENT_ENVIRONMENT);

I expect a line like

opts.proxy = proxyOpts;

at the end of the if block above but don't see it, unfortunately.

┆Issue is synchronized with this Jira Uncategorised by Unito

Add configuration validators

While working on this
#52
I realised that we do not have validators for any of our configs. That means the connector configuration won't fail unless we provide a custom validator for each of our config. (and especially required configs)
There is a good post explaining how to do that below
https://www.confluent.io/blog/write-a-kafka-connect-connector-with-configuration-handling/#validators

We should write some validators for different configs and write integration tests to make sure that connector is configured with only valid config values.

┆Issue is synchronized with this Jira Task by Unito

Make message name configurable and interpolable

Message name is currently a constant called "sink".

We need to add a new configuration property for message name that should support pattern based mapping

Todos

  • Add optional configuration property
  • Implement mapping
  • Add some tests
  • Update Readme

┆Issue is synchronized with this Jira Task by Unito

How to set the encoding of Ably messages?

Hello! I use Ably-Kafka Connector, and I want to apply GraphQL Subscriptions schema to my deployment using https://github.com/ably-labs/graphql-ably-pubsub. So I want to do the following: event published to Kafka topic -> message added to Ably channel by Ably-Kafka Connector -> message consumed by Node.js backend using graphql-ably-pubsub library -> message published to frontend using GraphQL Subscriptions.

I've almost set everything up, but I face a problem with serialization. It seems like messages are consumed from Ably in different ways when I use graphql-ably-pubsub to publish to Ably VS when I use Kafka Connector.

Below you can see two messages: the upper one is published via Kafka, and the lower is published by graphql-ably-pubsub. As I can see, the only difference is the "encoding" field. My hypothesis is that graphql-ably-pubsub isn't compatible with the message from Kafka connector because of this field. How can I set it up for Ably Kafka Connector?

Screenshot 2023-03-10 at 16 46 33

Also maybe there is another cause of the problem? I would be glad to hear your opinion.

Add support for data with schema with non-struct top level

We need to consider converting data with schema where schema.type != STRUCT at top level. Currently messages with top level schemas other than STRUCT won't be JSONified. BYTES and STRINGs will be passed as is as a data to Ably message. Other top level types are not supported and connector will throw an exception. While this seems enough good for most cases, we should consider supporting all top level Kafka connect schemas.

I am not sure how likely it is a top level schema could be a primitive type, but I think we should still address it. For complex types we already support BYTES and STRUCT, We just need to address ARRAY and MAP types.

This issue is created from conversation below

If I understand correctly, this will fail if valueSchema != null and valueSchema != STRUCT, i.e. there is a schema set, but it's not a Struct type, such as Array, String, Integer, etc, as we'll still be passing a Connect data type to ably-java, which it won't know how to process?

Have we made a deliberate decision to only support Struct as the top-level schema? If so, we should probably add a more explicit error, else it'll be raised as a bug should somebody try to send an array or similar. I'm not sure it's really necessary to require this? Are not 1, "foo", 3.145, [1, 2, 3] etc all valid JSON values? The docs suggest we at least support arrays.

If others agree, I'm happy with us just raising a ticket to add support for other top-level types later rather than delaying this PR.

Originally posted by jaley in #73 (comment)

┆Issue is synchronized with this Jira Uncategorised by Unito

Issue when installing the connector

Describe the problem you have found:

As part of the Ably Kafka Sink Connector verification process, I started working on verifying the connector resources. While building the connector, I am seeing the following issue. Tried different permutations like modifying settings.xml, pom.xml  but without any success and eventually found this URL https://dl.bintray.com/palantir/releases/ , its seems they winded up business and now running under  a different name JFrog. Could you please suggest any work arounds for the same or share the approach your team following for building the connector.

image

How to reproduce:

  • Setup:
    • OS : Windows 10
    • JDK : Both 1.8 and 11
    • Apache Maven 3.8.1
  • Followed the steps.

Test on Confluent

Run the integ_v3 branch on Confluent, ensure docs are up to date on how to make Ably Connector run on Confluent.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.