Code Monkey home page Code Monkey logo

Comments (8)

james-johnston-thumbtack avatar james-johnston-thumbtack commented on August 16, 2024 1

We also saw a similar situation here several months back. It's extremely rare, but it did happen, and as a workaround a wrapping BigQuery view was unfortunately created to keep only the newest row version, using PostgreSQL LSN to keep only the latest.

This was for sinking a Kafka topic originating from Debezium PostgreSQL connector, and stored in the simple UPSERT topic format (that is, Kafka key == table primary key, Kafka value == new row value, or tombstone if deleting). The Debezium connector snapshots the PG table into Kafka, and then follows the PG write-ahead log to stream subsequent updates. Thus we can safely assume that the most recent Kafka message with a particular key represents the latest row version in the underlying PG table.

The BigQuery sink configuration at the time of the record duplication was as follows:

  • Installed on self-managed Kafka Connect with confluent-hub install --no-prompt wepay/kafka-connect-bigquery:2.4.0
    "connector.class"                      = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    "name"                                 = var.connector_name
    "tasks.max"                            = "1"
    "topics"                               = <exactly one Kafka topic>
    "project"                              = var.gcp_project
    "defaultDataset"                       = var.gcp_dataset
    "schemaRetriever"                      = "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever"
    "keyfile"                              = var.keyfile_location
    "autoCreateTables"                     = "true"
    "sanitizeTopics"                       = "false"
    "allBQFieldsNullable"                  = "true"
    "allowSchemaUnionization"              = "true"
    "allowBigQueryRequiredFieldRelaxation" = "true"
    "allowNewBigQueryFields"               = "true"
    "upsertEnabled"                        = "true"
    "deleteEnabled"                        = "true"
    "kafkaKeyFieldName"                    = "kafkakey"
    "mergeIntervalMs"                      = "60000"
    "mergeRecordsThreshold"                = "-1"
    "bigQueryRetry"                        = "10"
    "topic2TableMap"                       = <mapping for exactly one Kafka topic to one BigQuery table>

This was before exactly-one support in Debezium, so obviously some duplication can be expected there. But I think that is not a problem, because the duplicates would all have the same Kafka key and only the most recent message with the same Kafka key should count. @b-goyal also correctly points out that BigQuery streaming inserts are normally at-least-once. Thus the initial BigQuery inserts could also be subject to even more duplication.

However, the key here is that the connector was configured with both upsertEnabled and deleteEnabled. Thus, any duplicate incoming records should be treated as UPDATEs: while the intermediate temporary table might have duplicates, the final output table generated with the MERGE BigQuery SQL statement should not have any. Here is the relevant MERGE statement from connector version 2.4.0 when we saw this issue: https://github.com/confluentinc/kafka-connect-bigquery/blob/v2.4.0/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java#L216-L242

Notice the key USING block when SELECTing from the temporary intermediate table, which seems (to me) to clearly select only at most one row from the source - the newest row. The remainder of the MERGE statement seems to clearly use it to update any existing row in the target table rather than insert a new one.

        + "USING ("
          + "SELECT * FROM ("
            + "SELECT ARRAY_AGG("
              + "x ORDER BY " + i + " DESC LIMIT 1"
            + ")[OFFSET(0)] src "
            + "FROM " + table(intermediateTable) + " x "
            + "WHERE " + batch + "=" + batchNumber + " "
            + "GROUP BY " + String.join(", ", keyFields)
          + ")"
        + ") "

Thus, even though the intermediate table might have duplicates due to the multiple issues mentioned above, they shouldn't make it into the final target table due to the de-duplication that the MERGE statement does.

Yet unfortunately it did happen:

Screenshot 2023-06-26 at 6 37 56 PM

In this example, transaction_pk is the primary key for the table, and is the kafkakey that is used by the MERGE statement for deduplication. Yet the output target table had two rows for the same primary key, as shown in the above screenshot. In this case, the row was first created with a transaction status of transaction_created. Approximately 3 minutes later, the same row was later updated with a new transaction status of transaction_success. The primary key is the same thus it should have superseded the earlier row when the MERGE statement ran. Yet it did not.

This happened some time ago, so I can't look up the exact logs any more. But the engineer on call did note at the time that the BigQuery sink was being rate-limited around the same time for an extended duration with a bunch of errors like: com.google.cloud.bigquery.BigQueryException: Exceeded rate limits: too many concurrent queries for this project_and_region. For more information, see http://<snip>. If the connector or any tasks got stuck in a failure state, the connector would have been auto-restarted by an in-house watchdog service that restarts them. (This was due to the errors around BQ write serialization not being retried - that fix wasn't yet in v2.4.0). Thus, generally speaking, I believe that whatever edge cases / race conditions might exist with either the BigQuery sink connector, and/or BigQuery itself, would have had a higher risk of appearance under this scenario with a high error rate, potential connector restarts, & ongoing rate limiting.

from kafka-connect-bigquery.

OneCricketeer avatar OneCricketeer commented on August 16, 2024

Are you sure you don't have equivalent (not duplicate) data in the topic? For example, your producer is sending the same record, more than once, but at different offsets? Maybe you can use InsertField transform to include offset/partition information

from kafka-connect-bigquery.

spd123 avatar spd123 commented on August 16, 2024

Yes, Initially, I had the same concern that records might be duplicated from the producer's side, but upon further investigation, I discovered that this is not the case. What I found was that each record is duplicated precisely once and not more than twice. This means that this is being caused by the connector.

from kafka-connect-bigquery.

OneCricketeer avatar OneCricketeer commented on August 16, 2024

Does console consumer show the same? Can you get offset information for each record via connect transforms?

from kafka-connect-bigquery.

spd123 avatar spd123 commented on August 16, 2024

Do you mean in kafka connect logs? How do we get that?

from kafka-connect-bigquery.

OneCricketeer avatar OneCricketeer commented on August 16, 2024
  1. kafka-console-consumer - just read the topic. Are you sure the connector is causing duplication vs just data in the topic?

  2. If you insert the offset field, is it duplicated, or always incremental?

from kafka-connect-bigquery.

spd123 avatar spd123 commented on August 16, 2024

Yes, no duplicates exist in the records produced to the topic. I have verified this by using Kafdrop and writting a separate consumer that logs record_id (the same set of record_ids that was duplicated in bigquery) and offset values. Both methods showed no duplicates. When the producer create records, each record receives a unique timestamp and record_id. However, I have noticed that the duplicated records are exactly identical, including the timestamp. It seems like the same record is being inserted into Bigquery twice

from kafka-connect-bigquery.

b-goyal avatar b-goyal commented on August 16, 2024

@spd123 BigQuery is at-least once connector which means it guarantees to ingest records at-least once in the destination table. If you have verified that the topic contains unique data and there is only one bigquery connector/writer writing into the topic, then duplication can happen if the record ingestion is re-attempted. Re-attempt is made when the same set of records are sent again as those are not committed in first attempt. Could you check if there are warn/error logs which indicate commit failures?

from kafka-connect-bigquery.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.