Chapter 3. Starting the services
Using Debezium requires Streams for Apache Kafka with Kafka and Kafka Connect, a database, and the Debezium connector service. To run the services for this tutorial, you must:
3.1. Deploying a MySQL database
Deploy a MySQL database server that includes an example inventory
database that includes several tables that are pre-populated with data. The Debezium MySQL connector will capture changes that occur in the sample tables and transmit the change event records to an Apache Kafka topic.
Procedure
Start a MySQL database by running the following command, which starts a MySQL database server configured with an example
inventory
database:$ oc new-app -l app=mysql --name=mysql quay.io/debezium/example-mysql:latest
Configure credentials for the MySQL database by running the following command, which updates the deployment configuration for the MySQL database to add the user name and password:
$ oc set env deployment/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
Verify that the MySQL database is running by invoking the following command, which is followed by the output that shows that the MySQL database is running, and that the pod is ready:
$ oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23s
Open a new terminal and log into the sample
inventory
database.This command opens a MySQL command line client in the pod that is running the MySQL database. The client uses the user name and password that you previously configured:
$ oc exec mysql-1-2gzx5 -it -- mysql -u mysqluser -pmysqlpw inventory mysql: [Warning] Using a password on the command line interface can be insecure. Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 7 Server version: 5.7.29-log MySQL Community Server (GPL) Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql>
List the tables in the
inventory
database:mysql> show tables; +---------------------+ | Tables_in_inventory | +---------------------+ | addresses | | customers | | geom | | orders | | products | | products_on_hand | +---------------------+ 6 rows in set (0.00 sec)
Explore the database and view the data that it contains, for example, view the
customers
table:mysql> select * from customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
3.2. Deploying Kafka Connect
After you deploy the MySQL database, use Streams for Apache Kafka to build a Kafka Connect container image that includes the Debezium MySQL connector plug-in. During the deployment process, you create and use the following custom resources (CRs):
-
A
KafkaConnect
CR that defines your Kafka Connect instance and includes information about the MySQL connector artifacts to include in the image. -
A
KafkaConnector
CR that provides details that include information that the MySQL connector uses to access the source database. After Streams for Apache Kafka starts the Kafka Connect pod, you start the connector by applying theKafkaConnector
CR.
During the build process, the Streams for Apache Kafka Operator transforms input parameters in the KafkaConnect
custom resource, including Debezium connector definitions, into a Kafka Connect container image. The build downloads the necessary artifacts from the Red Hat Maven repository, and incorporates them into the image. The newly created container is pushed to the container registry that is specified in .spec.build.output
, and is used to deploy a Kafka Connect pod.
Container images can be stored in an external container registry, such as quay.io
, or in an OpenShift ImageStream. Because ImageStreams are not created automatically, to store container images in an ImageStream, you must create the ImageStream before you deploy Kafka Connect.
After Streams for Apache Kafka builds and stores the Kafka Connect image, use the KafkaConnector
custom resource to start the connector.
Prerequisites
- Streams for Apache Kafka is running on an OpenShift cluster.
- The Streams for Apache Kafka Cluster Operator is installed to the OpenShift cluster.
- If you prefer to store the KafkaConnect container image in an OpenShift ImageStream, an ImageStream is available.
- Apache Kafka and Kafka Connect are running on Streams for Apache Kafka.
Procedure
-
Log in to the OpenShift cluster and create or open a project, for example
debezium
. Create a Debezium
KafkaConnect
custom resource (CR) for the connector, or modify an existing one.
The following example shows an excerpt from adbz-connect.yaml
file that describes aKafkaConnect
custom resource.
Themetadata.annotations
andspec.build
properties are required.Example 3.1. A
dbz-connect.yaml
file that defines aKafkaConnect
custom resource that includes a Debezium connectorapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: replicas: 1 version: 3.6.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-mysql artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.7.3.Final-redhat-00001/debezium-connector-mysql-2.7.3.Final-redhat-00001-plugin.zip 7 bootstrapServers: my-cluster-kafka-bootstrap:9093 ...
Table 3.1. Descriptions of Kafka Connect configuration settings Item Description 1
Sets the
strimzi.io/use-connector-resources
annotation to"true"
to enable the Cluster Operator to useKafkaConnector
resources to configure connectors in this Kafka Connect cluster.2
The
spec.build
configuration specifies where to store the build image and lists the plug-ins to include in the image, along with the location of the plug-in artifacts.3
The
build.output
specifies the registry in which the newly built image is stored.4
Specifies the name and image name for the image output. Valid values for
output.type
aredocker
to push into a container registry like Docker Hub or Quay, orimagestream
to push the image to an internal OpenShift ImageStream. To use an ImageStream, an ImageStream resource must be deployed to the cluster. For more information about specifying thebuild.output
in the KafkaConnect configuration, see the Streams for Apache Kafka Build schema reference documentation.5
The
plugins
configuration lists all of the connectors that you want to include in the Kafka Connect image. For each entry in the list, specify a plug-inname
, and information for about the artifacts that are required to build the connector. Optionally, for each connector plug-in, you can include other components that you want to be available for use with the connector. For example, you can add Service Registry artifacts, or the Debezium scripting component.6
The value of
artifacts.type
specifies the file type of the artifact specified in theartifacts.url
. Valid types arezip
,tgz
, orjar
. Debezium connector archives are provided in.zip
file format. JDBC driver files are in.jar
format. Thetype
value must match the type of the file that is referenced in theurl
field.7
The value of
artifacts.url
specifies the address of an HTTP server, such as a Maven repository, that stores the file for the connector artifact. The OpenShift cluster must have access to the specified server.Apply the
KafkaConnect
build specification to the OpenShift cluster by entering the following command:oc create -f dbz-connect.yaml
Based on the configuration specified in the custom resource, the Streams for Apache Kafka Operator prepares a Kafka Connect image to deploy.
After the build completes, the Operator pushes the image to the specified registry or ImageStream, and starts the Kafka Connect cluster. The connector artifacts that you listed in the configuration are available in the cluster.Create a
KafkaConnector
resource to define an instance of the MySQL connector.
For example, create the followingKafkaConnector
CR, and save it asdebezium-inventory-connector.yaml
Example 3.2. A
mysql-inventory-connector.yaml
file that defines theKafkaConnector
custom resource for a Debezium connectorapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: inventory-connector 1 spec: class: io.debezium.connector.mysql.MySqlConnector 2 tasksMax: 1 3 config: 4 database.hostname: mysql 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.server.id: 184054 topic.prefix: dbserver1 9 table.include.list: inventory.* 10 schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 11 schema.history.internal.kafka.topic: schema-changes.inventory 12
Table 3.2. Descriptions of connector configuration settings Item Description 1
The name of the connector to register with the Kafka Connect cluster.
2
The name of the connector class.
3
Only one task should operate at any one time. Use a single connector task to ensure proper order and event handling as the MySQL connector reads the MySQL server’s
binlog
. The Kafka Connect service uses connectors to start one or more tasks to complete the work. It automatically distributes the running tasks across the cluster of Kafka Connect services. If services stop or crash, tasks are redistributed to running services.4
The connector’s configuration.
5
The hostname or address of the MySQL database instance.
6
The port number of the database instance.
7
The name of the user account through which Debezium connects to the database.
8
The password that Debezium uses to connect to the database user account.
9
Topic prefix for the MySQL server or cluster. This string prefixes the names of every Kafka topic that the connector sends event records to.
10
The list of tables from which the connector captures change events. The connector detects changes only if they occur in the
inventory
table.11
List of Kafka brokers that the connector uses to write and recover DDL statements to the database schema history topic. This is the same broker that the connector sends change event records to. After a restart, the connector recovers the database schemas that existed at the point in the binlog when the connector resumes reading.
12
Name of the database schema history topic. This topic is for internal use only and should not be used by consumers.
Create the connector resource by running the following command:
oc create -n <namespace> -f <kafkaConnector>.yaml
For example,
oc create -n debezium -f mysql-inventory-connector.yaml
The connector is registered to the Kafka Connect cluster and starts to run against the database that is specified by
spec.config.database.dbname
in theKafkaConnector
CR. After the connector pod is ready, Debezium is running.
You are now ready to verify that the connector was created and has started to capture changes in the inventory
database.
3.3. Example: A simple OpenShift ImageStream
object definition
The following example shows a simple ImageStream
object definition
apiVersion: image.openshift.io/v1 kind: ImageStream metadata: name: kafka-connect-dbz-mysql spec: lookupPolicy: local: true
Additional resources:
3.4. Verifying the connector deployment
If the connector starts correctly without errors, it creates a topic for each table that the connector is configured to capture. Downstream applications can subscribe to these topics to retrieve information events that occur in the source database.
To verify that the connector is running, you perform the following operations from the OpenShift Container Platform web console, or through the OpenShift CLI tool (oc):
- Verify the connector status.
- Verify that the connector generates topics.
- Verify that topics are populated with events for read operations ("op":"r") that the connector generates during the initial snapshot of each table.
Prerequisites
- A Debezium connector is deployed to Streams for Apache Kafka on OpenShift.
-
The OpenShift
oc
CLI client is installed. - You have access to the OpenShift Container Platform web console.
Procedure
Check the status of the
KafkaConnector
resource by using one of the following methods:From the OpenShift Container Platform web console:
-
Navigate to Home
Search. -
On the Search page, click Resources to open the Select Resource box, and then type
KafkaConnector
. - From the KafkaConnectors list, click the name of the connector that you want to check, for example inventory-connector.
- In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
-
Navigate to Home
From a terminal window:
Enter the following command:
oc describe KafkaConnector <connector-name> -n <project>
For example,
oc describe KafkaConnector inventory-connector -n debezium
The command returns status information that is similar to the following output:
Example 3.3.
KafkaConnector
resource statusName: inventory-connector Namespace: debezium Labels: strimzi.io/cluster=my-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: dbserver1 dbserver1.inventory.addresses dbserver1.inventory.customers dbserver1.inventory.geom dbserver1.inventory.orders dbserver1.inventory.products dbserver1.inventory.products_on_hand Events: <none>
Verify that the connector created Kafka topics:
From the OpenShift Container Platform web console.
-
Navigate to Home
Search. -
On the Search page, click Resources to open the Select Resource box, and then type
KafkaTopic
. - From the KafkaTopics list, click the name of the topic that you want to check, for example, dbserver1.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d.
- In the Conditions section, verify that the values in the Type and Status columns are set to Ready and True.
-
Navigate to Home
From a terminal window:
Enter the following command:
oc get kafkatopics
The command returns status information that is similar to the following output:
Example 3.4.
KafkaTopic
resource statusNAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs my-cluster 1 1 True connect-cluster-offsets my-cluster 25 1 True connect-cluster-status my-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 1 True dbserver1---a96f69b23d6118ff415f772679da623fbbb99421 my-cluster 1 1 True dbserver1.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 my-cluster 1 1 True dbserver1.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b my-cluster 1 1 True dbserver1.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 my-cluster 1 1 True dbserver1.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d my-cluster 1 1 True dbserver1.inventory.products---df0746db116844cee2297fab611c21b56f82dcef my-cluster 1 1 True dbserver1.inventory.products-on-hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 my-cluster 1 1 True schema-changes.inventory my-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 my-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1 1 True
Check topic content.
- From a terminal window, enter the following command:
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
For example,
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
The format for specifying the topic name is the same as the
oc describe
command returns in Step 1, for example,dbserver1.inventory.addresses
.For each event in the topic, the command returns information that is similar to the following output:
Example 3.5. Content of a Debezium change event
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.7.3.Final-redhat-00001","connector":"mysql","name":"dbserver1","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
In the preceding example, the
payload
value shows that the connector snapshot generated a read ("op" ="r"
) event from the tabledbserver1.products_on_hand
. The"before"
state of theproduct_id
record isnull
, indicating that no previous value exists for the record. The"after"
state shows aquantity
of3
for the item withproduct_id
101
.
You are now ready to view change events that the Debezium connector captures from the inventory
database.