8.6. 部署 Debezium PostgreSQL 连接器


您可以使用以下任一方法部署 Debezium PostgreSQL 连接器:

8.6.1. 使用 AMQ Streams 部署 PostgreSQL 连接器

从 Debezium 1.7 开始,部署 Debezium 连接器的首选方法是使用 AMQ Streams 构建包含连接器插件的 Kafka Connect 容器镜像。

在部署过程中,您可以创建并使用以下自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR,并包含有关镜像中需要包含连接器工件的信息。
  • KafkaConnector CR,提供包括连接器用来访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,您可以通过应用 KafkaConnector CR 来启动连接器。

在 Kafka Connect 镜像的构建规格中,您可以指定可用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。当 AMQ Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。

KafkaConnect CR 中的 spec.build.output 参数指定存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在 Docker registry 中,也可以存储在 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。

注意

如果使用 KafkaConnect 资源来创建集群,之后无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。

其他资源

8.6.2. 使用 AMQ Streams 部署 Debezium PostgreSQL 连接器

使用早期版本的 AMQ Streams 时,要在 OpenShift 上部署 Debezium 连接器,您需要首先为连接器构建 Kafka Connect 镜像。在 OpenShift 上部署连接器的当前首选方法是使用 AMQ Streams 中的构建配置来构建 Kafka Connect 容器镜像,其中包含您要使用的 Debezium 连接器插件。

在构建过程中,AMQ Streams Operator 将 KafkaConnect 自定义资源(包括 Debezium 连接器定义)中的输入参数转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。

新创建的容器被推送到在 .spec.build.output 中指定的容器 registry,用于部署 Kafka Connect 集群。在 AMQ Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector 自定义资源来启动构建中包含的连接器。

先决条件

  • 您可以访问安装了集群 Operator 的 OpenShift 集群。
  • AMQ Streams Operator 正在运行。
  • OpenShift 中部署和升级 AMQ Streams 所述,会部署 Apache Kafka 集群。
  • Kafka Connect 在 AMQ Streams 上部署
  • 您有一个 Red Hat Integration 许可证。
  • 已安装 OpenShift oc CLI 客户端,或者您可以访问 OpenShift Container Platform Web 控制台。
  • 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限,或者您必须创建 ImageStream 资源:

    将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
    • 在 registry 中创建和管理镜像的帐户和权限。
    将构建镜像存储为原生 OpenShift ImageStream

流程

  1. 登录 OpenShift 集群。
  2. 为连接器创建 Debezium KafkaConnect 自定义资源(CR),或修改现有的资源。例如,创建一个名为 dbz-connect.yamlKafkaConnect CR,用于指定 metadata.annotationsspec.build 属性。以下示例显示了一个 dbz-connect.yaml 文件的摘录,该文件描述了 KafkaConnect 自定义资源。

    例 8.1. 定义包含 Debezium 连接器的 KafkaConnect 自定义资源的 dbz-connect.yaml 文件

    在以下示例中,自定义资源被配置为下载以下工件:

    • Debezium PostgreSQL 连接器存档。
    • Service Registry 归档。Service Registry 是一个可选组件。只有在打算将 Avro 序列化与连接器搭配使用时,才添加 Service Registry 组件。
    • Debezium 脚本 SMT 归档以及您要与 Debezium 连接器一起使用的关联脚本引擎。SMT 归档和脚本语言依赖项是可选组件。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时,才添加这些组件。
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: debezium-kafka-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      version: 3.5.0
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-postgres
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/2.3.4.Final-redhat-00001/debezium-connector-postgres-2.3.4.Final-redhat-00001-plugin.zip  7
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.4.4.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.4.4.Final-redhat-<build-number>.zip  8
              - type: zip
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.3.4.Final-redhat-00001/debezium-scripting-2.3.4.Final-redhat-00001.zip 9
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
              - type: jar
                url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
    
      bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
    
      ...
    表 8.25. Kafka Connect 配置设置的描述
    描述

    1

    strimzi.io/use-connector-resources 注解设置为 "true",使 Cluster Operator 使用 KafkaConnector 资源在此 Kafka Connect 集群中配置连接器。

    2

    spec.build 配置指定在镜像中存储构建镜像的位置,并列出要在镜像中包含的插件,以及插件工件的位置。

    3

    build.output 指定存储新构建镜像的 registry。

    4

    指定镜像输出的名称和镜像名称。output.type 的有效值是 要推送到 容器 registry (如 Docker Hub 或 Quay)或 镜像流 的有效值,以将镜像推送到内部 OpenShift ImageStream。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定 build.output 的更多信息,请参阅在 OpenShift 中配置 AMQ Streams 中的 AMQ Streams Build schema 参考

    5

    plugins 配置列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您还可以包含可用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。

    6

    artifacts.type 的值指定在 artifacts.url 中指定的工件类型。有效类型为 ziptgzjar。Debezium 连接器存档以 .zip 文件格式提供。类型 值必须与 url 字段中引用的文件类型匹配。

    7

    artifacts.url 的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Debezium 连接器工件在 Red Hat Maven 存储库中提供。OpenShift 集群必须有权访问指定的服务器。

    8

    (可选)指定用于下载 Service Registry 组件的工件 类型和 url。包含 Service Registry 工件,只有在您希望连接器使用 Apache Avro 来序列化带有 Service Registry 的事件键和值时,而不是使用默认的 JSON 转换程序。

    9

    (可选)指定 Debezium 脚本 SMT 归档的工件 类型和 url,以用于 Debezium 连接器。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才包括脚本 SMT。要使用脚本 SMT,您必须部署 JSR 223 兼容脚本实现,如 groovy。

    10

    (可选)指定 JSR 223 兼容脚本实施的 JAR 文件的工件 类型和 url,这是 Debezium 脚本 SMT 所需的。

    重要

    如果使用 AMQ Streams 将连接器插件合并到 Kafka Connect 镜像中,每个所需的脚本语言 工件。url 必须指定 JAR 文件的位置,并且 artifacts.type 的值也必须设置为 jar。无效的值会导致连接器在运行时失败。

    要启用带有脚本 SMT 的 Apache Groovy 语言,示例中的自定义资源会为以下库检索 JAR 文件:

    • groovy
    • Groovy-jsr223 (指定代理)
    • groovy-json (解析 JSON 字符串的模块)

    作为替代方案,Debebe Debezium 脚本 SMT 也支持使用 JSR 223 实现 GraalVM JavaScript。

  3. 输入以下命令将 KafkaConnect 构建规格应用到 OpenShift 集群:

    oc create -f dbz-connect.yaml

    根据自定义资源中指定的配置,Streams Operator 准备要部署的 Kafka Connect 镜像。
    构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。

  4. 创建一个 KafkaConnector 资源来定义您要部署的每个连接器的实例。
    例如,创建以下 KafkaConnector CR,并将它保存为 postgresql-inventory-connector.yaml

    例 8.2. 为 Debezium 连接器定义 KafkaConnector 自定义资源的 postgresql-inventory-connector.yaml 文件

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-postgresql 1
    spec:
      class: io.debezium.connector.postgresql.PostgresConnector 2
      tasksMax: 1  3
      config:  4
        database.hostname: postgresql.debezium-postgresql.svc.cluster.local 5
        database.port: 5432   6
        database.user: debezium  7
        database.password: dbz  8
        database.dbname: mydatabase 9
        topic.prefix: inventory-connector-postgresql 10
        table.include.list: public.inventory  11
    
        ...
    表 8.26. 连接器配置设置的描述
    描述

    1

    使用 Kafka Connect 集群注册的连接器名称。

    2

    连接器类的名称。

    3

    可以同时操作的任务数量。

    4

    连接器的配置。

    5

    主机数据库实例的地址。

    6

    数据库实例的端口号。

    7

    Debezium 用于连接到数据库的帐户名称。

    8

    Debezium 用于连接到数据库用户帐户的密码。

    9

    要从中捕获更改的数据库名称。

    10

    数据库实例或集群的主题前缀。
    指定的名称只能由字母数字字符或下划线组成。
    因为主题前缀被用作从这个连接器接收更改事件的任何 Kafka 主题的前缀,所以该名称在集群中的连接器之间必须是唯一的。
    如果连接器与 Avro 连接器集成,则此命名空间也用于相关 Kafka Connect 模式的名称,以及相应 Avro 模式的命名空间。

    11

    连接器捕获更改事件的表列表。

  5. 运行以下命令来创建连接器资源:

    oc create -n <namespace> -f <kafkaConnector>.yaml

    例如,

    oc create -n debezium -f {context}-inventory-connector.yaml

    连接器注册到 Kafka Connect 集群,并开始针对 KafkaConnector CR 中的 spec.config.database.dbname 指定的数据库运行。连接器 pod 就绪后,Debebe 正在运行。

