Code Monkey home page Code Monkey logo

kafka-connect-bigquery's Introduction

Kafka Connect BigQuery Connector

Build Status Code Coverage

This is an implementation of a sink connector from Apache Kafka to Google BigQuery, built on top of Apache Kafka Connect. For a comprehensive list of configuration options, see the Connector Configuration Wiki.

History

This connector was originally developed by WePay. In late 2020 the project moved to Confluent, with both companies taking on maintenance duties. All new activity such as filing issues and opening pull requests should now target the Confluent fork of the project.

Download

The latest releases are available in the GitHub release tab, or via Confluent Hub.

Standalone Quickstart

NOTE: You must have the Confluent Platform installed in order to run the example.

Configuration Basics

Firstly, you need to specify configuration settings for your connector. These can be found in the kcbq-connector/quickstart/properties/connector.properties file. Look for this section:

########################################### Fill me in! ###########################################
# The name of the BigQuery project to write to
project=
# The name of the BigQuery dataset to write to (leave the '.*=' at the beginning, enter your
# dataset after it)
datasets=.*=
# The location of a BigQuery service account or user JSON credentials file
# or service account credentials or user credentials in JSON format (non-escaped JSON blob)
keyfile=
# 'FILE' if keyfile is a credentials file, 'JSON' if it's a credentials JSON
keySource=FILE

You'll need to choose a BigQuery project to write to, a dataset from that project to write to, and provide the location of a JSON key file that can be used to access a BigQuery service account that can write to the project/dataset pair. Once you've decided on these properties, fill them in and save the properties file.

Once you get more familiar with the connector, you might want to revisit the connector.properties file and experiment with tweaking its settings.

Migrating to 2.x.x

In accordance with the introduction of schema unionization in version 2.0.0, the following changes to configs have been introduced and should be made when migrating:

  1. autoUpdateSchemas has been removed
  2. allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation have been introduced
  3. allowSchemaUnionization has been introduced

Setting allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation to true while setting allowSchemaUnionization to false results in the same behavior that setting autoUpdateSchemas to true used to.

Building and Extracting a Confluent Hub archive

If you haven't already, move into the repository's top-level directory:

$ cd /path/to/kafka-connect-bigquery/

Begin by creating Confluent Hub archive of the connector with the Confluent Schema Retriever included:

$ mvn clean package -DskipTests

And then extract its contents:

