5.5. 部署 Debezium MongoDB 连接器


您可以使用以下任一方法部署 Debezium MongoDB 连接器:

5.5.1. 使用 AMQ Streams 部署 MongoDB 连接器

从 Debezium 1.7 开始,部署 Debezium 连接器的首选方法是使用 AMQ Streams 构建包含连接器插件的 Kafka Connect 容器镜像。

在部署过程中,您可以创建并使用以下自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR,并包含有关镜像中需要包含连接器工件的信息。
  • KafkaConnector CR,提供包括连接器用来访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,您可以通过应用 KafkaConnector CR 来启动连接器。

在 Kafka Connect 镜像的构建规格中,您可以指定可用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。当 AMQ Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。

KafkaConnect CR 中的 spec.build.output 参数指定存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在 Docker registry 中,也可以存储在 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。

注意

如果使用 KafkaConnect 资源来创建集群,之后无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。

其他资源

5.5.2. 使用 AMQ Streams 部署 Debezium MongoDB 连接器

使用早期版本的 AMQ Streams 时,要在 OpenShift 上部署 Debezium 连接器,您需要首先为连接器构建 Kafka Connect 镜像。在 OpenShift 上部署连接器的当前首选方法是使用 AMQ Streams 中的构建配置来构建 Kafka Connect 容器镜像,其中包含您要使用的 Debezium 连接器插件。

在构建过程中,AMQ Streams Operator 将 KafkaConnect 自定义资源(包括 Debezium 连接器定义)中的输入参数转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。

新创建的容器被推送到在 .spec.build.output 中指定的容器 registry,用于部署 Kafka Connect 集群。在 AMQ Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector 自定义资源来启动构建中包含的连接器。

先决条件

  • 您可以访问安装了集群 Operator 的 OpenShift 集群。
  • AMQ Streams Operator 正在运行。
  • OpenShift 中部署和升级 AMQ Streams 所述,会部署 Apache Kafka 集群。
  • Kafka Connect 在 AMQ Streams 上部署
  • 您有一个 Red Hat Integration 许可证。
  • 已安装 OpenShift oc CLI 客户端,或者您可以访问 OpenShift Container Platform Web 控制台。
  • 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限,或者您必须创建 ImageStream 资源:

    将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
    • 在 registry 中创建和管理镜像的帐户和权限。
    将构建镜像存储为原生 OpenShift ImageStream

