11.2. 配置 Debezium 连接器以使用 Avro 序列化
Debezium 连接器在 Kafka Connect 框架中工作,通过生成更改事件记录捕获数据库中的每个行级别更改。对于每个更改事件记录,Debezium 连接器完成以下操作:
- 应用配置的转换。
- 使用配置的 Kafka Connect 转换器 将记录键和值序列化为二进制形式。
- 将记录写入正确的 Kafka 主题。
您可以为每个独立 Debezium 连接器实例指定转换器。Kafka Connect 提供了一个 JSON 转换器,它将记录键和值序列化到 JSON 文档中。默认行为是 JSON 转换器包含记录的 message schema,使得每个记录都非常详细。Debezium 入门指南显示了当包含 有效负载和模式时记录是什么。如果要使用 JSON 序列化记录,请考虑将以下连接器配置属性设置为 false
:
-
key.converter.schemas.enable
-
value.converter.schemas.enable
将这些属性设置为 false
可排除每个记录的详细模式信息。
或者,您可以使用 Apache Avro 序列化记录键和值。Avro 二进制格式压缩效率更高。Avro 模式可以确保每个记录都有正确的结构。Avro 的架构演进机制使架构能够演进。这对 Debezium 连接器至关重要,它动态生成每个记录的模式,以匹配更改的数据库表的结构。随着时间的推移,更改写入同一 Kafka 主题的事件记录可能会有不同版本的相同模式。Avro 序列化使更改事件记录的使用者更容易地适应更改记录模式。
要使用 Apache Avro 序列化,您必须部署一个模式 registry,用于管理 Avro 消息模式及其版本。有关设置此 registry 的详情,请参考在 OpenShift 上安装和部署 Service Registry 的文档。
11.2.1. 关于 Service Registry
Service Registry
Service Registry 提供以下与 Avro 搭配使用的组件:
- 您可以在 Debezium 连接器配置中指定的 Avro converter。这个转换器将 Kafka Connect 模式映射到 Avro 模式。然后,转换器使用 Avro 模式将记录键和值序列化为 Avro 的紧凑二进制形式。
跟踪的 API 和模式 registry:
- Kafka 主题中使用的 Avro 模式。
- 其中 Avro converter 发送生成的 Avro 模式。
由于 Avro 模式存储在此 registry 中,因此每个记录只需要仅包含 tiny 模式标识符。这使得每个记录更小。对于 Kafka 等 I/O 绑定系统,这意味着生产者和消费者的总吞吐量。
- 用于 Kafka 生成者和消费者的 Avro Serdes (序列化器和反序列化器)。您编写用于消耗更改事件记录的 Kafka 消费者应用程序可以使用 Avro Serdes 来反序列化更改事件记录。
要将 Service Registry 与 Debezium 搭配使用,请将 Service Registry 转换器及其依赖项添加到您用于运行 Debezium 连接器的 Kafka Connect 容器镜像中。
Service Registry 项目还提供 JSON 转换器。此转换器将不详细消息与人类可读的 JSON 的优点相结合。消息本身不包含架构信息,但只包含一个 schema ID。
要使用 Service Registry 提供的转换器,您需要提供 apicurio.registry.url
。
11.2.2. 部署使用 Avro 序列化的 Debezium 连接器概述
要部署使用 Avro 序列化的 Debezium 连接器,您必须完成三个主要任务:
- 按照在 OpenShift 上安装和部署 Service Registry 中的说明部署 Service Registry 实例。
- 通过下载 Debezium Service Registry Kafka Connect zip 文件并将其提取到 Debezium 连接器的目录中来安装 Avro converter。
通过设置配置属性将 Debezium 连接器实例配置为使用 Avro serialization,如下所示:
key.converter=io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 key.converter.apicurio.registry.auto-register=true key.converter.apicurio.registry.find-latest=true value.converter=io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 value.converter.apicurio.registry.auto-register=true value.converter.apicurio.registry.find-latest=true schema.name.adjustment.mode=avro
在内部,Kafka Connect 始终使用 JSON 键/值转换器来存储配置和偏移。
11.2.3. 在 Debezium 容器中部署使用 Avro 的连接器
在您的环境中,您可能想要使用提供的 Debezium 容器来部署使用 Avro 序列化的 Debezium 连接器。完成以下步骤,为 Debezium 构建自定义 Kafka Connect 容器镜像,并将 Debezium 连接器配置为使用 Avro converter。
先决条件
- 已安装 Docker 并有足够的权限来创建和管理容器。
- 您下载了您要使用 Avro 序列化部署的 Debezium 连接器插件。
流程
部署 Service Registry 实例。请参阅 在 OpenShift 上安装和部署 Service Registry,其提供了以下说明:
- 安装 Service Registry
- 安装 AMQ Streams
- 设置 AMQ Streams 存储
提取 Debezium 连接器存档,以便为连接器插件创建目录结构。如果您下载并提取多个 Debezium 连接器的存档,生成的目录结构类似以下示例:
tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb | ├── ... ├── debezium-connector-mysql │ ├── ... ├── debezium-connector-postgres │ ├── ... └── debezium-connector-sqlserver ├── ...
将 Avro converter 添加到包含您要配置为使用 Avro 序列化的 Debezium 连接器的目录中:
- 进入 Red Hat Integration 下载站点 并下载 Service Registry Kafka Connect zip 文件。
- 将存档提取到所需的 Debezium 连接器目录中。
要将多种 Debezium 连接器配置为使用 Avro 序列化,请将存档提取到每个相关连接器类型的目录中。虽然将存档提取到每个目录中会复制文件,您可以删除可能冲突的依赖项。
创建并发布自定义镜像,以运行配置为使用 Avro converter 的 Debezium 连接器:
使用
registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.0-12
作为基础镜像来创建新的Dockerfile
。在以下示例中,将 my-plugins 替换为插件目录的名称:FROM registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.0-12 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
在 Kafka Connect 开始运行连接器前,Kafka Connect 会加载
/opt/kafka/plugins
目录中的任何第三方插件。构建 docker 容器镜像。例如,如果您将您在上一步中创建的 docker 文件保存为
debezium-container-with-avro
,则可运行以下命令:docker build -t debezium-container-with-avro:latest
将自定义镜像推送到容器 registry,例如:
docker push <myregistry.io>/debezium-container-with-avro:latest
指向新容器镜像。执行以下操作之一:
编辑
KafkaConnect
自定义资源的KafkaConnect.spec.image
属性。如果设置,此属性会覆盖 Cluster Operator 中的STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
变量。例如:apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... image: debezium-container-with-avro
-
在
install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml
文件中,编辑STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
变量以指向新容器镜像并重新安装 Cluster Operator。如果编辑此文件,则需要将其应用到 OpenShift 集群。
部署配置为使用 Avro converter 的每个 Debezium 连接器。对于每个 Debezium 连接器:
创建 Debezium 连接器实例。以下
inventory-connector.yaml
文件示例会创建一个KafkaConnector
自定义资源,用于定义配置为使用 Avro converter 的 MySQL 连接器实例:apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: database.hostname: mysql database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 topic.prefix: dbserver1 database.include.list: inventory schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 schema.history.internal.kafka.topic: schema-changes.inventory schema.name.adjustment.mode: avro key.converter: io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url: http://apicurio:8080/api key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url: http://apicurio:8080/api value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
应用连接器实例,例如:
oc apply -f inventory-connector.yaml
这会注册
inventory-connector
,连接器开始针对清单
数据库运行。
验证连接器是否已创建,并已开始跟踪指定数据库中的更改。您可以通过观察 Kafka Connect 日志输出来验证连接器实例,例如
inventory-connector
启动。显示 Kafka Connect 日志输出:
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
检查日志输出,以验证是否已执行初始快照。您应该看到类似以下行的内容:
... 2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
创建快照涉及几个步骤:
... 2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ... 2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] 2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot] ...
完成快照后,Debebe 开始跟踪更改,例如,
清单
数据库的binlog
以更改事件:... 2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0] 2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306] Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5) 2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306] ...
11.2.4. 关于 Avro 名称要求
如 Avro 文档所述,名称必须遵循以下规则:
-
以
[A-Za-z_]
开始 -
之后仅包含
[A-Za-z0-9_]
字符
Debezium 使用列的名称作为对应 Avro 字段的基础。如果列名称也不符合 Avro 命名规则,这可能会导致序列化过程中出现问题。每个 Debezium 连接器都提供一个配置属性 sanitize.field.names
,如果您有没有遵循名称的 Avro 规则的列,则可以将其设置为 true
。将 sanitize.field.names
设置为 true
允许序列化非格式字段,而无需实际修改您的模式。