2.2. 部署 Kafka 连接


部署 MySQL 数据库后,使用 AMQ Streams 构建 Kafka Connect 容器镜像,其中包括 Debezium MySQL connector 插件。在此过程中,您可以创建并使用以下自定义资源(CR):

  • 一个 KafkaConnect CR,用于定义 Kafka Connect 实例,并包含有关要在镜像中包括的 MySQL 连接器工件的信息。
  • KafkaConnector CR 提供详情,其中包括 MySQL 连接器用于访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,通过应用 KafkaConnector CR 来启动连接器。

在构建过程中,AMQ Streams Operator 将 KafkaConnect 自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建从 Red Hat Maven 存储库下载必要的工件,并将它们合并到镜像中。新创建的容器被推送到在 .spec.build.output 中指定的容器 registry,用于部署 Kafka Connect pod。在 AMQ Streams 构建 Kafka Connect 镜像后,使用 KafkaConnector 自定义资源启动连接器。

流程

  1. 登录 OpenShift 集群,创建或打开一个项目,如 debezium
  2. 为连接器创建 Debezium KafkaConnect 自定义资源(CR),或修改现有资源(CR)。例如,创建一个 KafkaConnect CR,用于指定 metadata.annotationsspec.build 属性,如下例所示。使用名称(如 dbz-connect.yaml )保存文件。

    例 2.1. 一个 dbz-connect.yaml 文件,该文件定义了一个 KafkaConnect 自定义资源,其中包含 Debezium 连接器

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.00
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-mysql
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/1.9.7.Final-redhat-<build_number>/debezium-connector-mysql-1.9.7.Final.zip  7
    
      bootstrapServers: my-cluster-kafka-bootstrap:9093
    表 2.1. Kafka Connect 配置设置的描述
    Description

    1

    strimzi.io/use-connector-resources 注解设置为 "true" 来启用 Cluster Operator 以使用 KafkaConnector 资源在此 Kafka Connect 集群中配置连接器。

    2

    spec.build 配置指定存储构建镜像的位置,并列出了要包含在镜像中的插件,以及插件工件的位置。

    3

    build.output 指定存储新构建的镜像的 registry。

    4

    指定镜像输出的名称和镜像名称。output.type 的有效值为 docker,用于推送到 Docker Hub 或 Quay 等容器注册表,或者 镜像流 将镜像推送到内部 OpenShift ImageStream。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定 build.output 的更多信息,请参阅 AMQ Streams Build schema 参考文档

    5

    插件配置 列出了您希望在 Kafka Connect 镜像中包含的所有连接器。对于列表中的每个条目,请指定插件名称,以及构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括您希望用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。

    6

    artifacts.type 的值指定 artifacts.url 中指定的工件的文件类型。有效类型为 ziptgzjar。Debezium 连接器存档以 .zip 文件格式提供。JDBC 驱动程序文件采用 .jar 格式。类型 值必须与 url 字段中引用的文件类型匹配。

    7

    artifacts.url 指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。OpenShift 集群必须有权访问指定的服务器。

  3. 输入以下命令将 KafkaConnect 构建规格应用到 OpenShift 集群:

    oc create -f dbz-connect.yaml

    根据自定义资源中指定的配置,AMQ Streams Operator 准备要部署的 Kafka Connect 镜像。
    构建完成后,Operator 会将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。

  4. 创建 KafkaConnector 资源,以定义 MySQL 连接器的实例。
    例如,创建以下 KafkaConnector CR,并将它保存为 debezium-inventory-connector.yaml

    例 2.2. mysql-inventory-connector.yaml 文件,该文件定义 Debezium 连接器的 KafkaConnector 自定义资源

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: my-connect-cluster
      name: inventory-connector 1
    spec:
      class: io.debezium.connector.mysql.MySqlConnector 2
      tasksMax: 1  3
      config:  4
        database.hostname: mysql 5
        database.port: 3306   6
        database.user: debezium  7
        database.password: dbz  8
        database.server.id: 184054
        database.dbname: mydatabase 9
        database.server.name: dbserver1 10
        database.include.list: inventory  11
        database.history.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 12
        database.history.kafka.topic: schema-changes.inventory
    表 2.2. 连接器配置设置描述
    Description

    1

    使用 Kafka Connect 集群注册的连接器名称。

    2

    连接器类的名称。

    3

    在同一时间仅应操作一个任务。使用单一连接器任务来确保作为 MySQL 连接器正确顺序和事件处理,读取 MySQL 服务器的 binlog。Kafka Connect 服务使用连接器启动一个或多个任务来完成工作。它会在 Kafka Connect 服务的集群中自动分发正在运行的任务。如果服务停止或崩溃,则会将任务重新分发到运行的服务。

    4

    连接器的配置。

    5

    数据库主机,即运行 MySQL 服务器(mysql)的容器的名称。

    6

    数据库实例的端口号。

    7

    Debezium 连接到数据库的用户帐户的名称。

    8

    数据库用户帐户的密码。

    9

    要从中捕获更改的数据库名称。

    10

    数据库实例或集群的逻辑名称。服务器名称是 MySQL 服务器或服务器集群的逻辑标识符。这个名称被用作所有 Kafka 主题的前缀。

    11

    连接器从中捕获更改事件的表列表。连接器仅检测 inventory 数据库中的更改。

    12

    指定连接器用于存储数据库模式历史记录的 Kafka 代理和主题(您要发送事件的同一代理)。重启后,连接器会在连接器恢复读取时恢复 binlog 中存在的数据库模式。

  5. 运行以下命令来创建连接器资源:

    oc create -n <namespace> -f <kafkaConnector>.yaml

    例如,

    oc create -n debezium -f mysql-inventory-connector.yaml

    连接器注册到 Kafka Connect 集群,并开始针对 KafkaConnector CR 中的 spec.config.database.dbname 指定的数据库运行。在连接器 Pod 就绪后,Debezium 已在运行。

您现在已准备好 验证连接器是否已创建,并开始使用捕获 清单 数据库中的更改。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.