流程

  1. 登录 OpenShift 集群。
  2. 为连接器创建 Debezium KafkaConnect 自定义资源(CR),或修改现有的资源。例如,创建一个名为 dbz-connect.yamlKafkaConnect CR,用于指定 metadata.annotationsspec.build 属性。以下示例显示了一个 dbz-connect.yaml 文件的摘录,该文件描述了 KafkaConnect 自定义资源。

    例 5.1. 定义包含 Debezium 连接器的 KafkaConnect 自定义资源的 dbz-connect.yaml 文件

    在以下示例中,自定义资源被配置为下载以下工件:

    • Debezium MongoDB 连接器存档。
    • Service Registry 归档。Service Registry 是一个可选组件。只有在打算将 Avro 序列化与连接器搭配使用时,才添加 Service Registry 组件。
    • Debezium 脚本 SMT 归档以及您要与 Debezium 连接器一起使用的关联脚本引擎。SMT 归档和脚本语言依赖项是可选组件。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时,才添加这些组件。
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.5.0
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-mongodb
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mongodb/2.3.4.Final-redhat-00001/debezium-connector-mongodb-2.3.4.Final-redhat-00001-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.4.4.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.4.4.Final-redhat-<build-number>.zip  8
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.3.4.Final-redhat-00001/debezium-scripting-2.3.4.Final-redhat-00001.zip 9
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    
      ...
    表 5.12. Kafka Connect 配置设置的描述
    描述

    1

    strimzi.io/use-connector-resources 注解设置为 "true",使 Cluster Operator 使用 KafkaConnector 资源在此 Kafka Connect 集群中配置连接器。

    2

    spec.build 配置指定在镜像中存储构建镜像的位置,并列出要在镜像中包含的插件,以及插件工件的位置。

    3

    build.output 指定存储新构建镜像的 registry。

    4

    指定镜像输出的名称和镜像名称。output.type 的有效值是 要推送到 容器 registry (如 Docker Hub 或 Quay)或 镜像流 的有效值,以将镜像推送到内部 OpenShift ImageStream。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定 build.output 的更多信息,请参阅在 OpenShift 中配置 AMQ Streams 中的 AMQ Streams Build schema 参考

    5

    plugins 配置列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您还可以包含可用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。

    6

    artifacts.type 的值指定在 artifacts.url 中指定的工件类型。有效类型为 ziptgzjar。Debezium 连接器存档以 .zip 文件格式提供。类型 值必须与 url 字段中引用的文件类型匹配。

    7

    artifacts.url 的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Debezium 连接器工件在 Red Hat Maven 存储库中提供。OpenShift 集群必须有权访问指定的服务器。

    8

    (可选)指定用于下载 Service Registry 组件的工件 类型和 url。包含 Service Registry 工件,只有在您希望连接器使用 Apache Avro 来序列化带有 Service Registry 的事件键和值时,而不是使用默认的 JSON 转换程序。

    9

    (可选)指定 Debezium 脚本 SMT 归档的工件 类型和 url,以用于 Debezium 连接器。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才包括脚本 SMT。要使用脚本 SMT,您必须部署 JSR 223 兼容脚本实现,如 groovy。

    10

    (可选)指定 JSR 223 兼容脚本实施的 JAR 文件的工件 类型和 url,这是 Debezium 脚本 SMT 所需的。

    重要

    如果使用 AMQ Streams 将连接器插件合并到 Kafka Connect 镜像中,每个所需的脚本语言 工件。url 必须指定 JAR 文件的位置,并且 artifacts.type 的值也必须设置为 jar。无效的值会导致连接器在运行时失败。

    要启用带有脚本 SMT 的 Apache Groovy 语言,示例中的自定义资源会为以下库检索 JAR 文件:

    • groovy
    • Groovy-jsr223 (指定代理)
    • groovy-json (解析 JSON 字符串的模块)

    作为替代方案,Debebe Debezium 脚本 SMT 也支持使用 JSR 223 实现 GraalVM JavaScript。

  3. 输入以下命令将 KafkaConnect 构建规格应用到 OpenShift 集群:

    oc create -f dbz-connect.yaml

    根据自定义资源中指定的配置,Streams Operator 准备要部署的 Kafka Connect 镜像。
    构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。

  4. 创建一个 KafkaConnector 资源来定义您要部署的每个连接器的实例。
    例如,创建以下 KafkaConnector CR,并将它保存为 mongodb-inventory-connector.yaml

    例 5.2. mongodb-inventory-connector.yaml 文件,该文件为 Debezium 连接器定义 KafkaConnector 自定义资源

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-mongodb 1
    spec:
      class: io.debezium.connector.mongodb.MongoDbConnector 2
      tasksMax: 1  3
      config:  4
        mongodb.hosts: rs0/192.168.99.100:27017 5
        mongodb.user: debezium  6
        mongodb.password: dbz  7
        topic.prefix: inventory-connector-mongodb 8
        collection.include.list: inventory[.]*  9
    表 5.13. 连接器配置设置的描述
    描述

    1

    使用 Kafka Connect 集群注册的连接器名称。

    2

    连接器类的名称。

    3

    可以同时操作的任务数量。

    4

    连接器的配置。

    5

    主机数据库实例的地址和端口号。

    7

    Debezium 用于连接到数据库的帐户名称。

    8

    Debezium 用于连接到数据库用户帐户的密码。

    8

    数据库实例或集群的主题前缀。
    指定的名称只能由字母数字字符或下划线组成。
    因为主题前缀被用作从这个连接器接收更改事件的任何 Kafka 主题的前缀,所以该名称在集群中的连接器之间必须是唯一的。
    如果连接器与 Avro 连接器集成,则此命名空间也用于相关 Kafka Connect 模式的名称,以及相应 Avro 模式的命名空间。

    9

    连接器捕获更改的集合名称。

  5. 运行以下命令来创建连接器资源:

    oc create -n <namespace> -f <kafkaConnector>.yaml

    例如,

    oc create -n debezium -f {context}-inventory-connector.yaml

    连接器注册到 Kafka Connect 集群,并开始针对 KafkaConnector CR 中的 spec.config.database.dbname 指定的数据库运行。连接器 pod 就绪后,Debebe 正在运行。

现在 ,您可以验证 Debezium MongoDB 部署

5.5.3. 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium MongoDB 连接器

要部署 Debezium MongoDB 连接器,您必须构建包含 Debezium 连接器归档的自定义 Kafka Connect 容器镜像,然后将此容器镜像推送到容器 registry。然后,您创建两个自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR。CR 中的 image 属性指定您创建的容器镜像的名称,以运行 Debezium 连接器。您可以将此 CR 应用到部署 Red Hat AMQ Streams 的 OpenShift 实例。AMQ Streams 提供将 Apache Kafka 带到 OpenShift 的 operator 和镜像。
  • 定义 Debezium MongoDB 连接器的 KafkaConnector CR。将此 CR 应用到应用 KafkaConnect CR 的同一 OpenShift 实例。

先决条件

