搜索

11.4. 配置通知以报告连接器状态

download PDF

Debezium 通知提供了一种机制来获取连接器的状态信息。通知可以发送到以下频道:

SinkNotificationChannel
通过 Connect API 向配置的主题发送通知。
LogNotificationChannel
通知附加到日志中。
JmxNotificationChannel
通知作为 JMX bean 中的属性公开。
有关 Debezium 通知的详情,请查看以下主题

11.4.1. Debezium 通知的格式描述

通知消息包含以下信息:

属性Description

id

分配给通知的唯一标识符。对于增量快照通知,idexecute-snapshot 信号一同发送。

aggregate_type

与通知相关的聚合根的数据类型。在域驱动的设计中,导出的事件应始终引用聚合。

type

提供有关 aggregate_type 字段中指定的事件的状态信息。

additional_data

一个 Map<String,String>,其中包含有关通知的详细信息。例如,请参阅 Debezium 通知有关增量快照的进度

timestamp

通知创建的时间。该值代表自 UNIX epoch 起的毫秒数。

11.4.2. Debezium 通知的类型

Debezium 通知提供有关 初始快照 或增量快照 进度的信息。

有关初始快照状态的 Debezium 通知

以下示例显示了提供初始快照状态的典型通知:

{
    "id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
    "aggregate_type": "Initial Snapshot",
    "type": "COMPLETED", 1
    "additional_data" : {
        "connector_name": "myConnector"
    },
    "timestamp": "1695817046353"
}
描述

1

type 字段可以包含以下值之一:

  • 完成
  • 中止
  • 跳过

下表显示了在通知中报告初始快照状态的不同有效负载示例:

Statuspayload

STARTED

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

IN_PROGRESS

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Initial Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1"
   },
   "timestamp": "1695817046353"
}

MongoDB 连接器目前不支持 field data_collection

TABLE_SCAN_COMPLETED

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Initial Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1, table2",
      "scanned_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED"
   },
   "timestamp": "1695817046353"
}

在上例中,additional_data.status 字段可以包含以下值之一:

SQL_EXCEPTION
执行快照时发生 SQL 异常。
SUCCEEDED
快照成功完成。

MongoDB 连接器目前不支持字段 total_rows_scanneddata_collection

完成

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"COMPLETED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

ABORTED

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"ABORTED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

跳过

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"SKIPPED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

11.4.2.1. 示例: Debezium 通知,该通知报告增量快照的进度

下表显示了在通知中报告增量快照状态的不同有效负载示例:

Statuspayload

Start

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Incremental Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      },
      "timestamp": "1695817046353"
}

paused

{
      "id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51",
      "aggregate_type":"Incremental Snapshot",
      "type":"PAUSED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      },
      "timestamp": "1695817046353"
}

resumed

 {
   "id":"a9468204-769d-430f-96d2-b0933d4839f3",
   "aggregate_type":"Incremental Snapshot",
   "type":"RESUMED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2"
   },
   "timestamp": "1695817046353"
}

stopped

{
   "id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c",
   "aggregate_type":"Incremental Snapshot",
   "type":"ABORTED",
   "additional_data":{
      "connector_name":"my-connector"
   },
   "timestamp": "1695817046353"
}

处理块

{
   "id":"d02047d6-377f-4a21-a4e9-cb6e817cf744",
   "aggregate_type":"Incremental Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1",
      "maximum_key":"100",
      "last_processed_key":"50"
   },
   "timestamp": "1695817046353"
}

快照为表完成

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1, table2",
      "scanned_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED"
   },
   "timestamp": "1695817046353"
}

在上例中,additional_data.status 字段可以包含以下值之一:

EMPTY
表不包含值。
NO_PRIMARY_KEY
无法完成快照;表没有主键。
跳过
无法为这类表完成快照。详情请参考日志。
SQL_EXCEPTION
执行快照时发生 SQL 异常。
SUCCEEDED
快照成功完成。
UNKNOWN_SCHEMA
无法找到表的 schema。检查日志,以查看已知表的列表。

完成

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"COMPLETED",
   "additional_data":{
      "connector_name":"my-connector"
   },
   "timestamp": "1695817046353"
}

11.4.3. 启用 Debezium 将事件发送到通知频道

要启用 Debezium 来发出通知,请通过设置 notification.enabled.channels 配置属性来指定通知频道列表。默认情况下,提供了以下通知频道:

  • sink
  • log
  • jmx
重要

要使用 sink 通知频道,还必须将 notification.sink.topic.name 配置属性设置为您要 Debezium 发送通知的主题名称。

11.4.3.1. 启用 Debezium 通知以报告通过 JMX Bean 公开的事件

要启用 Debezium 报告通过 JMX Bean 公开的事件,请完成以下步骤:

  1. 启用 JMX MBean 服务器 以公开通知 bean。
  2. 在连接器配置中的 notification.enabled.channels 属性中添加 jmx
  3. 将您的首选 JMX 客户端连接到 MBean 服务器。

通知通过带有名称 debezium. <connector-type> .management.notifications. < server > 的 bean 的 Notifications 属性公开。

下图显示了报告增量快照开始的通知:

JMX 'Notifications' 属性中的字段

要丢弃通知,请在 bean 上调用 reset 操作。

该通知也作为 JMX 通知公开,类型为 debezium.notification。要让应用程序侦听 MBean 发出的 JMX 通知,请将 应用程序订阅到通知

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.