11.5. Debezium コネクターへのシグナル送信


Debezium のシグナルメカニズムを使用すると、コネクターの動作を変更したり、テーブルの アドホックスナップショット を起動するなどの 1 回限りのアクションをトリガーする方法が可能になります。シグナルを使用してコネクターをトリガーし、指定されたアクションを実行するには、次のチャネルの 1 つ以上を使用するようにコネクターを設定できます。

SourceSignalChannel
SQL コマンドを発行して、特殊なシグナリングデータコレクションにシグナルメッセージを追加できます。ソースデータベース上に作成するシグナリングデータコレクションは、Debezium との通信専用に設計されています。
KafkaSignalChannel
シグナルメッセージを設定可能な Kafka トピックに送信します。
JmxSignalChannel
JMX signal 操作でシグナルを送信します。
FileSignalChannel
ファイルを使用してシグナルを送信できます。Debezium は、新しい ログレコード または アドホックスナップショットレコード がチャネルに追加されたことを検出すると、シグナルを読み取り、要求された操作を開始します。

シグナリングは、以下の Debezium コネクターで使用可能です。

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

signal.enabled.channels 設定プロパティーを設定して、有効にするチャネルを指定できます。このプロパティーには、有効になっているチャネルの名前がリストされます。デフォルトでは、Debezium は source および kafka のチャネルを提供します。source チャネルは、増分スナップショットシグナルに必要なため、デフォルトで有効になっています。

11.5.1. Debezium ソースシグナリングチャネルの有効化

デフォルトでは、Debezium ソースシグナリングチャネルが有効になっています。

使用するコネクターごとにシグナリングを明示的に設定する必要があります。

手順

  1. ソースデータベースで、コネクターへのシグナル送信用にデータコレクションテーブルを作成します。シグナリングデータコレクションの必要な構造については、シグナリングデータコレクションの構造を参照してください。
  2. Db2 や SQL Server など、ネイティブな変更データキャプチャ (CDC) メカニズムを実装しているソースデータベースでは、信号テーブルの CDC を有効にします。
  3. Debezium コネクターの設定にシグナリングデータコレクションの名前を追加します。
    コネクター設定で、プロパティー signal.data.collection を追加し、その値を手順 1 で作成したシグナルデータコレクションの完全修飾名に設定します。

    例: signal.data.collection = inventory.debezium_signals.

    シグナリングコレクションの完全修飾名のフォーマットは、コネクターによって異なります。
    次の例では、各コネクターに使用する名前のフォーマットを示しています。

    Db2
    <schemaName>.<tableName>
    MongoDB
    <databaseName>.<collectionName>
    MySQL
    <databaseName>.<tableName>
    Oracle
    <databaseName>.<schemaName>.<tableName>
    PostgreSQL
    <schemaName>.<tableName>
    SQL Server
    <databaseName>.<schemaName>.<tableName>

    signal.data.collection プロパティーの設定の詳細は、お使いのコネクターの設定プロパティーの表を参照してください。

11.5.1.1. Debezium シグナリングデータ収集の必須構造

シグナルデータコレクションまたはシグナルテーブルは、コネクターに送信して指定の操作をトリガーするシグナルを保存します。シグナリングテーブルの構造は、以下の標準フォーマットに準拠する必要があります。

  • 3 つのフィールド (列) があります。
  • フィールドは、表 1 のように決まった順序で配置されています。
表11.7 シグナリングデータ収集の必須構造
フィールドタイプ説明

id
(必須)

string

シグナルのインスタンスを識別する任意のユニークな文字列です。
シグナリングテーブルに登録するシグナルには、それぞれ ID を割り当てます。
通常、ID は UUID 文字列です。
シグナルインスタンスは、ロギング、デバッグ、デデュープなどに使用できます。
シグナルによって Debezium が増分スナップショットを実行すると、任意の id 文字列を持つシグナルメッセージが生成されます。生成されたメッセージに含まれる id 文字列が、送信されたシグナルの id 文字列と一致しません。