$ mkdir -p bin/jar/ && cp kcbq-connector/target/components/packages/wepay-kafka-connect-bigquery-*/wepay-kafka-connect-bigquery-*/lib/*.jar bin/jar/

Setting-Up Background Processes

Then move into the quickstart directory:

$ cd kcbq-connector/quickstart/

After that, if your Confluent Platform installation isn't in a sibling directory to the connector, specify its location (and do so before starting each of the subsequent processes in their own terminal):

$ export CONFLUENT_DIR=/path/to/confluent

Then, initialize the background processes necessary for Kafka Connect (one terminal per script): (Taken from http://docs.confluent.io/3.0.0/quickstart.html)

$ ./zookeeper.sh

(wait a little while for it to get on its feet)

$ ./kafka.sh

(wait a little while for it to get on its feet)

$ ./schema-registry.sh

(wait a little while for it to get on its feet)

Initializing the Avro Console Producer

Next, initialize the Avro Console Producer (also in its own terminal):

$ ./avro-console-producer.sh

Give it some data to start off with (type directly into the Avro Console Producer instance):

{"f1":"Testing the Kafka-BigQuery Connector!"}

Running the Connector

Finally, initialize the BigQuery connector (also in its own terminal):

$ ./connector.sh

Piping Data Through the Connector

Now you can enter Avro messages of the schema {"f1": "$SOME_STRING"} into the Avro Console Producer instance, and the pipeline instance should write them to BigQuery.

If you want to get more adventurous, you can experiment with different schemas or topics by adjusting flags given to the Avro Console Producer and tweaking the config settings found in the kcbq-connector/quickstart/properties directory.

Integration Testing the Connector

Configuring the tests

You must supply the following environment variables in order to run the tests:

  • $KCBQ_TEST_PROJECT: The name of the BigQuery project to use for the test
  • $KCBQ_TEST_DATASET: The name of the BigQuery dataset to use for the test
  • $KCBQ_TEST_KEYFILE: The key file used to authenticate with BigQuery during the test
  • $KCBQ_TEST_BUCKET: The name of the GCS bucket to use (for testing the GCS batch loading feature)

The $KCBQ_TEST_FOLDER variable can be supplied to specify which subfolder of the GCS bucket should be used when testing the GCS batch loading feature; if not supplied, the top-level folder will be used.

Adding new GCP Credentials & BigQuery DataSet

This section is optional in case one wants to use a different GCP project and generate new creds for that

gcloud iam service-accounts create kcbq-test --description="service account key for bigquery sink integration test" --display-name="kcbq-test"
gcloud iam service-accounts keys create /tmp/creds.json --iam-account=kcbq-test@<GCP_PROJECT_NAME>.iam.gserviceaccount.com
  • Give BigQuery & Storage Admin Permissions to Service Account:
    • Open https://console.cloud.google.com/iam-admin/iam?project=<GCP_PROJECT_NAME>
    • Click on Add and enter New Principal as created above e.g. kcbq-test@<GCP_PROJECT_NAME>.iam.gserviceaccount.com
    • Add following 2 roles from "Select a role" drop down menu:
      • BigQuery -> BigQuery Admin
      • Cloud Storage -> Storage Admin
  • Add a BigQuery DataSet into the Project:

Running the Integration Tests

# (Re)builds the project and runs the integration tests, skipping unit tests to save a bit of time
mvn clean package integration-test -Dskip.unit.tests=true

How Integration Testing Works

Integration tests run by creating embedded instances for Zookeeper, Kafka, Schema Registry, and the BigQuery Connector itself, then verifying the results using a JUnit test.

They use schemas and data that can be found in the kcbq-connector/src/test/resources/integration_test_cases/ directory, and rely on a user-provided JSON key file (like in the quickstart example) to access BigQuery.

Data Corruption Concerns

In order to ensure the validity of each test, any table that will be written to in the course of integration testing is preemptively deleted before the connector is run. This will only be an issue if you have any tables in your dataset whose names begin with kcbq_test_ and match the sanitized name of any of the test_schema subdirectories. If that is the case, you should probably consider writing to a different project/dataset.

Kafka, Schema Registry, Zookeeper, and Kafka Connect are all run as temporary embedded instances, so there is no risk that running integration tests will corrupt any existing data that is already on your machine, and there is also no need to free up any of your ports that might currently be in use by instances of the services that are brought up in the process of testing.

Adding New Integration Tests

Adding an integration test is a little more involved, and consists of two major steps: specifying Avro data to be sent to Kafka, and specifying via JUnit test how to verify that such data made it to BigQuery as expected.

To specify input data, you must create a new directory in the kcbq-connector/src/test/resources/integration_test_cases/ directory with whatever name you want the Kafka topic of your test to be named, and whatever string you want the name of your test's BigQuery table to be derived from. Then, create two files in that directory:

  • schema.json will contain the Avro schema of the type of data the new test will send through the connector.

  • data.json will contain a series of JSON objects, each of which should represent an Avro record that matches the specified schema. Each JSON object must occupy its own line, and each object cannot occupy more than one line (this inconvenience is due to limitations in the Avro Console Producer, and may be addressed in future commits).

To specify data verification, add to the test cases present in the kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/BigQuerySinkConnectorIT.java

NOTE: Because the order of rows is not guaranteed when reading test results from BigQuery, you must include a numeric column named "row" number in all of your test schemas, and every row of test data must have a unique value for its row number. When data is read back from BigQuery to verify its accuracy, it will be returned in ascending order based on that "row" column.

Implementation details of different modes

Upsert/Delete with Legacy InsertAll API

Click here to read the implementation details of upsert/delete mode with Legacy InsertAll API

kafka-connect-bigquery's People

Contributors

amitr17 avatar aniketshrimal avatar apoorvmittal10 avatar b-goyal avatar c0urante avatar confluentjenkins avatar confluentsemaphore avatar cprovencher avatar criccomini avatar cyril-engels avatar danidelvalle avatar frecap avatar gharris1727 avatar gkstechie avatar ivanyu avatar kapilchhajer avatar kenji-h avatar liukrimhrim avatar makearl avatar manasjyotisharma avatar mtagle avatar ncliang avatar nicolasguyomar avatar sagarrao12 avatar skyzyx avatar sp-gupta avatar stoynov96 avatar whynick1 avatar xuan616 avatar ypmahajan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-bigquery's Issues

2.0.0 Release to Confluent Hub

We're working on cutting a 2.0.0 to Confluent Hub, which includes breaking changes from 1.6. In line with this we're also updating the confluent docs for the connector. I'm just opening this issue in case, folks have things to add, questions to ask and so on.

We've documented the following breaking changes:

  1. datasets has been removed, and defaultDataset has been introduced. The connector now infers the dataset from the topic name if the topic is in the form <dataset>:<tableName>; if the topic name is in the form <tablename>, it defaults to defaultDataset.

  2. topicsToTables has been removed. SMTs should be used route topics to tables.

  3. autoUpdateSchemas has been replaced by allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation.

And we've documented the following new configuration added:

  1. upsertEnabled and deleteEnabled
  2. intermediateTableSuffix
  3. mergeIntervalMs and mergeRecordsThreshold
  4. autoCreateBucket

questions
The confluent documentation states that enableBatchLoad is a beta feature. Is this still true?

enabling upsert logic creating column name as struct type.

Hi @C0urante,

I have successfully tested upsert logic on Bigquery sink connector. But Bigquery table created by bigquery sink connector, creating column name which mentioned in kafkaKeyFieldName as columnname.columname.

Example :
kafkaKeyFieldName = log_id

Column name in table look like - log_id.log_id integer.

Is there any way i can get only column name. as (log_id) instead of (log_id.log_id )

Disable schema unionization by default

(Copied from wepay#291 (comment))

The prior behavior of the connector was to basically send a single record's schema to BigQuery and let validation happen there; the only permitted operations were (and still are) adding new columns to a table, and relaxing existing columns from REQUIRED to NULLABLE. This meant that it was possible to relax required fields to nullable, but only if there was a corresponding upstream schema change.

The new behavior of the connector still catches this case, but also automatically relaxes REQUIRED fields in the existing table schema to NULLABLE if they're missing from the most recent upstream schema.

This is risky, since it means that a single misplaced record with a completely disjoint schema from the existing table schema can cause permanent modifications to be made to the BigQuery table schema. Granted, this would require allowNewBQFields and allowBQRequiredFieldRelaxation to both be set to true, but it's not unreasonable for people to want to enable both with the expectation that they would cause the connector to act in the same way as it would have with autoUpdateSchemas.

I think we might still want to add a third config property, allowSchemaUnionization, that toggles the schema unionization behavior. If it's set to false and both allowNewBQFields and allowBQRequiredFieldRelaxation are set to true, then the prior behavior of the autoUpdateSchemas property should be preserved effectively for users who still want that.

Implement KIP-610 in BigQuery connector

Implementing https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors would enable us to see which messages on a topic failed from the BigQuery point of view.

The scenario is that a message might be accepted by the broker, but be deemed unacceptable by BigQuery. This record then causes the connector to fail with an exception which requires human intervention to restart (in the absence of any user-developed monitoring using the REST API).

It would be considerably more user- and developer-friendly to have these messages sent to a DLQ with information about what caused the failure, what time it occurred and which topic/partition/offset were involved.

Add support for BigQuery Storage Write API

BigQuery have release a new feature (currently in preview) called Storage Write API, which improves the streaming inserts API. The features that apply to Kafka Connect would be exactly-once delivery, stream-level transactions, schema update detection amongst other features.

It would be great if Kafka Connect could support this API

Jenkins integration tests cannot be run in parallel

We run the integration tests on Jenkins against a fixed table and dataset; this causes issues when two different builds run the tests at the same time, since they both end up deleting and then writing to the same table.

I think we can fix this by:

  1. Setting a conservative retention policy on the dataset we use for integration testing the connector (I'm thinking 24 hours)
  2. Altering the integration tests for the connector to use an optional $KCBQ_TEST_TABLE_SUFFIX environment variable which, if present, is appended to the end of each table created during integration tests
  3. Modifying either the Jenkinsfile or the jenkins profile in the top-level pom to populate that environment variable and $KCBQ_TEST_FOLDER with the branch name, git commit hash, and/or epoch timestamp for the run, so that we can easily identify which test runs correspond to which tables in the event that there's a failure we'd like to investigate

Tasks fail for records containing values of connect Time data type

TimeConverter for Time data type is currently missing in KafkaLogicalConverters.

This leads to kafka-connect-bigquery connector trying to insert java.util.Date object into INTEGER column which fails and stops the task.

Exception in thread "pool-4-thread-1" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows:
        [row index 0]: invalid: This field is not a record.
        [row index 1]: invalid: This field is not a record.
        at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:131)
        at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
        at java.base/java.lang.Thread.run(Thread.java:832)

bigQueryRetry and bigQueryRetryWait don't seem to take effect

In version 2.1.0, if there is a temporary connection issue with BigQuery (connection reset) the task fails immediately and doesn't retry.

This is the exception reported in the failing task:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write to table
Caused by: Connection reset; See logs for more detail
	at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
	at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:233)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563)
	... 10 more

We have configured our connector with the following parameters:

bigQueryRetry
bigQueryRetryWait
errors.retry.timeout
errors.retry.delay.max.ms

and still the tasks fail on first connection reset.

Is this a known issue or are we missing something?

For v2.0.X, can you provide sample config for streaming insert/update?

Hi,

I tried to run a sample insert/update pipeline using the DataGen connector data and kcbqv2. Sink connector only allows the JSON data, or else throwing Top-level Kafka Connect schema must be of type 'struct' error.

When I use the JSON data, it expects a map object as Error: Only Map objects supported in absence of schema for record conversion to BigQuery format.. What kind of map object I need to provide? Is there any way to use the Avro format for easier mapping? If so, can you provide an example config file? Thank you.

I provide my config files for both DataGen and kcbqv2 connectors. (running on local docker env)

DataGen Config:

{
"name": "users",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"kafka.topic": "users",
"max.interval": "1000",
"schema.keyfield": "userid",
"quickstart": "users",
"key.converter.enhanced.avro.schema.support": "true",
"value.converter.enhanced.avro.schema.support": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://schema-registry:8081"
}
}

Bigquery Sink Config:

{
"name": "kcbqv2",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "users",
"errors.deadletterqueue.topic.name": "users_err",
"project": "-----",
"defaultDataset": "-----",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"keyfile": "-----",
"keySource": "FILE",
"kafkaKeyFieldName": "userid",
"kafkaDataFieldName": "",
"autoCreateTables": "true",
"upsertEnabled": "true",
"deleteEnabled": "false",
"intermediateTableSuffix": "_intermediate",
"schemaRegistryLocation": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://schema-registry:8081"
}
}

Topics to tables mapping config removed in 2.0?

We were trying to upgrade our connectors from version 1.6.x to 2.0, and we noticed the mapping configurations have been removed.

If I am not wrong, the change is here:
image

So if I understand correctly, in order to map a topic to a BigQuery dataset and table we need to name the topic like dataset:tablename?

So we are coupling the topic name that can have some standards to the naming of the BigQuery dataset and table? That kills any room for flexibility and impedes us upgrading.

Is there a chance we can look into this and add this feature back? What has been the reason to remove it?

Write errors do not result in the kafka connect task failing

We've noticed that when a BigQuery schema update fails, the kafka-connector and the task itself do not enter a failed state.

For example we see the following error:

[2020-11-13 10:16:12,507] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: Failed to write rows after BQ schema update within 5 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=some_dataset, tableId=some_table_name}} (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:68)

However the status for this task continues to show it as Running. There is no indication of an issue unless you delve into the logs.

Is this expected?

Null inner values cause NPEs with field sanitization enabled

This is easiest to outline with a test case. Added to FieldNameSanitizerTest, the following test case:

  /**
   * Verifies that null values are acceptable while sanitizing keys.
   */
  @Test
  public void testNullValue() {
    assertEquals(
        Collections.singletonMap("abc", null),
        FieldNameSanitizer.replaceInvalidKeys(Collections.singletonMap("abc", null)));
  }