流程

  1. 为 Kafka Connect 创建 Debezium MongoDB 容器:

    1. 创建一个使用 registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 的 Dockerfile 作为基础镜像。例如,在终端窗口中输入以下命令:

      cat <<EOF >debezium-container-for-mongodb.yaml 1
      FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
      USER root:root
      RUN mkdir -p /opt/kafka/plugins/debezium 2
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mongodb/2.3.4.Final-redhat-00001/debezium-connector-mongodb-2.3.4.Final-redhat-00001-plugin.zip \
      && unzip debezium-connector-mongodb-2.3.4.Final-redhat-00001-plugin.zip \
      && rm debezium-connector-mongodb-2.3.4.Final-redhat-00001-plugin.zip
      RUN cd /opt/kafka/plugins/debezium/
      USER 1001
      EOF
      描述

      1

      您可以指定您想要的任何文件名。

      2

      指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将此路径替换为目录的实际路径。

      该命令在当前目录中创建一个名为 debezium-container-for-mongodb.yaml 的 Dockerfile。

    2. 从您在上一步中创建的 debezium-container-for-mongodb.yaml Docker 文件中构建容器镜像。在包含文件的目录中,打开终端窗口并输入以下命令之一:

      podman build -t debezium-container-for-mongodb:latest .
      docker build -t debezium-container-for-mongodb:latest .

      前面的命令使用名称 debezium-container-for-mongodb 构建容器镜像。

    3. 将自定义镜像推送到容器 registry,如 quay.io 或内部容器 registry。容器 registry 必须可供您要部署镜像的 OpenShift 实例使用。输入以下命令之一:

      podman push <myregistry.io>/debezium-container-for-mongodb:latest
      docker push <myregistry.io>/debezium-container-for-mongodb:latest
    4. 创建新的 Debezium MongoDB KafkaConnect 自定义资源(CR)。例如,创建一个名为 dbz-connect.yamlKafkaConnect CR,用于指定 注解和 镜像 属性。以下示例显示了一个 dbz-connect.yaml 文件的摘录,该文件描述了 KafkaConnect 自定义资源。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        #...
        image: debezium-container-for-mongodb  2
      
        ...
      描述

      1

      metadata.annotations 表示 KafkaConnector 资源用于配置在这个 Kafka Connect 集群中使用的 Cluster Operator。

      2

      spec.image 指定您创建的镜像的名称,以运行 Debezium 连接器。此属性覆盖 Cluster Operator 中的 STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 变量。

    5. 输入以下命令将 KafkaConnect CR 应用到 OpenShift Kafka Connect 环境:

      oc create -f dbz-connect.yaml

      该命令添加了一个 Kafka Connect 实例,用于指定您为运行 Debezium 连接器而创建的镜像的名称。

  2. 创建一个 KafkaConnector 自定义资源来配置 Debezium MongoDB 连接器实例。

    您可以在 .yaml 文件中配置 Debezium MongoDB 连接器,该文件指定连接器的配置属性。连接器配置可能指示 Debezium 为 MongoDB 副本集或分片集群的子集生成更改事件。另外,您可以设置过滤不需要的集合的属性。

    以下示例配置了一个 Debezium 连接器,它在 192.168.99.100 上的端口 27017 连接到 MongoDB 副本集 rs0,并捕获 清单 集合中发生的更改。inventory-connector-mongodb 是副本集的逻辑名称。

    MongoDB inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector-mongodb 1
        labels: strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mongodb.MongoDbConnector 2
        config:
         mongodb.connection.string: mongodb://192.168.99.100:27017/?replicaSet=rs0 3
         topic.prefix: inventory-connector-mongodb 4
         collection.include.list: inventory[.]* 5

    1 1 1 1 1 1 1 1 1 1 1 1 1
    用于使用 Kafka Connect 注册连接器的名称。
    2 2 2 2 2 2 2 2 2 2 2 2 2
    MongoDB 连接器类的名称。
    3 3 3 3 3 3 3 3 3 3
    用于连接到 MongoDB 副本集的主机地址。
    4 4 4 4 4 4 4 4 4 4
    MongoDB 副本集的逻辑名称,它组成了生成事件的命名空间,并在使用 Avro converter 时,用来写入的 Kafka 主题、Kafka Connect 模式名称和相应 Avro 模式的命名空间中使用。
    5 5 5 5 5 5 5 5
    与要监控的所有集合的命名空间(如 <dbName>.<collectionName>)匹配的正则表达式列表。
  3. 使用 Kafka Connect 创建连接器实例。例如,如果您将 KafkaConnector 资源保存在 inventory-connector.yaml 文件中,您将运行以下命令:

    oc apply -f inventory-connector.yaml

    前面的命令注册 inventory-connector,连接器开始针对 KafkaConnector CR 中定义的 清单 集合运行。

