9.4. 部署 Debezium SQL Server 连接器
您可以使用以下任一方法部署 Debezium SQL Server 连接器:
9.4.1. 使用 AMQ Streams 的 SQL Server 连接器部署
从 Debezium 1.7 开始,部署 Debezium 连接器的首选方法是使用 AMQ Streams 构建包含连接器插件的 Kafka Connect 容器镜像。
在部署过程中,您可以创建并使用以下自定义资源(CR):
-
定义 Kafka Connect 实例的
KafkaConnect
CR,并包含有关镜像中需要包含连接器工件的信息。 -
KafkaConnector
CR,提供包括连接器用来访问源数据库的信息。在 AMQ Streams 启动 Kafka Connect pod 后,您可以通过应用KafkaConnector
CR 来启动连接器。
在 Kafka Connect 镜像的构建规格中,您可以指定可用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Apicurio Registry 工件或 Debezium 脚本组件。当 AMQ Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。
KafkaConnect
CR 中的 spec.build.output
参数指定存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在 Docker registry 中,也可以存储在 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须在部署 Kafka Connect 前创建 ImageStream。镜像流不会被自动创建。
如果使用 KafkaConnect
资源来创建集群,之后无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。
其他资源
- 在 OpenShift 中部署和管理 AMQ Streams 中的配置 Kafka 连接。
- 在 OpenShift 中部署和管理 AMQ Streams 中自动构建新容器镜像。
9.4.2. 使用 AMQ Streams 部署 Debezium SQL Server 连接器
使用早期版本的 AMQ Streams 时,要在 OpenShift 上部署 Debezium 连接器,您需要首先为连接器构建 Kafka Connect 镜像。在 OpenShift 上部署连接器的当前首选方法是使用 AMQ Streams 中的构建配置来构建 Kafka Connect 容器镜像,其中包含您要使用的 Debezium 连接器插件。
在构建过程中,AMQ Streams Operator 将 KafkaConnect
自定义资源(包括 Debezium 连接器定义)中的输入参数转换为 Kafka Connect 容器镜像。构建会从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。
新创建的容器被推送到在 .spec.build.output
中指定的容器 registry,用于部署 Kafka Connect 集群。在 AMQ Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector
自定义资源来启动构建中包含的连接器。
先决条件
- 您可以访问安装了集群 Operator 的 OpenShift 集群。
- AMQ Streams Operator 正在运行。
- 在 OpenShift 中部署和管理 AMQ Streams 所述,Apache Kafka 集群会被部署。
- Kafka Connect 在 AMQ Streams 上部署
- 您有红帽构建的 Debezium 许可证。
-
已安装 OpenShift
oc
CLI 客户端,或者您可以访问 OpenShift Container Platform Web 控制台。 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限,或者您必须创建 ImageStream 资源:
- 将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
- 在 registry 中创建和管理镜像的帐户和权限。
- 将构建镜像存储为原生 OpenShift ImageStream
- ImageStream 资源已部署到集群中,以存储新的容器镜像。您必须为集群显式创建 ImageStream。默认无法使用镜像流。如需有关 ImageStreams 的更多信息,请参阅 OpenShift Container Platform 文档中的管理镜像流。
流程
- 登录 OpenShift 集群。
为连接器创建 Debezium
KafkaConnect
自定义资源(CR),或修改现有的资源。例如,创建一个名为dbz-connect.yaml
的KafkaConnect
CR,用于指定metadata.annotations
和spec.build
属性。以下示例显示了一个dbz-connect.yaml
文件的摘录,该文件描述了KafkaConnect
自定义资源。
例 9.1. 定义包含 Debezium 连接器的
KafkaConnect
自定义资源的dbz-connect.yaml
文件在以下示例中,自定义资源被配置为下载以下工件:
- Debezium SQL Server 连接器存档。
- 红帽构建的 Apicurio Registry 存档。Apicurio Registry 是一个可选组件。只有在打算将 Avro 序列化与连接器搭配使用时,才添加 Apicurio Registry 组件。
- Debezium 脚本 SMT 归档以及您要与 Debezium 连接器一起使用的关联脚本引擎。SMT 归档和脚本语言依赖项是可选组件。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时,才添加这些组件。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: debezium-kafka-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: version: 3.5.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-sqlserver artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-sqlserver/2.3.7.Final-redhat-00001/debezium-connector-sqlserver-2.3.7.Final-redhat-00001-plugin.zip 7 - type: zip url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.4.4.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.4.4.Final-redhat-<build-number>.zip 8 - type: zip url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.3.7.Final-redhat-00001/debezium-scripting-2.3.7.Final-redhat-00001.zip 9 - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.11/groovy-3.0.11.jar 10 - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093 ...
表 9.16. Kafka Connect 配置设置的描述 项 描述 1
将
strimzi.io/use-connector-resources
注解设置为"true"
,使 Cluster Operator 使用KafkaConnector
资源在此 Kafka Connect 集群中配置连接器。2
spec.build
配置指定在镜像中存储构建镜像的位置,并列出要在镜像中包含的插件,以及插件工件的位置。3
build.output
指定存储新构建镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type
的有效值是要推送到
容器 registry (如 Docker Hub 或 Quay)或镜像流
的有效值,以将镜像推送到内部 OpenShift ImageStream。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定build.output
的更多信息,请参阅 {Name configuration StreamsOpenShift} 中的 AMQ Streams Build schema 参考。5
plugins
配置列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称
,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您还可以包含可用于连接器的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。6
artifacts.type
的值指定在artifacts.url
中指定的工件类型。有效类型为zip
、tgz
或jar
。Debezium 连接器存档以.zip
文件格式提供。类型
值必须与url
字段中引用的文件类型匹配。7
artifacts.url
的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Debezium 连接器工件在 Red Hat Maven 存储库中提供。OpenShift 集群必须有权访问指定的服务器。8
(可选)指定用于下载 Apicurio Registry 组件的工件
类型和
url
。包含 Apicurio Registry 工件,只有在您希望连接器使用 Apache Avro 来序列化带有红帽构建的 Apicurio Registry 的值,而不是使用默认的 JSON 转换程序时。9
(可选)指定 Debezium 脚本 SMT 归档的工件
类型和
url
,以用于 Debezium 连接器。只有在打算使用 Debezium 的基于内容的路由 SMT 或 过滤 SMT 时才包括脚本 SMT。要使用脚本 SMT,您必须部署 JSR 223 兼容脚本实现,如 groovy。10
(可选)指定 JSR 223 兼容脚本实施的 JAR 文件的工件
类型和
url
,这是 Debezium 脚本 SMT 所需的。重要如果使用 AMQ Streams 将连接器插件合并到 Kafka Connect 镜像中,每个所需的脚本语言
工件。url
必须指定 JAR 文件的位置,并且artifacts.type
的值也必须设置为jar
。无效的值会导致连接器在运行时失败。要启用带有脚本 SMT 的 Apache Groovy 语言,示例中的自定义资源会为以下库检索 JAR 文件:
-
groovy
-
Groovy-jsr223
(指定代理) -
groovy-json
(解析 JSON 字符串的模块)
作为替代方案,Debebe Debezium 脚本 SMT 也支持使用 JSR 223 实现 GraalVM JavaScript。
输入以下命令将
KafkaConnect
构建规格应用到 OpenShift 集群:oc create -f dbz-connect.yaml
根据自定义资源中指定的配置,Streams Operator 准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。集群中提供了您在配置中列出的连接器工件。创建一个
KafkaConnector
资源来定义您要部署的每个连接器的实例。
例如,创建以下KafkaConnector
CR,并将它保存为sqlserver-inventory-connector.yaml
例 9.2.
sqlserver-inventory-connector.yaml
文件,为 Debezium 连接器定义KafkaConnector
自定义资源apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: debezium-kafka-connect-cluster name: inventory-connector-sqlserver 1 spec: class: io.debezium.connector.sqlserver.SqlServerConnector 2 tasksMax: 1 3 config: 4 schema.history.internal.kafka.bootstrap.servers: debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092 schema.history.internal.kafka.topic: schema-changes.inventory database.hostname: sqlserver.debezium-sqlserver.svc.cluster.local 5 database.port: 1433 6 database.user: debezium 7 database.password: dbz 8 topic.prefix: inventory-connector-sqlserver 9 table.include.list: dbo.customers 10 ...
表 9.17. 连接器配置设置的描述 项 描述 1
使用 Kafka Connect 集群注册的连接器名称。
2
连接器类的名称。
3
可以同时操作的任务数量。
4
连接器的配置。
5
主机数据库实例的地址。
6
数据库实例的端口号。
7
Debezium 用于连接到数据库的帐户名称。
8
Debezium 用于连接到数据库用户帐户的密码。
9
数据库实例或集群的主题前缀。
指定的名称只能由字母数字字符或下划线组成。
因为主题前缀被用作从这个连接器接收更改事件的任何 Kafka 主题的前缀,所以该名称在集群中的连接器之间必须是唯一的。
如果连接器与 Avro 连接器集成,则此命名空间也用于相关 Kafka Connect 模式的名称,以及相应 Avro 模式的命名空间。10
连接器捕获更改事件的表列表。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
例如,
oc create -n debezium -f sqlserver-inventory-connector.yaml
连接器注册到 Kafka Connect 集群,并开始针对
KafkaConnector
CR 中的spec.config.database.dbname
指定的数据库运行。连接器 pod 就绪后,Debebe 正在运行。
9.4.3. 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium SQL Server 连接器
要部署 Debezium SQL Server 连接器,您必须构建包含 Debezium 连接器归档的自定义 Kafka Connect 容器镜像,然后将此容器镜像推送到容器 registry。然后,您需要创建以下自定义资源(CR):
-
定义 Kafka Connect 实例的
KafkaConnect
CR。CR 中的image
属性指定您创建的容器镜像的名称,以运行 Debezium 连接器。您可以将此 CR 应用到部署 Red Hat AMQ Streams 的 OpenShift 实例。AMQ Streams 提供将 Apache Kafka 带到 OpenShift 的 operator 和镜像。 -
定义 Debezium SQL Server 连接器的
KafkaConnector
CR。将此 CR 应用到应用KafkaConnect
CR 的同一 OpenShift 实例。
先决条件
- SQL Server 正在运行,您完成了 设置 SQL Server 的步骤,以便使用 Debezium 连接器。
- AMQ Streams 部署在 OpenShift 中,并运行 Apache Kafka 和 Kafka Connect。如需更多信息,请参阅在 OpenShift 中部署和管理 AMQ Streams
- podman 或 Docker 已安装。
-
您有一个在容器 registry 中创建和管理容器(如
quay.io
或docker.io
)的帐户和权限,您要添加将运行 Debezium 连接器的容器。
流程
为 Kafka Connect 创建 Debezium SQL Server 容器:
创建一个使用
registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0
的 Dockerfile 作为基础镜像。例如,在终端窗口中输入以下命令:cat <<EOF >debezium-container-for-sqlserver.yaml 1 FROM registry.redhat.io/amq-streams-kafka-35-rhel8:2.5.0 USER root:root RUN mkdir -p /opt/kafka/plugins/debezium 2 RUN cd /opt/kafka/plugins/debezium/ \ && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-sqlserver/2.3.7.Final-redhat-00001/debezium-connector-sqlserver-2.3.7.Final-redhat-00001-plugin.zip \ && unzip debezium-connector-sqlserver-2.3.7.Final-redhat-00001-plugin.zip \ && rm debezium-connector-sqlserver-2.3.7.Final-redhat-00001-plugin.zip RUN cd /opt/kafka/plugins/debezium/ USER 1001 EOF
项 描述 1
您可以指定您想要的任何文件名。
2
指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将此路径替换为目录的实际路径。
该命令在当前目录中创建一个名为
debezium-container-for-sqlserver.yaml
的 Dockerfile。从您在上一步中创建的
debezium-container-for-sqlserver.yaml
Docker 文件中构建容器镜像。在包含文件的目录中,打开终端窗口并输入以下命令之一:podman build -t debezium-container-for-sqlserver:latest .
docker build -t debezium-container-for-sqlserver:latest .
前面的命令使用名称
debezium-container-for-sqlserver
构建容器镜像。将自定义镜像推送到容器 registry,如 quay.io 或内部容器 registry。容器 registry 必须可供您要部署镜像的 OpenShift 实例使用。输入以下命令之一:
podman push <myregistry.io>/debezium-container-for-sqlserver:latest
docker push <myregistry.io>/debezium-container-for-sqlserver:latest
创建新的 Debezium SQL Server KafkaConnect 自定义资源(CR)。例如,创建一个名为
dbz-connect.yaml
的KafkaConnect
CR,用于指定注解和
镜像
属性。以下示例显示了一个dbz-connect.yaml
文件的摘录,该文件描述了KafkaConnect
自定义资源。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: #... image: debezium-container-for-sqlserver 2 ...
项 描述 1
metadata.annotations
表示KafkaConnector
资源用于配置在这个 Kafka Connect 集群中使用的 Cluster Operator。2
spec.image
指定您创建的镜像的名称,以运行 Debezium 连接器。此属性覆盖 Cluster Operator 中的STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
变量。输入以下命令将
KafkaConnect
CR 应用到 OpenShift Kafka Connect 环境:oc create -f dbz-connect.yaml
该命令添加了一个 Kafka Connect 实例,用于指定您为运行 Debezium 连接器而创建的镜像的名称。
创建一个
KafkaConnector
自定义资源来配置 Debezium SQL Server 连接器实例。您可以在
.yaml
文件中配置 Debezium SQL Server 连接器,该文件指定连接器的配置属性。连接器配置可能指示 Debezium 为 schema 和表的子集生成事件,或者可能会设置属性,以便 Debezium 忽略、掩码或截断敏感、太大或不需要的指定列中的值。以下示例配置了一个 Debezium 连接器,它连接到端口
1433
上的 SQL 服务器主机192.168.99.100
。此主机有一个名为testDB
的数据库,名为customer
的表,inventory-connector-sqlserver
是服务器的逻辑名称。SQL Server
inventory-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: inventory-connector-sqlserver 1 labels: strimzi.io/cluster: my-connect-cluster annotations: strimzi.io/use-connector-resources: 'true' spec: class: io.debezium.connector.sqlserver.SqlServerConnector 2 config: database.hostname: 192.168.99.100 3 database.port: 1433 4 database.user: debezium 5 database.password: dbz 6 database.names: testDB1,testDB2 7 topic.prefix: inventory-connector-sqlserver 8 table.include.list: dbo.customers 9 schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 10 schema.history.internal.kafka.topic: schemahistory.fullfillment 11 database.ssl.truststore: path/to/trust-store 12 database.ssl.truststore.password: password-for-trust-store 13
表 9.18. 连接器配置设置的描述 项 描述 1
在我们使用 Kafka Connect 服务注册时连接器的名称。
2
此 SQL Server 连接器类的名称。
3
SQL Server 实例的地址。
4
SQL Server 实例的端口号。
5
SQL Server 用户的名称。
6
SQL Server 用户的密码。
7
要从中捕获更改的数据库名称。
8
SQL Server 实例/集群的主题前缀,它组成一个命名空间,并在使用 Avro converter 时用于连接器写入的 Kafka 主题、Kafka Connect 模式名称和对应 Avro 模式的命名空间。
9
连接器只捕获
dbo.customers
表中的更改。10
此连接器将用来写入和恢复 DDL 语句到数据库 schema 历史记录主题的 Kafka 代理列表。
11
连接器要写入和恢复 DDL 语句的数据库模式历史记录主题的名称。本主题仅用于内部使用,不应供消费者使用。
12
存储服务器的签名证书的 SSL 信任存储的路径。除非禁用了数据库加密(
database.encrypt=false
),否则需要此属性。13
SSL 信任存储密码。除非禁用了数据库加密(
database.encrypt=false
),否则需要此属性。使用 Kafka Connect 创建连接器实例。例如,如果您将
KafkaConnector
资源保存在inventory-connector.yaml
文件中,您将运行以下命令:oc apply -f inventory-connector.yaml
前面的命令注册
inventory-connector
,连接器开始针对KafkaConnector
CR 中定义的testDB
数据库运行。
验证 Debezium SQL Server 连接器是否正在运行
如果连接器正确启动且没有错误,它会为每个连接器配置为捕获的表创建一个主题。下游应用程序可以订阅这些主题,以检索源数据库中发生的信息事件。
要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:
- 验证连接器状态。
- 验证连接器是否生成主题。
- 验证主题是否填充了读取操作("op":"r")的事件,连接器在每个表的初始快照中生成。
先决条件
- Debezium 连接器部署到 OpenShift 上的 AMQ Streams。
-
已安装 OpenShift
oc
CLI 客户端。 - 访问 OpenShift Container Platform web 控制台。
流程
使用以下方法之一检查
KafkaConnector
资源的状态:在 OpenShift Container Platform Web 控制台中:
-
导航到 Home
Search。 -
在 Search 页面中,点 Resources 打开 Select Resource 框,然后键入
KafkaConnector
。 - 在 KafkaConnectors 列表中,点您要检查的连接器的名称,如 inventory-connector-sqlserver。
- 在 Conditions 部分,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
-
导航到 Home
在终端窗口中:
使用以下命令:
oc describe KafkaConnector <connector-name> -n <project>
例如,
oc describe KafkaConnector inventory-connector-sqlserver -n debezium
该命令返回类似以下示例的状态信息:
例 9.3.
KafkaConnector
资源状态Name: inventory-connector-sqlserver Namespace: debezium Labels: strimzi.io/cluster=debezium-kafka-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector-sqlserver Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: inventory-connector-sqlserver.inventory inventory-connector-sqlserver.inventory.addresses inventory-connector-sqlserver.inventory.customers inventory-connector-sqlserver.inventory.geom inventory-connector-sqlserver.inventory.orders inventory-connector-sqlserver.inventory.products inventory-connector-sqlserver.inventory.products_on_hand Events: <none>
验证连接器是否创建了 Kafka 主题:
通过 OpenShift Container Platform Web 控制台。
-
导航到 Home
Search。 -
在 Search 页面中,点 Resources 打开 Select Resource 框,然后键入
KafkaTopic
。 -
在 KafkaTopics 列表中,点您要检查的主题名称,例如
inventory-connector-sqlserver.inventory.orders--ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
。 - 在 Conditions 部分,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
-
导航到 Home
在终端窗口中:
使用以下命令:
oc get kafkatopics
该命令返回类似以下示例的状态信息:
例 9.4.
KafkaTopic
资源状态NAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs debezium-kafka-cluster 1 1 True connect-cluster-offsets debezium-kafka-cluster 25 1 True connect-cluster-status debezium-kafka-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a debezium-kafka-cluster 50 1 True inventory-connector-sqlserver--a96f69b23d6118ff415f772679da623fbbb99421 debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.products---df0746db116844cee2297fab611c21b56f82dcef debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 debezium-kafka-cluster 1 1 True schema-changes.inventory debezium-kafka-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 debezium-kafka-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b debezium-kafka-cluster 1 1 True
检查主题内容。
- 在终端窗口中输入以下命令:
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
例如,
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-sqlserver.inventory.products_on_hand
指定主题名称的格式与
oc describe
命令返回的格式与第 1 步中返回,例如inventory-connector-sqlserver.inventory.addresses
。对于主题中的每个事件,命令会返回类似以下示例的信息:
例 9.5. Debezium 更改事件的内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-sqlserver.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.3.7.Final-redhat-00001","connector":"sqlserver","name":"inventory-connector-sqlserver","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"sqlserver-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
在前面的示例中,
有效负载
值显示连接器快照从表inventory.products_on_hand
生成读取(op" ="r"
)事件。product_id
记录的"before"
状态为null
,表示该记录不存在之前的值。"after"
状态对于product_id
为101
的项目的quantity
显示为3
。
有关您可以为 Debezium SQL Server 连接器设置的配置属性的完整列表,请参阅 SQL Server 连接器属性。
结果
当连接器启动时,它会 对连接器进行配置的 SQL Server 数据库执行一致的快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流传输到 Kafka 主题。
9.4.4. Debezium SQL Server 连接器配置属性的描述
Debezium SQL Server 连接器具有大量配置属性,您可以使用它来实现应用程序的正确连接器行为。许多属性都有默认值。
有关属性的信息组织如下:
- 所需的连接器配置属性
- 高级连接器配置属性
数据库模式历史记录连接器配置属性,用于控制 Debezium 如何处理从数据库 schema 历史记录主题读取的事件。
- 控制 数据库驱动程序行为的直通数据库驱动程序属性。
所需的 Debezium SQL Server 连接器配置属性
除非默认值可用 , 否则需要以下配置属性。
属性 | 默认 | 描述 |
---|---|---|
没有默认值 | 连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有 Kafka Connect 连接器都需要此属性。) | |
没有默认值 |
连接器的 Java 类的名称。始终为 SQL Server 连接器使用 | |
| 指定连接器可用于从数据库实例中捕获数据的最大任务数量。 | |
没有默认值 | SQL Server 数据库服务器的 IP 地址或主机名。 | |
| SQL Server 数据库服务器的整数端口号。 | |
没有默认值 | 连接到 SQL Server 数据库服务器时要使用的用户名。使用 Kerberos 身份验证时可以省略,可以使用 pass-through 属性进行配置。 | |
没有默认值 | 连接到 SQL Server 数据库服务器时要使用的密码。 | |
没有默认值 | 指定 SQL 服务器名称实例的实例名称。 | |
没有默认值 |
为您希望 Debezium 捕获的 SQL Server 数据库服务器提供命名空间的主题前缀。前缀应该在所有其他连接器中唯一,因为它被用作从这个连接器接收记录的所有 Kafka 主题名称的前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、句点和下划线。 警告 不要更改此属性的值。如果您重启后更改了 name 值,而不是继续向原始主题发出事件,连接器会将后续事件发送到名称基于新值的主题。连接器也无法恢复其数据库架构历史记录主题。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与您要 捕获更改的模式名称匹配。不包括在 schema.
要匹配 schema 的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个名称字符串匹配;它与 schema 名称中可能存在的子字符串匹配。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与 您不想 捕获更改的模式名称匹配。任何名称不包含在 schema.
要匹配 schema 的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与 schema 的整个名称字符串匹配;它与 schema 名称中可能存在的子字符串匹配。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与您希望 Debezium 捕获的表的完全限定表标识符匹配。默认情况下,连接器为指定模式捕获所有非系统表。当设置此属性时,连接器只捕获指定表中的更改。每个标识符都是 schemaName.tableName。
要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它与表名称中可能存在的子字符串不匹配。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与您要从捕获中排除的表的完全限定表标识符匹配。Debezium 捕获
要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它与表名称中可能存在的子字符串不匹配。 | |
空字符串 |
可选的、以逗号分隔的正则表达式列表,与更改事件消息值中包含的列的完全限定域名匹配。列的完全限定域名格式为 schemaName.tableName.columnName。请注意,主键列始终包含在事件的键中,即使没有包含在值中。
要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;它与列中可能出现的子字符串匹配。 | |
空字符串 |
可选的、以逗号分隔的正则表达式列表,与更改事件消息值中排除的列的完全限定域名匹配。列的完全限定域名格式为 schemaName.tableName.columnName。请注意,如果从值中排除了主键列,则始终包含在事件的键中。
要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;它与列中可能出现的子字符串匹配。 | |
|
指定在包含列中没有更改时是否跳过发布消息。如果列中没有包括每个 | |
| 不适用 |
可选的、以逗号分隔的正则表达式列表,与基于字符列的完全限定名称匹配。列的完全限定域名格式为 `<schemaName>.<tableName>.<columnName>`.
一个 pseudonym,它包括了通过应用指定的 hashAlgorithm 和 salt 的结果的哈希值。根据所使用的哈希函数,会维护引用完整性,而列值则替换为 pseudonyms。支持的哈希功能在 Java Cryptography 架构标准 Algorithm Name 文档的 MessageDigest 部分中 进行了描述。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
如有必要,pseudonym 会自动缩短为列的长度。连接器配置可以包含多个属性,用于指定不同的哈希算法和 salt。 |
|
时间、日期和时间戳可以以不同的精度类型代表: | |
|
指定连接器如何处理 | |
|
布尔值,指定连接器是否应该将数据库模式中的更改发布到与数据库服务器 ID 的名称相同的 Kafka 主题。每个架构更改都使用包含数据库名称和一个 JSON 结构的键记录,该键描述了 schema 更新。这独立于连接器内部记录数据库架构历史记录。默认值是 | |
|
控制 delete 事件是否后跟一个 tombstone 事件。 | |
不适用 |
可选的、以逗号分隔的正则表达式列表,与基于字符列的完全限定名称匹配。如果在列中的数据超过了在属性名中的 length 指定的字符长度时删节数据,设置此属性。将
列的完全限定域名会观察以下格式:< 您可以在单个配置中指定多个长度不同的属性。 | |
列的 N/a Fully-qualified 名称格式为 schemaName.tableName.columnName。 |
可选的、以逗号分隔的正则表达式列表,与基于字符列的完全限定名称匹配。如果您希望连接器屏蔽一组列的值,例如,如果它们包含敏感数据,则设置此属性。将 列的完全限定域名观察以下格式: schemaName.tableName.columnName。要匹配列的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与列的整个名称字符串匹配;表达式与列名称中可能存在的子字符串不匹配。 您可以在单个配置中指定多个长度不同的属性。 | |
不适用 | 可选的、以逗号分隔的正则表达式列表,与您希望连接器发出代表列元数据的额外参数的完全限定名称匹配。当设置此属性时,连接器会将以下字段添加到事件记录的 schema 中:
这些参数会分别传播列的原始类型名称和长度(用于变量宽度类型)。
列的完全限定域名观察以下格式: schemaName.tableName.columnName。 | |
不适用 | 可选的、以逗号分隔的正则表达式列表,用于指定为数据库中列定义的数据类型的完全限定名称。当设置此属性时,对于具有匹配数据类型的列,连接器会发出在 schema 中包含以下额外字段的事件记录:
这些参数会分别传播列的原始类型名称和长度(用于变量宽度类型)。
列的完全限定域名观察以下格式: schemaName.tableName.typeName。 有关 SQL Server 特定数据类型名称的列表,请参阅 SQL Server 数据类型映射。 | |
不适用 | 指定连接器用来组成自定义消息键的表达式列表,用于更改它发布到指定表的 Kafka 主题的事件记录。
默认情况下,Debezium 使用表的主键列作为它发出的记录的消息键。在默认位置,或者为缺少主密钥的表指定一个键,您可以根据一个或多个列配置自定义消息密钥。
每个完全限定表名称都是以下格式的一个正则表达式: 对于用来创建自定义消息键的列数量没有限制。但是,最好使用指定唯一密钥所需的最小数量。 | |
bytes |
指定二进制( | |
none |
指定应如何调整模式名称以与连接器使用的消息转换器兼容。可能的设置:
| |
none |
指定应如何调整字段名称以与连接器使用的消息转换器兼容。可能的设置:
如需更多信息,请参阅 Avro 命名。 |
高级 SQL Server 连接器配置属性
以下 高级配置 属性具有很好的默认值,这些默认值在大多数情况下将可以正常工作,因此很少需要在连接器的配置中指定。
属性 | 默认 | 描述 |
---|---|---|
没有默认值 |
枚举连接器可以使用的 自定义转换器 实例的符号链接列表。例如,
您必须设置
对于您为连接器配置的每个转换器,您还必须添加一个
例如, isbn.type: io.debezium.test.IsbnConverter
如果要进一步控制配置的转换器的行为,您可以添加一个或多个配置参数将值传递给转换器。要将任何其他配置参数与转换器关联,请为参数名称加上转换器的符号名作为前缀。例如, isbn.schema.name: io.debezium.sqlserver.type.Isbn | |
初始 | 一个模式,用于获取捕获表的结构的初始快照和可选数据。快照完成后,连接器将继续从数据库的 redo 日志中读取更改事件。支持以下值:
| |
|
一个可选的、以逗号分隔的正则表达式列表,匹配表的完全限定名 ( 要匹配表的名称,Debebe 会使用正则表达式,它由您作为 anchored 正则表达式指定。也就是说,指定的表达式与表的整个名称字符串匹配;它与表名称中可能存在的子字符串不匹配。 | |
repeatable_read | 控制使用哪个事务隔离级别以及为捕获指定连接器锁定表的模式。支持以下值:
模式选择也会影响数据一致性。只有 | |
|
指定连接器在处理事件时应如何响应异常。 | |
| 正整数值,指定连接器在每个迭代过程中应等待的毫秒数,以便出现新更改事件。默认值为 500 毫秒,或 0.5 秒。 | |
|
正整数值,用于指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取事件时,它会将事件放置在阻塞队列中,然后再将它们写入 Kafka。阻塞队列可以提供从数据库读取更改事件时,连接器最快于将其写入 Kafka 的信息,或者在 Kafka 不可用时从数据库读取更改事件。当连接器定期记录偏移时,队列中保存的事件会被忽略。始终将 | |
|
一个长的整数值,用于指定阻塞队列的最大卷(以字节为单位)。默认情况下,不会为阻塞队列指定卷限制。要指定队列可以消耗的字节数,请将此属性设置为正长值。 | |
| 正整数值,指定每个应在此连接器迭代过程中处理的事件的最大大小。 | |
|
控制发送心跳消息的频率。 | |
没有默认值 |
在启动后,连接器在进行快照前应等待的间隔为 milli-seconds; | |
| 指定在拍摄快照时每个表中读取的最大行数。连接器将在这个大小的多个批处理中读取表内容。默认值为 2000。 | |
没有默认值 | 指定将针对给定查询的每个数据库往返获取的行数。默认为 JDBC 驱动程序的默认获取大小。 | |
|
整数值,用于指定执行快照时要等待的最大时间(以毫秒为单位)。如果此时间间隔内无法获取表锁定,则快照将失败(同时看到 快照)。 | |
没有默认值 | 指定要包含在快照中的表行。如果您希望快照只包含表中的行的子集,请使用属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。
该属性包含以逗号分隔的、完全限定表名称列表,格式为 <
在包含 soft-delete 列 "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"
在生成的快照中,连接器只包括 | |
|
当设置为 | |
10000 (10 秒) | 在发生可分配错误后重启连接器前要等待的 milli-seconds 数量。 | |
|
在流过程中将跳过的操作类型的逗号分隔列表。操作包括:用于 inserts/create、 | |
没有默认值 |
用于向连接器发送信号的数据收集的完全限定名称。https://access.redhat.com/documentation/zh-cn/red_hat_build_of_debezium/2.3.7/html-single/debezium_user_guide/index#debezium-signaling-enabling-source-signaling-channel | |
source | 为连接器启用的信号频道名称列表。默认情况下,以下频道可用:
| |
没有默认值 | 为连接器启用的通知频道名称列表。默认情况下,以下频道可用:
| |
|
允许在增量快照期间更改模式。启用后,连接器将在增量快照期间检测模式更改,并重新选择当前块以避免锁定 DDL。 | |
| 连接器在增量快照块期间获取并读取内存的最大行数。增加块大小可提高效率,因为快照会运行更多大小的快照查询。但是,较大的块大小还需要更多内存来缓冲快照数据。将块大小调整为提供环境中最佳性能的值。 | |
0 |
指定在从数据库中的多个表流更改时,使用每个迭代的最大事务数来减少内存占用量。当设置为 | |
| 将 OPTION (RECOMPILE)查询选项用于增量快照期间使用的所有 SELECT 语句。这有助于解决可能发生的参数嗅探问题,但可能会导致源数据库的 CPU 负载增加,具体取决于查询执行的频率。 | |
|
应该用来确定数据更改、模式更改、事务、心跳事件等的 TopicNamingStrategy 类的名称,默认为 | |
|
指定主题名称的分隔符,默认为 | |
| 在绑定的并发哈希映射中用于保存主题名称的大小。此缓存将有助于确定与给定数据收集对应的主题名称。 | |
|
控制连接器向其发送心跳信息的主题名称。主题名称具有此模式: | |
|
控制连接器向其发送事务元数据消息的主题名称。主题名称具有此模式: 如需更多信息,请参阅 事务元数据。 | |
| 指定连接器执行初始快照时使用的线程数量。要启用并行初始快照,请将属性设置为大于 1 的值。在并行初始快照中,连接器会同时处理多个表。 重要 并行初始快照只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的更多信息,请参阅技术预览功能支持范围。 | |
| 在失败前,retriable 错误(如连接错误)的最大重试次数(-1 = no limit, 0 = disabled, > 0 = num of retries)。 |
Debezium SQL Server 连接器数据库模式历史记录配置属性
Debezium 提供了一组 schema.history.internal.*
属性,用于控制连接器如何与 schema 历史记录主题进行交互。
下表描述了用于配置 Debezium 连接器的 schema.history.internal
属性。
属性 | 默认 | 描述 |
---|---|---|
没有默认值 | 连接器存储数据库 schema 历史记录的 Kafka 主题的全名。 | |
没有默认值 | 连接器用来建立到 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索之前由连接器存储的数据库架构历史记录,以及用于从源数据库读取的每个 DDL 语句。每个对都应指向 Kafka Connect 进程使用的相同 Kafka 集群。 | |
| 整数值,用于指定连接器在启动/恢复期间应等待的最大毫秒数,同时轮询持久数据。默认值为 100ms。 | |
| 一个整数值,用于指定连接器在使用 Kafka admin 客户端获取集群信息时应等待的最大毫秒数。 | |
| 一个整数值,用于指定连接器在使用 Kafka admin 客户端创建 kafka 历史记录主题时应等待的最大毫秒数。 | |
|
连接器在连接器恢复失败前应尝试读取持久性历史记录数据的次数上限,并显示错误。接收数据后等待的最大时间为 restore. | |
|
指定连接器是否应忽略格式或未知数据库语句的布尔值,或者停止处理,以便人可以解决这个问题。安全默认值为 | |
|
一个布尔值,用于指定连接器是否记录来自 schema 或数据库中的所有表的模式结构,还是仅从为捕获的表中指定的表。
| |
|
一个布尔值,用于指定连接器是否记录来自数据库实例中的所有逻辑数据库的架构结构。
注意
MySQL Connector 的默认值为 |
配置制作者和消费者客户端的直通数据库架构历史记录属性
Debezium 依赖于 Kafka producer 将模式更改写入数据库架构历史记录主题。同样,它依赖于 Kafka 使用者在连接器启动时从数据库 schema 历史记录主题中读取。您可以通过将值分配给以 schema.history.internal.producer 和 schema.history.internal.consumer ruby 前缀开头的 pass-through 配置属性来定义 Kafka producer
和 消费者
客户端的配置。直通生成者和消费者数据库模式历史记录属性控制一系列行为,如这些客户端与 Kafka 代理的连接的方式,如下例所示:
schema.history.internal.producer.security.protocol=SSL schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks schema.history.internal.producer.ssl.keystore.password=test1234 schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks schema.history.internal.producer.ssl.truststore.password=test1234 schema.history.internal.producer.ssl.key.password=test1234 schema.history.internal.consumer.security.protocol=SSL schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks schema.history.internal.consumer.ssl.keystore.password=test1234 schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks schema.history.internal.consumer.ssl.truststore.password=test1234 schema.history.internal.consumer.ssl.key.password=test1234
Debezium 从属性名称中剥离前缀,然后再将属性传递给 Kafka 客户端。
如需有关 Kafka producer 配置属性和 Kafka 使用者配置属性的更多详情,请参阅 Kafka 文档。
Debezium 连接器 Kafka 信号配置属性
Debezium 提供了一组 signal.*
属性,用于控制连接器如何与 Kafka 信号主题进行交互。
下表描述了 Kafka 信号
属性。
属性 | 默认 | 描述 |
---|---|---|
<topic.prefix>-signal | 连接器监控用于临时信号的 Kafka 主题的名称。 注意 如果禁用了 自动主题创建,您必须手动创建所需的信号主题。需要信号主题来保留信号排序。信号主题必须具有单个分区。 | |
kafka-signal | Kafka 用户使用的组 ID 的名称。 | |
没有默认值 | 连接器用来建立到 Kafka 集群的初始连接的主机/端口对列表。每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。 | |
| 一个整数值,用于指定连接器在轮询信号时等待的最大毫秒数。 |
Debezium 连接器传递信号 Kafka 使用者客户端配置属性
Debezium 连接器为信号 Kafka 使用者提供直通配置。透传信号属性以 signals.consumer.*
前缀开始。例如,连接器将 signal.consumer.security.protocol=SSL
等属性传递给 Kafka 消费者。
Debezium 从属性中剥离前缀,然后再将属性传递给 Kafka 信号消费者。
Debezium 连接器接收器通知配置属性
下表描述了 通知
属性。
属性 | 默认 | 描述 |
---|---|---|
没有默认值 |
从 Debezium 接收通知的主题名称。当您将 |
Debezium SQL Server 连接器直通数据库驱动程序配置属性
Debezium 连接器为数据库驱动程序的直通配置提供。直通数据库属性以前缀 driver metric 开头
。例如,连接器将 driver.foobar=false
等属性传递给 JDBC URL。
与 数据库架构历史记录客户端通过直通属性 一样,Debebe 会在将前缀传递给数据库驱动程序之前从属性中剥离前缀。