produces this NullPointerException:

java.lang.NullPointerException
	at java.util.HashMap.merge(HashMap.java:1225)
	at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
	at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
	at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at com.wepay.kafka.connect.bigquery.utils.FieldNameSanitizer.replaceInvalidKeys(FieldNameSanitizer.java:25)
	at com.wepay.kafka.connect.bigquery.utils.FieldNameSanitizerTest.testNullValue(FieldNameSanitizerTest.java:88)

The data used in the test is clearly valid; the NPE comes from some quirks of the Java 8 streams library when attempting to use Collectors::toMap with data that has null map values.

We can address this by adding the above test case (and possibly additional test cases to handle more deeply-nested null values) to the connector, and then replacing the Java 8 streams logic with a vanilla for-loop.

Replace Docker-based integration tests with embedded integration tests

(Migrated from wepay#285)

When we wrote the Docker-based integration tests, there were no options for running an embedded instance of the Connect framework in a unit test. Now, there's the EmbeddedConnectCluster class provided and supported by Kafka that we can leverage for integration tests, and Confluent also provides an option for bringing up an embedded instance of Schema Registry.

Using both of these, it should be possible to recreate the existing tests very closely but with an improved workflow for running and expanding them that doesn't require shell scripts or Docker image building, and also enables more flexibility in the test cases covered. We can build on this to add specific tests for upsert/delete, for example.

Switch to Maven for build

We (Confluent) have CI/CD infrastructure that's geared towards Maven, and we'd like to use it with this connector. Among other things, switching to Maven will allow us to:

  • Automatically test PRs with our Jenkins server
  • Automatically test and publish new releases to Confluent Hub from any feature branch (1.1.x, 2.0.x, etc.)
  • Automatically manage version bumping for the connector on the appropriate branch after a release has been published

New records are processed and written to BigQuery table, but offset is unchanged

Connector Plugin Version:

Issue Description:

  • New records processed after a failure (exception when writing to bigquery table because of invalid data) doesn't update offset.
  • Found this issue when testing GH:68

Test Precondition:

Max.poll.records = 5
Number of topic partitions = 1
Number of connect tasks: 1

Test Steps:

Publish Message 1 = Correct Data. Lag = 0, Current Offset = 1, End Offset = 1 (1 message in BQ table)
Publish Message 2 = Correct Data, Lag = 0, Current Offset = 2, End Offset = 2 (2 messages in BQ table)
Publish Message 3 = Correct Data, Lag = 0, Current Offset = 3, End Offset = 3 (3 messages in BQ table)
Publish Message 4 = Correct Data, Lag = 0, Current Offset = 4, End Offset = 4 (4 messages in BQ table)
Publish Message 5 = Correct Data, Lag = 0, Current Offset = 5, End Offset = 5 (5 messages in BQ table)

Publish Message 6 = Invalid Data, Lag = 1, Current Offset = 5, End Offset = 6 (5 messages in BQ table)
(Invalid Data: Kafka topic accepts it but bigquery cannot insert record)

Connector: Running, Connect Task: Failed

Publish Message 7 = Correct Data, Lag = 2, Current Offset = 5, End Offset = 7 (5 messages in BQ table)
Publish Message 8 = Correct Data, Lag = 3, Current Offset = 5, End Offset = 8 (5 messages in BQ table)
Publish Message 9 = Correct Data, Lag = 4, Current Offset = 5, End Offset = 9 (5 messages in BQ table)
Publish Message 10 = Correct Data, Lag = 5, Current Offset = 5, End Offset = 10 (5 messages in BQ table)
Publish Message 11 = Correct Data, Lag = 6, Current Offset = 5, End Offset = 11 (5 messages in BQ table)
Publish Message 12 = Correct Data, Lag = 7, Current Offset = 5, End Offset = 12 (5 messages in BQ table)
Publish Message 13 = Correct Data, Lag = 8, Current Offset = 5, End Offset = 13 (5 messages in BQ table)

Restart Connect Task:
curl -X POST localhost:8083/connectors/edw-bigquery-sink-connector-test/tasks/0/restart

Lag = 8, Current Offset = 5, End Offset = 13 (8 messages in BQ table)

Connector: Running, Connect Task: Failed

Publish Message 14 = Correct Data, Lag = 9, Current Offset = 5, End Offset = 14 (8 messages in BQ table)
Publish Message 15 = Correct Data, Lag = 10, Current Offset = 5, End Offset = 15 (8 messages in BQ table)
Publish Message 16 = Correct Data, Lag = 11, Current Offset = 5, End Offset = 16 (8 messages in BQ table)

Restart Connect Task:
curl -X POST localhost:8083/connectors/edw-bigquery-sink-connector-test/tasks/0/restart

Lag = 11, Current Offset = 5, End Offset = 16 (11 messages in BQ table)

Expected Result:
Offset should change when records are written to bigquery. Following screen-prints shows the end state of test result.

Offset
Number_Of_Records_BigQuery

Messaging enhancements to promote debugging

I've been working with this connector for some internal clients, and spent considerable time working through the code to determine where our clients' input was breaking the connector.

There are a number of places where the messaging could be improved to help users validate their input. One such place is when throwOnCycle() throws its exception. Our clients schemata have many hundreds elements, and determining where in the schema the cycle occurs helps us to clarify problems for our clients so that they can refactor their schema.

Allow creation of non-partitioned tables

The connector is currently hardcoded to create time-partitioned tables. It'd be nice if we could allow users to opt out of this and create non-partitioned tables with the connector.

One way this could be implemented is to add a special NONE option for the timePartitioningType property which, when specified, will cause the connector to create non-partitioned tables.

Enable upsert with multiple key colum

Hi @C0urante,

Is upsert logic support for multiple key column.
I am getting error message, when i try to give multiple column on kafkaKeyFieldName.

Sample Kafka message:

{"po_nbr":"3772504775","load_id":87226103,"charge_type_code":1,"batch_nbr":16748,"acctg_type_code":5}{"business_type_code":"PPU ","trans_mode_code":3,"db_acctg_map_id":12,"cr_acctg_map_id":27,"db_acctg_bu_id":8,"cr_acctg_bu_id":31,"estimated_cost_amt":"0ยฎ","rate_factor_percent":"\u0000","last_change_ts":1620093248000,"ship_service_code":null,"receiver_id":{"int":504775},"received_ts":{"long":1619913600000},"pnl_vendor_nbr":{"int":476312956},"pnl_vendor_name":{"string":"REFRESCO BEVERAGES US INC "},"last_change_userid":"trsys ","journal_ind":"Y","invc_ex_rsn_code":{"int":121},"fleet_ind":"Y","create_userid":"trsys ","create_ts":1620093248000}

Sample Configuration properties :

connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
autoUpdateSchemas=true
sanitizeTopics=true
autoCreateTables=true
tasks.max=1
topics=acctg_load_po
allowSchemaUnionization=true
upsertEnabled=true
project=***
maxWriteSize=10000
defaultDataset=***
value.converter.schema.registry.url=****
schema.registry.url=****
kafkaKeyFieldName=po_nbr,load_id,charge_type_code,batch_nbr,acctg_type_code
allowNewBigQueryFields=true
keyfile=****
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
value.converter=io.confluent.connect.avro.AvroConverter
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=***
tableWriteWait=500
intermediateTableSuffix=_tmp
bufferSize=100000

Error message:

Caused by: Invalid field name "po_nbr,load_id,charge_type_code,batch_nbr,acctg_type_code". Fields must contain only letters, numbers, and underscores, start with a letter or underscore, and be at most 300 characters long.
at com.wepay.kafka.connect.bigquery.write.row.UpsertDeleteBigQueryWriter.attemptTableCreate(UpsertDeleteBigQueryWriter.java:89)
at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:115)
at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Please do let me know whether upsert logic support for multiple key column.