有关您可以为 Debezium MongoDB 连接器设置的配置属性的完整列表,请参阅 MongoDB 连接器配置属性

结果

连接器启动后,它会完成以下操作:

5.5.4. 验证 Debezium MongoDB 连接器是否正在运行

如果连接器正确启动且没有错误,它会为每个连接器配置为捕获的表创建一个主题。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。

要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:

  • 验证连接器状态。
  • 验证连接器是否生成主题。
  • 验证主题是否填充了读取操作("op":"r")的事件,连接器在每个表的初始快照中生成。

先决条件

  • Debezium 连接器部署到 OpenShift 上的 AMQ Streams。
  • 已安装 OpenShift oc CLI 客户端。
  • 访问 OpenShift Container Platform web 控制台。

流程

  1. 使用以下方法之一检查 KafkaConnector 资源的状态:

    • 在 OpenShift Container Platform Web 控制台中:

      1. 导航到 Home Search
      2. Search 页面中,点 Resources 打开 Select Resource 框,然后键入 KafkaConnector
      3. KafkaConnectors 列表中,点您要检查的连接器的名称,如 inventory-connector-mongodb
      4. Conditions 部分,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc describe KafkaConnector <connector-name> -n <project>

        例如,

        oc describe KafkaConnector inventory-connector-mongodb -n debezium

        该命令返回类似以下示例的状态信息:

        例 5.3. KafkaConnector 资源状态

        Name:         inventory-connector-mongodb
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-mongodb
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory-connector-mongodb.inventory
            inventory-connector-mongodb.inventory.addresses
            inventory-connector-mongodb.inventory.customers
            inventory-connector-mongodb.inventory.geom
            inventory-connector-mongodb.inventory.orders
            inventory-connector-mongodb.inventory.products
            inventory-connector-mongodb.inventory.products_on_hand
        Events:  <none>
  2. 验证连接器是否创建了 Kafka 主题:

    • 通过 OpenShift Container Platform Web 控制台。

      1. 导航到 Home Search
      2. Search 页面中,点 Resources 打开 Select Resource 框,然后键入 KafkaTopic
      3. KafkaTopics 列表中,点您要检查的主题名称,例如 inventory-connector-mongodb.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
      4. Conditions 部分,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc get kafkatopics

        该命令返回类似以下示例的状态信息:

        例 5.4. KafkaTopic 资源状态

        NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
        inventory-connector-mongodb--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-mongodb.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
  3. 检查主题内容。

    • 在终端窗口中输入以下命令:
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>

    例如,

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-mongodb.inventory.products_on_hand

    指定主题名称的格式与 oc describe 命令返回的格式与第 1 步中返回,例如 inventory-connector-mongodb.inventory.addresses

    对于主题中的每个事件,命令会返回类似以下示例的信息:

    例 5.5. Debezium 更改事件的内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-mongodb.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mongodb.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mongodb.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mongodb.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-mongodb.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.3.4.Final-redhat-00001","connector":"mongodb","name":"inventory-connector-mongodb","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mongodb-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}

    在前面的示例中,有效负载 值显示连接器快照从表 inventory.products_on_hand 生成读取(op" ="r")事件。product_id 记录的 "before" 状态为 null,表示该记录不存在之前的值。"after" 状态对于 product_id101 的项目的 quantity 显示为 3

5.5.5. Debezium MongoDB 连接器配置属性的描述

Debezium MongoDB 连接器具有大量配置属性,可用于实现应用程序的正确连接器行为。许多属性都有默认值。有关属性的信息组织如下:

除非默认值可用 否则需要以下配置属性。

表 5.14. 所需的 Debezium MongoDB 连接器配置属性
属性默认描述

name

没有默认值

连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有 Kafka Connect 连接器都需要此属性。)

connector.class

没有默认值

连接器的 Java 类的名称。始终为 MongoDB 连接器使用 io.debezium.connector.mongodb.MongoDbConnector 的值。

mongodb.connection.string

没有默认值

指定连接器用来连接到 MongoDB 副本集 的连接字符串。此属性替换了之前在 MongoDB 连接器版本中提供的 mongodb.hosts 属性。

注意

捕获分片 MongoDB 集群更改的连接器仅在 mongodb.connection.mode 设置为 replica_set 时在初始分片发现过程中使用此连接字符串。在初始发现过程后,会为每个分片生成连接字符串。

mongodb.connection.mode

replica_set

指定连接器连接到 分片 MongoDB 集群时所使用的策略。将此属性设置为以下值之一:

replica_set
连接器为每个分片建立到副本集的独立连接。
分片
连接器根据 mongodb-connection-string 的值建立与数据库的单一连接。+
注意

