7.6. 部署 Debezium PostgreSQL 连接器
您可以使用以下任一方法部署 Debezium PostgreSQL 连接器:
7.6.1. 使用 AMQ Streams 部署 PostgreSQL 连接器
从 Debezium 1.7 开始,部署 Debezium 连接器的首选方法是使用 AMQ Streams 来构建包含连接器插件的 Kafka Connect 容器镜像。
在部署过程中,您可以创建并使用以下自定义资源(CR):
-
定义 Kafka Connect 实例的
KafkaConnect
CR,并包含有关镜像中需要包含连接器工件的信息。 -
提供连接器用来访问源数据库的信息的
KafkaConnector
CR。在 AMQ Streams 启动 Kafka Connect pod 后,您可以通过应用KafkaConnector
CR 来启动连接器。
在 Kafka Connect 镜像的构建规格中,您可以指定可用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。当 AMQ Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。
KafkaConnect
CR 中的 spec.build.output
参数指定存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在 Docker registry 中,也可以存储在 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须先创建 ImageStream,然后才能部署 Kafka Connect。镜像流不会被自动创建。
如果使用 KafkaConnect
资源创建集群,之后您无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。
其他资源
- 在 OpenShift 上使用 AMQ Streams 配置 Kafka 连接。
- 在 OpenShift 中部署和升级 AMQ Streams 中的使用 AMQ Streams 自动创建新容器镜像。
7.6.2. 使用 AMQ Streams 部署 Debezium PostgreSQL 连接器
使用早期版本的 AMQ Streams 时,要在 OpenShift 上部署 Debezium 连接器,首先需要为连接器构建 Kafka Connect 镜像。在 OpenShift 上部署连接器的当前首选的方法是使用 AMQ Streams 中的构建配置来自动构建 Kafka Connect 容器镜像,其中包含您要使用的 Debezium 连接器插件。
在构建过程中,AMQ Streams Operator 会将 KafkaConnect
自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。
新创建的容器被推送到 .spec.build.output
中指定的容器 registry,用于部署 Kafka Connect 集群。AMQ Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector
自定义资源来启动构建中包含的连接器。
先决条件
- 您可以访问安装了集群 Operator 的 OpenShift 集群。
- AMQ Streams Operator 正在运行。
- 部署 Apache Kafka 集群,如在 OpenShift 中部署和升级 AMQ Streams 所述。
- Kafka Connect 部署在 AMQ Streams 上
- 您有一个 Red Hat Integration 许可证。
-
已安装 OpenShift
oc
CLI 客户端,或者您可以访问 OpenShift Container Platform Web 控制台。 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限或您必须创建 ImageStream 资源:
- 要将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
- 在 registry 中创建和管理镜像的帐户和权限。
- 将构建镜像存储为原生 OpenShift ImageStream
- ImageStream 资源已部署到集群中,以存储新的容器镜像。您必须为集群明确创建 ImageStream。镜像流默认不可用。如需有关 ImageStreams 的更多信息,请参阅在 OpenShift Container Platform 上管理镜像流。
流程
- 登录 OpenShift 集群。
为连接器创建 Debezium
KafkaConnect
自定义资源(CR),或修改现有的资源。例如,创建一个名为dbz-connect.yaml
的KafkaConnect
CR,用于指定metadata.annotations
和spec.build
属性。以下示例显示了来自dbz-connect.yaml
文件的摘录,该文件描述了KafkaConnect
自定义资源。
例 7.1.
dbz-connect.yaml
文件,该文件定义包含 Debezium 连接器的KafkaConnect
自定义资源在以下示例中,自定义资源被配置为下载以下工件:
- Debezium PostgreSQL 连接器存档。
- Service Registry 归档。Service Registry 是一个可选组件。只有在打算在连接器中使用 Avro 序列化时,才添加 Service Registry 组件。
- Debezium 脚本 SMT 归档以及您要与 Debezium 连接器一起使用的关联脚本引擎。SMT 归档和脚本语言依赖项是可选组件。只有在打算使用 Debezium 是 基于内容的路由 SMT 或 过滤 SMT 时,才添加这些组件。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: debezium-kafka-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: version: 3.3.1 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-postgres artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/2.1.4.Final-redhat-00001/debezium-connector-postgres-2.1.4.Final-redhat-00001-plugin.zip 7 - type: zip url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.3.0.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.3.0.Final-redhat-<build-number>.zip 8 - type: zip url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.1.4.Final-redhat-00001/debezium-scripting-2.1.4.Final-redhat-00001.zip 9 - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.11/groovy-3.0.11.jar 10 - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093 ...
表 7.23. Kafka Connect 配置设置的描述 项 描述 1
将
strimzi.io/use-connector-resources
注解设置为"true"
,以便 Cluster Operator 使用KafkaConnector
资源在此 Kafka Connect 集群中配置连接器。2
spec.build
配置指定构建镜像的位置,并列出要在镜像中包含的插件,以及插件工件的位置。3
build.output
指定存储新构建镜像的 registry。4
指定镜像输出的名称和镜像名称。
output.type
的有效值是docker
,可推送到容器 registry,如 Docker Hub 或 Quay,或将镜像推送到内部 OpenShift ImageStream的镜像流
。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定build.output
的更多信息,请参阅在 OpenShift 中配置 AMQ Streams 中的 AMQ Streams Build schema 参考。5
插件配置
列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件名称
,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您还可以包含您要与连接器一起使用的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。6
artifacts.type
的值指定artifacts.url
中指定的工件的文件类型。有效类型是zip
、tgz
或jar
。Debezium 连接器存档以.zip
文件格式提供。type
值必须与url
字段中引用的文件类型匹配。7
artifacts.url
的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Debezium 连接器工件位于 Red Hat Maven 存储库中。OpenShift 集群必须有权访问指定的服务器。8
(可选)指定下载 Service Registry 组件的工件
类型和
url
。包括 Service Registry 工件,只有在您希望连接器使用 Apache Avro 与 Service Registry 序列化事件键和值时,而不是使用默认的 JSON converter。9
(可选)指定 Debezium 脚本 SMT 归档的工件
类型和
url
,以用于 Debezium 连接器。只有在打算使用 Debezium 基于内容的路由 SMT 或 过滤 SMT 时使用脚本 SMT 时,才包含脚本 SMT,您必须部署 JSR 223 兼容脚本实施,如 groovy。10
(可选)为 JSR 223 兼容脚本实现的 JAR 文件指定工件
类型和
url
,这是 Debezium 脚本 SMT 所需的。重要如果您使用 AMQ Streams 将连接器插件合并到 Kafka Connect 镜像中,每个所需脚本语言组件
artifacts.url
必须指定 JAR 文件的位置,而artifacts.type
的值还必须设置为jar
。无效的值会导致连接器在运行时失败。要启用使用带有脚本 SMT 的 Apache Groovy 语言,示例中的自定义资源会检索以下库的 JAR 文件:
-
groovy
-
Groovy-jsr223
(协调代理) -
Groovy-json
(用于解析 JSON 字符串的模块)
作为替代方案,Debebe 脚本 SMT 还支持使用 GraalVM JavaScript 的 JSR 223 实现。
输入以下命令将
KafkaConnect
构建规格应用到 OpenShift 集群:oc create -f dbz-connect.yaml
根据自定义资源中指定的配置,Streams Operator 会准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。您配置中列出的连接器工件在集群中可用。创建一个
KafkaConnector
资源来定义您要部署的每个连接器的实例。
例如,创建以下KafkaConnector
CR,并将它保存为postgresql-inventory-connector.yaml
例 7.2. 为 Debezium 连接器定义
KafkaConnector
自定义资源的postgresql-inventory-connector.yaml
文件apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: debezium-kafka-connect-cluster name: inventory-connector-postgresql 1 spec: class: io.debezium.connector.postgresql.PostgresConnector 2 tasksMax: 1 3 config: 4 database.hostname: postgresql.debezium-postgresql.svc.cluster.local 5 database.port: 5432 6 database.user: debezium 7 database.password: dbz 8 database.dbname: mydatabase 9 topic.prefix: inventory-connector-postgresql 10 table.include.list: public.inventory 11 ...
表 7.24. 连接器配置设置的描述 项 描述 1
使用 Kafka Connect 集群注册的连接器名称。
2
连接器类的名称。
3
可同时运行的任务数量。
4
连接器的配置。
5
主机数据库实例的地址。
6
数据库实例的端口号。
7
Debezium 用于连接到数据库的帐户名称。
8
Debezium 用于连接到数据库用户帐户的密码。
9
要从中捕获更改的数据库名称。
10
数据库实例或集群的主题前缀。
指定的名称只能从字母数字字符或下划线括起。
由于主题前缀用作从这个连接器接收更改事件的任何 Kafka 主题的前缀,所以名称必须在集群中的连接器之间唯一。
此命名空间也用于相关的 Kafka Connect 模式的名称,如果您将连接器与 Avro 连接器集成,则对应的 Avro 模式的命名空间也用于。11
连接器从中捕获更改事件的表列表。
运行以下命令来创建连接器资源:
oc create -n <namespace> -f <kafkaConnector>.yaml
例如,
oc create -n debezium -f {context}-inventory-connector.yaml
连接器注册到 Kafka Connect 集群,并开始针对
KafkaConnector
CR 中的spec.config.database.dbname
指定的数据库运行。连接器 pod 就绪后,Debezium 正在运行。
现在,您已准备好 验证 Debezium PostgreSQL 部署。
7.6.3. 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium PostgreSQL 连接器
要部署 Debezium PostgreSQL 连接器,您需要构建包含 Debezium 连接器存档的自定义 Kafka Connect 容器镜像,并将此容器镜像推送到容器 registry。然后,您需要创建两个自定义资源(CR):
-
定义 Kafka Connect 实例的
KafkaConnect
CR。CR 中的image
属性指定您创建用来运行 Debezium 连接器的容器镜像的名称。您可以将此 CR 应用到部署了 Red Hat AMQ Streams 的 OpenShift 实例。AMQ Streams 提供将 Apache Kafka 引入到 OpenShift 的 operator 和镜像。 -
定义 Debezium Db2 连接器的
KafkaConnector
CR。将此 CR 应用到应用KafkaConnect
CR 的相同 OpenShift 实例。
先决条件
- PostgreSQL 正在运行,并执行 了设置 PostgreSQL 来运行 Debezium 连接器 的步骤。
- AMQ Streams 部署在 OpenShift 上,并运行 Apache Kafka 和 Kafka Connect。如需更多信息,请参阅在 OpenShift 中部署和升级 AMQ Streams。
- podman 或 Docker 已安装。
-
您有在容器 registry 中创建和管理容器(如
quay.io
或docker.io
)的帐户和权限,您要向其添加将运行 Debezium 连接器的容器。
流程
为 Kafka Connect 创建 Debezium PostgreSQL 容器:
创建一个 Dockerfile,它使用
registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.0-12
作为基础镜像。例如,在终端窗口中输入以下命令:cat <<EOF >debezium-container-for-postgresql.yaml 1 FROM registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.0-12 USER root:root RUN mkdir -p /opt/kafka/plugins/debezium 2 RUN cd /opt/kafka/plugins/debezium/ \ && curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-postgres/2.1.4.Final-redhat-00001/debezium-connector-postgres-2.1.4.Final-redhat-00001-plugin.zip \ && unzip debezium-connector-postgres-2.1.4.Final-redhat-00001-plugin.zip \ && rm debezium-connector-postgres-2.1.4.Final-redhat-00001-plugin.zip RUN cd /opt/kafka/plugins/debezium/ USER 1001 EOF
项 描述 1
您可以指定您想要的任何文件名。
2
指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将这个路径替换为您的目录的实际路径。
该命令在当前目录中创建一个名为
debezium-container-for-postgresql.yaml
的 Dockerfile。从您在上一步中创建的
debezium-container-for-postgresql.yaml
Docker 文件中构建容器镜像。在包含该文件的目录中,打开终端窗口并输入以下命令之一:podman build -t debezium-container-for-postgresql:latest .
docker build -t debezium-container-for-postgresql:latest .
build
命令构建名为debezium-container-for-postgresql
的容器镜像。将自定义镜像推送到容器 registry,如
quay.io
或内部容器 registry。容器 registry 必须可供部署镜像的 OpenShift 实例使用。输入以下命令之一:podman push <myregistry.io>/debezium-container-for-postgresql:latest
docker push <myregistry.io>/debezium-container-for-postgresql:latest
创建新的 Debezium PostgreSQL
KafkaConnect
自定义资源(CR)。例如,创建一个名为dbz-connect.yaml
的KafkaConnect
CR,用于指定注解和
镜像
属性。以下示例显示了来自dbz-connect.yaml
文件的摘录,该文件描述了KafkaConnect
自定义资源。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: image: debezium-container-for-postgresql 2 ...
项 描述 1
metadata.annotations
表示KafkaConnector
资源用于配置在这个 Kafka Connect 集群中使用的 Cluster Operator。2
spec.image
指定您为运行 Debezium 连接器而创建的镜像名称。此属性覆盖 Cluster Operator 中的STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE
变量。运行以下命令,将
KafkaConnect
CR 应用到 OpenShift Kafka 实例:oc create -f dbz-connect.yaml
这会更新 OpenShift 中的 Kafka Connect 环境,以添加 Kafka Connector 实例,用于指定您为运行 Debezium 连接器而创建的镜像名称。
创建一个
KafkaConnector
自定义资源来配置 Debezium PostgreSQL 连接器实例。您可以在
.yaml
文件中配置 Debezium PostgreSQL 连接器,该文件指定连接器的配置属性。连接器配置可能会指示 Debezium 为 schema 和表的子集生成事件,或者可能会设置属性,以便 Debezium 忽略敏感、太大或不需要的指定栏中的值。有关您可以为 Debezium PostgreSQL 连接器设置的配置属性的完整列表,请参阅 PostgreSQL 连接器属性。以下示例显示了一个自定义资源的摘录,该资源在端口
5432
上配置 Debezium 连接器,连接到 PostgreSQL 服务器主机192.168.99.100
。此主机有一个名为sampledb
的数据库,名为public
的模式,inventory-connector-postgresql
是服务器的逻辑名称。inventory-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: inventory-connector-postgresql 1 labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.postgresql.PostgresConnector tasksMax: 1 2 config: 3 database.hostname: 192.168.99.100 4 database.port: 5432 database.user: debezium database.password: dbz database.dbname: sampledb topic.prefix: inventory-connector-postgresql 5 schema.include.list: public 6 plugin.name: pgoutput 7 ...
- 1 1 1 1 1
- 连接器的名称。
- 2 2 2 2 2
- 任何时候只能有一个任务。由于 PostgreSQL 连接器读取 PostgreSQL 服务器的
binlog
,因此使用单一连接器任务可以确保正确的顺序和事件处理。Kafka Connect 服务使用连接器启动一个或多个可以正常工作的任务,并在 Kafka Connect 服务集群中自动分发运行的任务。如果有任何服务停止或崩溃,这些任务将重新分发到运行的服务。 - 3 3 3
- 连接器的配置。
- 4 4 4
- 运行 PostgreSQL 服务器的数据库主机的名称。在本例中,数据库主机名为
192.168.99.100
。 - 5 5 5
- 唯一的主题前缀。服务器名称是 PostgreSQL 服务器或服务器集群的逻辑标识符。此名称用作接收更改事件记录的所有 Kafka 主题的前缀。
- 6 6 6
- 连接器只捕获
公共
模式中的更改。可以配置连接器来捕获您选择的表中的更改。如需更多信息,请参阅table.include.list
。 - 7 7 7
- 在 PostgreSQL 服务器上安装 PostgreSQL 逻辑解码插件的名称。虽然 PostgreSQL 10 及更新版本唯一支持的值是
pgoutput
,但您必须将plugin.name
明确设置为pgoutput
。
使用 Kafka Connect 创建连接器实例。例如,如果您将
KafkaConnector
资源保存到inventory-connector.yaml
文件中,您将运行以下命令:oc apply -f inventory-connector.yaml
这会注册
inventory-connector
,连接器开始针对KafkaConnector
CR 中定义的sampledb
数据库运行。
结果
连接器启动后,它会对配置了连接器的 PostgreSQL 服务器数据库 执行一致的快照。然后,连接器开始为行级操作生成数据更改事件,并将事件记录流传输到 Kafka 主题。
7.6.4. 验证 Debezium PostgreSQL 连接器是否正在运行
如果连接器正确启动且没有错误,它会为每个连接器配置为捕获的表创建一个主题。下游应用程序可以订阅这些主题以检索源数据库中发生的信息事件。
要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作:
- 验证连接器状态。
- 验证连接器是否生成主题。
- 验证主题是否填充了每个表初始快照过程中生成的读操作("op":"r")的事件。
先决条件
- Debezium 连接器部署到 OpenShift 上的 AMQ Streams。
-
已安装 OpenShift
oc
CLI 客户端。 - 访问 OpenShift Container Platform web 控制台。
流程
使用以下方法之一检查
KafkaConnector
资源的状态:在 OpenShift Container Platform Web 控制台中:
-
导航到 Home
Search。 -
在 Search 页面中,点 Resources 以打开 Select Resource 复选框,然后键入
KafkaConnector
。 - 在 KafkaConnectors 列表中,点您要检查的连接器的名称,如 inventory-connector-postgresql。
- 在 Conditions 部分中,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
-
导航到 Home
在一个终端窗口中:
使用以下命令:
oc describe KafkaConnector <connector-name> -n <project>
例如,
oc describe KafkaConnector inventory-connector-postgresql -n debezium
该命令返回类似以下输出的状态信息:
例 7.3.
KafkaConnector
资源状态Name: inventory-connector-postgresql Namespace: debezium Labels: strimzi.io/cluster=debezium-kafka-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector-postgresql Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: inventory-connector-postgresql.inventory inventory-connector-postgresql.inventory.addresses inventory-connector-postgresql.inventory.customers inventory-connector-postgresql.inventory.geom inventory-connector-postgresql.inventory.orders inventory-connector-postgresql.inventory.products inventory-connector-postgresql.inventory.products_on_hand Events: <none>
验证连接器是否已创建 Kafka 主题:
通过 OpenShift Container Platform Web 控制台。
-
导航到 Home
Search。 -
在 Search 页面中,点 Resources 打开 Select Resource 复选框,然后键入
KafkaTopic
。 -
在 KafkaTopics 列表中,点您要检查的主题的名称,例如
inventory-connector-postgresql.inventory.orders--ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
。 - 在 Conditions 部分中,验证 Type 和 Status 列中的值是否已设置为 Ready 和 True。
-
导航到 Home
在一个终端窗口中:
使用以下命令:
oc get kafkatopics
该命令返回类似以下输出的状态信息:
例 7.4.
KafkaTopic
资源状态NAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs debezium-kafka-cluster 1 1 True connect-cluster-offsets debezium-kafka-cluster 25 1 True connect-cluster-status debezium-kafka-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a debezium-kafka-cluster 50 1 True inventory-connector-postgresql--a96f69b23d6118ff415f772679da623fbbb99421 debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.products---df0746db116844cee2297fab611c21b56f82dcef debezium-kafka-cluster 1 1 True inventory-connector-postgresql.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 debezium-kafka-cluster 1 1 True schema-changes.inventory debezium-kafka-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 debezium-kafka-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b debezium-kafka-cluster 1 1 True
检查主题内容。
- 在终端窗口中输入以下命令:
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
例如,
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-postgresql.inventory.products_on_hand
指定主题名称的格式与
oc describe
命令的格式在第 1 步中返回,例如inventory-connector-postgresql.inventory.addresses
。对于主题中的每个事件,命令会返回类似以下输出的信息:
例 7.5. Debezium 更改事件的内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-postgresql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-postgresql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.1.4.Final-redhat-00001","connector":"postgresql","name":"inventory-connector-postgresql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"postgresql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
在上例中,
有效负载
值显示连接器快照从表inventory.products_on_hand
中生成一个读取("op" ="r"
)事件。product_id
记录的"before"
状态为null
,这表示记录没有之前的值。"after"
状态对于product_id
为101
的项目的quantity
显示为3
。
7.6.5. Debezium PostgreSQL 连接器配置属性的描述
Debezium PostgreSQL 连接器有许多配置属性,您可以使用它们来实现应用程序的正确连接器行为。许多属性具有默认值。有关属性的信息按如下方式进行组织:
除非默认值可用 , 否则需要以下配置属性。
属性 | 默认 | 描述 |
---|---|---|
没有默认值 | 连接器的唯一名称。尝试再次使用相同的名称注册将失败。所有 Kafka Connect 连接器都需要此属性。 | |
没有默认值 |
连接器的 Java 类的名称。对于 PostgreSQL 连接器,始终使用 | |
| 应该为此连接器创建的最大任务数量。PostgreSQL 连接器始终使用单个任务,因此不要使用这个值,因此始终可以接受默认值。 | |
| 在 PostgreSQL 服务器上安装 PostgreSQL 逻辑解码插件的名称。
唯一支持的值是 | |
| 为特定数据库/架构的特定插件创建的 PostgreSQL 逻辑解码插槽的名称。服务器使用此插槽将事件流传输到您要配置的 Debezium 连接器。 插槽名称必须符合 PostgreSQL 复制插槽命名规则,它的状态:"每个复制插槽都有一个名称,可以包含小写字母、数字和下划线字符"。 | |
| 当连接器以安全、预期的方式停止时,是否删除逻辑复制插槽。默认行为是,当连接器停止时,复制插槽仍然为连接器配置。当连接器重启时,具有相同的复制插槽可让连接器在离开的地方开始处理。
仅在测试或开发环境中设置为 | |
|
使用 如果尚未存在且 包含所有表,则在启动时会创建此发布。然后,Debezium 应用自己的 include/exclude 列表过滤(如果已配置),以限制发布以更改感兴趣的事件。连接器用户必须有超级用户权限才能创建此发布,因此通常会在首次启动连接器前创建发布。 如果发布已存在,对于所有表,或配置了表子集,Debezium 会使用其定义发布。 | |
没有默认值 | PostgreSQL 数据库服务器的 IP 地址或主机名。 | |
| PostgreSQL 数据库服务器的整数端口号。 | |
没有默认值 | 用于连接到 PostgreSQL 数据库服务器的 PostgreSQL 数据库用户的名称。 | |
没有默认值 | 连接到 PostgreSQL 数据库服务器时要使用的密码。 | |
没有默认值 | 要从中流传输更改的 PostgreSQL 数据库的名称。 | |
没有默认值 |
为 Debezium 捕获更改的特定 PostgreSQL 数据库服务器或集群提供命名空间的主题前缀。前缀在所有其他连接器中应该是唯一的,因为它用作从这个连接器接收记录的所有 Kafka 主题的主题名称前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、句点和下划线。 警告 不要更改此属性的值。如果您更改了 name 值,重启后,而不是继续向原始主题发出事件,连接器会将后续事件发送到名称基于新值的主题。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与您要 捕获更改的模式的名称匹配。任何未包含在 schema.
要匹配架构的名称,Debezium 应用您指定的正则表达式,以 替代 的正则表达式。也就是说,指定的表达式与架构的完整标识符匹配,它与 schema 名称中可能存在的子字符串不匹配。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与 您不想 捕获更改的模式的名称匹配。任何名称没有包含在 schema.
要匹配架构的名称,Debezium 应用您指定的正则表达式,以 替代 的正则表达式。也就是说,指定的表达式与架构的完整标识符匹配,它与 schema 名称中可能存在的子字符串不匹配。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与您要捕获更改的表的完全限定表标识符匹配。当设置此属性时,连接器只从指定的表中捕获更改。每个标识符的格式都是 schemaName。tableName。默认情况下,连接器捕获捕获更改的每个模式中的每个非系统表中的更改。
要匹配表的名称,Debebe 应用您指定的正则表达式。也就是说,指定的表达式与表的整个标识符匹配;它与表名称中可能存在的子字符串不匹配。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与您不想捕获更改的表的完全限定表标识符匹配。每个标识符的格式都是 schemaName。tableName。当设置此属性时,连接器会捕获您指定的每个表中的更改。
要匹配表的名称,Debebe 应用您指定的正则表达式。也就是说,指定的表达式与表的整个标识符匹配;它与表名称中可能存在的子字符串不匹配。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与应包含在更改事件记录值中的列的完全限定名称匹配。列的完全限定域名格式为 schemaName。tableName.columnName。
要匹配列的名称,Debebe 应用您指定的正则表达式。也就是说,表达式用于匹配列的整个名称字符串;它与列名称中可能存在的子字符串不匹配。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,与应该从更改事件记录值中排除的列的完全限定名称匹配。列的完全限定域名格式为 schemaName。tableName.columnName。
要匹配列的名称,Debebe 应用您指定的正则表达式。也就是说,表达式用于匹配列的整个名称字符串;它与列名称中可能存在的子字符串不匹配。 | |
|
时间、日期和时间戳可以通过不同类型的精度表示: | |
|
指定连接器应该如何处理 | |
|
指定连接器应该如何处理 | |
|
指定连接器应该如何处理 | |
|
是否使用加密连接到 PostgreSQL 服务器。选项包括: | |
没有默认值 | 包含客户端的 SSL 证书的文件的路径。如需更多信息 ,请参阅 PostgreSQL 文档。 | |
没有默认值 | 包含客户端 SSL 私钥的文件的路径。如需更多信息 ,请参阅 PostgreSQL 文档。 | |
没有默认值 |
从 | |
没有默认值 | 包含服务器验证的根证书的文件路径。如需更多信息 ,请参阅 PostgreSQL 文档。 | |
| 启用 TCP keep-alive 探测以验证数据库连接是否仍然处于活动状态。如需更多信息 ,请参阅 PostgreSQL 文档。 | |
|
控制 删除 事件是否随后是 tombstone 事件。 | |
不适用 |
一个可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定名称匹配。如果您要在一组列中超过属性名称中指定的字符数时,设置此属性。将
列的完全限定域名会观察以下格式:< 您可以在单个配置中指定多个长度不同的属性。 | |
不适用 |
一个可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定名称匹配。如果您希望连接器屏蔽一组列的值,例如,如果它们包含敏感数据,则设置此属性。将 列的完全限定域名会观察以下格式: schemaName.tableName.columnName。要匹配列的名称,Debebe 应用您指定的正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。 您可以在单个配置中指定多个长度不同的属性。 | |
| 不适用 |
一个可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定名称匹配。列的完全限定域名格式为 < schemaName>。<tableName & gt; . <columnName>。
一个 pseudonym,它包括了通过应用指定的 hashAlgorithm 和 salt 的结果的哈希值。根据使用的 hash 功能,会维护引用完整性,而列值则替换为伪nyms。支持的哈希功能在 Java Cryptography 架构标准算法名称文档中的 MessageDigest 部分 进行了描述。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
如有必要,伪的nym 会自动缩短到列的长度。连接器配置可以包含多个指定不同哈希算法和 salt 的属性。 |
不适用 | 可选的、以逗号分隔的正则表达式列表,它与您希望连接器发送代表列元数据的完全限定名称匹配。当设置此属性时,连接器会将以下字段添加到事件记录的架构中:
这些参数分别传播列的原始类型和长度(用于变量带宽类型)。
列的完全限定域名会观察以下格式之一: databaseName.tableName.columnName, 或 databaseName.schemaName.tableName.columnName. | |
不适用 | 可选的、以逗号分隔的正则表达式列表,用于指定为数据库列定义的数据类型的完全限定名称。当设置此属性时,对于具有匹配数据类型的列,连接器会发出事件记录,该记录在 schema 中包含以下额外字段:
这些参数分别传播列的原始类型和长度(用于变量带宽类型)。
列的完全限定域名会观察以下格式之一: databaseName.tableName.typeName, 或 databaseName.schemaName.tableName.typeName. 有关 PostgreSQL 特定数据类型名称的列表,请查看 PostgreSQL 数据类型映射。 | |
空字符串 | 指定连接器用来组成自定义消息键的表达式列表,以更改它发布到指定表的 Kafka 主题的事件记录。
默认情况下,Debezium 使用表的主键列作为它发出的记录的消息键。对于缺少主密钥的表,或者指定缺少主密钥的表的密钥,您可以根据一个或多个列配置自定义消息密钥。
每个完全限定表名称都是正则表达式,格式为: 对您用来创建自定义消息键的列数没有限制。但是,最好使用指定唯一密钥所需的最小数量。 | |
all_tables |
仅在使用 | |
bytes |
指定二进制( | |
none |
指定如何调整架构名称,以便与连接器使用的消息转换器兼容。可能的设置:
| |
|
指定在将 Postgres 领导类型转换为 | |
没有默认值 | 可选的、以逗号分隔的正则表达式列表,与您希望连接器捕获的逻辑解码消息前缀的名称匹配。默认情况下,连接器捕获所有逻辑解码信息。当设置此属性时,连接器只捕获具有属性指定的前缀的逻辑解码消息。所有其他逻辑解码信息都会被排除。 要匹配消息前缀的名称,Debebe 应用您指定的正则表达式。也就是说,指定的表达式与整个消息前缀长度匹配;表达式与前缀中可能存在的子字符串不匹配。
如果您在配置中包含此属性,不要设置 有关 消息 事件结构及其排序语义的信息,请参考 消息 事件。 | |
没有默认值 |
可选的、以逗号分隔的正则表达式列表,它与您不想连接器捕获的逻辑解码消息前缀的名称匹配。当设置此属性时,连接器不会捕获使用指定前缀的逻辑解码消息。所有其他消息都会被捕获。 要匹配消息前缀的名称,Debebe 应用您指定的正则表达式。也就是说,指定的表达式与整个消息前缀长度匹配;表达式与前缀中可能存在的子字符串不匹配。
如果您在配置中包含此属性,不要设置 有关 消息 事件结构及其排序语义的信息,请参考 消息 事件。 |
以下 高级配置 属性具有在大多数情况下工作的默认值,因此很少需要在连接器配置中指定。
属性 | 默认 | 描述 |
---|---|---|
没有默认值 |
枚举连接器可以使用 的自定义转换器 实例的符号名称的逗号分隔列表。例如,
您必须设置
对于您为连接器配置的每个转换器,还必须添加一个
例如, isbn.type: io.debezium.test.IsbnConverter
如果要进一步控制配置的转换器的行为,您可以添加一个或多个配置参数将值传递给转换器。要将任何其他配置参数与转换器关联,请将参数名称与转换器的符号链接名称添加前缀。 isbn.schema.name: io.debezium.postgresql.type.Isbn | |
|
指定在连接器启动时执行快照的条件: | |
|
可选的、以逗号分隔的正则表达式列表,与表的完全限定名称(< 要匹配表的名称,Debebe 应用您指定的正则表达式。也就是说,指定的表达式与表的整个名称字符串匹配,它与表名称中可能存在的子字符串不匹配。 | |
| 正整数值,用于指定在执行快照时等待获取表锁定的最长时间(以毫秒为单位)。如果连接器无法在这个时间段内获取表锁定,则快照会失败。连接器如何提供快照 提供详情。 | |
没有默认值 | 指定要包含在快照中的表行。如果您希望快照仅在表中包括行的子集,请使用此属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。
属性包含一个以逗号分隔的表名称列表,格式为 <
在包含 soft-delete 列 "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"
在生成的快照中,连接器仅包含 | |
|
指定连接器在处理事件过程中应如何响应异常: | |
| 正整数值,用于指定连接器进程每个批处理的最大大小。 | |
|
正整数值,用于指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取事件时,它会将事件放置在阻塞队列中,然后再将它们写入 Kafka。当连接器将消息写入 Kafka 或 Kafka 不可用时,阻塞队列可以提供从数据库读取更改事件的后端。当连接器定期记录偏移时,队列中保存的事件会被忽略。始终将 | |
|
较长的整数值,指定块队列的最大卷(以字节为单位)。默认情况下,不会为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正长值。 | |
| 正整数值,用于指定连接器在处理批处理事件前应该等待出现新更改事件的毫秒数。默认值为 500 毫秒。 | |
|
当连接器遇到数据类型未知的字段时,指定连接器行为。默认行为是连接器从更改事件省略字段并记录警告。 注意
当 | |
没有默认值 |
连接器建立与数据库的 JDBC 连接时执行的、以逗号分隔的 SQL 语句列表。要将分号用作字符而不是分隔符,请指定两个连续分号 | |
|
以毫秒为单位向服务器发送复制连接状态更新的频率。 | |
|
控制连接器将心跳信息发送到 Kafka 主题的频率。默认行为是连接器不会发送心跳信息。 | |
没有默认值 |
指定连接器发送心跳消息时连接器在源数据库上执行的查询。 | |
|
指定为表触发刷新内存模式的条件。 | |
没有默认值 | 连接器在连接器启动时应等待的时间(毫秒)。如果您要在集群中启动多个连接器,此属性对于避免快照中断非常有用,这可能会导致连接器重新平衡。 | |
| 在快照中,连接器以行批处理形式读取表内容。此属性指定批处理中的最大行数。 | |
没有默认值 |
用于传递给配置的逻辑解码插件的参数的分号分隔列表。例如, | |
如果连接器配置将
如果没有,则为 | 指明是否清理字段名称以遵循 Avro 命名要求。 | |
| 如果连接到复制插槽失败,这是连续尝试连接的最大次数。 | |
| 连接器无法连接到复制插槽时重试尝试之间等待的时间。 | |
|
指定连接器提供的常量值,以指示原始值是不是由数据库提供的粘贴值。如果 | |
|
确定连接器是否生成带有事务边界的事件,并使用事务元数据增强更改事件。如果您希望连接器进行此操作,请指定 | |
|
确定连接器是否应该提交源 postgres 数据库中已处理的记录的 LSN,以便删除 WAL 日志。如果您不希望连接器进行此操作,请指定 | |
10000 (10 秒) | 在发生可分配错误后重启连接器前要等待的毫秒数量。 | |
|
以逗号分隔的操作类型列表,这些类型将在流期间跳过。操作包括: | |
没有默认值 |
用于向连接器发送信号的数据收集的完全限定名称。 | |
1024 | 连接器在增量快照块期间获取并读取内存的最大行数。增加块大小可提高效率,因为快照会运行更大的快照查询。但是,较大的块大小还需要更多内存来缓冲快照数据。将块大小调整为在您的环境中提供最佳性能的值。 | |
|
从复制插槽中读取 XMIN 的频率(以毫秒为单位)。XMIN 值提供从其开始新复制插槽的下限。默认值为 | |
|
应该用来决定数据更改的主题名称、模式更改、事务、心跳事件等的 TopicNamingStrategy 类的名称,默认为 | |
|
指定主题名称的分隔符,默认为 | |
| 用于在绑定并发哈希映射中保存主题名称的大小。此缓存有助于确定与给定数据收集对应的主题名称。 | |
|
控制连接器向发送心跳消息的主题名称。主题名称具有此模式: | |
|
控制连接器向发送事务元数据消息的主题名称。主题名称具有此模式: |
直通连接器配置属性
连接器还支持在创建 Kafka producer 和消费者时使用的 直通 配置属性。
请务必查阅 Kafka 文档,了解 Kafka 生成者和消费者的所有配置属性。PostgreSQL 连接器 使用新的消费者配置属性。