type
(必須)

string

送信する信号の種類を指定します。
信号の種類によっては、信号が利用可能なすべてのコネクターで使用できますが、他の信号の種類は特定のコネクターでのみ使用できます。

data
(オプション)

string

シグナルアクションに渡す、JSON 形式のパラメーターを指定します。
それぞれの信号タイプには、特定のデータセットが必要です。

注記

データコレクションのフィールド名は任意です。前述の表には、推奨される名称が記載されています。異なる命名規則を使用している場合は、各フィールドの値が期待される内容と一致していることを確認してください。

11.5.1.2. Debezium シグナルのデータコレクションの作成

標準 SQL の DDL クエリーをソースデータベースに送信して、シグナリングテーブルを作成します。

前提条件

  • ソースデータベースでのテーブル作成に十分なアクセス権限がある。

手順

  • SQL クエリーをソースデータベースに送信して、必須の構造 と合致するテーブルを作成します。例:

    CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
注記

id 変数の VARCHAR パラメーターに割り当てる容量は、シグナリングテーブルに送信されるシグナルの ID 文字列のサイズを考慮して、十分に確保する必要があります。
ID のサイズが使用可能なスペースを超える場合、コネクターは信号を処理できません。

次の例は、3 列の debezium_signal テーブルを作成する CREATE TABLE コマンドです。

CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

11.5.2. Debezium Kafka シグナリングチャネルの有効化

Kafka シグナリングチャネルを有効にするには、有効にするチャネルを signal.enabled.channels 設定プロパティーに追加し、シグナルを受信するトピックの名前を signal.kafka.topic プロパティーに追加します。シグナリングチャネルを有効にすると、シグナルを使用して、設定したシグナルトピックに送信されるように、kafka コンシューマーが作成されます。

注記

Kafka シグナリングを使用して大部分のコネクターのアドホック増分スナップショットをトリガーするには、まずコネクター設定で source シグナリングチャネルを有効にする 必要があります。ソースチャネルには、透かしメカニズムが実装されており、増分スナップショットでキャプチャーされ、ストリーミングの再開後に再度キャプチャーされる可能性のあるイベントが重複しないようにします。シグナリングチャネルを使用して、GTID が有効になっている 読み取り専用 MySQL データベースの増分スナップショットをトリガーする場合は、ソースチャネルを有効にする必要はありません。詳細は、MySQL 読み取り専用増分スナップショット を参照してください。

メッセージの形式

Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。

値は、type フィールドと data フィールドが含まれる JSON オブジェクトです。

シグナルタイプが execute-snapshot に設定されている場合、data フィールドには次の表にリストされているフィールドが含まれている必要があります。

表11.8 スナップショットデータフィールドの実行
フィールドデフォルト

type

incremental

実行するスナップショットのタイプ。現在、Debezium は incremental 型と blocking 型をサポートしています。

data-collections

該当なし

スナップショットに含めるデータコレクションの完全修飾名と一致する、コンマ区切りの正規表現の配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

additional-condition

該当なし

コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する条件を指定するオプションの文字列。

注記

このプロパティーは非推奨であり、additional-conditions プロパティーに置き換える必要があります。

additional-conditions

該当なし

コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
各追加条件は、アドホックスナップショットがキャプチャーするデータをフィルタリングする基準を指定するオブジェクトです。各追加条件には次のプロパティーを設定できます。

data-collection
フィルターを適用する {data-collection} の完全修飾名。各 {data-collection} に異なるフィルターを適用できます。
filter

スナップショットに含めるためにデータベースレコードに存在する必要がある列の値を指定します (例: "color='blue'")。
スナップショットプロセスは、{data-collection} 内のレコードを filter 値に対して評価し、一致する値を含むレコードのみをキャプチャーします。

