4.4. 重启 Kafka Connect 服务
现在,您已看到 Debezium MySQL 连接器如何捕获 create、update 和 delete 事件,现在您会看到它如何捕获更改事件,即使它没有运行。
Kafka Connect 服务自动管理其注册连接器的任务。因此,如果它离线,当它重启时,它将启动任何非运行的任务。这意味着,即使 Debezium 没有运行,它仍然可以报告数据库中的更改。
在此过程中,您将停止 Kafka Connect,更改数据库中的一些数据,然后重启 Kafka Connect 以查看更改事件。
流程
停止 Kafka Connect 服务。
打开 Kafka Connect 部署的配置:
$ oc edit deployment/my-connect-cluster-connect
部署配置将打开:
apiVersion: apps.openshift.io/v1 kind: Deployment metadata: ... spec: replicas: 1 ...
-
将
spec.replicas
值更改为0
。 - 保存配置。
验证 Kafka Connect 服务是否已停止。
此命令显示 Kafka Connect 服务已完成,且没有运行 pod:
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-1-dxcs9 0/1 Completed 0 7h
当 Kafka Connect 服务停机时,切换到运行 MySQL 客户端的终端,并在数据库中添加新记录。
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
重启 Kafka Connect 服务。
打开 Kafka Connect 服务的部署配置。
$ oc edit deployment/my-connect-cluster-connect
部署配置将打开:
apiVersion: apps.openshift.io/v1 kind: Deployment metadata: ... spec: replicas: 0 ...
-
将
spec.replicas
值更改为1
。 - 保存部署配置。
验证 Kafka Connect 服务是否已重启。
此命令显示 Kafka Connect 服务正在运行,并且 pod 已就绪:
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-2-q9kkl 1/1 Running 0 74s
-
切换到正在运行
kafka-console-consumer.sh
的终端。新事件到达时弹出。 检查您在 Kafka Connect 离线时创建的记录(用于可读性):
{ ... "payload":{ "id":1005 } } { ... "payload":{ "before":null, "after":{ "id":1005, "first_name":"Sarah", "last_name":"Thompson", "email":"kitt@acme.com" }, "source":{ "version":"2.3.7.Final", "connector":"mysql", "name":"dbserver1", "ts_ms":1582581502000, "snapshot":"false", "db":"inventory", "table":"customers", "server_id":223344, "gtid":null, "file":"mysql-bin.000004", "pos":364, "row":0, "thread":5, "query":null }, "op":"c", "ts_ms":1582581502317 } }