replica_set 选项允许连接器在多个连接器任务之间分发分片处理。但是,在这个配置中,连接器会在连接到单个分片时绕过 MongoDB 路由器,而 MongoDB 不建议这样做。

警告

在连接模式间切换无效偏移时,这会触发新快照。

topic.prefix

没有默认值

标识此连接器监控的连接器和/或 MongoDB 副本集或分片集群的唯一名称。每台服务器应由大多数 Debezium 连接器监控,因为此服务器名称会添加所有持久的 Kafka 主题,从 MongoDB 副本集或集群中重复。仅使用字母数字字符、连字符、句点和下划线来组成名称。逻辑名称在所有其他连接器之间应是唯一的,因为该名称在命名此连接器的 Kafka 主题中用作前缀。

警告

不要更改此属性的值。如果您重启后更改了 name 值,而不是继续向原始主题发出事件,连接器会将后续事件发送到名称基于新值的主题。

mongodb.user

没有默认值

连接到 MongoDB 时使用的数据库用户的名称。只有在 MongoDB 被配置为使用身份验证时才需要。

mongodb.password

没有默认值

连接到 MongoDB 时使用的密码。只有在 MongoDB 被配置为使用身份验证时才需要。

mongodb.authsource

admin

包含 MongoDB 凭证的数据库(身份验证源)。只有在 MongoDB 配置为将 MongoDB 与另一个身份验证数据库而不是 admin 进行身份验证时才需要。

mongodb.ssl.enabled

false

连接器将使用 SSL 连接到 MongoDB 实例。

mongodb.ssl.invalid.hostname.allowed

false

启用 SSL 时,此设置控制连接阶段是否禁用了严格的主机名检查。如果为 true,连接不会阻止中间人攻击。

database.include.list

空字符串

可选的、以逗号分隔的正则表达式列表,与要监控的数据库名称匹配。默认情况下,会监控所有数据库。
当设置 database.include.list 时,连接器只监控属性指定的数据库。其他数据库不包括在监控中。

要匹配数据库的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与数据库的整个名称字符串匹配;它与数据库名称中可能存在的子字符串匹配。
如果您在配置中包含此属性,不要设置 database.exclude.list 属性。

database.exclude.list

空字符串

可选的、以逗号分隔的正则表达式列表,与数据库名称匹配,以便在监控中排除。当设置 database.exclude.list 时,连接器会监控每个数据库,但属性指定的数据库除外。

要匹配数据库的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与数据库的整个名称字符串匹配;它与数据库名称中可能存在的子字符串匹配。
如果您在配置中包含此属性,请不要设置 database.include.list 属性。

collection.include.list

空字符串

可选的、以逗号分隔的正则表达式列表,与要监控 MongoDB 集合的完全限定命名空间匹配。默认情况下,连接器会监控除 本地和 admin 数据库中除它们以外的所有集合。当设置了 collection.include.list 时,连接器只监控属性指定的集合。其他集合不包括在监控中。集合标识符的格式是 databaseName.collectionName

要匹配命名空间的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与命名空间的整个名称字符串匹配,它与名称中的子字符串不匹配。
如果您在配置中包含此属性,不要设置 collection.exclude.list 属性。

collection.exclude.list

空字符串

可选的、以逗号分隔的正则表达式列表,与要从监控中排除的 MongoDB 集合的完全限定命名空间匹配。当设置了 collection.exclude.list 时,连接器会监控每个集合,但属性指定的集合除外。集合标识符的格式是 databaseName.collectionName

要匹配命名空间的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与命名空间的整个名称字符串匹配,它不与数据库名称中存在的子字符串匹配。
如果您在配置中包含此属性,请不要设置 collection.include.list 属性。

snapshot.mode

初始

指定连接器启动时执行快照的条件。将属性设置为以下值之一:

初始
当连接器启动时,如果没有检测到偏移主题中的值,它会执行数据库的快照。
never
当连接器启动时,它会跳过快照过程,并立即开始将数据库记录的操作更改事件流传输到 oplog。

capture.mode

change_streams_update_full

指定连接器用来捕获 MongoDB 服务器的 update 事件更改的方法。将此属性设置为以下值之一:

change_streams
update 事件消息不包括完整文档。消息不包含代表更改文档状态的字段。
change_streams_update_full

update 事件消息包括完整文档。消息不包含代表更新前文档状态的 before 字段。事件消息返回 after 字段中文档的完整状态。

注意

在某些情况下,当将 capture.mode 配置为返回完整文档时,更新事件消息的 updateDescriptionafter 字段可能会报告不一致的值。在将多个更新应用到快速成功的文档后,这些差异可能会导致。连接器仅在收到事件的 updateDescription 字段中描述的更新后,从 MongoDB 数据库请求完整的文档。如果后续更新在连接器可以从数据库检索源文档前修改它,则连接器会收到稍后更新修改的文档。