filter プロパティーに割り当てる特定の値は、アドホックスナップショットの種類によって異なります。

  • 増分スナップショットの場合、スナップショットがクエリーの条件句に追加する検索条件フラグメントを指定します (例: "color='blue'")。
  • ブロッキングスナップショットの場合、snapshot.select.statement.overrides プロパティーで設定するものなど、完全な SELECT ステートメントを指定します。

以下の例は、典型的な execute-snapshot Kafka メッセージを示しています。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

11.5.3. Debezium JMX シグナリングチャネルの有効化

JMX シグナリングを有効にするには、コネクター設定の signal.enabled.channels プロパティーに jmx を追加し、JMX MBean サーバーがシグナリング Bean を公開できる ようにします。

手順

  1. 任意の JMX クライアントを使用して (例:JConsole または JDK Mission Control) MBean サーバーに接続します。
  2. Mbean debezium.<connector-type>.management.signals.<server> を検索します。Mbean は、以下の入力パラメーターを受け入れる signal 操作を公開します。

    p0
    シグナルの ID。
    p1
    シグナルのタイプ (例: execute-snapshot)
    p2
    指定されたシグナルタイプに関する追加情報を含む JSON データフィールド。
  3. 入力パラメーターの値を指定して execute-snapshot シグナルを送信します。
    JSON data フィールドに、以下の表に記載されている情報を含めます。

    表11.9 スナップショットデータフィールドの実行
    フィールドデフォルト

    type

    incremental

    実行するスナップショットのタイプ。現在、Debezium は incremental 型と blocking 型をサポートしています。

    data-collections

    該当なし

    スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。
    signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。

    additional-condition

    該当なし

    コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する条件を指定するオプションの文字列。

    注記

    このプロパティーは非推奨であり、additional-conditions プロパティーに置き換える必要があります。

    additional-conditions

    該当なし

    コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
    各追加条件は、アドホックスナップショットがキャプチャーするデータをフィルタリングする基準を指定するオブジェクトです。各追加条件には次のプロパティーを設定できます。

    data-collection
    フィルターを適用する {data-collection} の完全修飾名。各 {data-collection} に異なるフィルターを適用できます。
    filter

    スナップショットに含めるためにデータベースレコードに存在する必要がある列の値を指定します (例: "color='blue'")。
    スナップショットプロセスは、{data-collection} 内のレコードを filter 値に対して評価し、一致する値を含むレコードのみをキャプチャーします。

    filter プロパティーに割り当てる特定の値は、アドホックスナップショットの種類によって異なります。

    • 増分スナップショットの場合、スナップショットがクエリーの条件句に追加する検索条件フラグメントを指定します (例: "color='blue'")。
    • ブロッキングスナップショットの場合、snapshot.select.statement.overrides プロパティーで設定するものなど、完全な SELECT ステートメントを指定します。

以下の図は、JConsole を使用してシグナルを送信する方法の例を示しています。

JConsole を使用して `execute-snapshot` シグナルを送信する

11.5.4. Debezium シグナルアクションの種類

シグナリングを使用して、以下のアクションを起こすことができます。

一部の信号はすべてのコネクターに対応していません。

11.5.4.1. ロギング信号

log シグナルタイプのシグナリングテーブルエントリーを作成することで、ログにエントリーを追加するようコネクターに要求できます。シグナルの処理後、コネクターは指定されたメッセージをログに出力します。オプションとして、結果として得られるメッセージにストリーミング座標が含まれるようにシグナルを設定することもできます。

表11.10 ログメッセージを追加するためのシグナリングレコードの例
説明

id

924e3ff8-2245-43ca-ba77-2af9af02fa07

 

type

log

シグナルのアクションタイプです。

data

{"message": "Signal message at offset {}"}

message パラメーターは、ログに出力する文字列を指定します。
メッセージにプレースホルダー ({}) を追加した場合は、ストリーミング座標に置き換えられます。

