Este contenido no está disponible en el idioma seleccionado.
Chapter 2. Starting the services
Using Debezium requires AMQ Streams with Kafka and Kafka Connect, a database, and the Debezium connector service. To run the services for this tutorial, you must:
2.1. Deploying a MySQL database Copiar enlaceEnlace copiado en el portapapeles!
Deploy a MySQL database server that includes an example inventory database that includes several tables that are pre-populated with data. The Debezium MySQL connector will capture changes that occur in the sample tables and transmit the change event records to an Apache Kafka topic.
Procedure
Start a MySQL database by running the following command, which starts a MySQL database server configured with an example
inventorydatabase:oc new-app --name=mysql quay.io/debezium/example-mysql:latest
$ oc new-app --name=mysql quay.io/debezium/example-mysql:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow Configure credentials for the MySQL database by running the following command, which updates the deployment configuration for the MySQL database to add the user name and password:
oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
$ oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpwCopy to Clipboard Copied! Toggle word wrap Toggle overflow Verify that the MySQL database is running by invoking the following command, which is followed by the output that shows that the MySQL database is running, and that the pod is ready:
oc get pods -l app=mysql
$ oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23sCopy to Clipboard Copied! Toggle word wrap Toggle overflow Open a new terminal and log into the sample
inventorydatabase.This command opens a MySQL command line client in the pod that is running the MySQL database. The client uses the user name and password that you previously configured:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow List the tables in the
inventorydatabase:Copy to Clipboard Copied! Toggle word wrap Toggle overflow Explore the database and view the data that it contains, for example, view the
customerstable:Copy to Clipboard Copied! Toggle word wrap Toggle overflow
2.2. Deploying Kafka Connect Copiar enlaceEnlace copiado en el portapapeles!
After you deploy the MySQL database, use AMQ Streams to build a Kafka Connect container image that includes the Debezium MySQL connector plug-in. During the deployment process, you create and use the following custom resources (CRs):
-
A
KafkaConnectCR that defines your Kafka Connect instance and includes information about the MySQL connector artifacts to include in the image. -
A
KafkaConnectorCR that provides details that include information that the MySQL connector uses to access the source database. After AMQ Streams starts the Kafka Connect pod, you start the connector by applying theKafkaConnectorCR.
During the build process, the AMQ Streams Operator transforms input parameters in the KafkaConnect custom resource, including Debezium connector definitions, into a Kafka Connect container image. The build downloads the necessary artifacts from the Red Hat Maven repository, and incorporates them into the image. The newly created container is pushed to the container registry that is specified in .spec.build.output, and is used to deploy a Kafka Connect pod. After AMQ Streams builds the Kafka Connect image, use the KafkaConnector custom resource to start the connector.
Procedure
-
Log in to the OpenShift cluster and create or open a project, for example
debezium. Create a Debezium
KafkaConnectcustom resource (CR) for the connector, or modify an existing one. For example, create aKafkaConnectCR that specifies themetadata.annotationsandspec.buildproperties, as shown in the following example. Save the file with a name such asdbz-connect.yaml.Example 2.1. A
dbz-connect.yamlfile that defines aKafkaConnectcustom resource that includes a Debezium connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 2.1. Descriptions of Kafka Connect configuration settings Item Description 1
Sets the
strimzi.io/use-connector-resourcesannotation to"true"to enable the Cluster Operator to useKafkaConnectorresources to configure connectors in this Kafka Connect cluster.2
The
spec.buildconfiguration specifies where to store the build image and lists the plug-ins to include in the image, along with the location of the plug-in artifacts.3
The
build.outputspecifies the registry in which the newly built image is stored.4
Specifies the name and image name for the image output. Valid values for
output.typearedockerto push into a container registry like Docker Hub or Quay, orimagestreamto push the image to an internal OpenShift ImageStream. To use an ImageStream, an ImageStream resource must be deployed to the cluster. For more information about specifying thebuild.outputin the KafkaConnect configuration, see the AMQ Streams Build schema reference documentation.5
The
pluginsconfiguration lists all of the connectors that you want to include in the Kafka Connect image. For each entry in the list, specify a plug-inname, and information for about the artifacts that are required to build the connector. Optionally, for each connector plug-in, you can include other components that you want to be available for use with the connector. For example, you can add Service Registry artifacts, or the Debezium scripting component.6
The value of
artifacts.typespecifies the file type of the artifact specified in theartifacts.url. Valid types arezip,tgz, orjar. Debezium connector archives are provided in.zipfile format. JDBC driver files are in.jarformat. Thetypevalue must match the type of the file that is referenced in theurlfield.7
The value of
artifacts.urlspecifies the address of an HTTP server, such as a Maven repository, that stores the file for the connector artifact. The OpenShift cluster must have access to the specified server.Apply the
KafkaConnectbuild specification to the OpenShift cluster by entering the following command:oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Based on the configuration specified in the custom resource, the Streams Operator prepares a Kafka Connect image to deploy.
After the build completes, the Operator pushes the image to the specified registry or ImageStream, and starts the Kafka Connect cluster. The connector artifacts that you listed in the configuration are available in the cluster.Create a
KafkaConnectorresource to define an instance of the MySQL connector.
For example, create the followingKafkaConnectorCR, and save it asdebezium-inventory-connector.yamlExample 2.2. A
mysql-inventory-connector.yamlfile that defines theKafkaConnectorcustom resource for a Debezium connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand Table 2.2. Descriptions of connector configuration settings Item Description 1
The name of the connector to register with the Kafka Connect cluster.
2
The name of the connector class.
3
Only one task should operate at any one time. Use a single connector task to ensure proper order and event handling as the MySQL connector reads the MySQL server’s
binlog. The Kafka Connect service uses connectors to start one or more tasks to complete the work. It automatically distributes the running tasks across the cluster of Kafka Connect services. If services stop or crash, tasks are redistributed to running services.4
The connector’s configuration.
5
The database host, which is the name of the container that runs the MySQL server (mysql).
6
The port number of the database instance.
7
The name of the user account through which Debezium connects to the database.
8
The password for the database user account.
9
The name of the database to capture changes from.
10
The logical name of the database instance or cluster. The server name is the logical identifier for the MySQL server or cluster of servers. This name is used as the prefix for all Kafka topics.
11
The list of tables from which the connector captures change events. The connector detects changes in the inventory database only.
12
Specifies the Kafka broker and topic that the connector uses to store the history of the database schemas (the same broker to which you are sending events). After a restart, the connector recovers the database schemas that existed at the point in the binlog when the connector resumes reading.
Create the connector resource by running the following command:
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc create -n debezium -f mysql-inventory-connector.yaml
oc create -n debezium -f mysql-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow The connector is registered to the Kafka Connect cluster and starts to run against the database that is specified by
spec.config.database.dbnamein theKafkaConnectorCR. After the connector pod is ready, Debezium is running.
You are now ready to verify that the connector was created and has started to capture changes in the inventory database.
2.3. Verifying the connector deployment Copiar enlaceEnlace copiado en el portapapeles!
If the connector starts correctly without errors, it creates a topic for each table that the connector is configured to capture. Downstream applications can subscribe to these topics to retrieve information events that occur in the source database.
To verify that the connector is running, you perform the following operations from the OpenShift Container Platform web console, or through the OpenShift CLI tool (oc):
- Verify the connector status.
- Verify that the connector generates topics.
- Verify that topics are populated with events for read operations ("op":"r") that the connector generates during the initial snapshot of each table.
Prerequisites
- A Debezium connector is deployed to AMQ Streams on OpenShift.
-
The OpenShift
ocCLI client is installed. - You have access to the OpenShift Container Platform web console.
Procedure
Check the status of the
KafkaConnectorresource by using one of the following methods:From the OpenShift Container Platform web console:
-
Navigate to Home
Search. -
On the Search page, click Resources to open the Select Resource box, and then type
KafkaConnector. - From the KafkaConnectors list, click the name of the connector that you want to check, for example inventory-connector.
- In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
-
Navigate to Home
From a terminal window:
Enter the following command:
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc describe KafkaConnector inventory-connector -n debezium
oc describe KafkaConnector inventory-connector -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow The command returns status information that is similar to the following output:
Example 2.3.
KafkaConnectorresource statusCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Verify that the connector created Kafka topics:
From the OpenShift Container Platform web console.
-
Navigate to Home
Search. -
On the Search page, click Resources to open the Select Resource box, and then type
KafkaTopic. - From the KafkaTopics list, click the name of the topic that you want to check, for example, inventory-connector.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d.
- In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
-
Navigate to Home
From a terminal window:
Enter the following command:
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow The command returns status information that is similar to the following output:
Example 2.4.
KafkaTopicresource statusCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Check topic content.
- From a terminal window, enter the following command:
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow For example,
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector.inventory.products_on_hand
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow The format for specifying the topic name is the same as the
oc describecommand returns in Step 1, for example,inventory_connector.inventory.addresses.For each event in the topic, the command returns information that is similar to the following output:
Example 2.5. Content of a Debezium change event
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.7.2.Final-redhat-00001","connector":"mysql","name":"inventory_connector","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.7.2.Final-redhat-00001","connector":"mysql","name":"inventory_connector","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow In the preceding example, the
payloadvalue shows that the connector snapshot generated a read ("op" ="r") event from the tableinventory.products_on_hand. The"before"state of theproduct_idrecord isnull, indicating that no previous value exists for the record. The"after"state shows aquantityof3for the item withproduct_id101.
You are now ready to view change events that the Debezium connector captures from the inventory database.