现在 ,您可以验证 Debezium PostgreSQL 部署

8.6.3. 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium PostgreSQL 连接器

要部署 Debezium PostgreSQL 连接器,您需要构建包含 Debezium 连接器存档的自定义 Kafka Connect 容器镜像,并将此容器镜像推送到容器 registry。然后,您需要创建两个自定义资源(CR):

  • 定义 Kafka Connect 实例的 KafkaConnect CR。CR 中的 image 属性指定您创建的容器镜像的名称,以运行 Debezium 连接器。您可以将此 CR 应用到部署 Red Hat AMQ Streams 的 OpenShift 实例。AMQ Streams 提供将 Apache Kafka 带到 OpenShift 的 operator 和镜像。
  • 定义 Debezium Db2 连接器的 KafkaConnector CR。将此 CR 应用到应用 KafkaConnect CR 的同一 OpenShift 实例。

先决条件

流程

  1. 为 Kafka Connect 创建 Debezium PostgreSQL 容器:

    1. 创建一个使用 registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 的 Dockerfile 作为基础镜像。例如,在终端窗口中输入以下命令:

      cat <<EOF >debezium-container-for-postgresql.yaml 1
      FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
      USER root:root
      RUN mkdir -p /opt/kafka/plugins/debezium 2
      RUN cd /opt/kafka/plugins/debezium/ \
      && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/2.3.4.Final-redhat-00001/debezium-connector-postgres-2.3.4.Final-redhat-00001-plugin.zip \
      && unzip debezium-connector-postgres-2.3.4.Final-redhat-00001-plugin.zip \
      && rm debezium-connector-postgres-2.3.4.Final-redhat-00001-plugin.zip
      RUN cd /opt/kafka/plugins/debezium/
      USER 1001
      EOF
      描述

      1

      您可以指定您想要的任何文件名。

      2

      指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将此路径替换为目录的实际路径。

      该命令在当前目录中创建一个名为 debezium-container-for-postgresql.yaml 的 Dockerfile。

    2. 从您在上一步中创建的 debezium-container-for-postgresql.yaml Docker 文件中构建容器镜像。在包含文件的目录中,打开终端窗口并输入以下命令之一:

      podman build -t debezium-container-for-postgresql:latest .
      docker build -t debezium-container-for-postgresql:latest .

      build 命令使用名称 debezium-container-for-postgresql 构建容器镜像。

    3. 将自定义镜像推送到容器 registry,如 quay.io 或内部容器 registry。容器 registry 必须可供您要部署镜像的 OpenShift 实例使用。输入以下命令之一:

      podman push <myregistry.io>/debezium-container-for-postgresql:latest
      docker push <myregistry.io>/debezium-container-for-postgresql:latest
    4. 创建新的 Debezium PostgreSQL KafkaConnect 自定义资源(CR)。例如,创建一个名为 dbz-connect.yamlKafkaConnect CR,用于指定 注解和 镜像 属性。以下示例显示了一个 dbz-connect.yaml 文件的摘录,该文件描述了 KafkaConnect 自定义资源。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        image: debezium-container-for-postgresql 2
      
        ...
      描述

      1

      metadata.annotations 表示 KafkaConnector 资源用于配置在这个 Kafka Connect 集群中使用的 Cluster Operator。

      2

      spec.image 指定您创建的镜像的名称,以运行 Debezium 连接器。此属性覆盖 Cluster Operator 中的 STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 变量。

    5. 运行以下命令,将 KafkaConnect CR 应用到 OpenShift Kafka 实例:

      oc create -f dbz-connect.yaml

      这会更新 OpenShift 中的 Kafka Connect 环境,以添加 Kafka Connector 实例,该实例指定您为运行 Debezium 连接器而创建的镜像名称。

  2. 创建一个 KafkaConnector 自定义资源来配置 Debezium PostgreSQL 连接器实例。

    您可以在 .yaml 文件中配置 Debezium PostgreSQL 连接器,该文件指定连接器的配置属性。连接器配置可能指示 Debezium 为 schema 和表的子集生成事件,或者可能会设置属性,以便 Debezium 忽略、掩码或截断敏感、太大或不需要的指定列中的值。有关您可以为 Debezium PostgreSQL 连接器设置的配置属性的完整列表,请参阅 PostgreSQL 连接器属性

    以下示例显示了一个自定义资源的摘录,该资源在端口 5432 上配置一个 Debezium 连接器连接到 PostgreSQL 服务器主机 192.168.99.100。此主机有一个名为 sampledb 的数据库,名为 public 的 schema,inventory-connector-postgresql 是服务器的逻辑名称。

    inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector-postgresql  1
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.postgresql.PostgresConnector
        tasksMax: 1  2
        config:  3
          database.hostname: 192.168.99.100   4
          database.port: 5432
          database.user: debezium
          database.password: dbz
          database.dbname: sampledb
          topic.prefix: inventory-connector-postgresql   5
          schema.include.list: public   6
          plugin.name: pgoutput    7
    
          ...

    1 1 1 1 1
    连接器的名称。
    2 2 2 2 2
    任何时候都只能运行一个任务。因为 PostgreSQL 连接器使用单一连接器任务读取 PostgreSQL 服务器的 binlog 可确保正确的顺序和事件处理。Kafka Connect 服务使用连接器来启动一个或多个完成工作的任务,并在 Kafka Connect 服务集群中自动分发正在运行的任务。如果有任何服务停止或崩溃,则这些任务将重新分发到运行的服务。
    3 3 3
    连接器的配置。
    4 4 4
    运行 PostgreSQL 服务器的数据库主机的名称。在本例中,数据库主机名为 192.168.99.100
    5 5 5
    唯一的主题前缀。服务器名称是 PostgreSQL 服务器或服务器集群的逻辑标识符。此名称用作接收更改事件记录的所有 Kafka 主题的前缀。
    6 6 6
    连接器只捕获 public 模式中的更改。可以将连接器配置为仅捕获您选择的表中的更改。如需更多信息,请参阅 table.include.list
    7 7 7
    在 PostgreSQL 服务器上安装的 PostgreSQL 逻辑解码插件的名称。虽然 PostgreSQL 10 及之后的版本唯一支持的值是 pgoutput,但您必须将 plugin.name 明确设置为 pgoutput
  3. 使用 Kafka Connect 创建连接器实例。例如,如果您将 KafkaConnector 资源保存在 inventory-connector.yaml 文件中,您将运行以下命令:

    oc apply -f inventory-connector.yaml

    这会注册 inventory-connector,连接器开始针对 KafkaConnector CR 中定义的 sampledb 数据库运行。

