6.4. 配置通知以报告连接器状态


Debezium 通知提供了一种获取有关连接器的状态信息的机制。通知可发送到以下频道:

SinkNotificationChannel
通过 Connect API 将通知发送到配置的主题。
LogNotificationChannel
通知附加到日志中。
JmxNotificationChannel
通知作为 JMX Bean 中的属性公开。
有关 Debezium 通知的详情,请查看以下主题

6.4.1. Debezium 通知格式的描述

通知消息包含以下信息:

Expand
属性Description

id

分配给通知的唯一标识符。对于增量快照通知,idexecute-snapshot 信号发送相同。

aggregate_type

与通知相关的聚合根的数据类型。在域驱动的设计中,导出的事件应始终引用聚合。

type

提供有关 aggregate_type 字段中指定的事件的状态信息。

additional_data

a Map<String,String>,其中包含有关通知的详细信息。例如,请参阅 Debezium 通知增量快照的进度

timestamp

通知创建的时间。该值表示自 UNIX 时期起的毫秒数。

6.4.2. Debezium 通知类型

Debezium 通知提供有关 初始快照 进度的信息 或增量快照

有关初始快照状态的 Debezium 通知

以下示例显示了提供初始快照状态的典型通知:

{
    "id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
    "aggregate_type": "Initial Snapshot",
    "type": "COMPLETED", 
1

    "additional_data" : {
        "connector_name": "myConnector"
    },
    "timestamp": "1695817046353"
}
Expand
描述

1

type 字段可以包含以下值之一:

  • 完成
  • ABORTED
  • SKIPPED

下表显示了报告初始快照状态的通知中可能出现的不同有效负载的示例:

Expand
Statuspayload

STARTED

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

IN_PROGRESS

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Initial Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1"
   },
   "timestamp": "1695817046353"
}

MongoDB 连接器目前不支持字段 data_collection

TABLE_SCAN_COMPLETED

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Initial Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1, table2",
      "scanned_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED"
   },
   "timestamp": "1695817046353"
}

在前面的示例中,additional_data.status 字段可以包含以下值之一:

SQL_EXCEPTION
执行快照时发生 SQL 异常。
SUCCEEDED
快照成功完成。

MongoDB 连接器目前不支持 fields total_rows_scanneddata_collection

完成

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"COMPLETED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

ABORTED

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"ABORTED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

跳过

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"SKIPPED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

下表显示了可能出现在通知中报告增量快照状态的不同有效负载示例:

Expand
Statuspayload

Start

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Incremental Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      },
      "timestamp": "1695817046353"
}

paused

{
      "id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51",
      "aggregate_type":"Incremental Snapshot",
      "type":"PAUSED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      },
      "timestamp": "1695817046353"
}

resumed

 {
   "id":"a9468204-769d-430f-96d2-b0933d4839f3",
   "aggregate_type":"Incremental Snapshot",
   "type":"RESUMED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2"
   },
   "timestamp": "1695817046353"
}

已停止

{
   "id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c",
   "aggregate_type":"Incremental Snapshot",
   "type":"ABORTED",
   "additional_data":{
      "connector_name":"my-connector"
   },
   "timestamp": "1695817046353"
}

处理块

{
   "id":"d02047d6-377f-4a21-a4e9-cb6e817cf744",
   "aggregate_type":"Incremental Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1",
      "maximum_key":"100",
      "last_processed_key":"50"
   },
   "timestamp": "1695817046353"
}

为表完成快照

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1, table2",
      "scanned_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED"
   },
   "timestamp": "1695817046353"
}

在前面的示例中,additional_data.status 字段可以包含以下值之一:

EMPTY
表不包含任何值。
NO_PRIMARY_KEY
无法完成快照;表没有主密钥。
SKIPPED
无法为这类表完成快照。详情请参考日志。
SQL_EXCEPTION
执行快照时发生 SQL 异常。
SUCCEEDED
快照成功完成。
UNKNOWN_SCHEMA
无法找到表的模式。检查日志中的已知表列表。

完成

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"COMPLETED",
   "additional_data":{
      "connector_name":"my-connector"
   },
   "timestamp": "1695817046353"
}

6.4.3. 启用 Debezium 向通知频道发送事件

要启用 Debezium 来发出通知,请通过设置 notification.enabled.channels 配置属性来指定通知频道列表。默认情况下,以下通知频道可用:

  • sink
  • log
  • jmx
重要

要使用 sink 通知频道,还必须将 notification.sink.topic.name 配置属性设置为您要 Debezium 发送通知的主题名称。

要启用 Debezium 来报告通过 JMX Bean 公开的事件,请完成以下配置步骤:

  1. 启用 JMX MBean 服务器 以公开通知 Bean。
  2. jmx 添加到连接器配置中的 notification.enabled.channels 属性中。
  3. 将您的首选 JMX 客户端连接到 MBean 服务器。

通知通过名为 debezium 的 bean 的 Notifications 属性公开。<connector-type& gt; .management.notifications. <server>

下图显示了报告增量快照开始的通知:

JMX 'Notifications' 属性中的字段

要丢弃通知,请在 bean 上调用 reset 操作。

通知也作为 JMX 通知公开,类型为 debezium.notification。要让应用程序侦听 MBean 发出的 JMX 通知,请将 应用程序订阅到通知

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部