3.2. 部署 Kafka Connect
部署 MySQL 数据库后,使用 AMQ Streams 构建包含 Debezium MySQL 连接器插件的 Kafka Connect 容器镜像。在部署过程中,您可以创建并使用以下自定义资源(CR):
-
定义 Kafka Connect 实例的
KafkaConnect
CR,并包含有关要在镜像中包含的 MySQL 连接器工件的信息。 -
KafkaConnector
CR 提供了包括 MySQL 连接器用来访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,您可以通过应用KafkaConnector
CR 来启动连接器。
在构建过程中,AMQ Streams Operator 将 KafkaConnect
自定义资源(包括 Debezium 连接器定义)中的输入参数转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库下载所需的工件,并将它们合并到镜像中。新创建的容器被推送到在 .spec.build.output
中指定的容器 registry,用于部署 Kafka Connect pod。在 AMQ Streams 构建 Kafka Connect 镜像后,使用 KafkaConnector
自定义资源来启动连接器。
先决条件
- AMQ Streams 在 OpenShift 集群上运行。
- AMQ Streams Cluster Operator 已安装到 OpenShift 集群。
- Apache Kafka 和 Kafka Connect 在 AMQ Streams 上运行。
流程
-
登录 OpenShift 集群,再创建或打开一个项目,如
debezium
。 为连接器创建 Debezium
KafkaConnect
自定义资源(CR),或修改现有的资源。
以下示例显示了一个dbz-connect.yaml
文件的摘录,该文件描述了KafkaConnect
自定义资源。metadata.annotations
和spec.build
属性是必需的。例 3.1. 定义包含 Debezium 连接器的
KafkaConnect
自定义资源的dbz-connect.yaml
文件apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: replicas: 1 version: 3.5.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-mysql artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.3.4.Final-redhat-00001/debezium-connector-mysql-2.3.4.Final-redhat-00001-plugin.zip 7 bootstrapServers: my-cluster-kafka-bootstrap:9093 ...
表 3.1. Kafka Connect 配置设置的描述 项 描述 1
将
strimzi.io/use-connector-resources
注解设置为"true"
,使 Cluster Operator 使用KafkaConnector
资源在此 Kafka Connect 集群中配置连接器。2
spec.build
配置指定在镜像中存储构建镜像的位置,并列出要在镜像中包含的插件,以及插件工件的位置。3
build.output
指定存储新构建镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type
的有效值是要推送到 Docker Hub 或 Quay 等容器 registry 的有效值,或镜像流
以将镜像推送到内部 OpenShift ImageStream。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定
build.output
的更多信息,请参阅 AMQ Streams Build schema 文档。5
plugins
配置列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称
,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您还可以包含可用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。6
artifacts.type
的值指定在artifacts.url
中指定的工件类型。有效类型为zip
、tgz
或jar
。Debezium 连接器存档以.zip
文件格式提供。JDBC 驱动程序文件采用.jar
格式。类型
值必须与url
字段中引用的文件类型匹配。7
artifacts.url
的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。OpenShift 集群必须有权访问指定的服务器。输入以下命令将
KafkaConnect
构建规格应用到 OpenShift 集群:oc create -f dbz-connect.yaml
根据自定义资源中指定的配置,AMQ Streams Operator 准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。创建一个
KafkaConnector
资源来定义 MySQL 连接器的实例。
例如,创建以下KafkaConnector
CR,并将它保存为debezium-inventory-connector.yaml
例 3.2. 为 Debezium 连接器定义
KafkaConnector
自定义资源的mysql-inventory-connector.yaml
文件apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: inventory-connector 1 spec: class: io.debezium.connector.mysql.MySqlConnector 2 tasksMax: 1 3 config: 4 database.hostname: mysql 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.server.id: 184054 topic.prefix: dbserver1 9 table.include.list: inventory.* 10 schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 11 schema.history.internal.kafka.topic: schema-changes.inventory 12
表 3.2. 连接器配置设置的描述 项 描述 1
使用 Kafka Connect 集群注册的连接器名称。
2
连接器类的名称。
3
任何时候都只能运行一个任务。使用单一连接器任务来确保正确的顺序和事件处理,因为 MySQL 连接器读取 MySQL 服务器的
binlog
。Kafka Connect 服务使用连接器来启动一个或多个任务来完成工作。它会在 Kafka Connect 服务集群中自动分发正在运行的任务。如果服务停止或崩溃,任务将重新分发到运行的服务。4
连接器的配置。
5
MySQL 数据库实例的主机名或地址。
6
数据库实例的端口号。
7
Debezium 通过它连接到数据库的用户帐户名称。
8
Debezium 用于连接到数据库用户帐户的密码。
9
MySQL 服务器或集群的主题前缀。此字符串前缀连接器将事件记录发送到的每个 Kafka 主题的名称。
10
连接器捕获更改事件的表列表。连接器仅在
清单
表中发生时才会检测更改。11
连接器用来写入和恢复 DDL 语句到数据库 schema 历史记录主题的 Kafka 代理列表。这与连接器发送更改事件记录的代理相同。重启后,连接器会在连接器恢复读取时恢复 binlog 中存在的数据库模式。
12
数据库架构历史记录主题的名称。本主题仅用于内部使用,不应供消费者使用。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
例如,
oc create -n debezium -f mysql-inventory-connector.yaml
连接器注册到 Kafka Connect 集群,并开始针对
KafkaConnector
CR 中的spec.config.database.dbname
指定的数据库运行。连接器 pod 就绪后,Debebe 正在运行。
现在,您已准备好 验证连接器是否已创建, 并已开始捕获 inventory
数据库中的更改。