Code Monkey home page Code Monkey logo

debezium's Introduction

Debezium Starter Kit

In this repository you'll find different pre-built examples to start to work with Debezium

Features

The features implemented are the following ones:

  • Simple replication

    Example to implement a simple replication between two sources (MySQL and Postgresql) and Kafka. Besides that, a simple transformation and a transformation chain is also included

  • Transformations

    Example to implement a simple transformation and a transformation chain

  • Replication using Avro and Confluent Schema Registry

    Example of replication between two sources (MySQL and Postgresql) and Kafka using Avro and Confluent Schema Registry

  • Replication using Avro and Apicurio Schema Registry (Red Hat)

    Example of replication between two sources (MySQL and Postgresql) and Kafka using Avro and Apicurio Schema Registry

All of the examples are implemented using Docker Compose and provide these tools:


Set-up the examples

The repository is organized around the folder "examples". In that folder, you'll find a subfolder for each example. Each subfolder contains three files:

  • setup_infra.sh : script used to create the necessary infrastructure (using Docker Compose)
  • destroy_infra.sh: script used to destroy the created infrastructure
  • register_connectors.sh: script used to register the connectors used in the example

For instance, if you want to execute the example related to "Replication using Avro and Confluent Schema Registry", you must follow these steps:

cd examples/avro_confluent-schema-registry
sh setup_infra.sh

Once the infrastructure is up and running, to register the connectors, just execute:

cd examples/avro_confluent-schema-registry
sh register_connectors.sh

Finally, to destroy the infrastructure, execute:

cd examples/avro_confluent-schema-registry
sh destroy_infra.sh



Playing with Debezium

In the following sections we're going different examples using Debezium.

Simple replication

In this example we're going to show how Debezium works, implementing a simple replication from two sources (MySQL and Postgresql) and Kafka. As I said in the setup section, to create the infrastucture, execute:

cd examples/simple
sh setup_infra.sh

Once the containers are up and running, you can register all the connector using the script defined in "examples/simple/register_connectors.sh" and skip the following section or, if you want to see in detail how to work with Debezium and to register connectors, keep reading.

Registering connectors

if we open Debezium UI, we'll see that there is no connectors registered:

debezium_empty

If we also open Kafka UI, we'll see there are no topics related to Debezium. We will only see some topics related to Kafka Connect::

debezium_empty

Now, we're going to register some connectors. To do that, we'll use two different ways to register connectors:

  • By using the Kafka Connect API (Rest)
  • By using Debezium UI

MySQL connector

We'll use the API instead of Debezium UI. Connectors are defined by a JSON file where we indicate some configuration properties. The file associated to our MySQL connector is placed at "examples/simple/connectors/register-mysql.json":

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "mysql",
    "database.include.list": "app1",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "mysql-schema-changes.app1"
  }
}

If you want to get more info about the different properties, check MySQL connector documentation at Debezium.

Once we've defined the configuration file associated to our connector (MySQL), we register it by calling to Kafka Connect Rest API executing this command from "examples/simple" folder:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @connectors/register-mysql.json

If the operation is correct, we'll get a "201 status" response and the location of the connector (http://localhost:8083/connectors/mysql-connector):

HTTP/1.1 201 Created
Date: Mon, 11 Oct 2021 10:14:12 GMT
Location: http://localhost:8083/connectors/mysql-connector
Content-Type: application/json
Content-Length: 479
Server: Jetty(9.4.39.v20210325)

{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","database.server.name":"mysql","database.include.list":"app1","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"mysql-schema-changes.app1","name":"mysql-connector"},"tasks":[],"type":"source"}

We can also check if the connector is registered and it's running by accesing to Debezium UI:

debezium_empty

When the connector starts, it gets an snapshot of the information existing in the tables configured in the JSON file ("database.include.list": "app1") and sends the messages associated to that snapshot to Kafka. If we go to Kafka UI, we'll see new topics:

