搜索

11.2. 配置 Debezium 连接器以使用 Avro 序列化

download PDF

Debezium 连接器在 Kafka Connect 框架中的工作原理,通过生成更改事件记录来捕获数据库中的每个行级更改。对于每个更改事件记录,Debezium 连接器完成以下操作:

  1. 应用配置的转换。
  2. 使用配置的 Kafka Connect 转换器 将记录键和值序列化为二进制形式。
  3. 将记录写入正确的 Kafka 主题。

您可以为每个单独的 Debezium 连接器实例指定转换器。Kafka Connect 提供了一个 JSON 转换程序,可将记录键和值序列化为 JSON 文档。默认行为是 JSON 转换程序包含记录的消息模式,这使得每个记录非常详细。Debezium 入门指南 显示了当包含有效负载和模式时记录的内容。如果您希望记录使用 JSON 序列化,请考虑将以下连接器配置属性设置为 false

  • key.converter.schemas.enable
  • value.converter.schemas.enable

将这些属性设置为 false 可排除每个记录的详细模式信息。

或者,您可以使用 Apache Avro 来序列化记录键和值。Avro 二进制格式是紧凑有效的。通过 avro 模式,可以确保每个记录都有正确的结构。avro 的模式演进机制使模式能够演变。这对 Debezium 连接器至关重要,它会动态生成每个记录的模式,以匹配已更改的数据库表的结构。随着时间的推移,更改写入同一 Kafka 主题的事件记录可能具有相同的 schema 版本。avro serialization 可让更改事件记录的消费者更容易适应更改的记录模式。

要使用 Apache Avro serialization,您必须部署一个管理 Avro 消息模式及其版本的 schema registry。有关设置此 registry 的详情,请参考在 OpenShift 上安装和部署红帽构建的 Apicurio Registry 的文档。

11.2.1. 关于 Apicurio Registry

红帽构建的 Apicurio Registry

Red Hat build of Apicurio Registry 提供以下与 Avro 一起工作的组件:

  • 您可以在 Debezium 连接器配置中指定 Avro 转换程序。这个转换器将 Kafka Connect 模式映射到 Avro 模式。然后,转换器使用 Avro 模式将记录键和值序列化为 Avro 的紧凑二进制形式。
  • 一个 API 和 schema registry,用于跟踪:

    • Kafka 主题中使用的 Avro 模式。
    • 其中 Avro converter 发送生成的 Avro 模式。

    由于 Avro 模式存储在此 registry 中,因此每个记录需要仅包含 tiny 模式标识符。这使得每个记录更小。对于 Kafka 等 I/O 绑定系统,这意味着生产者和消费者的总吞吐量。

  • Kafka 生成者和消费者的 avro Serdes (序列化器和反序列化器)。您编写以消耗更改事件记录的 Kafka 消费者应用程序可以使用 Avro Serdes 来反序列化更改事件记录。

要将 Apicurio Registry 与 Debezium 搭配使用,请将 Apicurio Registry 转换器及其依赖项添加到用于运行 Debezium 连接器的 Kafka Connect 容器镜像。

注意

Apicurio Registry 项目还提供 JSON 转换程序。这个转换器将不太详细消息的好处与人类可读的 JSON 合并。消息不包含自身模式信息,而仅包含一个模式 ID。

注意

要使用 Apicurio Registry 提供的转换器,您需要提供 apicurio.registry.url

11.2.2. 部署使用 Avro 序列化的 Debezium 连接器概述

要部署使用 Avro 序列化的 Debezium 连接器,您必须完成三个主要任务:

  1. 按照在 OpenShift 上安装和部署 Apicurio Registry 中的说明,部署红帽构建的 Apicurio Registry 实例。
  2. 通过下载 Debezium Service Registry Kafka Connect zip 文件并将其提取到 Debezium 连接器的目录中,来安装 Avro 转换。
  3. 通过设置配置属性,将 Debezium 连接器实例配置为使用 Avro 序列化,如下所示:

    key.converter=io.apicurio.registry.utils.converter.AvroConverter
    key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
    key.converter.apicurio.registry.auto-register=true
    key.converter.apicurio.registry.find-latest=true
    value.converter=io.apicurio.registry.utils.converter.AvroConverter
    value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
    value.converter.apicurio.registry.auto-register=true
    value.converter.apicurio.registry.find-latest=true
    schema.name.adjustment.mode=avro

在内部,Kafka Connect 始终使用 JSON 键/值转换器来存储配置和偏移。

11.2.3. 在 Debezium 容器中部署使用 Avro 的连接器

在您的环境中,您可能想要使用提供的 Debezium 容器来部署使用 Avro 序列化的 Debezium 连接器。完成以下步骤,为 Debezium 构建自定义 Kafka Connect 容器镜像,并将 Debezium 连接器配置为使用 Avro converter。

先决条件

  • 已安装 Docker 并有足够的权限来创建和管理容器。
  • 您下载了您要使用 Avro 序列化部署的 Debezium 连接器插件。

