6.2. 配置 Debezium 连接器以使用 Avro 序列化
Debezium 连接器在 Kafka Connect 框架中工作,通过生成更改事件记录捕获数据库中的每一行级更改。对于每个更改事件记录,Debebe 连接器完成以下操作:
- 应用配置的转换。
- 使用配置的 Kafka Connect 转换器 将记录键和值序列化为二进制形式。
- 将记录写入正确的 Kafka 主题。
您可以为每个 Debezium 连接器实例指定转换器。Kafka Connect 提供了一个 JSON 转换程序,它会将记录键和值序列化为 JSON 文档。默认行为是 JSON 转换程序包含记录的消息模式,从而使每个记录非常详细。Debezium 入门指南 显示了在包括 payload 和 schema 时记录是什么样子。如果要使用 JSON 序列化记录,请考虑将以下连接器配置属性设置为 false
:
-
key.converter.schemas.enable
-
value.converter.schemas.enable
将这些属性设置为 false
会排除每个记录中的详细模式信息。
或者,您可以使用 Apache Avro 序列化记录键和值。Avro 二进制格式是紧凑和效率。avro 模式可以确保每个记录都有正确的结构。avro 的模式演进机制使架构能够演变。这对 Debezium 连接器至关重要,它动态生成每个记录的模式,以匹配更改的数据库表的结构。随着时间的推移,更改写入同一 Kafka 主题的事件记录可能具有相同的模式的不同版本。avro 序列化使更改事件记录的消费者更容易适应更改的记录模式。
要使用 Apache Avro serialization,您必须部署一个管理 Avro 消息 schema 及其版本的 schema registry。有关设置此 registry 的详情,请参考在 OpenShift 上安装和部署红帽构建的 Apicurio Registry 文档。
6.2.1. 关于 Apicurio Registry
红帽构建的 Apicurio Registry
红帽构建的 Apicurio Registry 提供以下与 Avro 搭配使用的组件:
- 您可以在 Debezium 连接器配置中指定的 Avro converter。此转换器将 Kafka Connect 模式映射到 Avro 模式。然后,转换器使用 Avro 模式将记录键和值序列化为 Avro 的紧凑二进制格式。
跟踪的 API 和模式 registry:
- Kafka 主题中使用的 Avro 模式。
- Avro converter 发送生成的 Avro 模式。
由于 Avro 模式存储在此 registry 中,因此每个记录只需要包含小型 模式标识符。这使得每个记录变得更小。对于如 Kafka 的 I/O 绑定系统,这意味着生产者和消费者的总吞吐量。
- Kafka producer 和消费者的 avro Serdes (serializers 和 deserializers)。您写入的 Kafka 消费者应用程序消耗更改事件记录可以使用 Avro Serdes 来反序列化更改事件记录。
要将 Apicurio Registry 与 Debezium 搭配使用,请将 Apicurio Registry 转换器及其依赖项添加到用于运行 Debezium 连接器的 Kafka Connect 容器镜像中。
Apicurio Registry 项目还提供 JSON 转换程序。此转换器将不太详细消息的好处与人类可读的 JSON 相结合。消息本身不包含架构信息,而只包含 schema ID。
要使用 Apicurio Registry 提供的转换器,您需要提供 apicurio.registry.url
。
6.2.2. 部署使用 Avro 序列化的 Debezium 连接器概述
要部署使用 Avro 序列化的 Debezium 连接器,您必须完成三个主要任务:
- 按照在 OpenShift 上安装和部署红帽构建的 Apicurio Registry 的说明,部署红帽构建的 Apicurio Registry 实例。
- 通过下载 Debezium Service Registry Kafka Connect zip 文件并将其提取到 Debezium 连接器的目录中来安装 Avro converter。
通过设置配置属性,将 Debezium 连接器实例配置为使用 Avro serialization,如下所示:
key.converter=io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 key.converter.apicurio.registry.auto-register=true key.converter.apicurio.registry.find-latest=true value.converter=io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 value.converter.apicurio.registry.auto-register=true value.converter.apicurio.registry.find-latest=true schema.name.adjustment.mode=avro
在内部,Kafka Connect 始终使用 JSON 键/值转换器来存储配置和偏移量。
6.2.3. 部署在 Debezium 容器中使用 Avro 的连接器
在您的环境中,您可能想要使用提供的 Debezium 容器来部署使用 Avro 序列化的 Debezium 连接器。完成以下步骤,为 Debezium 构建自定义 Kafka Connect 容器镜像,并将 Debezium 连接器配置为使用 Avro converter。
先决条件
- 已安装 Docker 并有足够的权限来创建和管理容器。
- 您下载了要使用 Avro 序列化部署的 Debezium 连接器插件。
流程
部署 Apicurio Registry 实例。请参阅在 OpenShift 上安装和部署红帽构建的 Apicurio Registry,这提供了以下说明:
- 安装 Apicurio Registry
- 安装 AMQ Streams
- 设置 AMQ Streams 存储
提取 Debezium 连接器存档,为连接器插件创建目录结构。如果您为多个 Debezium 连接器下载并提取了存档,则生成的目录结构类似以下示例:
tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb | ├── ... ├── debezium-connector-mysql │ ├── ... ├── debezium-connector-postgres │ ├── ... └── debezium-connector-sqlserver ├── ...
将 Avro converter 添加到包含您要配置为使用 Avro 序列化的 Debezium 连接器的目录:
- 进入 软件下载 并下载 Apicurio Registry Kafka Connect zip 文件。
- 将存档提取到所需的 Debezium 连接器目录中。
要将多个 Debezium 连接器配置为使用 Avro serialization,请将存档提取到每个相关连接器类型的目录中。虽然将存档提取到每个目录会复制文件,但这样做可以消除冲突依赖项的可能性。
创建并发布自定义镜像,以运行配置为使用 Avro converter 的 Debezium 连接器:
使用
registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
作为基础镜像创建新Dockerfile
。在以下示例中,将 my-plugins 替换为您的插件目录的名称:FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
在 Kafka Connect 开始运行连接器前,Kafka Connect 会加载
/opt/kafka/plugins
目录中任何第三方插件。构建 docker 容器镜像。例如,如果您将上一步中创建的 docker 文件保存为
debezium-container-with-avro
,则您将运行以下命令:docker build -t debezium-container-with-avro:latest
将自定义镜像推送到容器 registry 中,例如:
docker push <myregistry.io>/debezium-container-with-avro:latest
指向新的容器镜像。执行以下操作之一:
编辑
KafkaConnect
自定义资源的KafkaConnect.spec.image
属性。如果设置,此属性会覆盖 Cluster Operator 中的STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
变量。例如:apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... image: debezium-container-with-avro
-
在
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
文件中,编辑STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
变量以指向新的容器镜像并重新安装 Cluster Operator。如果编辑这个文件,则需要将它应用到 OpenShift 集群。
部署配置为使用 Avro converter 的每个 Debezium 连接器。对于每个 Debezium 连接器:
创建 Debezium 连接器实例。以下
inventory-connector.yaml
文件示例会创建一个KafkaConnector
自定义资源,用于定义配置为使用 Avro converter 的 MySQL 连接器实例:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: database.hostname: mysql database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 topic.prefix: dbserver1 database.include.list: inventory schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 schema.history.internal.kafka.topic: schema-changes.inventory schema.name.adjustment.mode: avro key.converter: io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url: http://apicurio:8080/api key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url: http://apicurio:8080/api value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
应用连接器实例,例如:
oc apply -f inventory-connector.yaml
这会注册
inventory-connector
,连接器开始针对inventory
数据库运行。
验证连接器是否已创建并已启动,以跟踪指定数据库中的更改。您可以通过观察 Kafka Connect 日志输出来验证连接器实例,如
inventory-connector
启动。显示 Kafka Connect 日志输出:
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
检查日志输出,以验证最初的快照已经执行。您应该看到类似以下行的内容:
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
执行快照涉及几个步骤:
... 2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ... 2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
完成快照后,Debezium 开始跟踪更改,例如,用于更改事件的
inventory
数据库的binlog
:... 2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306] Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5) 2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306] ...
6.2.4. 关于 Avro 名称要求
如 Avro 文档中所述,名称必须遵循以下规则:
-
从
[A-Za-z_]
开始 -
因此,仅包含
[A-Za-z0-9_]
字符
Debezium 使用列的名称作为对应 Avro 字段的基础。如果列名称也不符合 Avro 命名规则,这可能会导致序列化期间出现问题。每个 Debezium 连接器都提供一个配置属性 field.name.adjustment.mode
,如果您有一个列不遵循 Avro 规则的名称,则可以将其设置为 avro
。将 field.name.adjustment.mode
设置为 avro
允许序列化非格式字段,而无需实际修改您的架构。