Chapter 2. Starting the services
Using Debezium requires AMQ Streams with Kafka and Kafka Connect, a database, and the Debezium connector service. To run the services for this tutorial, you must:
2.1. Deploying a MySQL database Copy linkLink copied to clipboard!
Deploy a MySQL database server that includes an example inventory
database that includes several tables that are pre-populated with data. The Debezium MySQL connector will capture changes that occur in the sample tables and transmit the change event records to an Apache Kafka topic.
Procedure
Start a MySQL database by running the following command, which starts a MySQL database server configured with an example
inventory
database:oc new-app --name=mysql quay.io/debezium/example-mysql:latest
$ oc new-app --name=mysql quay.io/debezium/example-mysql:latest
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Configure credentials for the MySQL database by running the following command, which updates the deployment configuration for the MySQL database to add the user name and password:
oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
$ oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that the MySQL database is running by invoking the following command, which is followed by the output that shows that the MySQL database is running, and that the pod is ready:
oc get pods -l app=mysql
$ oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23s
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Open a new terminal and log into the sample
inventory
database.This command opens a MySQL command line client in the pod that is running the MySQL database. The client uses the user name and password that you previously configured:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow List the tables in the
inventory
database:Copy to Clipboard Copied! Toggle word wrap Toggle overflow Explore the database and view the data that it contains, for example, view the
customers
table:Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.2. Deploying Kafka Connect Copy linkLink copied to clipboard!
After you deploy the MySQL database, use AMQ Streams to build a Kafka Connect container image that includes the Debezium MySQL connector plug-in. During the deployment process, you create and use the following custom resources (CRs):
-
A
KafkaConnect
CR that defines your Kafka Connect instance and includes information about the MySQL connector artifacts to include in the image. -
A
KafkaConnector
CR that provides details that include information that the MySQL connector uses to access the source database. After AMQ Streams starts the Kafka Connect pod, you start the connector by applying theKafkaConnector
CR.
During the build process, the AMQ Streams Operator transforms input parameters in the KafkaConnect
custom resource, including Debezium connector definitions, into a Kafka Connect container image. The build downloads the necessary artifacts from the Red Hat Maven repository, and incorporates them into the image. The newly created container is pushed to the container registry that is specified in .spec.build.output
, and is used to deploy a Kafka Connect pod. After AMQ Streams builds the Kafka Connect image, use the KafkaConnector
custom resource to start the connector.
Procedure
-
Log in to the OpenShift cluster and create or open a project, for example
debezium
. Create a Debezium
KafkaConnect
custom resource (CR) for the connector, or modify an existing one. For example, create aKafkaConnect
CR that specifies themetadata.annotations
andspec.build
properties, as shown in the following example. Save the file with a name such asdbz-connect.yaml
.Example 2.1. A
dbz-connect.yaml
file that defines aKafkaConnect
custom resource that includes a Debezium connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 2.1. Descriptions of Kafka Connect configuration settings Item Description 1
Sets the
strimzi.io/use-connector-resources
annotation to"true"
to enable the Cluster Operator to useKafkaConnector
resources to configure connectors in this Kafka Connect cluster.2
The
spec.build
configuration specifies where to store the build image and lists the plug-ins to include in the image, along with the location of the plug-in artifacts.3
The
build.output
specifies the registry in which the newly built image is stored.4
Specifies the name and image name for the image output. Valid values for
output.type
aredocker
to push into a container registry like Docker Hub or Quay, orimagestream
to push the image to an internal OpenShift ImageStream. To use an ImageStream, an ImageStream resource must be deployed to the cluster. For more information about specifying thebuild.output
in the KafkaConnect configuration, see the AMQ Streams Build schema reference documentation.5
The
plugins
configuration lists all of the connectors that you want to include in the Kafka Connect image. For each entry in the list, specify a plug-inname
, and information for about the artifacts that are required to build the connector. Optionally, for each connector plug-in, you can include other components that you want to be available for use with the connector. For example, you can add Service Registry artifacts, or the Debezium scripting component.6
The value of
artifacts.type
specifies the file type of the artifact specified in theartifacts.url
. Valid types arezip
,tgz
, orjar
. Debezium connector archives are provided in.zip
file format. JDBC driver files are in.jar
format. Thetype
value must match the type of the file that is referenced in theurl
field.7
The value of
artifacts.url
specifies the address of an HTTP server, such as a Maven repository, that stores the file for the connector artifact. The OpenShift cluster must have access to the specified server.Apply the
KafkaConnect
build specification to the OpenShift cluster by entering the following command:oc create -f dbz-connect.yaml
oc create -f dbz-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Based on the configuration specified in the custom resource, the Streams Operator prepares a Kafka Connect image to deploy.
After the build completes, the Operator pushes the image to the specified registry or ImageStream, and starts the Kafka Connect cluster. The connector artifacts that you listed in the configuration are available in the cluster.Create a
KafkaConnector
resource to define an instance of the MySQL connector.
For example, create the followingKafkaConnector
CR, and save it asdebezium-inventory-connector.yaml
Example 2.2. A
mysql-inventory-connector.yaml
file that defines theKafkaConnector
custom resource for a Debezium connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 2.2. Descriptions of connector configuration settings Item Description 1
The name of the connector to register with the Kafka Connect cluster.
2
The name of the connector class.
3
Only one task should operate at any one time. Use a single connector task to ensure proper order and event handling as the MySQL connector reads the MySQL server’s
binlog
. The Kafka Connect service uses connectors to start one or more tasks to complete the work. It automatically distributes the running tasks across the cluster of Kafka Connect services. If services stop or crash, tasks are redistributed to running services.4
The connector’s configuration.
5
The database host, which is the name of the container that runs the MySQL server (mysql).
6
The port number of the database instance.
7
The name of the user account through which Debezium connects to the database.
8
The password for the database user account.
9
The name of the database to capture changes from.
10
The logical name of the database instance or cluster. The server name is the logical identifier for the MySQL server or cluster of servers. This name is used as the prefix for all Kafka topics.
11
The list of tables from which the connector captures change events. The connector detects changes in the inventory database only.
12
Specifies the Kafka broker and topic that the connector uses to store the history of the database schemas (the same broker to which you are sending events). After a restart, the connector recovers the database schemas that existed at the point in the binlog when the connector resumes reading.
Create the connector resource by running the following command:
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc create -n debezium -f mysql-inventory-connector.yaml
oc create -n debezium -f mysql-inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The connector is registered to the Kafka Connect cluster and starts to run against the database that is specified by
spec.config.database.dbname
in theKafkaConnector
CR. After the connector pod is ready, Debezium is running.
You are now ready to verify that the connector was created and has started to capture changes in the inventory
database.
2.3. Verifying the connector deployment Copy linkLink copied to clipboard!
If the connector starts correctly without errors, it creates a topic for each table that the connector is configured to capture. Downstream applications can subscribe to these topics to retrieve information events that occur in the source database.
To verify that the connector is running, you perform the following operations from the OpenShift Container Platform web console, or through the OpenShift CLI tool (oc):
- Verify the connector status.
- Verify that the connector generates topics.
- Verify that topics are populated with events for read operations ("op":"r") that the connector generates during the initial snapshot of each table.
Prerequisites
- A Debezium connector is deployed to AMQ Streams on OpenShift.
-
The OpenShift
oc
CLI client is installed. - You have access to the OpenShift Container Platform web console.
Procedure
Check the status of the
KafkaConnector
resource by using one of the following methods:From the OpenShift Container Platform web console:
-
Navigate to Home
Search. -
On the Search page, click Resources to open the Select Resource box, and then type
KafkaConnector
. - From the KafkaConnectors list, click the name of the connector that you want to check, for example inventory-connector.
- In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
-
Navigate to Home
From a terminal window:
Enter the following command:
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc describe KafkaConnector inventory-connector -n debezium
oc describe KafkaConnector inventory-connector -n debezium
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The command returns status information that is similar to the following output:
Example 2.3.
KafkaConnector
resource statusCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Verify that the connector created Kafka topics:
From the OpenShift Container Platform web console.
-
Navigate to Home
Search. -
On the Search page, click Resources to open the Select Resource box, and then type
KafkaTopic
. - From the KafkaTopics list, click the name of the topic that you want to check, for example, inventory-connector.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d.
- In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
-
Navigate to Home
From a terminal window:
Enter the following command:
oc get kafkatopics
oc get kafkatopics
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The command returns status information that is similar to the following output:
Example 2.4.
KafkaTopic
resource statusCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Check topic content.
- From a terminal window, enter the following command:
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector.inventory.products_on_hand
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector.inventory.products_on_hand
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The format for specifying the topic name is the same as the
oc describe
command returns in Step 1, for example,inventory_connector.inventory.addresses
.For each event in the topic, the command returns information that is similar to the following output:
Example 2.5. Content of a Debezium change event
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.7.2.Final-redhat-00001","connector":"mysql","name":"inventory_connector","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.7.2.Final-redhat-00001","connector":"mysql","name":"inventory_connector","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
Copy to Clipboard Copied! Toggle word wrap Toggle overflow In the preceding example, the
payload
value shows that the connector snapshot generated a read ("op" ="r"
) event from the tableinventory.products_on_hand
. The"before"
state of theproduct_id
record isnull
, indicating that no previous value exists for the record. The"after"
state shows aquantity
of3
for the item withproduct_id
101
.
You are now ready to view change events that the Debezium connector captures from the inventory
database.