流程

  1. 部署 Apicurio Registry 实例。请参阅在 OpenShift 上安装和部署红帽构建的 Apicurio Registry,该 registry 提供了以下说明:

    • 安装 Apicurio Registry
    • 安装 AMQ Streams
    • 设置 AMQ Streams 存储
  2. 提取 Debezium 连接器存档,以便为连接器插件创建目录结构。如果您为多个 Debezium 连接器下载并提取存档,则生成的目录结构类似以下示例:

    tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    |   ├── ...
    ├── debezium-connector-mysql
    │   ├── ...
    ├── debezium-connector-postgres
    │   ├── ...
    └── debezium-connector-sqlserver
        ├── ...
  3. 将 Avro converter 添加到包含您要配置为使用 Avro 序列化的 Debezium 连接器的目录中:

    1. 访问 红帽构建的 Debezium 下载站点,并下载 Apicurio Registry Kafka Connect zip 文件。
    2. 将存档提取到所需的 Debezium 连接器目录中。

    要将多个 Debezium 连接器配置为使用 Avro 序列化,请将存档提取到每个相关连接器类型的目录中。虽然提取存档到每个目录会复制文件,但这样做消除了冲突依赖项的可能性。

  4. 创建并发布自定义镜像,以运行 Debezium 连接器,该连接器配置为使用 Avro 转换程序:

    1. 使用 registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 作为基础镜像来创建一个新的 Dockerfile。在以下示例中,将 my-plugins 替换为插件目录的名称:

      FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
      USER root:root
      COPY ./my-plugins/ /opt/kafka/plugins/
      USER 1001

      在 Kafka Connect 开始运行连接器前,Kafka Connect 会加载 /opt/kafka/plugins 目录中任何第三方插件。

    2. 构建 docker 容器镜像。例如,如果您将您在上一步中创建的 docker 文件保存为 debezium-container-with-avro,则您将运行以下命令:

      docker build -t debezium-container-with-avro:latest

    3. 将自定义镜像推送到容器 registry 中,例如:

      docker push <myregistry.io>/debezium-container-with-avro:latest

    4. 指向新容器镜像。执行以下操作之一:

      • 编辑 KafkaConnect 自定义资源的 KafkaConnect.spec.image 属性。如果设置,此属性覆盖 Cluster Operator 中的 STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 变量。例如:

        apiVersion: kafka.strimzi.io/v1beta2
        kind: KafkaConnect
        metadata:
          name: my-connect-cluster
        spec:
          #...
          image: debezium-container-with-avro
      • install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml 文件中,编辑 STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 变量以指向新的容器镜像并重新安装 Cluster Operator。如果编辑此文件,则需要将其应用到 OpenShift 集群。
  5. 部署配置为使用 Avro converter 的每个 Debezium 连接器。对于每个 Debezium 连接器:

    1. 创建 Debezium 连接器实例。以下 inventory-connector.yaml 文件示例会创建一个 KafkaConnector 自定义资源,该资源定义配置为使用 Avro converter 的 MySQL 连接器实例:

      apiVersion: kafka.strimzi.io/v1beta1
      kind: KafkaConnector
      metadata:
        name: inventory-connector
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mysql.MySqlConnector
        tasksMax: 1
        config:
          database.hostname: mysql
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054
          topic.prefix: dbserver1
          database.include.list: inventory
          schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
          schema.history.internal.kafka.topic: schema-changes.inventory
          schema.name.adjustment.mode: avro
          key.converter: io.apicurio.registry.utils.converter.AvroConverter
          key.converter.apicurio.registry.url: http://apicurio:8080/api
          key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
          value.converter: io.apicurio.registry.utils.converter.AvroConverter
          value.converter.apicurio.registry.url: http://apicurio:8080/api
          value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
    2. 应用连接器实例,例如:

      oc apply -f inventory-connector.yaml

      这会注册 inventory-connector,连接器开始针对 inventory 数据库运行。

  6. 验证连接器是否已创建并已启动,以跟踪指定数据库中的更改。您可以通过观察 Kafka Connect 日志输出来验证连接器实例,例如 inventory-connector 启动。

    1. 显示 Kafka Connect 日志输出:

      oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
    2. 检查日志输出,以验证初始快照是否已执行。您应该看到类似如下的行:

      ...
      2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      执行快照涉及多个步骤:

      ...
      2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,843 INFO 	 using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...
      2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      完成快照后,Debebe 开始跟踪更改,例如,库存 数据库的 binlog 用于更改事件:

      ...
      2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
      Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
      INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
      2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
      ...

11.2.4. 关于 Avro 名称要求

如 Avro 文档中所述,名称必须遵循以下规则:

  • [A-Za-z_]开始。
  • 因此,仅包含 [A-Za-z0-9_] 字符

Debezium 使用列的名称作为对应 Avro 字段的基础。如果列名称不遵循 Avro 命名规则,这可能会导致序列化过程中出现问题。每个 Debezium 连接器都提供一个配置属性 field.name.adjustment.mode,如果您有没有遵循 Avro 规则的列,则可以将其设置为 avro。将 field.name.adjustment.mode 设置为 avro 允许对非格式字段进行序列化,而无需实际修改您的模式。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.