2.2. AMQ Streams 上的 Debezium 部署


要在 Red Hat OpenShift Container Platform 上为 Debezium 设置连接器,请使用 AMQ Streams 为您要使用的连接器构建 Kafka Connect 容器镜像,其中包括 connector 插件。在连接器启动后,它将连接到已配置的数据库,并为插入、更新和删除行或文档生成更改事件记录。

从 Debezium 1.7 开始,部署 Debezium 连接器的首选方法是使用 AMQ Streams 来构建包含连接器插件的 Kafka Connect 容器镜像。

在此过程中,您可以创建并使用以下自定义资源(CR):

  • 一个 KafkaConnect CR,用于定义 Kafka Connect 实例,并包含连接器工件需要包含在镜像中的信息。
  • KafkaConnector CR,它提供详情,其中包括连接器用于访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,通过应用 KafkaConnector CR 来启动连接器。

在 Kafka Connect 镜像的构建规格中,您可以指定可用于部署的连接器。对于每个连接器插件,您还可以指定您希望用于部署的其他组件。例如,您可以添加 Apicurio Registry 工件或 Debezium 脚本组件。当 AMQ Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。

KafkaConnect CR 中的 spec.build.output 参数指定存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在 Docker 注册表中,也可以存储在 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。

注意

如果您使用 KafkaConnect 资源创建集群,则之后您无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 检索信息。

2.2.1. 使用 AMQ Streams 部署 Debezium

您遵循相同的步骤来部署每种类型的 Debezium 连接器。下面的部分论述了如何部署 Debezium MySQL 连接器。

随着 AMQ Streams 的早期版本,在 OpenShift 中部署 Debezium 连接器,首先需要为连接器构建 Kafka 连接镜像。在 OpenShift 上部署连接器的当前首选方法是使用 AMQ Streams 中的构建配置,自动构建包含您要使用的 Debezium 连接器插件的 Kafka Connect 容器镜像。

在构建过程中,AMQ Streams Operator 将 KafkaConnect 自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载所需的工件。

新创建的容器被推送到在 .spec.build.output 中指定的容器 registry,用于部署 Kafka Connect 集群。在 AMQ Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector 自定义资源来启动构建中包含的连接器。

前提条件

  • 您可以访问安装集群 Operator 的 OpenShift 集群。
  • AMQ Streams Operator 正在运行。
  • OpenShift 上部署了 Apache Kafka 集群,如部署和升级 AMQ Streams 中所述。
  • Kafka Connect 部署在 AMQ Streams 上
  • 有红帽构建的 Debezium 许可证。
  • 已安装 OpenShift oc CLI 客户端,或您可以访问 OpenShift Container Platform Web 控制台。
  • 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限,或者您必须创建 ImageStream 资源:

    将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
    • 在 registry 中创建和管理镜像的帐户和权限。
    将构建镜像存储为原生 OpenShift ImageStream
    • ImageStream 资源部署到集群中。您必须为集群明确创建 ImageStream。默认情况下,镜像流不可用。

