2.2. 部署 Kafka 连接
部署 MySQL 数据库后,使用 AMQ Streams 构建 Kafka Connect 容器镜像,其中包括 Debezium MySQL connector 插件。在此过程中,您可以创建并使用以下自定义资源(CR):
-
一个
KafkaConnect
CR,用于定义 Kafka Connect 实例,并包含有关要在镜像中包括的 MySQL 连接器工件的信息。 -
KafkaConnector
CR 提供详情,其中包括 MySQL 连接器用于访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,通过应用KafkaConnector
CR 来启动连接器。
在构建过程中,AMQ Streams Operator 将 KafkaConnect
自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建从 Red Hat Maven 存储库下载必要的工件,并将它们合并到镜像中。新创建的容器被推送到在 .spec.build.output
中指定的容器 registry,用于部署 Kafka Connect pod。在 AMQ Streams 构建 Kafka Connect 镜像后,使用 KafkaConnector
自定义资源启动连接器。
流程
-
登录 OpenShift 集群,创建或打开一个项目,如
debezium
。 为连接器创建 Debezium
KafkaConnect
自定义资源(CR),或修改现有资源(CR)。例如,创建一个KafkaConnect
CR,用于指定metadata.annotations
和spec.build
属性,如下例所示。使用名称(如dbz-connect.yaml
)保存文件。例 2.1. 一个
dbz-connect.yaml
文件,该文件定义了一个KafkaConnect
自定义资源,其中包含 Debezium 连接器apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: version: 3.00 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-mysql artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/1.9.7.Final-redhat-<build_number>/debezium-connector-mysql-1.9.7.Final.zip 7 bootstrapServers: my-cluster-kafka-bootstrap:9093
表 2.1. Kafka Connect 配置设置的描述 项 Description 1
将
strimzi.io/use-connector-resources
注解设置为"true"
来启用 Cluster Operator 以使用KafkaConnector
资源在此 Kafka Connect 集群中配置连接器。2
spec.build
配置指定存储构建镜像的位置,并列出了要包含在镜像中的插件,以及插件工件的位置。3
build.output
指定存储新构建的镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type
的有效值为docker
,用于推送到 Docker Hub 或 Quay 等容器注册表,或者镜像流
将镜像推送到内部 OpenShift ImageStream。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定build.output
的更多信息,请参阅 AMQ Streams Build schema 参考文档。5
插件配置
列出了您希望在 Kafka Connect 镜像中包含的所有连接器。对于列表中的每个条目,请指定插件名称,以及构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括您希望用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。
6
artifacts.type
的值指定artifacts.url
中指定的工件的文件类型。有效类型为zip
、tgz
或jar
。Debezium 连接器存档以.zip
文件格式提供。JDBC 驱动程序文件采用.jar
格式。类型
值必须与url
字段中引用的文件类型匹配。7
artifacts.url
指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。OpenShift 集群必须有权访问指定的服务器。输入以下命令将
KafkaConnect
构建规格应用到 OpenShift 集群:oc create -f dbz-connect.yaml
根据自定义资源中指定的配置,AMQ Streams Operator 准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 会将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。创建
KafkaConnector
资源,以定义 MySQL 连接器的实例。
例如,创建以下KafkaConnector
CR,并将它保存为debezium-inventory-connector.yaml
例 2.2.
mysql-inventory-connector.yaml
文件,该文件定义 Debezium 连接器的KafkaConnector
自定义资源apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: inventory-connector 1 spec: class: io.debezium.connector.mysql.MySqlConnector 2 tasksMax: 1 3 config: 4 database.hostname: mysql 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.server.id: 184054 database.dbname: mydatabase 9 database.server.name: dbserver1 10 database.include.list: inventory 11 database.history.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 12 database.history.kafka.topic: schema-changes.inventory
表 2.2. 连接器配置设置描述 项 Description 1
使用 Kafka Connect 集群注册的连接器名称。
2
连接器类的名称。
3
在同一时间仅应操作一个任务。使用单一连接器任务来确保作为 MySQL 连接器正确顺序和事件处理,读取 MySQL 服务器的
binlog
。Kafka Connect 服务使用连接器启动一个或多个任务来完成工作。它会在 Kafka Connect 服务的集群中自动分发正在运行的任务。如果服务停止或崩溃,则会将任务重新分发到运行的服务。4
连接器的配置。
5
数据库主机,即运行 MySQL 服务器(mysql)的容器的名称。
6
数据库实例的端口号。
7
Debezium 连接到数据库的用户帐户的名称。
8
数据库用户帐户的密码。
9
要从中捕获更改的数据库名称。
10
数据库实例或集群的逻辑名称。服务器名称是 MySQL 服务器或服务器集群的逻辑标识符。这个名称被用作所有 Kafka 主题的前缀。
11
连接器从中捕获更改事件的表列表。连接器仅检测 inventory 数据库中的更改。
12
指定连接器用于存储数据库模式历史记录的 Kafka 代理和主题(您要发送事件的同一代理)。重启后,连接器会在连接器恢复读取时恢复 binlog 中存在的数据库模式。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
例如,
oc create -n debezium -f mysql-inventory-connector.yaml
连接器注册到 Kafka Connect 集群,并开始针对
KafkaConnector
CR 中的spec.config.database.dbname
指定的数据库运行。在连接器 Pod 就绪后,Debezium 已在运行。
您现在已准备好 验证连接器是否已创建,并开始使用捕获 清单
数据库中的更改。