debezium_empty

The topic containing the snapshot is "mysql.app1.customers". If we check the messages in this topic, we'll get four messages. Each message is generated from each row:

debezium_empty


Making changes in MySQL to generate events

To check how Debezium manages changes we're going to make some changes in MySQL data. To perform these actions we'll use Adminer. To connect to MySQL, we'll need this information:

  • Server: mysql
  • User / Pass: user1 / user1
  • Database: testdb

debezium_empty

Once we've connected to MySQL, we're going to add a new register:

debezium_empty

and then, we'll modify another one adding "(Updated)" to the "first_name" data:

debezium_empty

If everything is Ok (sure it is), Debezium will get those two changes and it'll send them to Kafka. To check it, we access to Kafka UI and open the topic "mysql.app1.customers". We'll see that there are two messages more.

The first one is associated to the insert action. In the payload of the message we can check the type of the operation ("op: 'c'") and the content of the new registry("After"):

debezium_empty

The second one is associated to the update action. If we examine the payload, we'll see that the operation is "update" (op: u) and, in update operations we can get the content before the update action ("Before") and the content after the update action ("After"):

debezium_empty

Now, you can keep making changes to see how Debezium works with the different operations.



Using Debezium UI to register a connector

In this case, instead of using the Kafka Connect Rest API, we're going to use Debezium UI to register a connector to replicate changes from Postgresql to Kafka. The first step is to open Debezium UI and click into "Create a Conector" to start the wizard. We select "PostgreSQL" as connector type and click "Next":

debezium_empty

Now, we have to fill the connector configuration data:

  • Connector name: postgresql-connector
  • Namespace: postgresql
  • Hostname: postgresql
  • Port: 5432
  • User: debezium
  • Password: dbz
  • Database: postgresql

