Este conteúdo não está disponível no idioma selecionado.
Chapter 2. Starting the services
Using Debezium requires AMQ Streams with Kafka and Kafka Connect, a database, and the Debezium connector service. To run the services for this tutorial, you must:
2.1. Deploying a MySQL database
				Deploy a MySQL database server that includes an example inventory database that includes several tables that are pre-populated with data. The Debezium MySQL connector will capture changes that occur in the sample tables and transmit the change event records to an Apache Kafka topic.
			
Procedure
- Start a MySQL database by running the following command, which starts a MySQL database server configured with an example - inventorydatabase:- oc new-app --name=mysql quay.io/debezium/example-mysql:latest - $ oc new-app --name=mysql quay.io/debezium/example-mysql:latest- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow 
- Configure credentials for the MySQL database by running the following command, which updates the deployment configuration for the MySQL database to add the user name and password: - oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw - $ oc set env dc/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow 
- Verify that the MySQL database is running by invoking the following command, which is followed by the output that shows that the MySQL database is running, and that the pod is ready: - oc get pods -l app=mysql - $ oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23s- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow 
- Open a new terminal and log into the sample - inventorydatabase.- This command opens a MySQL command line client in the pod that is running the MySQL database. The client uses the user name and password that you previously configured: - Copy to Clipboard Copied! - Toggle word wrap Toggle overflow 
- List the tables in the - inventorydatabase:- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow 
- Explore the database and view the data that it contains, for example, view the - customerstable:- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow 
2.2. Deploying Kafka Connect
After you deploy the MySQL database, use AMQ Streams to build a Kafka Connect container image that includes the Debezium MySQL connector plug-in. During the deployment process, you create and use the following custom resources (CRs):
- 
						A KafkaConnectCR that defines your Kafka Connect instance and includes information about the MySQL connector artifacts to include in the image.
- 
						A KafkaConnectorCR that provides details that include information that the MySQL connector uses to access the source database. After AMQ Streams starts the Kafka Connect pod, you start the connector by applying theKafkaConnectorCR.
				During the build process, the AMQ Streams Operator transforms input parameters in the KafkaConnect custom resource, including Debezium connector definitions, into a Kafka Connect container image. The build downloads the necessary artifacts from the Red Hat Maven repository, and incorporates them into the image. The newly created container is pushed to the container registry that is specified in .spec.build.output, and is used to deploy a Kafka Connect pod. After AMQ Streams builds the Kafka Connect image, use the KafkaConnector custom resource to start the connector.
			
Procedure
- 
						Log in to the OpenShift cluster and create or open a project, for example debezium.
- Create a Debezium - KafkaConnectcustom resource (CR) for the connector, or modify an existing one. For example, create a- KafkaConnectCR that specifies the- metadata.annotationsand- spec.buildproperties, as shown in the following example. Save the file with a name such as- dbz-connect.yaml.- Example 2.1. A - dbz-connect.yamlfile that defines a- KafkaConnectcustom resource that includes a Debezium connector- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - Expand - Table 2.1. Descriptions of Kafka Connect configuration settings - Item - Description - 1 - Sets the - strimzi.io/use-connector-resourcesannotation to- "true"to enable the Cluster Operator to use- KafkaConnectorresources to configure connectors in this Kafka Connect cluster.- 2 - The - spec.buildconfiguration specifies where to store the build image and lists the plug-ins to include in the image, along with the location of the plug-in artifacts.- 3 - The - build.outputspecifies the registry in which the newly built image is stored.- 4 - Specifies the name and image name for the image output. Valid values for - output.typeare- dockerto push into a container registry like Docker Hub or Quay, or- imagestreamto push the image to an internal OpenShift ImageStream. To use an ImageStream, an ImageStream resource must be deployed to the cluster. For more information about specifying the- build.outputin the KafkaConnect configuration, see the AMQ Streams Build schema reference documentation.- 5 - The - pluginsconfiguration lists all of the connectors that you want to include in the Kafka Connect image. For each entry in the list, specify a plug-in- name, and information for about the artifacts that are required to build the connector. Optionally, for each connector plug-in, you can include other components that you want to be available for use with the connector. For example, you can add Service Registry artifacts, or the Debezium scripting component.- 6 - The value of - artifacts.typespecifies the file type of the artifact specified in the- artifacts.url. Valid types are- zip,- tgz, or- jar. Debezium connector archives are provided in- .zipfile format. JDBC driver files are in- .jarformat. The- typevalue must match the type of the file that is referenced in the- urlfield.- 7 - The value of - artifacts.urlspecifies the address of an HTTP server, such as a Maven repository, that stores the file for the connector artifact. The OpenShift cluster must have access to the specified server.
- Apply the - KafkaConnectbuild specification to the OpenShift cluster by entering the following command:- oc create -f dbz-connect.yaml - oc create -f dbz-connect.yaml- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - Based on the configuration specified in the custom resource, the AMQ Streams Operator prepares a Kafka Connect image to deploy. 
 After the build completes, the Operator pushes the image to the specified registry or ImageStream, and starts the Kafka Connect cluster. The connector artifacts that you listed in the configuration are available in the cluster.
