Chapter 3. Creating a connector to capture inventory database changes
After starting the Kafka, Debezium, and MySQL services, you are ready to create a connector instance that captures changes in the inventory database.
In this procedure, you create the connector instance by creating a KafkaConnector Custom Resource (CR) that defines the connector instance, and then applying it. After applying the CR, the connector instance starts capturing changes in the inventory database’s binlog. The binlog records all of the database’s transactions (such as changes to individual rows and changes to the schemas). When a row in the database changes, Debezium generates a change event.
Typically, you use Kafka tools to manually create the necessary topics, including specifying the number of replicas. However, for this tutorial, Kafka is configured to automatically create the topics with just one replica.
Procedure
Create a
KafkaConnectorCR that configures a Debezium MySQL connector instance for capturing changes to theinventorydatabase. Copy the following example CR:inventory-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- The name of the connector.
- 2
- Only one task should operate at any one time. Because the MySQL connector reads the MySQL server’s
binlog, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services. - 3
- The connector’s configuration.
- 4
- The database host, which is the name of the container running the MySQL server (
mysql). - 5 6
- A unique server ID and name. The server name is the logical identifier for the MySQL server or cluster of servers. This name will be used as the prefix for all Kafka topics.
- 7
- Only changes in the
inventorydatabase will be detected. - 8 9
- The connector will store the history of the database schemas in Kafka using this broker (the same broker to which you are sending events) and topic name. Upon restart, the connector will recover the schemas of the database that existed at the point in time in the
binlogwhen the connector should begin reading.
Apply the connector instance:
oc apply -f inventory-connector.yaml
$ oc apply -f inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow The
inventory-connectorconnector is registered and starts to run against theinventorydatabase.Verify that
inventory-connectorwas created and has started to capture changes in theinventorydatabase by watching the Kafka Connect log output asinventory-connectorstarts:Display the Kafka Connect log output:
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
$ oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)Copy to Clipboard Copied! Toggle word wrap Toggle overflow Review the log output and verify that the initial snapshot has been executed. These lines show that the initial snapshot has started:
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...Copy to Clipboard Copied! Toggle word wrap Toggle overflow The snapshot involves a number of steps:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow After completing the snapshot, Debezium begins capturing updates to the
inventorydatabase’sbinlog:Copy to Clipboard Copied! Toggle word wrap Toggle overflow