change_streams_update_full_with_pre_image
update 事件消息包括完整文档,并包含一个代表 更改前 文档状态的字段。
change_streams_with_pre_image
更新 事件不包括完整文档,而是包含一个代表 更改前 文档状态的字段。

snapshot.include.collection.list

collection.include.list中指定的所有集合

一个可选的、以逗号分隔的正则表达式列表,与您要包含在快照中的模式的完全限定域名(<databaseName&gt; .<collectionName>)匹配。指定的项目必须在连接器的 collection.include.list 属性中命名。只有在连接器的 snapshot.mode 属性设置为除 never 的值时,此属性才会生效。
此属性不会影响增量快照的行为。

要匹配 schema 的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个名称字符串匹配;它与 schema 名称中可能存在的子字符串匹配。

field.exclude.list

空字符串

可选的、以逗号分隔的字段名称列表,这些字段应排除在更改事件消息值中。字段的完全限定域名格式为 databaseName.collectionName.fieldName.nestedFieldName,其中 databaseNamecollectionName 可能包含与任何字符匹配的通配符 jpeg。

field.renames

空字符串

可选的、以逗号分隔的字段替换列表,用于重命名更改事件消息值中的字段。字段的完全限定替换格式为 databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName,其中 databaseNamecollectionName 可以包含与任何字符匹配的通配符 packagemanifests,用于确定字段重命名映射。下一个字段替换应用于列表中之前字段替换的结果,因此在重命名同一路径中的多个字段时请注意这一点。

tasks.max

1

指定连接器用来连接到分片集群的最大任务数量。当您将连接器与单个 MongoDB 副本集搭配使用时,默认值为可以接受。但是,当集群包含多个分片时,要启用 Kafka Connect 来分发每个副本集的工作,请指定等于或大于集群中的分片数量的值。然后,MongoDB 连接器可以使用单独的任务连接到集群中每个分片的副本集。

注意

只有在连接器连接到分片 MongoDB 集群,并且 mongodb.connection.mode 属性设置为 replica_set 时,此属性才会生效。当 mongodb.connection.mode 设置为 sharded 时,或者连接器连接到未分片的 MongoDB 副本集部署时,连接器会忽略此设置,并默认使用单个任务。

snapshot.max.threads

1

正整数值,用于指定用于在副本集中执行集合空间同步的最大线程数。默认为 1。

tombstones.on.delete

true

控制 delete 事件是否后跟一个 tombstone 事件。

true - 一个 delete 操作由 delete 事件和后续 tombstone 事件表示。

false - 仅有一个 delete 事件被抛出。

删除源记录后,发出 tombstone 事件(默认行为)可让 Kafka 在为主题启用了 日志 压缩时完全删除与已删除行键相关的所有事件。

snapshot.delay.ms

没有默认值

在启动后,连接器在进行快照前应等待的时间(以毫秒为单位)。
可用于在集群中启动多个连接器时避免快照中断,这可能会导致连接器的重新平衡。

snapshot.fetch.size

0

指定在拍摄快照时每个集合中应一次读取的最大文档数。连接器将在这个大小的多个批处理中读取集合内容。
默认值为 0,这表示服务器选择合适的获取大小。

schema.name.adjustment.mode

none

指定应如何调整模式名称以与连接器使用的消息转换器兼容。可能的设置:

  • none 不应用任何调整。
  • avro 将无法在 Avro 类型名中使用的字符替换为下划线。
  • avro_unicode 将无法在 Avro 类型名称中使用的下划线或字符替换为对应的 unicode,如 _uxxxx。注意:_ 是 Java 中反斜杠的转义序列

field.name.adjustment.mode

none

指定应如何调整字段名称以与连接器使用的消息转换器兼容。可能的设置:

  • none 不应用任何调整。
  • avro 将无法在 Avro 类型名中使用的字符替换为下划线。
  • avro_unicode 将无法在 Avro 类型名称中使用的下划线或字符替换为对应的 unicode,如 _uxxxx。注意:_ 是 Java 中反斜杠的转义序列

如需了解更多详细信息,请参阅 Avro 命名

mongodb.hosts

没有默认值

以逗号分隔的主机名和端口对列表(格式为 'host' 或 'host:port')在副本集中的 MongoDB 服务器。列表中可以包含单个主机名和端口对。

注意

此属性已弃用,应该被 +mongodb.connection.string 替代。

以下 高级配置 属性具有很好的默认值,这些默认值在大多数情况下将可以正常工作,因此很少需要在连接器的配置中指定。

表 5.15. Debezium MongoDB 连接器高级配置属性
属性默认描述

max.batch.size

2048

正整数值,指定每个应在此连接器迭代过程中处理的事件的最大大小。默认值为 2048。