Schema for kafka-key :

{
"id": 5886,
"subject": "acctg_load_po-key",
"version": 1,
"schema": "{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"po_nbr","type":"string"},{"name":"load_id","type":"int"},{"name":"charge_type_code","type":{"type":"int","connect.type":"int16"}},{"name":"batch_nbr","type":"int"},{"name":"acctg_type_code","type":{"type":"int","connect.type":"int16"}}]}",
"compatibilityLevel": null
}

Add partition expiration

GDPR requires us not to keep Personally Identifiable Information (PII) any longer than necessary. Therefore we need to be able to delete the data ingested from Kafka into BigQuery after a given amount of time. BigQuery offers partition expiration to help us do this. Could we have a new configuration option to be able to set partition expiration on the sink connector's destination table? We would be willing to contribute to add this feature.

Table name and topic name high coupling ?

Hi guys,

As far as I have seen/understood, the only way to set the bigquery table name is via topic name, am I right ?

I've read the documentation, and connector pages, and it seems really strange to me that it's not possible to set the table name on the connector configuration. If so (if didn't miss this part), Is there any reason for it ?

Many thanks

Failed to deserialize data for topic mycluster.tlp_stress.keyvalue to Avro

Hello,
I'm using Debezium for CDC from Cassandra and polling into a kafka topic.
Converter used:
value.converter=org.apache.kafka.connect.json.JsonConverter

When I run the bq-sink, i get the following error:


| [2020-11-23 09:23:26,847] ERROR WorkerSinkTask{id=kcbq-connect1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
connect_1         | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect_1         |     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
connect_1         |     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect_1         |     at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
connect_1         |     at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
connect_1         |     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
connect_1         |     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
connect_1         |     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
connect_1         |     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:178)
connect_1         |     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:228)
connect_1         |     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1         |     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1         |     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1         |     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1         |     at java.lang.Thread.run(Thread.java:748)
connect_1         | Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mycluster.tlp_stress.keyvalue to Avro:

