3.2. 部署 Kafka Connect
部署 MySQL 数据库后,使用 AMQ Streams 构建包含 Debezium MySQL 连接器插件的 Kafka Connect 容器镜像。在部署过程中,您可以创建并使用以下自定义资源(CR):
-
定义 Kafka Connect 实例的
KafkaConnectCR,并包含有关要在镜像中包含的 MySQL 连接器工件的信息。 -
KafkaConnectorCR 提供了包括 MySQL 连接器用来访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,您可以通过应用KafkaConnectorCR 来启动连接器。
在构建过程中,AMQ Streams Operator 将 KafkaConnect 自定义资源(包括 Debezium 连接器定义)中的输入参数转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库下载所需的工件,并将它们合并到镜像中。新创建的容器被推送到在 .spec.build.output 中指定的容器 registry,用于部署 Kafka Connect pod。
容器镜像可以存储在外部容器 registry 中,如 quay.io,或存储在 OpenShift ImageStream 中。因为 ImageStreams 不会被自动创建,因此若要将容器镜像存储在 ImageStream 中,因此您必须在部署 Kafka Connect 前创建 ImageStream。
在 AMQ Streams 构建并存储 Kafka Connect 镜像后,使用 KafkaConnector 自定义资源来启动连接器。
先决条件
- AMQ Streams 在 OpenShift 集群上运行。
- AMQ Streams Cluster Operator 已安装到 OpenShift 集群。
- 如果您希望将 KafkaConnect 容器镜像存储在 OpenShift ImageStream 中,则提供了一个 ImageStream。
- Apache Kafka 和 Kafka Connect 在 AMQ Streams 上运行。
步骤
-
登录 OpenShift 集群,再创建或打开一个项目,如
debezium。 为连接器创建 Debezium
KafkaConnect自定义资源(CR),或修改现有的资源。
以下示例显示了一个dbz-connect.yaml文件的摘录,该文件描述了KafkaConnect自定义资源。metadata.annotations和spec.build属性是必需的。例 3.1. 定义包含 Debezium 连接器的
KafkaConnect自定义资源的dbz-connect.yaml文件Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表 3.1. Kafka Connect 配置设置的描述 项 描述 1
将
strimzi.io/use-connector-resources注解设置为"true",使 Cluster Operator 使用KafkaConnector资源在此 Kafka Connect 集群中配置连接器。2
spec.build配置指定在镜像中存储构建镜像的位置,并列出要在镜像中包含的插件,以及插件工件的位置。3
build.output指定存储新构建镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type的有效值是要推送到 Docker Hub 或 Quay 等容器 registry 的有效值,或镜像流以将镜像推送到内部 OpenShift ImageStream。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定build.output的更多信息,请参阅 AMQ Streams Build schema 文档。5
plugins配置列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您还可以包含可用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。6
artifacts.type的值指定在artifacts.url中指定的工件类型。有效类型为zip、tgz或jar。Debezium 连接器存档以.zip文件格式提供。JDBC 驱动程序文件采用.jar格式。类型值必须与url字段中引用的文件类型匹配。7
artifacts.url的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。OpenShift 集群必须有权访问指定的服务器。输入以下命令将
KafkaConnect构建规格应用到 OpenShift 集群:oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 根据自定义资源中指定的配置,AMQ Streams Operator 准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。创建一个
KafkaConnector资源来定义 MySQL 连接器的实例。
例如,创建以下KafkaConnectorCR,并将它保存为debezium-inventory-connector.yaml例 3.2. 为 Debezium 连接器定义
KafkaConnector自定义资源的mysql-inventory-connector.yaml文件Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表 3.2. 连接器配置设置的描述 项 描述 1
使用 Kafka Connect 集群注册的连接器名称。
2
连接器类的名称。
3
任何时候都只能运行一个任务。使用单一连接器任务来确保正确的顺序和事件处理,因为 MySQL 连接器读取 MySQL 服务器的
binlog。Kafka Connect 服务使用连接器来启动一个或多个任务来完成工作。它会在 Kafka Connect 服务集群中自动分发正在运行的任务。如果服务停止或崩溃,任务将重新分发到运行的服务。4
连接器的配置。
5
MySQL 数据库实例的主机名或地址。
6
数据库实例的端口号。
7
Debezium 通过它连接到数据库的用户帐户名称。
8
Debezium 用于连接到数据库用户帐户的密码。
9
MySQL 服务器或集群的主题前缀。此字符串前缀连接器将事件记录发送到的每个 Kafka 主题的名称。
10
连接器捕获更改事件的表列表。连接器仅在
清单表中发生时才会检测更改。11
连接器用来写入和恢复 DDL 语句到数据库 schema 历史记录主题的 Kafka 代理列表。这与连接器发送更改事件记录的代理相同。重启后,连接器会在连接器恢复读取时恢复 binlog 中存在的数据库模式。
12
数据库架构历史记录主题的名称。本主题仅用于内部使用,不应供消费者使用。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 例如,
oc create -n debezium -f mysql-inventory-connector.yaml
oc create -n debezium -f mysql-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 连接器注册到 Kafka Connect 集群,并开始针对
KafkaConnectorCR 中的spec.config.database.dbname指定的数据库运行。连接器 pod 就绪后,Debebe 正在运行。
现在,您已准备好 验证连接器是否已创建, 并已开始捕获 inventory 数据库中的更改。