3.4. 重启 Kafka Connect 服务
现在,您已经了解了 Debezium MySQL 连接器如何捕获创建、更新和删除事件,现在您将会看到如何捕获更改事件,即使它未在运行。
Kafka Connect 服务自动管理其注册连接器的任务。因此,如果它处于离线状态,重启时会启动任何运行的任务。这意味着,即使 Debezium 没有运行,它仍然可以报告数据库中的更改。
在此过程中,您将停止 Kafka Connect,更改数据库中的一些数据,然后重新启动 Kafka Connect 以查看更改事件。
流程
停止 Kafka Connect 服务。
打开 Kafka Connect 服务的部署配置:
$ oc edit dc/my-connect-cluster-connect
部署配置将打开:
apiVersion: apps.openshift.io/v1 kind: DeploymentConfig metadata: ... spec: replicas: 1 ...
-
将
spec.replicas
值更改为 0。
- 保存部署配置。
验证 Kafka Connect 服务是否已停止。
这个命令显示 Kafka Connect 服务已完成,且没有 pod 正在运行:
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-1-dxcs9 0/1 Completed 0 7h
在 Kafka Connect 服务关闭时,切换到运行 MySQL 客户端的终端,并在数据库中添加新记录。
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
重启 Kafka Connect 服务。
打开 Kafka Connect 服务的部署配置。
$ oc edit dc/my-connect-cluster-connect
部署配置将打开:
apiVersion: apps.openshift.io/v1 kind: DeploymentConfig metadata: ... spec: replicas: 0 ...
-
将
spec.replicas
值更改为1
。 - 保存部署配置。
验证 Kafka Connect 服务是否已重启。
这个命令显示 Kafka Connect 服务正在运行,且 pod 已就绪:
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-2-q9kkl 1/1 Running 0 74s
-
切换到运行
kafka-console-consumer.sh
的终端。新事件在到达时弹出。 检查您在 Kafka Connect 离线时创建的记录(为可读性而格式化):
{ ... "payload":{ "id":1005 } } { ... "payload":{ "before":null, "after":{ "id":1005, "first_name":"Sarah", "last_name":"Thompson", "email":"kitt@acme.com" }, "source":{ "version":"1.9.7.Final", "connector":"mysql", "name":"dbserver1", "ts_ms":1582581502000, "snapshot":"false", "db":"inventory", "table":"customers", "server_id":223344, "gtid":null, "file":"mysql-bin.000004", "pos":364, "row":0, "thread":5, "query":null }, "op":"c", "ts_ms":1582581502317 } }