It does mean that eh sink doesn't support the JSON format when reading from Kafka ?
Thanks,

Upgrade bigquery to version 1.119.0

Hi,

I am currently experiencing an issue whereby version 1.79.0 of the google-cloud-bigquery artefact only supports partitioning against the DAY partitioning type, this has been resolved in version 1.119.0 of the artefact.

Is there a way to upgrading the version to 1.119.0 to get the support all of the partitioning types? Are there concerns that I need to be aware of that has prevented the upgrade to later version of this library?

Thanks in advance.

`mvn clean package -DskipTests` failed

I'm trying to follow your doc to build the project: https://github.com/confluentinc/kafka-connect-bigquery#building-and-extracting-a-confluent-hub-archive but got the following error:

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for kafka-connect-bigquery-parent 2.2.0-SNAPSHOT:
[INFO]
[INFO] kafka-connect-bigquery-parent ...................... SUCCESS [  0.078 s]
[INFO] kafka-connect-bigquery-api ......................... SUCCESS [  1.798 s]
[INFO] kafka-connect-bigquery ............................. FAILURE [  0.210 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  2.166 s
[INFO] Finished at: 2021-04-06T11:08:40-07:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project kcbq-connector: Could not resolve dependencies for project com.wepay.kcbq:kcbq-connector:jar:2.2.0-SNAPSHOT: Failed to collect dependencies at io.confluent:kafka-schema-registry:jar:5.5.1: Failed to read artifact descriptor for io.confluent:kafka-schema-registry:jar:5.5.1: Could not transfer artifact io.confluent:kafka-schema-registry:pom:5.5.1 from/to maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: [confluent (http://packages.confluent.io/maven/, default, releases+snapshots)] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <args> -rf :kcbq-connector

It looks like io.confluent:kafka-schema-registry:pom:5.5.1 is not publicly accessible? Could you help? Thanks!

Modify and delete columns in source database

Is dropping and modifying columns allowed using this sink connector (since BigQuery doesn't allow dropping and modifying columns in an existing table). I've tested the edge case of adding columns in my database and that gets reflected in my BigQuery warehouse, however I'm not able to delete and modify columns since the BigQuery connector is throwing an error which says that the provided schema isn't correct (after I drop/modify the columns).

I'm using Confluent platform 5.5.1 and the sink connector that is provided by Counfluent Hub

Support configurable time partitioning types for created tables

Follow-up to discussion on #61.

Since the connector now has support for writing to tables with the HOUR, MONTH, and YEAR time partitioning types (in addition to the DAY type, which it has supported for a few years now), users should be able to control the time partitioning type it uses for tables it creates. This will likely involve a new configuration property such as timePartitioningType with permitted case-insensitive values HOUR, DAY (default to preserve backwards compatibility), MONTH, and `YEAR.

Offsets are not managed correctly when the connector fails to process some of the records in a batch

Connector was processing 500 records in batch. When one of the records threw an exception, connector stopped processing (Connector task status: FAILED). When connector task was restarted, connector resumed processing skipping the failed record. Maybe the connector falsely committed the offset for all records in the batch.

Connector Plugin Version:

  • 2.0.x

Steps to reproduce:

  • Publish records to a topic with some records having invalid data
  • Start the connector to process records in a batch
  • One of the records failed to process and throws an exception because of invalid data (Example: [row index 487]: invalid: Timestamp field value is out of range:1441393020755000000)
  • Restart the connector task
  • Connector starts running

Expected Result:

  • Connector should try to reprocess the failed records after the task restart and no records in the batch should be skipped

Actual Result:

  • Connector skipped the failed record and continued processing.

Error log snippet:

2021-01-14 23:14:22.882 Z ERROR WorkerSinkTask{id=edw-bigquery-sink-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:567)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows:
        [row index 0]: stopped:
        [row index 1]: stopped:
        .
        .        
        [row index 5]: stopped:
        [row index 6]: stopped:
        [row index 7]: invalid: Timestamp field value is out of range:1783986724881000000
        [row index 8]: stopped:
        [row index 9]: stopped:
        .
        .
        [row index 17]: stopped:
        [row index 18]: stopped:
        [row index 19]: invalid: Timestamp field value is out of range:1783986724881000000
        [row index 20]: stopped:
        .
        .
        [row index 24]: stopped:
        [row index 25]: stopped: , com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows:
        [row index 0]: stopped:
        [row index 1]: stopped:
        .
        .
        [row index 9]: stopped:
        [row index 10]: invalid: Timestamp field value is out of range:1783986724881000000
        [row index 11]: invalid: Timestamp field value is out of range:1783986724881000000
        [row index 12]: stopped:
        [row index 13]: stopped:
        .
        .
        [row index 26]: stopped:
        [row index 27]: stopped:
        [row index 28]: stopped: ; See logs for more detail
        at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:107)
        at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:218)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:545)
        ... 10 more
2021-01-14 23:14:22.882 Z ERROR WorkerSinkTask{id=edw-bigquery-sink-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

Update README

The README is out of data in several places and there's room for improvement in others. This has come up so far in #45 and #86, and is likely to continue coming up until addressed.

At the least, we should:

  • Update the installation instructions to refer to Confluent Hub instead of Maven Central
  • Update any sample configuration files to use the properties recognized by the latest release of the connector
  • Verify that the quickstart still works on a fresh machine and, if not, either fix it or remove it
  • Verify that the integration testing instructions are still accurate and remove all excessive information that won't be useful to most users/developers
  • Call out all serious changes made with 2.x.x (including removal of the topicsToTables and datasets configuration properties and the SchemaRegistrySchemaRetriever, and no longer packaging the AvroConverter` with the connector)

As a stretch goal, we could:

  • Modernize and/or optimize the quickstart: bump Confluent Platform versions (3.0.0 is EOL at this point), replace shell scripts with a docker-compose setup and a single entrypoint script, maybe even include an example with the Apache 2.0-licensed datagen connector
  • Add instructions on performing a smooth rolling upgrade to 2.x.x with a running connector, including how to work with the backwards-incompatible configuration property changes and a note on installing the Avro converter separately

Array of strings isn't properly casted as repeated field in big query

version: confluent-hub install wepay/kafka-connect-bigquery:2.1.0

Hi!

There is a problem with casting record with given schema:

{
      "name": "my_array",
      "type": {
        "items": "string",
        "type": "array"
      }
}

During table creation in big query, connector sets this field schema as String
image.

Importing rows ends with error:

[row index 495]: invalid: Array specified for non-repeated field.

I think schema for that field, should be created with "repeated" option in BigQuery.

Upsert is not working on 2.0.0-alpha

Hi @C0urante,

Upsert is not happening on 2.0.0-alpha, Please do let us know whether this is supported in this version.

If so, i am using below config for bigquery sink connector, please let us know what makes wrong on this config properties.

autoUpdateSchemas=true
sanitizeTopics=true
autoCreateTables=true
tasks.max=1
topics=kafkaconnectinformixv6
allowSchemaUnionization=true
upsertEnabled=true
project=****
maxWriteSize=10000
defaultDataset=****
value.converter.schema.registry.url=*****
schema.registry.url=***
kafkaKeyFieldName=rcv_unit_id
allowNewBigQueryFields=true
keyfile=***
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
value.converter=io.confluent.connect.avro.AvroConverter
key.converter=io.confluent.connect.avro.AvroConverter
tableWriteWait=500
key.converter.schema.registry.url=****
intermediateTableSuffix=_tmp
bufferSize=100000

Also in Bigquery table, my kafka key message is always creating column name as "rcv_unit_id.rcv_unit_id", is there any way to change the name from "rcv_unit_id.rcv_unit_id" to " "rcv_unit_id".

Pre-emptive table existence checks cause rate limits to be exceeded

The maybeEnsureExistingTable method is invoked for every single record, and unconditionally fetches metadata for the table from BigQuery. This can cause rate limits to be exceeded for the connector fairly easily and is ultimately an unnecessary check, since if table creation is disabled and the connector tries to write to a table that doesn't exist, it'll still fail. It may provide a slightly better UX to do the check before attempting to write to BigQuery, and it is possible to add some kind of caching layer so that we don't check on tables that we already know exist, but the additional complexity and performance degradation may not be worth a slightly more informative and sooner-thrown error message.

Publish artifacts back to Maven central

#34 made jars and other artifacts be published exclusively to ConfluentHub. It would be a great help, and more standardized, if the build-process could reintroduce publishing artifacts back to maven central. Currently the README in this repo still points people to the maven published artifacts which are basically unchanged since the time this repo was forked from wepay.

batch loading sometimes missing a records

Hi

I setup 2 connector that sink data into bigquery for the same topic, which a producer produce several messages from a files.

this is the connector configuration using stream insert

connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
autoUpdateSchemas=true
sanitizeTopics=true
autoCreateTables=true
tasks.max=1
topics=sometopic
schemaRegistryLocation=http://schema-registry:8081
topicsToTables=sometopic=sometopic_stream
project=some_project
maxWriteSize=10000
datasets=.*=somedataset
keyfile=somekey.json
name=sink-bigquery-stream
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
key.converter=org.apache.kafka.connect.storage.StringConverter
tableWriteWait=1000
bufferSize=100000

this is the connector configuration using batch load
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
gcsBucketName=some_bucket
autoUpdateSchemas=true
sanitizeTopics=true
autoCreateTables=true
tasks.max=4
topics=sometopic
schemaRegistryLocation=http://schema-registry:8081
project=some_project
maxWriteSize=10000
datasets=.*=somedataset
enableBatchLoad=sometopic
keyfile=somekey.json
name=sink-bigquery
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
key.converter=org.apache.kafka.connect.storage.StringConverter
tableWriteWait=1000
bufferSize=100000

I create another sink to bigquery for number of message I produce to kafka for each given filename
as a result 6 out of 336 files have a missing records for batch load and no missing records for streaming insert.
could someone give a hint how to debug this? or maybe something wrong with my connector configuration .
I'm using this version of connector: kafka-connect-bigquery:1.6.6

image

Kafka Connect Avro Converter jars not ship in v2.0.0

Hi there!
When upgrading from 1.6.6 to 2.0.0 using confluent-hub CLI, I faced the issue :
io.confluent.connect.avro.AvroConverter Class not found
I realize that the jars for Avro Converter (avro-1.9.2.jar, kafka-avro-serializer-5.5.0.jar,...) were not anymore ship through kafka connect bigquery plugin.

I solved it by installing the kafka connect plugin kafka-connect-avro-converter:5.5.3.

It sounds definitely cleaner to have it this way but would be nice I guess to update the documentation somewhere? I followed this section .

Side note : I also had to change schemaRetriever to com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever

High CPU Utilization Kafka Connect Bigquery

My Kafka cluster Streaming CDC data into Bigquery has suddenly started giving below errors and the CPU utilization has increased to >80% than normally 20-30%. I am not able to figure out anything from stack trace logs. Any suggestions? thanks in advance!

`
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [2021-02-01 07:36:29,050] ERROR WorkerSinkTask{id=cdc-bq-sink-all-1} Commit of offsets threw an unexpected exception for sequence number 99: null (org.apache.kafka.connect.runtime.WorkerSinkTask:264)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; See logs for more detail
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:97)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:127)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:389)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:213)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at java.lang.Thread.run(Thread.java:748)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [2021-02-01 07:36:29,219] INFO Putting 0 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask:187)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [2021-02-01 07:36:29,221] INFO Putting 2 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask:187)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [2021-02-01 07:36:29,225] INFO Putting 1 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask:187)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [2021-02-01 07:36:29,239] INFO Putting 194 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask:187)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [2021-02-01 07:36:29,493] INFO Putting 500 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask:187)

Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [2021-02-01 07:36:29,770] INFO Putting 500 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask:187)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [2021-02-01 07:36:29,801] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: table insertion failed for the following rows:
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 0]: stopped:
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 1]: stopped:
...
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 14]: stopped:
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 15]: invalid: Missing required field: Msg_0_CLOUD_QUERY_TABLE.qr_filename.
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 16]: stopped:
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 17]: stopped:
...
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 41]: stopped: (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:68)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: Exception in thread "pool-10-thread-101" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows:
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 0]: stopped:
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 1]: stopped:
...
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 14]: stopped:
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 15]: invalid: Missing required field: Msg_0_CLOUD_QUERY_TABLE.qr_filename.
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 16]: stopped:
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 17]: stopped:
...
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: [row index 41]: stopped:
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:131)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:80)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Feb 01 07:36:29 kafka-cluster-2 connect-distributed[17558]: at java.lang.Thread.run(Thread.java:748)
`

java.lang.NoClassDefFoundError: com/google/cloud/bigquery/BigQuery

Hello,
I am trying to run kafka-connect-bigquery and getting the following error:

2020-11-19 21:49:29,361] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed)
connect_1         | java.lang.NoClassDefFoundError: com/google/cloud/bigquery/BigQuery
connect_1         |     at java.lang.Class.getDeclaredConstructors0(Native Method)
connect_1         |     at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
connect_1         |     at java.lang.Class.getConstructor0(Class.java:3075)
connect_1         |     at java.lang.Class.newInstance(Class.java:412)
connect_1         |     at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.versionFor(DelegatingClassLoader.java:380)
connect_1         |     at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:350)
connect_1         |     at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:330)