11.5.4.2. アドホックスナップショットシグナル

execute-snapshot シグナルタイプのシグナルを作成すると、アドホックスナップショットの開始をコネクターに要求できます。信号を処理した後、コネクターは要求されたスナップショットオペレーションを実行します。

コネクターが最初に起動したときに実行される最初のスナップショットとは異なり、アドホックスナップショットは、コネクターがすでにデータベースからの変更イベントのストリーミングを開始した後のランタイム中に発生します。いつでもアドホックなスナップショットを開始することができます。

アドホックスナップショットは、以下の Debezium コネクターで利用可能です。

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表11.11 アドホックスナップショットシグナルレコードの例

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

execute-snapshot

data

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
表11.12 アドホックスナップショットシグナルメッセージの例
キー

test_connector

{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"]}}

アドホックスナップショットの詳細は、お使いのコネクターのドキュメントの スナップショット のトピックを参照してください。

アドホックスナップショット停止シグナル

stop-snapshot シグナルタイプでシグナルテーブルエントリーを作成することにより、進行中のアドホックスナップショットを停止するようコネクターに要求できます。シグナルを処理した後、コネクターは現在進行中のスナップショット操作を停止します。

次の Debezium コネクターのアドホックスナップショットを停止できます。

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表11.13 停止アドホックスナップショットシグナルレコードの例

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

stop-snapshot

data

{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}

シグナルの type を指定する必要があります。data-collections フィールドはオプションです。現在のスナップショットのすべてのアクティビティーを停止するようコネクターに要求するには、data-collections フィールドを空白のままにします。増分スナップショットを続行したいが、スナップショットから特定のコレクションを除外したい場合は、除外するコレクションまたは正規表現の名前のコンマ区切りリストを指定します。コネクターが信号を処理した後、増分スナップショットが続行されますが、指定したコレクションからデータが除外されます。

11.5.4.3. 増分スナップショット

増分スナップショットはアドホックスナップショットの一種です。増分スナップショットでは、初期スナップショットと同様に、指定したテーブルのベースライン状態をキャプチャします。しかし、初期スナップショットとは異なり、増分スナップショットでは、一度にすべてのテーブルをキャプチャーするのではなく、チャンク単位でテーブルをキャプチャします。このコネクターは、スナップショットの進捗状況を追跡するために、電子透かしを使用しています。

増分スナップショットは、指定されたテーブルの初期状態を単一のモノリシックな操作ではなくチャンクでキャプチャーすることにより、初期スナップショットのプロセスに比べて以下のような利点があります。

  • コネクターが指定されたテーブルのベースライン状態をキャプチャしている間、トランザクションログからのほぼリアルタイムのイベントのストリーミングは中断することなく継続されます。
  • 増分スナップショットの処理が中断されても、中断した時点から再開することができます。
  • 増分スナップショットはいつでも開始できます。

増分スナップショットの一時停止シグナル

pause-snapshot シグナルタイプを使用してシグナルテーブルエントリーを作成することにより、進行中の増分スナップショットを一時停止するようコネクターに要求できます。信号を処理した後、コネクターは現在進行中のスナップショット操作の一時停止を停止します。そのため、信号の処理中の位置でスナップショット処理が一時停止されるため、データ収集を指定することはできません。

次の Debezium コネクターの増分スナップショットを一時停止できます。

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表11.14 一時停止増分スナップショットシグナルレコードの例

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

pause-snapshot

シグナルの type を指定する必要があります。data フィールドは無視されます。

増分スナップショットの一時停止シグナル

一時停止した増分スナップショットを再開するようコネクターに要求するには、resume-snapshot シグナルタイプでシグナルテーブルエントリーを作成します。信号を処理した後、コネクターは以前に一時停止したスナップショット操作を再開します。

次の Debezium コネクターの増分スナップショットを再開できます。

  • Db2
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表11.15 再開増分スナップショットシグナルレコードの例

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

resume-snapshot

シグナルの type を指定する必要があります。data フィールドは無視されます。

増分スナップショットの詳細は、お使いのコネクターのドキュメントのスナップショットのトピックを参照してください。

11.5.4.4. ブロッキングスナップショットシグナル

execute-snapshot シグナルタイプと、値 blocking を持つ data.type でシグナルを作成すると、アドホックブロッキングスナップショットの開始をコネクターに要求できます。信号を処理した後、コネクターは要求されたスナップショットオペレーションを実行します。

コネクターが最初に起動したときに実行される初期スナップショットとは異なり、アドホックブロッキングスナップショットは、コネクターがデータベースからの変更イベントのストリーミングを停止した後のランタイム中に発生します。アドホックブロッキングスナップショットはいつでも開始できます。

ブロッキングスナップショットは、以下の Debezium コネクターで利用可能です。

  • Db2
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server
表11.16 ブロッキングスナップショットシグナルレコードの例

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

execute-snapshot

data

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
表11.17 ブロッキングスナップショットシグナルメッセージの例
キー

test_connector

{"type":"execute-snapshot","data": {"type": "blocking"}

ブロッキングスナップショットの詳細は、お使いのコネクターのドキュメントの スナップショット のトピックを参照してください。

11.5.4.5. カスタムシグナルアクションの定義

カスタムアクションを使用すると、Debezium シグナリングフレームワークを拡張して、デフォルトの実装では使用できないアクションをトリガーできます。カスタムアクションは複数のコネクターで使用できます。

カスタムシグナルアクションを定義するには、次のインターフェイスを定義する必要があります。

@FunctionalInterface
public interface SignalAction<P extends Partition> {

    /**
     * @param signalPayload the content of the signal
     * @return true if the signal was processed
     */
    boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException;
}

io.debezium.pipeline.signal.actions.SignalAction は、シグナリングチャネルを介して送信されるメッセージペイロードを表す 1 つのパラメーターを持つ、単一のメソッドを公開します。

カスタムシグナリングアクションを定義した後、SPI インターフェイス io.debezium.pipeline.signal.actions.SignalActionProvider を使用して、カスタムアクションをシグナリングメカニズムで使用できるようにします。

public interface SignalActionProvider {

    /**
     * Create a map of signal action where the key is the name of the action.
     *
     * @param dispatcher the event dispatcher instance
     * @param connectorConfig the connector config
     * @return a concrete action
     */

    <P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig);
}

実装では、シグナルアクションのマップを返すはずです。マップキーはアクションの名前に設定します。キーはシグナルの type として使用されます。

11.5.4.6. Debezium コアモジュールの依存関係

カスタムアクション Java プロジェクトには、Debezium コアモジュールに対するコンパイル依存関係があります。プロジェクトの pom.xml ファイルに次のコンパイル依存関係を含めます。

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version> 1
</dependency>
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
${version.debezium} は Debezium コネクターのバージョンを表します。

META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider ファイルでプロバイダーの実装を宣言します。

11.5.4.7. カスタムシグナルアクションのデプロイ

前提条件

  • カスタムアクション Java プログラムがある。

手順

  • Debezium コネクターでカスタムアクションを使用するには、Java プロジェクトを JAR ファイルにエクスポートし、そのファイルを使用する各 Debezium コネクターの JAR ファイルを含むディレクトリーにコピーします。

    たとえば、一般的なデプロイメントでは、Debezium コネクターファイルは Kafka Connect ディレクトリーのサブディレクトリー (/kafka/connect) に保存され、各コネクター JAR は独自のサブディレクトリー (/kafka/connect/debezium-connector-db2/kafka/connect/debezium-connector-mysql など) に格納されます。
注記

複数のコネクターでカスタムアクションを使用するには、各コネクターのサブディレクトリーにカスタムシグナリングチャネル JAR ファイルのコピーを配置する必要があります。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.