max.queue.size

8192

正整数值,用于指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取事件时,它会将事件放置在阻塞队列中,然后再将它们写入 Kafka。阻塞队列可以提供从数据库读取更改事件时,连接器最快于将其写入 Kafka 的信息,或者在 Kafka 不可用时从数据库读取更改事件。当连接器定期记录偏移时,队列中保存的事件会被忽略。始终将 max.queue.size 的值设置为大于 max.batch.size 的值。

max.queue.size.in.bytes

0

一个长的整数值,用于指定阻塞队列的最大卷(以字节为单位)。默认情况下,不会为阻塞队列指定卷限制。要指定队列可以消耗的字节数,请将此属性设置为正长值。
如果还设置了 max.queue.size,当队列的大小达到任一属性指定的限制时,写入队列将被阻止。例如,如果您设置了 max.queue.size=1000、和 max.queue.size.in.bytes=5000,在队列包含 1000 个记录后,或者队列中记录的卷达到 5000 字节后,写入队列会被阻止。

poll.interval.ms

1000

正整数值,指定连接器在每个迭代过程中应等待的毫秒数,以便出现新更改事件。默认值为 500 毫秒,或 0.5 秒。

connect.backoff.initial.delay.ms

1000

正整数值,指定在第一次连接尝试或没有主可用后尝试重新连接到主时的初始延迟。默认为 1 秒(1000 ms)。

connect.backoff.max.delay.ms

1000

正整数值,指定在重复失败连接尝试或没有主可用后尝试重新连接到主时的最大延迟。默认为 120 秒(120,000 ms)。

connect.max.attempts

16

正整数值,指定发生异常和任务中止前尝试到副本集的主连接的最大失败数。默认为 16,在失败时,connect.backoff.initial.delay.msconnect.backoff.max.delay.ms 的默认值会超过 20 分钟的尝试。

heartbeat.interval.ms

0

控制发送心跳消息的频率。
此属性包含一个间隔,以毫秒为单位定义连接器将信息发送到 heartbeat 主题的频率。这可用于监控连接器是否仍然从数据库接收更改事件。在较长的时间段内,您还应利用心跳消息,以防在非捕获的集合中只更改了心跳消息。在这种情况下,连接器将继续从数据库读取 oplog/change 流,但永远不会将任何更改信息发送到 Kafka,这意味着没有将偏移更新提交到 Kafka。这将导致 oplog 文件被轮转,但连接器不会注意到它,因此在重启一些事件时,这个事件将不再需要重新执行初始快照。

将此参数设置为 0 以不发送心跳信息。
默认禁用此选项。

skipped.operations

t

在流过程中将跳过的操作类型的逗号分隔列表。操作包括:用于 inserts/create、u 表示 updates/replace、d 表示删除、t 表示截断,none 不跳过上述操作。默认情况下,为了与其他 Debezium 连接器保持一致,会跳过 truncate 操作(不会由此连接器发出)。但是,由于 MongoDB 不支持 截断更改事件,这实际上与指定 none 相同。

snapshot.collection.filter.overrides

没有默认值

控制快照中包含的集合项目。此属性仅影响快照。以 databaseName.collectionName 格式指定以逗号分隔的集合名称列表。

对于您指定的每个集合,还要指定另一个配置属性: snapshot.collection.filter.overrides.databaseName.collectionName。例如,其他配置属性的名称可能是: snapshot.collection.filter.overrides.customers.orders。将此属性设置为有效的过滤器表达式,它只检索您在快照中所需的项目。当连接器执行快照时,它只检索与过滤器表达式匹配的项目。

provide.transaction.metadata

false

当设置为 true Debezium 时,使用事务边界生成事件,并使用事务元数据增强数据事件信封。

如需了解更多详细信息,请参阅 事务元数据

retriable.restart.connector.wait.ms

10000 (10 秒)

在发生可检索错误后重启连接器前等待的毫秒数。

mongodb.poll.interval.ms

30000

连接器轮询新的、删除或更改的副本集的时间间隔。

mongodb.connect.timeout.ms

10000 (10 秒)

驱动程序在新连接尝试中止前等待的毫秒数。

mongodb.heartbeat.frequency.ms

10000 (10 秒)

集群监控器尝试访问每台服务器的频率。

mongodb.socket.timeout.ms

0

套接字上的发送/接收在超时发生前可能需要的毫秒数。0 代表禁用此行为。

mongodb.server.selection.timeout.ms

30000 (30 秒)

驱动程序在超时前等待选择服务器的毫秒数,并抛出错误。

cursor.pipeline

没有默认值