- Create a - KafkaConnectorresource to define an instance of the MySQL connector.
 For example, create the following- KafkaConnectorCR, and save it as- debezium-inventory-connector.yaml- Example 2.2. A - mysql-inventory-connector.yamlfile that defines the- KafkaConnectorcustom resource for a Debezium connector- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - Expand - Table 2.2. Descriptions of connector configuration settings - Item - Description - 1 - The name of the connector to register with the Kafka Connect cluster. - 2 - The name of the connector class. - 3 - Only one task should operate at any one time. Use a single connector task to ensure proper order and event handling as the MySQL connector reads the MySQL server’s - binlog. The Kafka Connect service uses connectors to start one or more tasks to complete the work. It automatically distributes the running tasks across the cluster of Kafka Connect services. If services stop or crash, tasks are redistributed to running services.- 4 - The connector’s configuration. - 5 - The database host, which is the name of the container that runs the MySQL server (mysql). - 6 - The port number of the database instance. - 7 - The name of the user account through which Debezium connects to the database. - 8 - The password for the database user account. - 9 - The name of the database to capture changes from. - 10 - The logical name of the database instance or cluster. The server name is the logical identifier for the MySQL server or cluster of servers. This name is used as the prefix for all Kafka topics. - 11 - The list of tables from which the connector captures change events. The connector detects changes in the inventory database only. - 12 - Specifies the Kafka broker and topic that the connector uses to store the history of the database schemas (the same broker to which you are sending events). After a restart, the connector recovers the database schemas that existed at the point in the binlog when the connector resumes reading. 
- Create the connector resource by running the following command: - oc create -n <namespace> -f <kafkaConnector>.yaml - oc create -n <namespace> -f <kafkaConnector>.yaml- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - For example, - oc create -n debezium -f mysql-inventory-connector.yaml - oc create -n debezium -f mysql-inventory-connector.yaml- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - The connector is registered to the Kafka Connect cluster and starts to run against the database that is specified by - spec.config.database.dbnamein the- KafkaConnectorCR. After the connector pod is ready, Debezium is running.
				You are now ready to verify that the connector was created and has started to capture changes in the inventory database.
			
2.3. Verifying the connector deployment
If the connector starts correctly without errors, it creates a topic for each table that the connector is configured to capture. Downstream applications can subscribe to these topics to retrieve information events that occur in the source database.
To verify that the connector is running, you perform the following operations from the OpenShift Container Platform web console, or through the OpenShift CLI tool (oc):
- Verify the connector status.
- Verify that the connector generates topics.
- Verify that topics are populated with events for read operations ("op":"r") that the connector generates during the initial snapshot of each table.
Prerequisites
- A Debezium connector is deployed to AMQ Streams on OpenShift.
- 
						The OpenShift ocCLI client is installed.
- You have access to the OpenShift Container Platform web console.
Procedure
- Check the status of the - KafkaConnectorresource by using one of the following methods:- From the OpenShift Container Platform web console: - 
										Navigate to Home Search. 
- 
										On the Search page, click Resources to open the Select Resource box, and then type KafkaConnector.
- From the KafkaConnectors list, click the name of the connector that you want to check, for example inventory-connector.
- In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
 
- 
										Navigate to Home 
- From a terminal window: - Enter the following command: - oc describe KafkaConnector <connector-name> -n <project> - oc describe KafkaConnector <connector-name> -n <project>- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - For example, - oc describe KafkaConnector inventory-connector -n debezium - oc describe KafkaConnector inventory-connector -n debezium- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - The command returns status information that is similar to the following output: - Example 2.3. - KafkaConnectorresource status- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow 
 
 
- Verify that the connector created Kafka topics: - From the OpenShift Container Platform web console. - 
										Navigate to Home Search. 
- 
										On the Search page, click Resources to open the Select Resource box, and then type KafkaTopic.
- From the KafkaTopics list, click the name of the topic that you want to check, for example, inventory-connector.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d.
- In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
 
- 
										Navigate to Home 
- From a terminal window: - Enter the following command: - oc get kafkatopics - oc get kafkatopics- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - The command returns status information that is similar to the following output: - Example 2.4. - KafkaTopicresource status- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow 
 
 
- Check topic content. - From a terminal window, enter the following command:
 - oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name> - oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - For example, - oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector.inventory.products_on_hand - oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector.inventory.products_on_hand- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - The format for specifying the topic name is the same as the - oc describecommand returns in Step 1, for example,- inventory_connector.inventory.addresses.- For each event in the topic, the command returns information that is similar to the following output: - Example 2.5. Content of a Debezium change event - {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.9.7.Final-redhat-00001","connector":"mysql","name":"inventory_connector","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}- {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.9.7.Final-redhat-00001","connector":"mysql","name":"inventory_connector","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - In the preceding example, the - payloadvalue shows that the connector snapshot generated a read (- "op" ="r") event from the table- inventory.products_on_hand. The- "before"state of the- product_idrecord is- null, indicating that no previous value exists for the record. The- "after"state shows a- quantityof- 3for the item with- product_id- 101.
				You are now ready to view change events that the Debezium connector captures from the inventory database.