Getting Started with Debezium
For use with Red Hat build of Debezium 2.7.3
Abstract
Preface Copy linkLink copied to clipboard!
This tutorial demonstrates how to use Debezium to capture updates in a MySQL database. As the data in the database changes, you can see the resulting event streams.
Providing feedback on Red Hat documentation
We appreciate your feedback on our documentation.
To propose improvements, open a Jira issue and describe your suggested changes. Provide as much detail as possible to enable us to address your request quickly.
Prerequisite
-
You have a Red Hat Customer Portal account. This account enables you to log in to the Red Hat Jira Software instance.
If you do not have an account, you will be prompted to create one.
Procedure
- Click the following link: Create issue.
- In the Summary text box, enter a brief description of the issue.
In the Description text box, provide the following information:
- The URL of the page where you found the issue.
-
A detailed description of the issue.
You can leave the information in any other fields at their default values.
- Click Create to submit the Jira issue to the documentation team.
Thank you for taking the time to provide feedback.
Chapter 1. About this tutorial Copy linkLink copied to clipboard!
The tutorial includes the following steps:
- Deploy a MySQL database server with a simple example database to OpenShift.
- Apply a custom resource in Streams for Apache Kafka to automatically build a Kafka Connect container image that includes the Debezium MySQL connector plug-in.
- Create the Debezium MySQL connector resource to capture changes in the database.
- Verify the connector deployment.
- View the change events that the connector emits to a Kafka topic from the database.
Prerequisites
- You are familiar with OpenShift and Streams for Apache Kafka.
- You have access to an OpenShift cluster on which the cluster Operator is installed.
- The Streams for Apache Kafka Operator is running.
- An Apache Kafka cluster is deployed as documented in Deploying and Managing Streams for Apache Kafka on OpenShift.
- You have a Red Hat build of Debezium license.
-
You know how to use OpenShift administration tools. The OpenShift
ocCLI client is installed or you have access to the OpenShift Container Platform web console. Depending on how you intend to store the Kafka Connect build image, you must either have permission to access a container registry, or you must create an ImageStream resource on OpenShift:
- To store the build image in an image registry, such as Red Hat Quay.io or Docker Hub
- An account and permissions to create and manage images in the registry.
- To store the build image as a native OpenShift ImageStream
- An ImageStream resource is deployed to the cluster for storing new container images. You must explicitly create an ImageStream for the cluster. ImageStreams are not available by default.
Additional resources:
Chapter 2. Introduction to Debezium Copy linkLink copied to clipboard!
Debezium is a distributed platform that converts information from your existing databases into event streams, enabling applications to detect, and immediately respond to row-level changes in the databases.
Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors. Each of the connectors works with a specific database management system (DBMS). Connectors record the history of data changes in the DBMS by detecting changes as they occur, and streaming a record of each change event to a Kafka topic. Consuming applications can then read the resulting event records from the Kafka topic.
By taking advantage of Kafka’s reliable streaming platform, Debezium makes it possible for applications to consume changes that occur in a database correctly and completely. Even if your application stops unexpectedly, or loses its connection, it does not miss events that occur during the outage. After the application restarts, it resumes reading from the topic from the point where it left off.
The tutorial that follows shows you how to deploy and use the Debezium MySQL connector with a simple configuration. For more information about deploying and using Debezium connectors, see the connector documentation.
Chapter 3. Starting the services Copy linkLink copied to clipboard!
Using Debezium requires Streams for Apache Kafka with Kafka and Kafka Connect, a database, and the Debezium connector service. To run the services for this tutorial, you must:
3.1. Deploying a MySQL database Copy linkLink copied to clipboard!
Deploy a MySQL database server that includes an example inventory database that includes several tables that are pre-populated with data. The Debezium MySQL connector will capture changes that occur in the sample tables and transmit the change event records to an Apache Kafka topic.
Procedure
Start a MySQL database by running the following command, which starts a MySQL database server configured with an example
inventorydatabase:oc new-app -l app=mysql --name=mysql quay.io/debezium/example-mysql:latest
$ oc new-app -l app=mysql --name=mysql quay.io/debezium/example-mysql:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow Configure credentials for the MySQL database by running the following command, which updates the deployment configuration for the MySQL database to add the user name and password:
oc set env deployment/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
$ oc set env deployment/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpwCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that the MySQL database is running by invoking the following command, which is followed by the output that shows that the MySQL database is running, and that the pod is ready:
oc get pods -l app=mysql
$ oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23sCopy to Clipboard Copied! Toggle word wrap Toggle overflow Open a new terminal and log into the sample
inventorydatabase.This command opens a MySQL command line client in the pod that is running the MySQL database. The client uses the user name and password that you previously configured:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow List the tables in the
inventorydatabase:Copy to Clipboard Copied! Toggle word wrap Toggle overflow Explore the database and view the data that it contains, for example, view the
customerstable:Copy to Clipboard Copied! Toggle word wrap Toggle overflow
3.2. Deploying Kafka Connect Copy linkLink copied to clipboard!
After you deploy the MySQL database, use Streams for Apache Kafka to build a Kafka Connect container image that includes the Debezium MySQL connector plug-in. During the deployment process, you create and use the following custom resources (CRs):
-
A
KafkaConnectCR that defines your Kafka Connect instance and includes information about the MySQL connector artifacts to include in the image. -
A
KafkaConnectorCR that provides details that include information that the MySQL connector uses to access the source database. After Streams for Apache Kafka starts the Kafka Connect pod, you start the connector by applying theKafkaConnectorCR.
During the build process, the Streams for Apache Kafka Operator transforms input parameters in the KafkaConnect custom resource, including Debezium connector definitions, into a Kafka Connect container image. The build downloads the necessary artifacts from the Red Hat Maven repository, and incorporates them into the image. The newly created container is pushed to the container registry that is specified in .spec.build.output, and is used to deploy a Kafka Connect pod.
Container images can be stored in an external container registry, such as quay.io, or in an OpenShift ImageStream. Because ImageStreams are not created automatically, to store container images in an ImageStream, you must create the ImageStream before you deploy Kafka Connect.
After Streams for Apache Kafka builds and stores the Kafka Connect image, use the KafkaConnector custom resource to start the connector.
Prerequisites
- Streams for Apache Kafka is running on an OpenShift cluster.
- The Streams for Apache Kafka Cluster Operator is installed to the OpenShift cluster.
- If you prefer to store the KafkaConnect container image in an OpenShift ImageStream, an ImageStream is available.
- Apache Kafka and Kafka Connect are running on Streams for Apache Kafka.
Procedure
-
Log in to the OpenShift cluster and create or open a project, for example
debezium. Create a Debezium
KafkaConnectcustom resource (CR) for the connector, or modify an existing one.
The following example shows an excerpt from adbz-connect.yamlfile that describes aKafkaConnectcustom resource.
Themetadata.annotationsandspec.buildproperties are required.Example 3.1. A
dbz-connect.yamlfile that defines aKafkaConnectcustom resource that includes a Debezium connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 3.1. Descriptions of Kafka Connect configuration settings Item Description 1
Sets the
strimzi.io/use-connector-resourcesannotation to"true"to enable the Cluster Operator to useKafkaConnectorresources to configure connectors in this Kafka Connect cluster.2
The
spec.buildconfiguration specifies where to store the build image and lists the plug-ins to include in the image, along with the location of the plug-in artifacts.3
The
build.outputspecifies the registry in which the newly built image is stored.4
Specifies the name and image name for the image output. Valid values for
output.typearedockerto push into a container registry like Docker Hub or Quay, orimagestreamto push the image to an internal OpenShift ImageStream. To use an ImageStream, an ImageStream resource must be deployed to the cluster. For more information about specifying thebuild.outputin the KafkaConnect configuration, see the Streams for Apache Kafka Build schema reference documentation.5
The
pluginsconfiguration lists all of the connectors that you want to include in the Kafka Connect image. For each entry in the list, specify a plug-inname, and information for about the artifacts that are required to build the connector. Optionally, for each connector plug-in, you can include other components that you want to be available for use with the connector. For example, you can add Service Registry artifacts, or the Debezium scripting component.6
The value of
artifacts.typespecifies the file type of the artifact specified in theartifacts.url. Valid types arezip,tgz, orjar. Debezium connector archives are provided in.zipfile format. JDBC driver files are in.jarformat. Thetypevalue must match the type of the file that is referenced in theurlfield.7
The value of
artifacts.urlspecifies the address of an HTTP server, such as a Maven repository, that stores the file for the connector artifact. The OpenShift cluster must have access to the specified server.Apply the
KafkaConnectbuild specification to the OpenShift cluster by entering the following command:oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Based on the configuration specified in the custom resource, the Streams for Apache Kafka Operator prepares a Kafka Connect image to deploy.
After the build completes, the Operator pushes the image to the specified registry or ImageStream, and starts the Kafka Connect cluster. The connector artifacts that you listed in the configuration are available in the cluster.Create a
KafkaConnectorresource to define an instance of the MySQL connector.
For example, create the followingKafkaConnectorCR, and save it asdebezium-inventory-connector.yamlExample 3.2. A
mysql-inventory-connector.yamlfile that defines theKafkaConnectorcustom resource for a Debezium connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 3.2. Descriptions of connector configuration settings Item Description 1
The name of the connector to register with the Kafka Connect cluster.
2
The name of the connector class.
3
Only one task should operate at any one time. Use a single connector task to ensure proper order and event handling as the MySQL connector reads the MySQL server’s
binlog. The Kafka Connect service uses connectors to start one or more tasks to complete the work. It automatically distributes the running tasks across the cluster of Kafka Connect services. If services stop or crash, tasks are redistributed to running services.4
The connector’s configuration.
5
The hostname or address of the MySQL database instance.
6
The port number of the database instance.
7
The name of the user account through which Debezium connects to the database.
8
The password that Debezium uses to connect to the database user account.
9
Topic prefix for the MySQL server or cluster. This string prefixes the names of every Kafka topic that the connector sends event records to.
10
The list of tables from which the connector captures change events. The connector detects changes only if they occur in the
inventorytable.11
List of Kafka brokers that the connector uses to write and recover DDL statements to the database schema history topic. This is the same broker that the connector sends change event records to. After a restart, the connector recovers the database schemas that existed at the point in the binlog when the connector resumes reading.
12
Name of the database schema history topic. This topic is for internal use only and should not be used by consumers.
Create the connector resource by running the following command:
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc create -n debezium -f mysql-inventory-connector.yaml
oc create -n debezium -f mysql-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow The connector is registered to the Kafka Connect cluster and starts to run against the database that is specified by
spec.config.database.dbnamein theKafkaConnectorCR. After the connector pod is ready, Debezium is running.
You are now ready to verify that the connector was created and has started to capture changes in the inventory database.
3.3. Example: A simple OpenShift ImageStream object definition Copy linkLink copied to clipboard!
The following example shows a simple ImageStream object definition
Additional resources:
3.4. Verifying the connector deployment Copy linkLink copied to clipboard!
If the connector starts correctly without errors, it creates a topic for each table that the connector is configured to capture. Downstream applications can subscribe to these topics to retrieve information events that occur in the source database.
To verify that the connector is running, you perform the following operations from the OpenShift Container Platform web console, or through the OpenShift CLI tool (oc):
- Verify the connector status.
- Verify that the connector generates topics.
- Verify that topics are populated with events for read operations ("op":"r") that the connector generates during the initial snapshot of each table.
Prerequisites
- A Debezium connector is deployed to Streams for Apache Kafka on OpenShift.
-
The OpenShift
ocCLI client is installed. - You have access to the OpenShift Container Platform web console.
Procedure
Check the status of the
KafkaConnectorresource by using one of the following methods:From the OpenShift Container Platform web console:
- Navigate to Home → Search.
-
On the Search page, click Resources to open the Select Resource box, and then type
KafkaConnector. - From the KafkaConnectors list, click the name of the connector that you want to check, for example inventory-connector.
- In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
From a terminal window:
Enter the following command:
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc describe KafkaConnector inventory-connector -n debezium
oc describe KafkaConnector inventory-connector -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow The command returns status information that is similar to the following output:
Example 3.3.
KafkaConnectorresource statusCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Verify that the connector created Kafka topics:
From the OpenShift Container Platform web console.
- Navigate to Home → Search.
-
On the Search page, click Resources to open the Select Resource box, and then type
KafkaTopic. - From the KafkaTopics list, click the name of the topic that you want to check, for example, dbserver1.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d.
- In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
From a terminal window:
Enter the following command:
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow The command returns status information that is similar to the following output:
Example 3.4.
KafkaTopicresource statusCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Check topic content.
- From a terminal window, enter the following command:
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow The format for specifying the topic name is the same as the
oc describecommand returns in Step 1, for example,dbserver1.inventory.addresses.For each event in the topic, the command returns information that is similar to the following output:
Example 3.5. Content of a Debezium change event
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.7.3.Final-redhat-00001","connector":"mysql","name":"dbserver1","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.7.3.Final-redhat-00001","connector":"mysql","name":"dbserver1","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow In the preceding example, the
payloadvalue shows that the connector snapshot generated a read ("op" ="r") event from the tabledbserver1.products_on_hand. The"before"state of theproduct_idrecord isnull, indicating that no previous value exists for the record. The"after"state shows aquantityof3for the item withproduct_id101.
You are now ready to view change events that the Debezium connector captures from the inventory database.
Chapter 4. Viewing change events Copy linkLink copied to clipboard!
After deploying the Debezium MySQL connector, it starts capturing changes to the inventory database.
When the connector starts, it writes events to a set of Apache Kafka topics, each of which represents one of the tables in the MySQL database. The name of each topic begins with the name of the database server, dbserver1.
The connector writes to the following Kafka topics:
dbserver1- The schema change topic to which DDL statements that apply to the tables for which changes are being captured are written.
dbserver1.inventory.products-
Receives change event records for the
productstable in theinventorydatabase. dbserver1.inventory.products_on_hand-
Receives change event records for the
products_on_handtable in theinventorydatabase. dbserver1.inventory.customers-
Receives change event records for the
customerstable in theinventorydatabase. dbserver1.inventory.orders-
Receives change event records for the
orderstable in theinventorydatabase.
The remainder of this tutorial examines the dbserver1.inventory.customers Kafka topic. As you look more closely at the topic, you’ll see how it represents different types of change events, and find information about the connector captured each event.
The tutorial contains the following sections:
4.1. Viewing a create event Copy linkLink copied to clipboard!
By viewing the dbserver1.inventory.customers topic, you can see how the MySQL connector captured create events in the inventory database. In this case, the create events capture new customers being added to the database.
Procedure
Open a new terminal and use
kafka-console-consumerto consume thedbserver1.inventory.customerstopic from the beginning of the topic.This command runs a simple consumer (
kafka-console-consumer.sh) in the Pod that is running Kafka (my-cluster-kafka-0):oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customersCopy to Clipboard Copied! Toggle word wrap Toggle overflow The consumer returns four messages (in JSON format), one for each row in the
customerstable. Each message contains the event records for the corresponding table row.There are two JSON documents for each event: a key and a value. The key corresponds to the row’s primary key, and the value shows the details of the row (the fields that the row contains, the value of each field, and the type of operation that was performed on the row).
For the last event, review the details of the key.
Here are the details of the key of the last event (formatted for readability):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The event has two parts: a
schemaand apayload. Theschemacontains a Kafka Connect schema describing what is in the payload. In this case, the payload is astructnameddbserver1.inventory.customers.Keythat is not optional and has one required field (idof typeint32).The
payloadhas a singleidfield, with a value of1004.By reviewing the key of the event, you can see that this event applies to the row in the
inventory.customerstable whoseidprimary key column had a value of1004.Review the details of the same event’s value.
The event’s value shows that the row was created, and describes what it contains (in this case, the
id,first_name,last_name, andemailof the inserted row).Here are the details of the value of the last event (formatted for readability):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow This portion of the event is much longer, but like the event’s key, it also has a
schemaand apayload. Theschemacontains a Kafka Connect schema nameddbserver1.inventory.customers.Envelope(version 1) that can contain five fields:op-
A required field that contains a string value describing the type of operation. Values for the MySQL connector are
cfor create (or insert),ufor update,dfor delete, andrfor read (in the case of a snapshot). before-
An optional field that, if present, contains the state of the row before the event occurred. The structure will be described by the
dbserver1.inventory.customers.ValueKafka Connect schema, which thedbserver1connector uses for all rows in theinventory.customerstable. after-
An optional field that, if present, contains the state of the row after the event occurred. The structure is described by the same
dbserver1.inventory.customers.ValueKafka Connect schema used inbefore. source-
A required field that contains a structure describing the source metadata for the event, which in the case of MySQL, contains several fields: the connector name, the name of the
binlogfile where the event was recorded, the position in thatbinlogfile where the event appeared, the row within the event (if there is more than one), the names of the affected database and table, the MySQL thread ID that made the change, whether this event was part of a snapshot, and, if available, the MySQL server ID, and the timestamp in seconds. ts_ms- An optional field that, if present, contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
NoteThe JSON representations of the events are much longer than the rows they describe. This is because, with every event key and value, Kafka Connect ships the schema that describes the payload. Over time, this structure may change. However, having the schemas for the key and the value in the event itself makes it much easier for consuming applications to understand the messages, especially as they evolve over time.
The Debezium MySQL connector constructs these schemas based upon the structure of the database tables. If you use DDL statements to alter the table definitions in the MySQL databases, the connector reads these DDL statements and updates its Kafka Connect schemas. This is the only way that each event is structured exactly like the table from where it originated at the time the event occurred. However, the Kafka topic containing all of the events for a single table might have events that correspond to each state of the table definition.
The JSON converter includes the key and value schemas in every message, so it does produce very verbose events.
Compare the event’s key and value schemas to the state of the
inventorydatabase. In the terminal that is running the MySQL command line client, run the following statement:Copy to Clipboard Copied! Toggle word wrap Toggle overflow This shows that the event records you reviewed match the records in the database.
4.2. Updating the database and viewing the update event Copy linkLink copied to clipboard!
Now that you have seen how the Debezium MySQL connector captured the create events in the inventory database, you will now change one of the records and see how the connector captures it.
By completing this procedure, you will learn how to find details about what changed in a database commit, and how you can compare change events to determine when the change occurred in relation to other changes.
Procedure
In the terminal that is running the MySQL command line client, run the following statement:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0Copy to Clipboard Copied! Toggle word wrap Toggle overflow View the updated
customerstable:Copy to Clipboard Copied! Toggle word wrap Toggle overflow Switch to the terminal running
kafka-console-consumerto see a new fifth event.By changing a record in the
customerstable, the Debezium MySQL connector generated a new event. You should see two new JSON documents: one for the event’s key, and one for the new event’s value.Here are the details of the key for the update event (formatted for readability):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow This key is the same as the key for the previous events.
Here is that new event’s value. There are no changes in the
schemasection, so only thepayloadsection is shown (formatted for readability):Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 4.1. Descriptions of fields in the payload of an update event value Item Description 1
The
beforefield shows the values present in the row before the database commit. The originalfirst_namevalue isAnne.2
The
afterfield shows the state of the row after the change event. Thefirst_namevalue is nowAnne Marie.3
The
sourcefield structure has many of the same values as before, except that thets_secandposfields have changed (thefilemight have changed in other circumstances).4
The
opfield value is nowu, signifying that this row changed because of an update.5
The
ts_ms,ts_us,ts_nsfield shows a timestamp that indicates when Debezium processed this event.By viewing the
payloadsection, you can learn several important things about the update event:-
By comparing the
beforeandafterstructures, you can determine what actually changed in the affected row because of the commit. -
By reviewing the
sourcestructure, you can find information about MySQL’s record of the change (providing traceability). -
By comparing the
payloadsection of an event to other events in the same topic (or a different topic), you can determine whether the event occurred before, after, or as part of the same MySQL commit as another event.
-
By comparing the
4.3. Deleting a record in the database and viewing the delete event Copy linkLink copied to clipboard!
Now that you have seen how the Debezium MySQL connector captured the create and update events in the inventory database, you will now delete one of the records and see how the connector captures it.
By completing this procedure, you will learn how to find details about delete events, and how Kafka uses log compaction to reduce the number of delete events while still enabling consumers to get all of the events.
Procedure
In the terminal that is running the MySQL command line client, run the following statement:
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)Copy to Clipboard Copied! Toggle word wrap Toggle overflow NoteIf the above command fails with a foreign key constraint violation, then you must remove the reference of the customer address from the addresses table using the following statement:
mysql> DELETE FROM addresses WHERE customer_id=1004;
mysql> DELETE FROM addresses WHERE customer_id=1004;Copy to Clipboard Copied! Toggle word wrap Toggle overflow Switch to the terminal running
kafka-console-consumerto see two new events.By deleting a row in the
customerstable, the Debezium MySQL connector generated two new events.Review the key and value for the first new event.
Here are the details of the key for the first new event (formatted for readability):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow This key is the same as the key in the previous two events you looked at.
Here is the value of the first new event (formatted for readability):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 4.2. Descriptions of fields in an event value Item Description 1
The
beforefield now has the state of the row that was deleted with the database commit.2
The
afterfield isnullbecause the row no longer exists.3
The
sourcefield structure has many of the same values as before, except that the values of thets_secandposfields have changed. In some circumstances, thefilevalue might also change.4
he
opfield value is nowd, indicating that the record was deleted.5
The
ts_ms,ts_us, andts_nsfields show timestamps that indicate when Debezium processed the event.Thus, this event provides a consumer with the information that it needs to process the removal of the row. The old values are also provided, because some consumers might require them to properly handle the removal.
Review the key and value for the second new event.
Here is the key for the second new event (formatted for readability):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Once again, this key is exactly the same key as in the previous three events you looked at.
Here is the value of that same event (formatted for readability):
{ "schema": null, "payload": null }{ "schema": null, "payload": null }Copy to Clipboard Copied! Toggle word wrap Toggle overflow If Kafka is set up to be log compacted, it will remove older messages from the topic if there is at least one message later in the topic with same key. This last event is called a tombstone event, because it has a key and an empty value. This means that Kafka will remove all prior messages with the same key. Even though the prior messages will be removed, the tombstone event means that consumers can still read the topic from the beginning and not miss any events.
4.4. Restarting the Kafka Connect service Copy linkLink copied to clipboard!
Now that you have seen how the Debezium MySQL connector captures create, update, and delete events, you will now see how it can capture change events even when it is not running.
The Kafka Connect service automatically manages tasks for its registered connectors. Therefore, if it goes offline, when it restarts, it will start any non-running tasks. This means that even if Debezium is not running, it can still report changes in a database.
In this procedure, you will stop Kafka Connect, change some data in the database, and then restart Kafka Connect to see the change events.
Procedure
Stop the Kafka Connect service.
Open the configuration for the Kafka Connect deployment:
oc edit deployment/my-connect-cluster-connect
$ oc edit deployment/my-connect-cluster-connectCopy to Clipboard Copied! Toggle word wrap Toggle overflow The deployment configuration opens:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
Change the
spec.replicasvalue to0. - Save the configuration.
Verify that the Kafka Connect service has stopped.
This command shows that the Kafka Connect service is completed, and that no pods are running:
oc get pods -l strimzi.io/name=my-connect-cluster-connect
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-1-dxcs9 0/1 Completed 0 7hCopy to Clipboard Copied! Toggle word wrap Toggle overflow
While the Kafka Connect service is down, switch to the terminal running the MySQL client, and add a new record to the database.
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");Copy to Clipboard Copied! Toggle word wrap Toggle overflow Restart the Kafka Connect service.
Open the deployment configuration for the Kafka Connect service.
oc edit deployment/my-connect-cluster-connect
$ oc edit deployment/my-connect-cluster-connectCopy to Clipboard Copied! Toggle word wrap Toggle overflow The deployment configuration opens:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
Change the
spec.replicasvalue to1. - Save the deployment configuration.
Verify that the Kafka Connect service has restarted.
This command shows that the Kafka Connect service is running, and that the pod is ready:
oc get pods -l strimzi.io/name=my-connect-cluster-connect
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-2-q9kkl 1/1 Running 0 74sCopy to Clipboard Copied! Toggle word wrap Toggle overflow
-
Switch to the terminal that is running
kafka-console-consumer.sh. New events pop up as they arrive. Examine the record that you created when Kafka Connect was offline (formatted for readability):
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Chapter 5. Next steps Copy linkLink copied to clipboard!
After completing the tutorial, consider the following next steps:
Explore the tutorial further.
Use the MySQL command line client to add, modify, and remove rows in the database tables, and see the effect on the topics. Keep in mind that you cannot remove a row that is referenced by a foreign key.
Plan a Debezium deployment.
You can install Debezium in OpenShift or on Red Hat Enterprise Linux. For more information, see the following:
Revised on 2024-10-09 02:25:11 UTC