Using the following versions:

image: confluentinc/cp-kafka:5.3.1
FROM confluentinc/cp-kafka-connect:5.4.3

Any idea please ?
thanks

Could you please add kafka record timestamp from kafka message as well as system time ?

public static Map<String, Object> buildKafkaDataRecord(SinkRecord kafkaConnectRecord) {
HashMap<String, Object> kafkaData = new HashMap<>();
kafkaData.put(KAFKA_DATA_TOPIC_FIELD_NAME, kafkaConnectRecord.topic());
kafkaData.put(KAFKA_DATA_PARTITION_FIELD_NAME, kafkaConnectRecord.kafkaPartition());
kafkaData.put(KAFKA_DATA_OFFSET_FIELD_NAME, kafkaConnectRecord.kafkaOffset());
kafkaData.put(KAFKA_DATA_INSERT_TIME_FIELD_NAME, System.currentTimeMillis() / 1000.0);
return kafkaData;
}

Will upsert work on changing kafka topic property

Please do let me know, upsert logic on bigquery table will work on changing Kafka topic property?

properties:

topics=transportation.acctg_load_po

on changing above topic using SMT REGEX from transportation.acctg_load_po to acctg_load_po, offset is not committing to consumer group_id.

Add PR template

We removed the PR template in #29 since it was added when this repository was a Confluent-only fork of the project. However, a PR template could be helpful for keeping community contributions at a high level of quality.

