3.2. 部署 Kafka Connect
部署 MySQL 数据库后,使用 Apache Kafka 的 Streams 来构建包含 Debezium MySQL 连接器插件的 Kafka Connect 容器镜像。在部署过程中,您要创建和使用以下自定义资源(CR):
-
定义 Kafka Connect 实例的
KafkaConnect
CR,并包含有关要包含在镜像中的 MySQL 连接器工件的信息。 -
提供详情的
KafkaConnector
CR,其中包含 MySQL 连接器用来访问源数据库的信息。在 Apache Kafka 的 Streams 启动 Kafka Connect pod 后,您可以通过应用KafkaConnector
CR 来启动连接器。
在构建过程中,Apache Kafka Operator 的 Streams 将 KafkaConnect
自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库下载必要的工件,并将它们合并到镜像中。新创建的容器被推送到 .spec.build.output
中指定的容器 registry,并用于部署 Kafka Connect pod。
容器镜像可以存储在外部容器注册表中,如 quay.io
或 OpenShift ImageStream 中。因为 ImageStreams 不会被自动创建,以便在 ImageStream 中存储容器镜像,因此您必须在部署 Kafka Connect 前创建 ImageStream。
在 Apache Kafka 构建并存储 Kafka Connect 镜像后,使用 KafkaConnector
自定义资源启动连接器。
先决条件
- Apache Kafka 的流在 OpenShift 集群上运行。
- Apache Kafka Cluster Operator 的 Streams 已安装到 OpenShift 集群。
- 如果您希望将 KafkaConnect 容器镜像存储在 OpenShift ImageStream 中,则提供 ImageStream。
- Apache Kafka 和 Kafka Connect 在 Apache Kafka 的 Streams 上运行。
流程
-
登录 OpenShift 集群,并创建或打开项目,如
debezium
。 为连接器创建 Debezium
KafkaConnect
自定义资源(CR),或修改现有的资源。
以下示例显示了描述KafkaConnect
自定义资源的dbz-connect.yaml
文件摘录。metadata.annotations
和spec.build
属性是必需的。例 3.1. 定义包含 Debezium 连接器的
KafkaConnect
自定义资源的dbz-connect.yaml
文件apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: replicas: 1 version: 3.6.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-mysql artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.7.3.Final-redhat-00001/debezium-connector-mysql-2.7.3.Final-redhat-00001-plugin.zip 7 bootstrapServers: my-cluster-kafka-bootstrap:9093 ...
表 3.1. Kafka Connect 配置设置的描述 项 描述 1
将
strimzi.io/use-connector-resources
注解设置为"true"
,以便 Cluster Operator 使用KafkaConnector
资源在此 Kafka Connect 集群中配置连接器。2
spec.build
配置指定存储构建镜像的位置,并列出镜像中包含的插件,以及插件工件的位置。3
build.output
指定存储新构建的镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type
的有效值为docker
,可推送到 Docker Hub 或 Quay 等容器 registry 中,用于将镜像推送到内部 OpenShift ImageStream。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定
build.output
的更多信息,请参阅 Apache Kafka Build schema 参考文档。5
插件配置
列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称
,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括要用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。6
artifacts.type
的值指定artifacts.url
中指定的工件的文件类型。有效类型是zip
、tgz
、或jar
。Debezium 连接器存档以.zip
文件格式提供。JDBC 驱动程序文件采用.jar
格式。type
值必须与url
字段中引用的文件类型匹配。7
artifacts.url
的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。OpenShift 集群必须有权访问指定的服务器。输入以下命令将
KafkaConnect
构建规格应用到 OpenShift 集群:oc create -f dbz-connect.yaml
根据自定义资源中指定的配置,Apache Kafka Operator 的 Streams 准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。您在配置中列出的连接器工件在集群中可用。创建
KafkaConnector
资源来定义 MySQL 连接器的实例。
例如,创建以下KafkaConnector
CR,并将它保存为debezium-inventory-connector.yaml
例 3.2. 为 Debezium 连接器定义
KafkaConnector
自定义资源的mysql-inventory-connector.yaml
文件apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: inventory-connector 1 spec: class: io.debezium.connector.mysql.MySqlConnector 2 tasksMax: 1 3 config: 4 database.hostname: mysql 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.server.id: 184054 topic.prefix: dbserver1 9 table.include.list: inventory.* 10 schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 11 schema.history.internal.kafka.topic: schema-changes.inventory 12
表 3.2. 连接器配置设置的描述 项 描述 1
要注册到 Kafka Connect 集群的连接器名称。
2
连接器类的名称。
3
只在任何时候只能运行一个任务。使用单一连接器任务来确保正确顺序和事件处理,因为 MySQL 连接器读取 MySQL 服务器的
binlog
。Kafka Connect 服务使用连接器启动一个或多个任务来完成工作。它会在 Kafka Connect 服务集群中自动分发正在运行的任务。如果服务停止或崩溃,任务将重新分发到运行的服务。4
连接器的配置。
5
MySQL 数据库实例的主机名或地址。
6
数据库实例的端口号。
7
Debezium 连接到数据库的用户帐户的名称。
8
Debezium 用来连接到数据库用户帐户的密码。
9
MySQL 服务器或集群的主题前缀。这个字符串作为连接器将事件记录发送到的每个 Kafka 主题的前缀。
10
连接器捕获更改事件的表列表。连接器仅在
清单
表中发生时才会检测更改。11
连接器用来写入和恢复 DDL 语句到数据库 schema 历史记录主题的 Kafka 代理列表。这与连接器将更改事件记录发送到的代理相同。重启后,当连接器恢复读取时,连接器会恢复 binlog 上存在的数据库模式。
12
数据库架构历史记录主题的名称。本主题仅用于内部使用,不应供消费者使用。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
例如,
oc create -n debezium -f mysql-inventory-connector.yaml
连接器注册到 Kafka Connect 集群,并开始针对
KafkaConnector
CR 中的spec.config.database.dbname
指定的数据库运行。连接器 pod 就绪后,Debezium 正在运行。
现在,您可以验证连接器是否已创建, 并开始捕获 inventory
数据库中的更改。