流程

  1. 登录 OpenShift 集群。
  2. 为连接器创建 Debezium KafkaConnect 自定义资源(CR),或修改现有资源(CR)。例如,创建一个 KafkaConnect CR,用于指定 metadata.annotationsspec.build 属性,如下例所示。使用名称(如 dbz-connect.yaml )保存文件。

    例 2.1. 一个 dbz-connect.yaml 文件,该文件定义了一个 KafkaConnect 自定义资源,其中包含 Debezium 连接器

    在以下示例中,自定义资源被配置为下载以下工件:

    • Debezium 连接器存档。
    • 红帽构建的 Apicurio Registry 归档。Apicurio Registry 是一个可选组件。只有在打算使用连接器进行 Avro serialization 时,才添加 Apicurio Registry 组件。
    • Debezium 脚本处理 SMT 归档和您要与 Debezium 连接器关联的脚本引擎。SMT 归档和脚本语言依赖项是可选组件。只有在计划使用基于 Debezium 内容的路由 SMT 或 过滤 SMT 时才添加这些组件。
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.00
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-mysql
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/1.9.7.Final-redhat-<build_number>/debezium-connector-mysql-1.9.7.Final-redhat-<build_number>-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.3-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.3-redhat-<build-number>.zip 8
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/1.9.7.Final/debezium-scripting-1.9.7.Final.zip 9
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    表 2.1. Kafka Connect 配置设置的描述
    Description

    1

    strimzi.io/use-connector-resources 注解设置为 "true" 来启用 Cluster Operator 以使用 KafkaConnector 资源在此 Kafka Connect 集群中配置连接器。

    2

    spec.build 配置指定存储构建镜像的位置,并列出了要包含在镜像中的插件,以及插件工件的位置。

    3

    build.output 指定存储新构建的镜像的 registry。

    4

    指定镜像输出的名称和镜像名称。output.type 的有效值为 docker,用于推送到容器注册表(如 Docker Hub 或 Quay)或 镜像流 (push)到内部 OpenShift ImageStream。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定 build.output 的更多信息,请参阅 AMQ Streams Build schema 参考文档

    5

    插件配置 列出了您希望在 Kafka Connect 镜像中包含的所有连接器。对于列表中的每个条目,请指定插件名称,以及构建连接器所需的工件的信息。另外,对于每个连接器插件,您可以包括您希望用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。

    6

    artifacts.type 的值指定 artifacts.url 中指定的工件的文件类型。有效类型为 ziptgzjar。Debezium 连接器存档以 .zip 文件格式提供。类型 值必须与 url 字段中引用的文件类型匹配。

    7

    artifacts.url 指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。红帽 Maven 存储库中提供了 Debezium 连接器工件。OpenShift 集群必须有权访问指定的服务器。

    8

    (可选)指定下载 Apicurio Registry 组件的工件 类型和 url。仅包含 Apicurio Registry 工件,只有在您希望连接器使用 Apache Avro 使用 Red Hat build of Apicurio Registry 而不是使用默认的 JSON 转换器来序列化事件键和值时,才包括 Apicurio Registry 工件。

    9

    (可选)指定 Debezium 脚本 SMT 归档的工件 类型和 url,以用于 Debezium 连接器。只有在打算使用 基于内容的路由 SMT 或 过滤 SMT 以使用脚本 SMT 时,才包含脚本 SMT,您必须部署 JSR 223- 兼容脚本实施,如 groovy。

    10

    (可选)指定 JSR 223 兼容脚本实施的 JAR 文件的工件 类型和 URL,这些脚本由 Debezium 脚本 SMT 是必需的。

    重要

    如果您使用 AMQ Streams 将 connector 插件合并到 Kafka Connect 镜像中,对于每个需要的脚本语言组件 artifacts.url 必须指定 JAR 文件的位置,工件.type 的值也必须设置为 jar。无效的值会导致连接器在运行时失败。

    要启用使用带有脚本 SMT 的 Apache Groovy 语言的使用,示例中的自定义资源会检索以下库的 JAR 文件:

    • groovy
    • groovy-jsr223 (scripting agent)
    • Groovy-json (解析 JSON 字符串的一个模块)

    作为替代方案,Debezium 的脚本 SMT 也支持使用 GraalVM JavaScript 的 JSR 223 实现。

  3. 输入以下命令将 KafkaConnect 构建规格应用到 OpenShift 集群:

    oc create -f dbz-connect.yaml

    根据自定义资源中指定的配置,Streams Operator 会准备要部署的 Kafka Connect 镜像。
    构建完成后,Operator 会将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。

  4. 创建一个 KafkaConnector 资源,以定义您要部署的每个连接器的实例。
    例如,创建以下 KafkaConnector CR,并将它保存为 mysql-inventory-connector.yaml

    例 2.2. mysql-inventory-connector.yaml 文件,该文件定义了 Debezium 连接器的 KafkaConnector 自定义资源

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-mysql 1
    spec:
      class: io.debezium.connector.mysql.MySqlConnector 2
      tasksMax: 1  3
      config:  4
        database.history.kafka.bootstrap.servers: 'debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092'
        database.history.kafka.topic: schema-changes.inventory
        database.hostname: mysql.debezium-mysql.svc.cluster.local 5
        database.port: 3306   6
        database.user: debezium  7
        database.password: dbz  8
        database.dbname: mydatabase 9
        database.server.name: inventory_connector_mysql 10
        database.include.list: public.inventory  11
    表 2.2. 连接器配置设置描述
    Description

    1

    使用 Kafka Connect 集群注册的连接器名称。

    2

    连接器类的名称。

    3

    可同步运行的任务数量。

    4

    连接器的配置。

    5

    主机数据库实例的地址。

    6

    数据库实例的端口号。

    7

    Debezium 连接到数据库的用户帐户的名称。

    8

    数据库用户帐户的密码。

    9

    要从中捕获更改的数据库名称。

    10

    数据库实例或集群的逻辑名称。
    指定名称只能通过字母数字字符或下划线组成。
    由于逻辑名称用作接收来自此连接器更改事件的任何 Kafka 主题的前缀,因此该名称必须在集群的连接器之间唯一。
    如果您将连接器与 Avro 连接器集成,则命名空间也用于相关的 Kafka Connect 模式的名称,以及对应的 Avro 模式的命名空间。

    11

    连接器从中捕获更改事件的表列表。

  5. 运行以下命令来创建连接器资源:

    oc create -n <namespace> -f <kafkaConnector>.yaml

    例如,

    oc create -n debezium -f {context}-inventory-connector.yaml

    连接器注册到 Kafka Connect 集群,并开始针对 KafkaConnector CR 中的 spec.config.database.dbname 指定的数据库运行。在连接器 Pod 就绪后,Debezium 已在运行。

您现在已准备好 验证 Debezium 部署

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.