Some things we might address in a template include:

  • What is the problem this PR tries to address? (Can possibly request a link to a GH issue here)
  • What is a summary of the proposed change?
  • What tests were added to verify the quality of this change and help prevent regression?
  • What existing tests can be used to verify the quality of this change and help prevent regression?

Flaky upsert/delete integration tests

The integration tests in com.wepay.kafka.connect.bigquery.integration.UpsertDeleteBigQuerySinkConnectorIT fail semifrequently on Jenkins and locally. We should look into these as they're the only flaky tests at the moment and, if they're fixed, we can trust our Jenkins builds to validate fixes much more reliably.

Add a changelog

This project is great, but the lack of changelog makes it difficult to know what to expect when upgrading to the latest version.

Optional regular expression to specify topics to microbatch into GCS

At the moment, an explicit list of topics must be provided in the connector config to enable batch loading of topics into GCS. Ideally, I would like to be able to optionally provide a regular expression in place of the explicit list (much like the topics and topics.regex config options).

I'd like some feedback on whether this would be a good addition to the connector and if so, could I contribute the code to add the functionality?

Operation info.

Hi, I want to use kcbq but I have doubt that is it just perform insert data operation or also perform update data operation in big query.

for the new record it performs insert operation and for update operation is it perform update or again perform insert only.

Publish configuration documentation

When we migrated KCBQ over to the Confluent repo, the (out of date) wiki docs on the repo were left behind.

Does Confluent have any plans to publish the KCBQ docs on its website?

Write errors do not result in a fail status for tasks

We've noticed that when a BigQuery schema update fails, the kafka-connector and the task itself do not enter a failed state.

For example we see the following error:

[2020-11-13 10:16:12,507] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: Failed to write rows after BQ schema update within 5 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=some_dataset, tableId=some_table_name}} (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:68)

However the status for this task continues to show it as Running. There is no indication of an issue unless you delve into the logs.

Is this expected?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.