You can also get them from the file "examples/simple/connectors/register-postgresql.json" (if we would want to register the connector calling the Kafka Connect Rest API as we did with the MySQL connector, we'd use this json file).

We have to open "Advanced Properties" panel and choose "pgoutput" as plugin because we are using a "raw" Postgresql in this repository, without libs requested by the other plugins;

debezium_empty

Then, we click "Validate". If everything is OK, the next step is to select which schemas and tables are we going to replicate. We write "app2" for the schema because we want to capture all the tables in that schema. Then, click "Apply" to consolidate the information:

debezium_empty

Then, we have to click "Review and Finish" to go to the final step:

debezium_empty

Click "Finish" to finally create the connector. We should have now two connectors running: the previous one to replicate data from MySQL and this one, to replicate data from Postgresql:

debezium_empty

Besides that, if we go to Kafka UI, we'll find a new topic, "postgresql.app2.customers", containing four messages. As que contiene cuatro mensajes y que, al igual que en el caso de MySQL, son los correspondientes al snapshot inicial de la información que existe en la tabla "customers"

debezium_empty


Making changes in Postgresql

As we did in the section related to MySQL, we're going to use Adminer to make changes in Postgresql to see how Debezium works. We need to introduce the following data to connect to Postgresql:

  • Server: postgresql
  • User / Pass: user2 / user2
  • Database: postgresql

debezium_empty

Then, we must select the schema "app2":

debezium_empty

Now, we're going to add a new register and to modify another one. To insert a new row, click on "Nuevo Registro":

debezium_empty

Fill all the fields and save:

debezium_empty

Then, we modify another one, adding "(Updated)" to "first_name":

debezium_empty

If everything is ok, Debezium will send these changes to Kafka. To check it, go to Kafka UI, open the topic "postgresql.app2.customers" and you'll find two new messages:

debezium_empty

The first one was generated from the insert action ("op:c"). The data of the new register is in the "After" property:

debezium_empty

The second one was generated from the update action ("op: u") and we can find the data before the update action ("Before" property) and after the action ("After" property):

debezium_empty

Transformations

Simple transformation

We're going to register a new connector to make a simple routing transformation, sending changes in Postgresql to the custom topic "customers_from_legacy" instead of "postgresql.app2.customers". To perform this transformation we need to add the following lines to the previous Postgresql connector:

"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "postgresql.app2.customers",
"transforms.Reroute.topic.replacement": "customers_from_legacy"

In these lines we are configuring:

  • "transforms": in this property we specify the list of the transformations we are going to do. In our case, just one: "Reroute"
  • "transforms.Reroute.type": we configure that the type of the "Reroute" transformation is "io.debezium.transforms.ByLogicalTableRouter".
  • "transforms.Reroute.topic.regex": this property is used to set the regular expression to match the current topic where changes are sent. So, all the messages sent to the topics matching the expression will be sent to the custom topic instead of the current one. In our case, "postgresql.app2.customers"
  • "transforms.Reroute.topic.replacement": this property defines the custom topic where the messages will be sent. In our case, "customers_from_legacy"

The complete JSON file is located at "examples/transformations/connectors/register-postgresql-with-topic-routing.json". In order to register, we can execute the script to register all the connectors ("examples/transformations/register_connectors.sh") or we can execute this sentence to register just this one (from "examples/transformations" folder):

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @connectors/register-postgresql-with-message-filtering-and-topic-routing.json

The result should be a 201 http status:

HTTP/1.1 201 Created
Date: Wed, 13 Oct 2021 15:37:39 GMT
Location: http://localhost:8083/connectors/postgresql-connector-with-topic-routing
Content-Type: application/json
Content-Length: 701
Server: Jetty(9.4.39.v20210325)

{"name":"postgresql-connector-with-topic-routing","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgresql","database.port":"5432","database.user":"debezium","database.password":"dbz","database.dbname":"postgresql","database.server.name":"postgresql","schema.include":"app2","plugin.name":"pgoutput","publication.autocreate.mode":"filtered","transforms":"Reroute","transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter","transforms.Reroute.topic.regex":"(.*)customers(.*)","transforms.Reroute.topic.replacement":"customers_from_legacy","name":"postgresql-connector-with-topic-routing"},"tasks":[],"type":"source"}%

We can also check it in the Debezium UI::

debezium_empty

And, if we go to Kafka UI we'll see that the new topic "customers_from_legacy" is created and if we click on it to see its messages, we'll see five messages associated to the initial table snapshot:

debezium_empty

Now, if we make changes in Postgresql, Debezium will send the messages associated to those changes to the topic "customers_from_legacy". As the previous Postgresql connector is also running, message will sent to the topic "postgresql.app2.customers" too.

Transformation chain

Now, we're going to configure a transformation chain:

  1. Get only changes with "id" equal to 2
  2. Send those changes with "id" equal to 2 to the custom topic "customers_id_2"

To configure these transformations we need to include these lines in the first Postgresql connector:

"transforms": "FilterById, RerouteCustomTopic",
"transforms.FilterById.type": "io.debezium.transforms.Filter",
"transforms.FilterById.language": "jsr223.groovy",
"transforms.FilterById.condition": "value.after.id == 2",
"transforms.RerouteCustomTopic.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteCustomTopic.topic.regex": "postgresql.app2.customers",
"transforms.RerouteCustomTopic.topic.replacement": "customers_id_2"

In these lines we are configuring:

  • "transforms": we set two transformations, and we called "FilterById" and "RerouteCustomTopic"
  • "transforms.FilterById.type": we configure the type of the first one: "io.debezium.transforms.Filter".
  • "transforms.FilterById.language": in this property we say that we are going to define the filter condition using Groovy ("jsr223.groovy")
  • "transforms.FilterById.condition": this is the filter condition ("value.after.id == 2") that is take the content value and then, examine the property "id" to the "After" node
  • "transforms.RerouteCustomTopic.type": we configure that the type of the "RerouteCustomTopic" transformation is "io.debezium.transforms.ByLogicalTableRouter".
  • "transforms.RerouteCustomTopic.topic.regex": this property is used to set the regular expression to match the current topic where changes are sent. So, all the messages sent to the topics matching the expression will be sent to the custom topic instead of the current one. In this case, "postgresql.app2.customers"
  • "transforms.RerouteCustomTopic.topic.replacement": this property defines the custom topic where the messages will be sent. In this case, "customers_id_2"