当流更改时,此设置应用处理来更改流事件,作为标准 MongoDB 聚合流管道的一部分。Pipeline 是一个 MongoDB 聚合管道,由数据库的说明组成,用于过滤或转换数据。这可用于自定义连接器使用的数据。此属性的值必须是 JSON 格式的允许 聚合管道阶段 的数组。请注意,这会在用于支持连接器的内部管道后附加(例如,过滤操作类型、数据库名称、集合名称等)。

cursor.pipeline.order

internal_first

用于构建有效 MongoDB 聚合流管道的顺序。将属性设置为以下值之一:

internal_first
连接器定义的内部阶段会首先应用。这意味着,只有由连接器捕获的事件才会被连接器捕获到用户定义的阶段(通过设置 cursor.pipeline进行配置)。
user_first
'cursor.pipeline' 属性定义的阶段会首先应用。在这个模式中,所有事件(包括未由连接器捕获的事件)被反馈到用户定义的管道阶段。如果 cursor.pipeline 的值包含复杂操作,则此模式可能会对性能造成负面影响。

cursor.max.await.time.ms

0

指定 oplog/change 流光标将在导致执行超时异常前等待服务器生成结果的最大毫秒数。值 0 表示使用 server/driver 默认等待超时。

signal.data.collection

没有默认值

用于向连接器发送信号的数据收集的完全限定名称。https://access.redhat.com/documentation/zh-cn/red_hat_integration/2023.q4/html-single/debezium_user_guide/index#debezium-signaling-enabling-source-signaling-channel使用以下格式指定集合名称:
<databaseName> . < collectionName>

signal.enabled.channels

source

为连接器启用的信号频道名称列表。默认情况下,以下频道可用:

  • source
  • kafka
  • file
  • jmx

notification.enabled.channels

没有默认值

为连接器启用的通知频道名称列表。默认情况下,以下频道可用:

  • sink
  • log
  • jmx

incremental.snapshot.chunk.size

1024

连接器在增量快照块期间获取并读取内存的最大文档数。增加块大小可提高效率,因为快照会运行更多大小的快照查询。但是,较大的块大小还需要更多内存来缓冲快照数据。将块大小调整为提供环境中最佳性能的值。
增量快照是 Debezium MongoDB 连接器的技术预览功能。

topic.naming.strategy

io.debezium.schema.DefaultTopicNamingStrategy

应该用来确定数据更改、模式更改、事务、心跳事件等的主题名称,默认为 DefaultTopicNamingStrategy

topic.delimiter

.

指定主题名称的分隔符,默认为

topic.cache.size

10000

在绑定的并发哈希映射中用于保存主题名称的大小。此缓存将有助于确定与给定数据收集对应的主题名称。

topic.heartbeat.prefix

__debezium-heartbeat

控制连接器向其发送心跳信息的主题名称。主题名称具有此模式:

topic.heartbeat.prefix.topic.prefix

例如,如果主题前缀是 fulfillment,则默认主题名称为 __debezium-heartbeat.fulfillment

topic.transaction

Transactions

控制连接器向其发送事务元数据消息的主题名称。主题名称具有此模式:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,默认的主题名称为 fulfillment.transaction

errors.max.retries

-1

在失败前,retriable 错误(如连接错误)的最大重试次数(-1 = no limit, 0 = disabled, > 0 = num of retries)。

Debezium 连接器 Kafka 信号配置属性

Debezium 提供了一组 signal.* 属性,用于控制连接器如何与 Kafka 信号主题进行交互。

下表描述了 Kafka 信号 属性。

表 5.16. Kafka 信号配置属性
属性默认描述

signal.kafka.topic

<topic.prefix>-signal

连接器监控用于临时信号的 Kafka 主题的名称。

注意

如果禁用了 自动主题创建,您必须手动创建所需的信号主题。需要信号主题来保留信号排序。信号主题必须具有单个分区。

signal.kafka.groupId

kafka-signal

Kafka 用户使用的组 ID 的名称。

signal.kafka.bootstrap.servers

没有默认值

连接器用来建立到 Kafka 集群的初始连接的主机/端口对列表。每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。

signal.kafka.poll.timeout.ms

100

一个整数值,用于指定连接器在轮询信号时等待的最大毫秒数。

Debezium 连接器传递信号 Kafka 使用者客户端配置属性

Debezium 连接器为信号 Kafka 使用者提供直通配置。透传信号属性以 signals.consumer.* 前缀开始。例如,连接器将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 消费者。

Debezium 从属性中剥离前缀,然后再将属性传递给 Kafka 信号消费者。

Debezium 连接器接收器通知配置属性

下表描述了 通知 属性。

表 5.17. sink 通知配置属性
属性默认描述

notification.sink.topic.name

没有默认值

从 Debezium 接收通知的主题名称。当您将 notification.enabled.channels 属性配置为将 sink 作为启用的通知频道之一时,需要此属性。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.