结果

连接器启动后,它会 对配置了连接器的 PostgreSQL 服务器数据库执行一致的快照。然后,连接器开始为行级操作生成数据更改事件,并将事件记录流传输到 Kafka 主题。

8.6.4. 验证 Debezium PostgreSQL 连接器是否正在运行

如果连接器正确启动且没有错误,它会为每个连接器配置为捕获的表创建一个主题。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。

要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:

  • 验证连接器状态。
  • 验证连接器是否生成主题。
  • 验证主题是否填充了读取操作("op":"r")的事件,连接器在每个表的初始快照中生成。

先决条件

  • Debezium 连接器部署到 OpenShift 上的 AMQ Streams。
  • 已安装 OpenShift oc CLI 客户端。
  • 访问 OpenShift Container Platform web 控制台。

流程

  1. 使用以下方法之一检查 KafkaConnector 资源的状态:

    • 在 OpenShift Container Platform Web 控制台中:

      1. 导航到 Home Search
      2. Search 页面中,点 Resources 打开 Select Resource 框,然后键入 KafkaConnector
      3. KafkaConnectors 列表中,点您要检查的连接器的名称,如 inventory-connector-postgresql
      4. Conditions 部分,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc describe KafkaConnector <connector-name> -n <project>

        例如,

        oc describe KafkaConnector inventory-connector-postgresql -n debezium

        该命令返回类似以下示例的状态信息:

        例 8.3. KafkaConnector 资源状态

        Name:         inventory-connector-postgresql
        Namespace:    debezium
        Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector-postgresql
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            inventory-connector-postgresql.inventory
            inventory-connector-postgresql.inventory.addresses
            inventory-connector-postgresql.inventory.customers
            inventory-connector-postgresql.inventory.geom
            inventory-connector-postgresql.inventory.orders
            inventory-connector-postgresql.inventory.products
            inventory-connector-postgresql.inventory.products_on_hand
        Events:  <none>
  2. 验证连接器是否创建了 Kafka 主题:

    • 通过 OpenShift Container Platform Web 控制台。

      1. 导航到 Home Search
      2. Search 页面中,点 Resources 打开 Select Resource 框,然后键入 KafkaTopic
      3. KafkaTopics 列表中,点您要检查的主题名称,例如 inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
      4. Conditions 部分,验证 TypeStatus 列中的值是否已设置为 ReadyTrue
    • 在终端窗口中:

      1. 使用以下命令:

        oc get kafkatopics

        该命令返回类似以下示例的状态信息:

        例 8.4. KafkaTopic 资源状态

        NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
        connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
        connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
        connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
        inventory-connector-postgresql--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
        inventory-connector-postgresql.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
        schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
  3. 检查主题内容。

    • 在终端窗口中输入以下命令:
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>

    例如,

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-postgresql.inventory.products_on_hand

    指定主题名称的格式与 oc describe 命令返回的格式与第 1 步中返回,例如 inventory-connector-postgresql.inventory.addresses

    对于主题中的每个事件,命令会返回类似以下示例的信息:

    例 8.5. Debezium 更改事件的内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.3.4.Final-redhat-00001","connector":"postgresql","name":"inventory-connector-postgresql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"postgresql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}

    在前面的示例中,有效负载 值显示连接器快照从表 inventory.products_on_hand 生成读取(op" ="r")事件。product_id 记录的 "before" 状态为 null,表示该记录不存在之前的值。"after" 状态对于 product_id101 的项目的 quantity 显示为 3

8.6.5. Debezium PostgreSQL 连接器配置属性的描述

Debezium PostgreSQL 连接器有许多配置属性,可用于实现应用程序的正确连接器行为。许多属性都有默认值。有关属性的信息组织如下:

除非默认值可用 否则需要以下配置属性。

表 8.27. 所需的连接器配置属性
属性默认描述

name

没有默认值

连接器的唯一名称。尝试使用相同的名称再次注册将失败。所有 Kafka Connect 连接器都需要此属性。

connector.class

没有默认值

连接器的 Java 类的名称。始终为 PostgreSQL 连接器使用 io.debezium.connector.postgresql.PostgresConnector 值。

tasks.max

1

应该为此连接器创建的最大任务数量。PostgreSQL 连接器始终使用单个任务,因此不使用这个值,因此默认值始终可以接受。

plugin.name

decoderbufs

在 PostgreSQL 服务器上安装的 PostgreSQL 逻辑解码插件的名称

唯一支持的值是 pgoutput。您必须将 plugin.name 明确设置为 pgoutput

slot.name

Debezium

为流传输特定数据库/schema 的特定插件创建的 PostgreSQL 逻辑解码插槽的名称。服务器使用此插槽将事件流传输到您要配置的 Debezium 连接器。

插槽名称必须符合 PostgreSQL 复制插槽命名规则其状态为 "每个复制插槽名称,其中可以包含小写字母、数字和下划线字符"。

slot.drop.on.stop

false

当连接器以安全、预期的方式停止时,是否删除逻辑复制插槽。默认行为是,当连接器停止时为连接器配置复制插槽。当连接器重启时,具有相同复制插槽可让连接器开始处理它的位置。

仅在测试或开发环境中设置为 true。丢弃插槽可让数据库丢弃 WAL 段。当连接器重启它执行新快照时,或者可以从 Kafka Connect offsets 主题中的持久偏移继续。

publication.name

dbz_publication

使用 pgoutput 时创建的用于流更改的 PostgreSQL 发布的名称。

如果尚未存在,则在启动时创建此发布,并且 包括所有表。然后,Debezium 应用自己的 include/exclude 列表过滤(如果已配置),以限制发布以更改感兴趣的事件。连接器用户必须具有创建此出版物的超级用户权限,因此通常最好在第一次启动连接器前创建发布。

如果发布已存在,可以是所有表,或配置了表子集,Debebe 会使用发布,因为它被定义。

database.hostname

没有默认值

PostgreSQL 数据库服务器的 IP 地址或主机名。

database.port

5432

PostgreSQL 数据库服务器的整数端口号。

database.user

没有默认值

用于连接到 PostgreSQL 数据库服务器的 PostgreSQL 数据库用户的名称。

database.password

没有默认值

连接到 PostgreSQL 数据库服务器时要使用的密码。

database.dbname

没有默认值

从中流传输更改的 PostgreSQL 数据库的名称。

topic.prefix

没有默认值

为特定 PostgreSQL 数据库服务器或集群提供命名空间的主题前缀,其中 Debezium 正在捕获更改。前缀应该在所有其他连接器中唯一,因为它被用作从这个连接器接收记录的所有 Kafka 主题的主题名称前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、句点和下划线。

警告

不要更改此属性的值。如果您重启后更改了 name 值,而不是继续向原始主题发出事件,连接器会将后续事件发送到名称基于新值的主题。

schema.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您要 捕获更改的模式名称匹配。不包括在 schema. include.list 中的任何架构 名称都会从捕获其更改中排除。默认情况下,所有非系统模式都会捕获其更改。

要匹配 schema 的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个标识符匹配;它与 schema 名称中可能存在的子字符串匹配。
如果您在配置中包含此属性,不要设置 schema.exclude.list 属性。

schema.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与 您不想 捕获更改的模式名称匹配。任何名称不包含在 schema. exclude.list 中的模式,其更改会被捕获,但系统模式除外。

要匹配 schema 的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个标识符匹配;它与 schema 名称中可能存在的子字符串匹配。
如果您在配置中包含此属性,请不要设置 schema.include.list 属性。

table.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您要捕获的表的完全限定表标识符匹配。当设置此属性时,连接器只捕获指定表中的更改。每个标识符都是 schemaName.tableName。默认情况下,连接器在每个捕获了其更改的每个模式中捕获每个非系统表中的更改。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的完整标识符匹配;它与表名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,不要设置 table.exclude.list 属性。

table.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您不想捕获的表的完全限定表标识符匹配。每个标识符都是 schemaName.tableName。当设置此属性时,连接器会捕获您未指定的每个表的更改。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的完整标识符匹配;它与表名称中可能存在的子字符串不匹配。
如果您在配置中包含此属性,请不要设置 table.include.list 属性。

column.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与更改事件记录值中包含的列的完全限定域名匹配。列的完全限定域名格式为 schemaName.tableName.columnName

要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,表达式用于匹配列的整个名称字符串;它与列中可能出现的子字符串不匹配。
如果您在配置中包含此属性,不要设置 column.exclude.list 属性。

column.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与更改事件记录值中排除的列的完全限定域名匹配。列的完全限定域名格式为 schemaName.tableName.columnName

要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,表达式用于匹配列的整个名称字符串;它与列中可能出现的子字符串不匹配。
如果您在配置中包含此属性,请不要设置 column.include.list 属性。

skip.messages.without.change

false

指定在包含列中没有更改时是否跳过发布消息。如果列中没有包括每个 column.include.listcolumn.exclude.list 属性的列有变化,这将过滤消息。

注:仅在表的 REPLICA IDENTITY 设置为 FULL 时有效

time.precision.mode

adaptive

时间、日期和时间戳可以通过不同类型的精度来表示:

adaptive 使用 millisecond、microsecond 或 nanosecond 精度值,根据数据库列的类型的类型来捕获日期、日期和时间戳。

adaptive_time_microseconds 使用 millisecond、microsecond 或 nanosecond 精度来捕获数据库中的日期、日期和时间戳值。一个例外是 TIME 类型字段,它总是被捕获为微秒。

connect 始终通过使用 Kafka Connect 的内置表示表示 Time,Date, 和 Timestamp 的值,无论数据库列的精度是什么。如需更多信息,请参阅 临时值

decimal.handling.mode

精确

指定连接器应该如何处理 DECIMALNUMERIC 列的值:

precise 代表使用 java.math.BigDecimal 来以二进制形式代表改变事件的值。

double 代表使用 double 值来代表值。它可能会降低一些精度,但更加容易使用。

string 以特定格式的字符串来对值进行编码。这容易使用,但其代表的真实类型的信息可能会丢失。如需更多信息,请参阅 Decimal type

hstore.handling.mode

map

指定连接器应该如何处理 hstore 列的值:

map 代表使用 MAP

json 代表使用 json 字符串 代表值。此设置对格式的字符串进行编码,如 {"key" : "val"}。如需更多信息,请参阅 PostgreSQL HSTORE 类型

interval.handling.mode

数字

指定连接器如何处理 interval 列的值:

numeric 代表使用大约微秒数的间隔。

string 代表间隔,使用 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 代表。例如: P1Y2M3DT4H5M6.78S。如需更多信息,请参阅 PostgreSQL 基本类型

database.sslmode

prefer

是否使用到 PostgreSQL 服务器的加密连接。options include:

disable 使用未加密的连接。

允许 首先尝试使用未加密的连接,失败,失败(安全(加密)连接。

首选 尝试先使用安全(加密)连接,失败,未加密的连接失败。

需要使用 安全(加密)连接,如果出现以下情况,则会失败。 无法建立。

verify-ca 的行为与 require 一样,但也会根据配置的证书颁发机构(CA)证书验证服务器 TLS 证书。 如果没有找到有效的匹配 CA 证书,

verify-full 的行为与 verify-ca 类似,但也验证服务器证书是否与连接器尝试连接的主机匹配。如需更多信息 ,请参阅 PostgreSQL 文档

database.sslcert

没有默认值

包含客户端的 SSL 证书的文件的路径。如需更多信息 ,请参阅 PostgreSQL 文档

database.sslkey

没有默认值

包含客户端的 SSL 私钥的文件的路径。如需更多信息 ,请参阅 PostgreSQL 文档

database.sslpassword

没有默认值

database.sslkey 指定的文件访问客户端私钥的密码。如需更多信息 ,请参阅 PostgreSQL 文档

database.sslrootcert

没有默认值

包含验证服务器的根证书的文件的路径。如需更多信息 ,请参阅 PostgreSQL 文档

database.tcpKeepAlive

true

启用 TCP keep-alive 探测,以验证数据库连接是否仍然处于活动状态。如需更多信息 ,请参阅 PostgreSQL 文档

tombstones.on.delete

true

控制 delete 事件是否后跟一个 tombstone 事件。

true - 一个 delete 操作由 delete 事件和后续 tombstone 事件表示。

false - 仅有一个 delete 事件被抛出。

删除源记录后,发出 tombstone 事件(默认行为)可让 Kafka 在为主题启用了 日志 压缩时完全删除与已删除行键相关的所有事件。

column.truncate.to.length.chars

不适用

可选的、以逗号分隔的正则表达式列表,与基于字符列的完全限定名称匹配。如果在列中的数据超过了在属性名中的 length 指定的字符长度时删节数据,设置此属性。将 length 设置为正整数值,如 column.truncate.to.20.chars

列的完全限定域名会观察以下格式:< schemaName> . <tableName&gt; . & lt;columnName&gt;。要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式与列名称中可能存在的子字符串不匹配。

您可以在单个配置中指定多个长度不同的属性。

column.mask.with.length.chars

不适用

可选的、以逗号分隔的正则表达式列表,与基于字符列的完全限定名称匹配。如果您希望连接器屏蔽一组列的值,例如,如果它们包含敏感数据,则设置此属性。将 length 设置为一个正整数,替换在属性名称中的 length 指定的星号(*)的数量列中的数据。将 length 设置为 0 (零)将指定列中的数据替换为空字符串。

列的完全限定域名观察以下格式: schemaName.tableName.columnName。要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式与列名称中可能存在的子字符串不匹配。

您可以在单个配置中指定多个长度不同的属性。

column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt

不适用

可选的、以逗号分隔的正则表达式列表,与基于字符列的完全限定名称匹配。列的完全限定域名格式为 <schemaName>.<tableName>.<columnName>.
要匹配一个列的名称,Debezium 应用正则表达式,它由您指定为 anchored 正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;表达式与列名称中可能存在的子字符串不匹配。在生成的更改事件记录中,指定列的值替换为 pseudonyms。

一个 pseudonym,它包括了通过应用指定的 hashAlgorithmsalt 的结果的哈希值。根据所使用的哈希函数,会维护引用完整性,而列值则替换为 pseudonyms。支持的哈希功能在 Java Cryptography 架构标准 Algorithm Name 文档的 MessageDigest 部分中 进行了描述。

在以下示例中,CzQMA0cB5K 是一个随机选择的 salt。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

如有必要,pseudonym 会自动缩短为列的长度。连接器配置可以包含多个属性,用于指定不同的哈希算法和 salt。

根据所用的 hashAlgorithm (选择 salt )和实际数据集,生成的数据集可能无法完全屏蔽。

应该使用哈希策略版本 2 来确保在不同的位置或系统中对值进行哈希处理。

column.propagate.source.type

不适用

可选的、以逗号分隔的正则表达式列表,与您希望连接器发出代表列元数据的额外参数的完全限定名称匹配。当设置此属性时,连接器会将以下字段添加到事件记录的 schema 中:

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

这些参数会分别传播列的原始类型名称和长度(用于变量宽度类型)。
启用连接器来发出这个额外数据可帮助正确调整 sink 数据库中的特定数字或基于字符的列。

列的完全限定域名会观察以下格式之一: databaseName.tableName.columnName, 或 databaseName.schemaName.tableName.columnName.
要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式与列名称中可能存在的子字符串不匹配。

datatype.propagate.source.type

不适用

可选的、以逗号分隔的正则表达式列表,用于指定为数据库中列定义的数据类型的完全限定名称。当设置此属性时,对于具有匹配数据类型的列,连接器会发出在 schema 中包含以下额外字段的事件记录:

  • __debezium.source.column.type
  • __debezium.source.column.length
  • __debezium.source.column.scale

这些参数会分别传播列的原始类型名称和长度(用于变量宽度类型)。
启用连接器来发出这个额外数据可帮助正确调整 sink 数据库中的特定数字或基于字符的列。

列的完全限定域名会观察以下格式之一: databaseName.tableName.typeName, 或 databaseName.schemaName.tableName.typeName.
要匹配数据类型的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与数据类型的整个名称字符串匹配;表达式与类型名称中可能存在的子字符串不匹配。

有关 PostgreSQL 特定数据类型名称的列表,请参阅 PostgreSQL 数据类型映射

message.key.columns

空字符串

指定连接器用来组成自定义消息键的表达式列表,用于更改它发布到指定表的 Kafka 主题的事件记录。

默认情况下,Debezium 使用表的主键列作为它发出的记录的消息键。在默认位置,或者为缺少主密钥的表指定一个键,您可以根据一个或多个列配置自定义消息密钥。

要为表建立自定义消息键,请列出表,后跟要用作消息键的列。每个列表条目都采用以下格式:

<fully-qualified_tableName> : <keyColumn&gt; , <keyColumn>

To base a table key on multiple column name, 在列名称间插入逗号。

每个完全限定表名称都是以下格式的一个正则表达式:

<schemaName > . &lt;tableName>

属性可以包括多个表的条目。使用分号分隔列表中的表条目。

以下示例为表 inventory.customerspurchase.orders 设置消息键:

inventory.customers:pk1,pk2; (any).purchaseorders:pk3,pk4

for the table inventory.customer, 列 pk1pk2 被指定为消息键。对于任何 模式中的 购买顺序表,列 pk3pk4 服务器作为消息键。

对于用来创建自定义消息键的列数量没有限制。但是,最好使用指定唯一密钥所需的最小数量。

请注意,在表上将此属性设置并将 REPLICA IDENTITY 设置为 DEFAULT 时,如果键列不是表的主键的一部分,则会导致 tombstone 事件不会被正确创建。
REPLICA IDENTITY 设置为 FULL 是唯一解决方案。

publication.autocreate.mode

all_tables

仅在使用 pgoutput 插件 流更改时应用。此设置决定了如何创建 发布。指定以下值之一:

all_tables - 如果存在发布,则连接器会使用它。如果发布不存在,连接器会为数据库中捕获更改的所有表创建一个发布。要使连接器创建发布,它必须通过具有创建发布和执行复制权限的数据库用户帐户访问数据库。您可以使用以下 SQL 命令 CREATE PUBLICATION <publication_name> FOR ALL TABLES;.

disabled - 连接器不会尝试创建出版物来授予所需的权限。数据库管理员或配置为执行复制的用户必须在运行连接器前创建发布。如果连接器无法找到发布,连接器会抛出异常并停止。

过滤 - 如果一个发布存在,连接器会使用它。如果不存在发布,连接器会为表创建一个新的发布,该表与 schema.include.list, schema.exclude.list, and table.include.list, and table.exclude.list 配置属性指定的当前过滤器配置属性匹配。例如: CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>。如果存在发布,连接器会更新与当前过滤器配置匹配的表的发布。例如: ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>

replica.identity.autoset.values

空字符串

此设置决定了表级别 的副本身份 的值。

这个选项将覆盖数据库中的现有值。以逗号分隔的正则表达式列表,与要在表格中使用的完全限定表和副本身份值匹配。

每个表达式必须与模式 '<fully-qualified table name>:<replica identity>' 匹配,其中表名称可以定义为(SCHEMA_NAME.TABLE_NAME),并且副本身份值为:

DEFAULT - 记录主键列的旧值(若有)。这是非系统表的默认值。

INDEX index_name - 记录指定索引涵盖的列的旧值,它必须是唯一的,而不是部分,不能延迟,且仅包含标记为 NOT NULL 的列。如果丢弃此索引,则行为与 NOTHING 相同。

FULL - 记录行中所有列的旧值。

NOTHING - 记录不有关旧行的信息。这是系统表的默认设置。

例如,

schema1.*:FULL,schema2.table2:NOTHING,schema2.table3:INDEX idx_name

binary.handling.mode

bytes

指定在更改事件中二进制(bytea)列应该代表:

bytes 代表二进制数据作为字节数组。

base64 代表二进制数据作为 base64 编码的字符串。

base64-url-safe 代表二进制数据作为 base64-url-safe-encoded 字符串。

hex 代表二进制数据以十六进制编码(base16)字符串表示。

schema.name.adjustment.mode

none

指定应如何调整模式名称以与连接器使用的消息转换器兼容。可能的设置:

  • none 不应用任何调整。
  • avro 将无法在 Avro 类型名中使用的字符替换为下划线。
  • avro_unicode 将无法在 Avro 类型名称中使用的下划线或字符替换为对应的 unicode,如 _uxxxx。注意:_ 是 Java 中反斜杠的转义序列

field.name.adjustment.mode

none

指定应如何调整字段名称以与连接器使用的消息转换器兼容。可能的设置:

  • none 不应用任何调整。
  • avro 将无法在 Avro 类型名中使用的字符替换为下划线。
  • avro_unicode 将无法在 Avro 类型名称中使用的下划线或字符替换为对应的 unicode,如 _uxxxx。注意:_ 是 Java 中反斜杠的转义序列

如需更多信息,请参阅 Avro 命名

money.fraction.digits

2

指定在将 Postgres money 类型转换为 java.math.BigDecimal(它代表更改事件中的值)时应使用多少位的十进制数字。仅在将 decimal.handling.mode 设置为 exact 时 才适用

message.prefix.include.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您希望连接器要捕获的逻辑解码消息前缀的名称匹配。默认情况下,连接器捕获所有逻辑解码信息。当设置此属性时,连接器只捕获带有属性指定的前缀的逻辑解码消息。所有其他逻辑解码信息都不包括。

要匹配消息前缀的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与整个消息前缀字符串匹配;表达式与前缀中可能存在的子字符串不匹配。

如果您在配置中包含此属性,不要设置 message.prefix.exclude.list 属性。

有关 消息 事件结构及其排序语义的详情,请参考 消息 事件

message.prefix.exclude.list

没有默认值

可选的、以逗号分隔的正则表达式列表,与您不希望连接器捕获的逻辑解码消息前缀匹配。当设置此属性时,连接器不会捕获使用指定前缀的逻辑解码消息。所有其他消息都会被捕获。
要排除所有逻辑解码信息,请将此属性的值设置为 adtrust

要匹配消息前缀的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与整个消息前缀字符串匹配;表达式与前缀中可能存在的子字符串不匹配。

如果您在配置中包含此属性,不要设置 message.prefix.include.list 属性。

有关 消息 事件结构及其排序语义的详情,请参考 消息 事件

以下 高级配置 属性在大多数情况下可以正常工作,因此很少需要在连接器的配置中指定。

表 8.28. 高级连接器配置属性
属性默认描述

converters

没有默认值

枚举连接器可以使用的 自定义转换器 实例的符号链接列表。例如,

isbn

您必须设置 converters 属性,使连接器能够使用自定义转换器。

对于您为连接器配置的每个转换器,您还必须添加一个 .type 属性,它指定了实现转换器接口的类的完整名称。.type 属性使用以下格式:

<converterSymbolicName>.type

例如,

isbn.type: io.debezium.test.IsbnConverter

如果要进一步控制配置的转换器的行为,您可以添加一个或多个配置参数将值传递给转换器。要将任何其他配置参数与转换器关联,请为参数名称加上转换器的符号名作为前缀。
例如,

isbn.schema.name: io.debezium.postgresql.type.Isbn

snapshot.mode

初始

指定在连接器启动时执行快照的条件:

initial - 连接器只有在没有为逻辑服务器名称记录偏移时才执行快照。

always - 在连接器每次启动时都执行快照。

never - 连接器永不执行快照。当以这种方式配置连接器时,其启动时的行为如下。如果 Kafka offsets 主题中存在之前存储的 LSN,则连接器将继续从该位置流更改。如果没有存储 LSN,则连接器会在服务器上创建 PostgreSQL 逻辑复制插槽时从时间点开始流更改。只有在您知道所有关注的数据仍然反映在 WAL 中时,never 快照模式很有用。

initial_only - 连接器会执行初始快照,然后停止,而无需处理任何后续更改。

导出 - 弃用的


了解更多信息,请参阅 snapshot.mode 选项表

snapshot.include.collection.list

table.include.list中指定的所有表

可选的、以逗号分隔的正则表达式列表,与要包含在快照中的表的完全限定名称(<schemaName>.<tableName>)匹配。指定的项目必须在连接器的 table.include.list 属性中命名。只有在连接器的 snapshot.mode 属性设置为除 never 的值时,此属性才会生效。
此属性不会影响增量快照的行为。

要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它与表名称中可能存在的子字符串不匹配。

snapshot.lock.timeout.ms

10000

正整数值,指定执行快照时要等待的最大时间(以毫秒为单位)。如果连接器无法在这个时间间隔内获取表锁定,则快照会失败。连接器如何执行快照 提供详细信息。

snapshot.select.statement.overrides

没有默认值

指定要包含在快照中的表行。如果您希望快照只包含表中的行的子集,请使用属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。

该属性包含以逗号分隔的、完全限定表名称列表,格式为 < schemaName>.<tableName&gt;。例如,

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

对于列表中的每个表,添加一个进一步的配置属性,用于指定连接器在获取快照时要在表上运行的 SELECT 语句。指定的 SELECT 语句决定了快照中包含的表行的子集。使用以下格式指定此 SELECT 语句属性的名称:

snapshot.select.statement.overrides. <schemaName> . &lt;tableName&gt;。例如,snapshot.select.statement.overrides.customers.orders

Example:

在包含 soft-delete 列 delete_flagcustomers.orders 表中,如果您希望快照只包含没有软删除的记录,请添加以下属性:

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"

在生成的快照中,连接器只包括 delete_flag = 0 的记录。

event.processing.failure.handling.mode

fail

指定连接器在处理事件期间应如何响应异常:

失败 传播异常,表示有问题的事件的偏移,并导致连接器停止。

会警告 记录有问题的事件的偏移,跳过该事件,并继续处理。

跳过 有问题的事件并继续处理。

max.batch.size

2048

正整数值,用于指定连接器处理的每个批处理的最大大小。

max.queue.size

8192

正整数值,用于指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取事件时,它会将事件放置在阻塞队列中,然后再将它们写入 Kafka。阻塞队列可以提供从数据库读取更改事件时,连接器最快于将其写入 Kafka 的信息,或者在 Kafka 不可用时从数据库读取更改事件。当连接器定期记录偏移时,队列中保存的事件会被忽略。始终将 max.queue.size 的值设置为大于 max.batch.size 的值。

max.queue.size.in.bytes

0

一个长的整数值,用于指定阻塞队列的最大卷(以字节为单位)。默认情况下,不会为阻塞队列指定卷限制。要指定队列可以消耗的字节数,请将此属性设置为正长值。
如果还设置了 max.queue.size,当队列的大小达到任一属性指定的限制时,写入队列将被阻止。例如,如果您设置了 max.queue.size=1000、和 max.queue.size.in.bytes=5000,在队列包含 1000 个记录后,或者队列中记录的卷达到 5000 字节后,写入队列会被阻止。

poll.interval.ms

500

正整数值,指定连接器在开始处理批处理事件前应等待新更改事件数的毫秒数。默认值为 500 毫秒。

include.unknown.datatypes

false

当连接器遇到数据类型未知的字段时,指定连接器行为。默认行为是,连接器从更改事件中省略字段并记录警告。

如果您希望更改事件包含字段的不透明二进制表示,请将此属性设置为 true。这可让消费者解码字段。您可以通过设置 二进制处理模式 属性来控制确切的表示。

注意

include.unknown.datatypes 设为 true 时,消费者会面临向后兼容性问题。仅可能仅在发行版本间更改数据库特定的二进制表示,但如果 Debezium 最终支持数据类型,则数据类型将在逻辑类型中发送下游,这需要用户调整。通常,当遇到不支持的数据类型时,创建一个功能请求,以便可以添加支持。

database.initial.statements

没有默认值

分号分隔的 SQL 语句列表,连接器在建立 JDBC 与数据库的连接时执行。要将分号用作字符而不是分隔符,请指定两个连续的分号 ;;

连接器可以自行决定建立 JDBC 连接。因此,此属性仅适用于配置会话参数,而不适用于执行 DML 语句。

当连接器创建用于读取事务日志的连接时,不会执行这些语句。

status.update.interval.ms

10000

向服务器发送复制连接状态更新的频率,以毫秒为单位。
属性还控制检查数据库状态在关闭数据库时检测死连接的频率。

heartbeat.interval.ms

0

控制连接器将心跳信息发送到 Kafka 主题的频率。默认行为是连接器不会发送心跳信息。

心跳消息可用于监控连接器是否从数据库接收更改事件。心跳消息有助于减少在连接器重启时需要重新更改事件的数量。要发送心跳消息,请将此属性设置为正整数,这代表心跳消息之间的毫秒数。

当数据库中有多个更新被跟踪时,需要心跳消息,但只有少量更新与连接器捕获更改的表和 schema 相关。在这种情况下,连接器照常从数据库事务日志中读取,但很少向 Kafka 发出更改记录。这意味着,没有将偏移更新提交到 Kafka,连接器没有将最新检索到的 LSN 发送到数据库的机会。数据库保留 WAL 文件,其中包含已经由连接器处理的事件。发送心跳消息可让连接器将最新检索到的 LSN 发送到数据库,这使得数据库能够回收不再需要 WAL 文件所使用的磁盘空间。

heartbeat.action.query

没有默认值

指定连接器发送心跳消息的查询,连接器在源数据库上执行。

这可用于解决 WAL 磁盘空间消耗 中描述的情况,其中从与高流量数据库在同一主机上的低流量数据库捕获更改可防止 Debezium 处理 WAL 记录,从而防止 Debezium 处理 WAL 记录,从而处理数据库。要解决这种情况,请在低流量数据库中创建一个心跳表,并将此属性设置为将记录插入到该表中,例如:

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

这允许连接器从低流量数据库中接收更改,并确认其 LSN,这可防止数据库主机上的未绑定 WAL 增长。

schema.refresh.mode

columns_diff

指定为表触发内存模式刷新的条件。

column_diff 是安全模式。它确保内存模式始终与数据库表的同步。

columns_diff_exclude_unchanged_toast 会指示连接器刷新内存模式缓存(如果从传入消息派生的模式),除非有未更改的 TOASTable 数据完全帐户用于差异。

如果经常更新的表有 TOASTed 数据,则此设置可能会显著提高连接器性能。但是,如果从表中丢弃了 TOASTable 列,则内存中模式可能会过时。

snapshot.delay.ms

没有默认值

连接器在连接器启动时执行快照前应等待的时间(以毫秒为单位)。如果您在集群中启动多个连接器,则此属性可用于避免快照中断,这可能会导致连接器的重新平衡。

snapshot.fetch.size

10240

在快照期间,连接器以每行的批处理读取表内容。此属性指定批处理中的最大行数。

slot.stream.params

没有默认值

分号分隔的参数列表,以传递给配置的逻辑解码插件。例如: add-tables=public.table,public.table2;include-lsn=true

slot.max.retries

6

如果连接到复制插槽失败,这是连续尝试的最大尝试数。

slot.retry.delay.ms

10000 (10 秒)

当连接器无法连接到复制插槽时,重试尝试之间等待的毫秒数。

unavailable.value.placeholder

__debezium_unavailable_value

指定连接器提供的常量,以指示原始值是不由数据库提供的粘贴值。如果 unavailable.value.placeholder 的设置以 hex: 前缀开头,则预期字符串的其余部分代表十六进制编码的 octets。如需更多信息,请参阅粘贴

provide.transaction.metadata

false

确定连接器是否生成带有事务边界的事件,并使用事务元数据增强更改事件信。如果您希望连接器进行此操作,请指定 true。如需更多信息,请参阅 事务元数据

flush.lsn.source

true

决定连接器是否应该提交源 postgres 数据库中已处理记录的 LSN,以便可以删除 WAL 日志。如果您不希望连接器进行此操作,请指定 false。请注意,如果设置为 false LSN 不会被 Debezium 确认,因此 WAL 日志不会被清除,从而导致磁盘空间问题。用户应该处理 Debezium 之外的 LSN 的确认。

retriable.restart.connector.wait.ms

10000 (10 秒)

在发生可检索错误后重启连接器前等待的毫秒数。

skipped.operations

t

在流过程中将跳过的操作类型的逗号分隔列表。操作包括:用于 inserts/create、u 表示更新、d 表示 delete、t 表示 truncates,none 不跳过任何操作。默认情况下会跳过 truncate 操作。

signal.data.collection

没有默认值

用于向连接器发送信号的数据收集的完全限定名称。
使用以下格式指定集合名称:
<schemaName> . < tableName>

signal.enabled.channels

source

为连接器启用的信号频道名称列表。默认情况下,以下频道可用:

  • source
  • kafka
  • file
  • jmx

notification.enabled.channels

没有默认值

为连接器启用的通知频道名称列表。默认情况下,以下频道可用:

  • sink
  • log
  • jmx

incremental.snapshot.chunk.size

1024

连接器在增量快照块期间获取并读取内存的最大行数。增加块大小可提高效率,因为快照会运行更多大小的快照查询。但是,较大的块大小还需要更多内存来缓冲快照数据。将块大小调整为提供环境中最佳性能的值。

xmin.fetch.interval.ms

0

XMIN 将从复制插槽读取的频率,以毫秒为单位。XMIN 值提供从其中开始新复制插槽的低限。默认值 0 禁用跟踪 XMIN 跟踪。

topic.naming.strategy

io.debezium.schema.SchemaTopicNamingStrategy

应该用来确定数据更改、模式更改、事务、心跳事件等的 TopicNamingStrategy 类的名称,默认为 SchemaTopicNamingStrategy

topic.delimiter

.

指定主题名称的分隔符,默认为

topic.cache.size

10000

在绑定的并发哈希映射中用于保存主题名称的大小。此缓存将有助于确定与给定数据收集对应的主题名称。

topic.heartbeat.prefix

__debezium-heartbeat

控制连接器向其发送心跳信息的主题名称。主题名称具有此模式:

topic.heartbeat.prefix.topic.prefix

例如,如果主题前缀是 fulfillment,则默认主题名称为 __debezium-heartbeat.fulfillment

topic.transaction

Transactions

控制连接器向其发送事务元数据消息的主题名称。主题名称具有此模式:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,默认的主题名称为 fulfillment.transaction

snapshot.max.threads

1

指定连接器执行初始快照时使用的线程数量。要启用并行初始快照,请将属性设置为大于 1 的值。在并行初始快照中,连接器会同时处理多个表。

重要

并行初始快照只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅技术预览功能支持范围

errors.max.retries

-1

在失败前,retriable 错误(如连接错误)的最大重试次数(-1 = no limit, 0 = disabled, > 0 = num of retries)。

透传连接器配置属性

连接器还支持创建 Kafka 生成者和消费者时使用的 直通 配置属性。

确保参考 Kafka 文档了解 Kafka 生成者和消费者的所有配置属性。PostgreSQL 连接器 使用新的消费者配置属性

Debezium 连接器 Kafka 信号配置属性

Debezium 提供了一组 signal.* 属性,用于控制连接器如何与 Kafka 信号主题进行交互。

下表描述了 Kafka 信号 属性。

表 8.29. Kafka 信号配置属性
属性默认描述

signal.kafka.topic

<topic.prefix>-signal

连接器监控用于临时信号的 Kafka 主题的名称。

注意

如果禁用了 自动主题创建,您必须手动创建所需的信号主题。需要信号主题来保留信号排序。信号主题必须具有单个分区。

signal.kafka.groupId

kafka-signal

Kafka 用户使用的组 ID 的名称。

signal.kafka.bootstrap.servers

没有默认值

连接器用来建立到 Kafka 集群的初始连接的主机/端口对列表。每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。

signal.kafka.poll.timeout.ms

100

一个整数值,用于指定连接器在轮询信号时等待的最大毫秒数。

Debezium 连接器传递信号 Kafka 使用者客户端配置属性

Debezium 连接器为信号 Kafka 使用者提供直通配置。透传信号属性以 signals.consumer.* 前缀开始。例如,连接器将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 消费者。

Debezium 从属性中剥离前缀,然后再将属性传递给 Kafka 信号消费者。

Debezium 连接器接收器通知配置属性

下表描述了 通知 属性。

表 8.30. sink 通知配置属性
属性默认描述

notification.sink.topic.name

没有默认值

从 Debezium 接收通知的主题名称。当您将 notification.enabled.channels 属性配置为将 sink 作为启用的通知频道之一时,需要此属性。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.