The complete JSON file is located at "examples/transformations/connectors/register-postgresql-with-message-filtering-and-topic-routing.json".

In order to register, we can execute the script to register all the connectors ("examples/transformations/register_connectors.sh") or we can execute this sentence to register just this one (from "examples/transformations" folder):

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @connectors/register-postgresql-with-topic-routing.json

We should receive a 201 status code:

HTTP/1.1 201 Created
Date: Thu, 14 Oct 2021 09:05:43 GMT
Location: http://localhost:8083/connectors/register-postgresql-with-message-filtering-and-topic-routing
Content-Type: application/json
Content-Length: 990
Server: Jetty(9.4.39.v20210325)

{"name":"register-postgresql-with-message-filtering-and-topic-routing","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgresql","database.port":"5432","database.user":"debezium","database.password":"dbz","database.dbname":"postgresql","database.server.name":"postgresql","schema.include":"app2","plugin.name":"pgoutput","publication.autocreate.mode":"filtered","slot.name":"debezium3","transforms":"FilterById, RerouteCustomTopic","transforms.FilterById.type":"io.debezium.transforms.Filter","transforms.FilterById.language":"jsr223.groovy","transforms.FilterById.condition":"value.after.id == 2","transforms.RerouteCustomTopic.type":"io.debezium.transforms.ByLogicalTableRouter","transforms.RerouteCustomTopic.topic.regex":"postgresql.app2.customers","transforms.RerouteCustomTopic.topic.replacement":"customers_id_2","name":"register-postgresql-with-message-filtering-and-topic-routing"},"tasks":[],"type":"source"}%

If we go to Debezium UI we'll find the new connector:

debezium_empty

And if we go to the Kafka UI, we'll see that there is a new topic "customers_id_2" containing only a message because in the snapshot data there is only a customer with id equal two:

debezium_empty

If we modify the customer with this id (2) in Postgresql:

debezium_empty

and other customer with different id:

debezium_empty

we'll see that Debezium only sends one message to the topic "customers_id_2", the one with id equal to two:

debezium_empty

What do I have to keep in mind to configure transformations?

To perform transformations you need to include some libs in the Kafka Connector (for instance Groovy). In this repository, the Kafka Connect image is a custom image including these libs. You can check it in the Dockerfile (infra/docker/connect-smt/Dockerfile)

You can find more detail in the Debezium official doc.


Using Avro

In the previous examples we've used JSON and we've not used an schema registry. If we don't configure any serializer, Debezium captures changes in the database, serializes the content using the default serializer (JSON) and sends the message to Kafka. Working with JSON messages is easier to debug but is less efficient than a binary format like Avro. In some use cases, we can need that efficiency.

Avro messages are more compact avoiding to include the schema in the content of the message because the schema is described in the schema registry. In the following examples, we are going to use Avro working with two different schema registries. In the first example we'll use Confluent Schema Registry and in the second one, we'll use Apicurio.

Debezium and Confluent Schema Registry

The code associated to this example is in the folder "examples/avro_confluent-schema-registry". To create the infrastructure, execute:

cd examples/avro_confluent-schema-registry
sh setup_infra.sh

Basically, we need to specify which converter we're going to use and where the schema registry is:

"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"

Destroying the environment

To destroy the environment execute the script "destroy_infra.sh" located in each example folder. For instance, to destroy the infra associated to the first example, execute:

sh examples/simple/destroy_infra.sh

debezium's People

Contributors

jaruizes avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.