2.2. 部署 Kafka 连接
部署 MySQL 数据库后,使用 AMQ Streams 构建包括 Debezium MySQL Connector 插件的 Kafka Connect 容器镜像。在部署过程中,您要创建并使用以下自定义资源(CR):
-
一个
KafkaConnect
CR,用于定义 Kafka Connect 实例,并包含镜像中要包含的 MySQL 连接器工件的信息。 -
KafkaConnector
CR 提供了包括 MySQL 连接器用于访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,您可通过应用KafkaConnector
CR 来启动连接器。
在构建过程中,AMQ Streams Operator 会将 KafkaConnect
自定义资源(包括 Debezium connector 定义)中的输入参数转换为 Kafka Connect 容器镜像。构建会从红帽 Maven 存储库下载所需的工件,并将它们合并到镜像中。新创建的容器被推送到 .spec.build.output
中指定的容器 registry 中,用于部署 Kafka Connect pod。在 AMQ Streams 构建 Kafka Connect 镜像后,使用 KafkaConnector
自定义资源启动连接器。
流程
-
登录 OpenShift 集群并创建或创建项目,例如
debezium
。 为连接器创建 Debezium
KafkaConnect
自定义资源(CR),或修改现有资源。例如,创建一个KafkaConnect
CR,用于指定metadata.annotations
和spec.build
属性,如下例所示。使用名称(如dbz-connect.yaml
)保存文件。例 2.1. 一个
dbz-connect.yaml
文件,它定义了一个KafkaConnect
自定义资源,其中包含一个 Debezium 连接器apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: version: 3.00 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-mysql artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/1.9.5.Final-redhat-<build_number>/debezium-connector-mysql-1.9.5.Final.zip 7 bootstrapServers: my-cluster-kafka-bootstrap:9093
表 2.1. Kafka Connect 配置设置的描述 项 描述 1
将
strimzi.io/use-connector-resources
注解设置为"true"
,使 Cluster Operator 使用KafkaConnector
资源在此 Kafka Connect 集群中配置连接器。2
spec.build
配置指定存储构建镜像的位置,并列出镜像中包含的插件以及插件工件的位置。3
build.output
指定在其中存储新构建的镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type
的有效值为docker
,可推送到容器 registry,如 Docker Hub 或 Quay,或将镜像推送到内部 OpenShift ImageStream 的镜像流。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定
build.output
的更多信息,请参阅 AMQ Streams Build schema 参考文档。5
插件配置
列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,请指定插件名称,以及构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括您要与连接器搭配使用的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。
6
artifacts.type
的值指定artifacts.url
中指定的构件的文件名。有效类型为zip
、tgz
或jar
。Debezium 连接器存档以.zip
文件格式提供。JDBC 驱动程序文件采用.jar
格式。type
值必须与url
字段中引用的文件类型匹配。7
artifacts.url
的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。OpenShift 集群必须有权访问指定的服务器。输入以下命令将
KafkaConnect
构建规格应用到 OpenShift 集群:oc create -f dbz-connect.yaml
根据自定义资源中指定的配置,AMQ Streams Operator 会准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 会将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。创建一个
KafkaConnector
资源来定义 MySQL 连接器实例。
例如,创建以下KafkaConnector
CR,并将其保存为debezium-inventory-connector.yaml
例 2.2.
mysql-inventory-connector.yaml
文件,为 Debezium 连接器定义KafkaConnector
自定义资源apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: inventory-connector 1 spec: class: io.debezium.connector.mysql.MySqlConnector 2 tasksMax: 1 3 config: 4 database.hostname: mysql 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.server.id: 184054 database.dbname: mydatabase 9 database.server.name: dbserver1 10 database.include.list: inventory 11 database.history.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 12 database.history.kafka.topic: schema-changes.inventory
表 2.2. 连接器配置设置描述 项 描述 1
要注册到 Kafka Connect 集群的连接器名称。
2
连接器类的名称。
3
只有一个任务应该在任何一个时间运行。使用单一连接器任务来确保订单和事件处理,因为 MySQL 连接器可读取 MySQL 服务器的
binlog
。Kafka Connect 服务使用连接器启动一个或多个任务来完成工作。它会在 Kafka Connect 服务的集群中自动分发正在运行的任务。如果服务停止或崩溃,则会将任务重新分发到运行的服务。4
连接器的配置。
5
数据库主机,这是运行 MySQL 服务器(mysql)的容器的名称。
6
数据库实例的端口号。
7
Debezium 连接到数据库的用户帐户的名称。
8
数据库用户帐户的密码。
9
捕获更改的数据库的名称。
10
数据库实例或集群的逻辑名称。服务器名称是 MySQL 服务器或服务器集群的逻辑标识符。此名称用作所有 Kafka 主题的前缀。
11
连接器捕获更改事件的表列表。连接器只会检测 inventory 数据库中的更改。
12
指定 Kafka 代理和主题,连接器用来存储数据库模式的历史记录(您要将事件发送到的同一代理)。重启后,当连接器恢复读取时,连接器会恢复在 binlog 存在的数据库模式。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
例如,
oc create -n debezium -f mysql-inventory-connector.yaml
连接器被注册到 Kafka Connect 集群,然后开始针对由
KafkaConnector
CR 中的spec.config.database.dbname
指定的数据库运行。在连接器 Pod 就绪后,Dbezium 正在运行。
您现在已准备好 验证连接器是否已创建 并已开始捕获 inventory
数据库中的更改。