6.5. Debezium コネクターへのシグナル送信
Debezium のシグナルメカニズムを使用すると、コネクターの動作を変更したり、テーブルの アドホックスナップショット を起動するなどの 1 回限りのアクションをトリガーする方法が可能になります。シグナルを使用してコネクターをトリガーし、指定されたアクションを実行するには、次のチャネルの 1 つ以上を使用するようにコネクターを設定できます。
- SourceSignalChannel
- SQL コマンドを発行して、特殊なシグナリングデータコレクションにシグナルメッセージを追加できます。ソースデータベース上に作成するシグナリングデータコレクションは、Debezium との通信専用に設計されています。
- KafkaSignalChannel
- シグナルメッセージを設定可能な Kafka トピックに送信します。
- JmxSignalChannel
-
JMX
signal
操作でシグナルを送信します。 - FileSignalChannel
- ファイルを使用してシグナルを送信できます。Debezium は、新しい ログレコード または アドホックスナップショットレコード がチャネルに追加されたことを検出すると、シグナルを読み取り、要求された操作を開始します。
シグナリングは、以下の Debezium コネクターで使用可能です。
- Db2
- MariaDB (テクノロジープレビュー)
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
signal.enabled.channels
設定プロパティーを設定して、有効にするチャネルを指定できます。このプロパティーには、有効になっているチャネルの名前がリストされます。デフォルトでは、Debezium は source
および kafka
のチャネルを提供します。source
チャネルは、増分スナップショットシグナルに必要なため、デフォルトで有効になっています。
6.5.1. Debezium ソースシグナリングチャネルの有効化
デフォルトでは、Debezium ソースシグナリングチャネルが有効になっています。
使用するコネクターごとにシグナリングを明示的に設定する必要があります。
手順
- ソースデータベースで、コネクターへのシグナル送信用にデータコレクションテーブルを作成します。シグナリングデータコレクションの必要な構造については、シグナリングデータコレクションの構造を参照してください。
- Db2 や SQL Server など、ネイティブな変更データキャプチャ (CDC) メカニズムを実装しているソースデータベースでは、信号テーブルの CDC を有効にします。
-
Debezium コネクターの設定にシグナリングデータコレクションの名前を追加します。
コネクター設定で、プロパティーsignal.data.collection
を追加し、その値を手順 1 で作成したシグナルデータコレクションの完全修飾名に設定します。
例:signal.data.collection = inventory.debezium_signals
.
シグナリングコレクションの完全修飾名のフォーマットは、コネクターによって異なります。
次の例では、各コネクターに使用する名前のフォーマットを示しています。
完全修飾テーブル名
- Db2
-
<schemaName>.<tableName>
- MariaDB (テクノロジープレビュー)
-
<databaseName>.<tableName>
- MongoDB
-
<databaseName>.<collectionName>
- MySQL
-
<databaseName>.<tableName>
- Oracle
-
<databaseName>.<schemaName>.<tableName>
- PostgreSQL
-
<schemaName>.<tableName>
- SQL Server
-
<databaseName>.<schemaName>.<tableName>
signal.data.collection
プロパティーの設定の詳細は、お使いのコネクターの設定プロパティーの表を参照してください。
6.5.1.1. Debezium シグナリングデータ収集の必須構造
シグナルデータコレクションまたはシグナルテーブルは、コネクターに送信して指定の操作をトリガーするシグナルを保存します。シグナリングテーブルの構造は、以下の標準フォーマットに準拠する必要があります。
- 3 つのフィールド (列) があります。
- フィールドは、表 1 のように決まった順序で配置されています。
フィールド | 型 | 説明 |
---|---|---|
|
|
シグナルのインスタンスを識別する任意のユニークな文字列です。 |
|
|
送信する信号の種類を指定します。 |
|
|
シグナルアクションに渡す、JSON 形式のパラメーターを指定します。 |
データコレクションのフィールド名は任意です。前述の表には、推奨される名称が記載されています。異なる命名規則を使用している場合は、各フィールドの値が期待される内容と一致していることを確認してください。
6.5.1.2. Debezium シグナルのデータコレクションの作成
標準 SQL の DDL クエリーをソースデータベースに送信して、シグナリングテーブルを作成します。
前提条件
- ソースデータベースでのテーブル作成に十分なアクセス権限がある。
手順
-
SQL クエリーをソースデータベースに送信して、必須の構造 と合致するテーブルを作成します。例:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
id
変数の VARCHAR
パラメーターに割り当てる容量は、シグナリングテーブルに送信されるシグナルの ID 文字列のサイズを考慮して、十分に確保する必要があります。
ID のサイズが使用可能なスペースを超える場合、コネクターは信号を処理できません。
次の例は、3 列の debezium_signal
テーブルを作成する CREATE TABLE
コマンドです。
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
6.5.2. Debezium Kafka シグナリングチャネルの有効化
Kafka シグナリングチャネルを有効にするには、有効にするチャネルを signal.enabled.channels
設定プロパティーに追加し、シグナルを受信するトピックの名前を signal.kafka.topic
プロパティーに追加します。シグナリングチャネルを有効にすると、シグナルを使用して、設定したシグナルトピックに送信されるように、kafka コンシューマーが作成されます。
コンシューマーに使用できる追加設定
Kafka シグナリングを使用して大部分のコネクターのアドホック増分スナップショットをトリガーするには、まずコネクター設定で source
シグナリングチャネルを有効にする 必要があります。ソースチャネルには、透かしメカニズムが実装されており、増分スナップショットでキャプチャーされ、ストリーミングの再開後に再度キャプチャーされる可能性のあるイベントが重複しないようにします。シグナリングチャネルを使用して、GTID が有効な読み取り専用の MySQL データベースの増分スナップショットをトリガーする場合、ソースチャネルを有効 にする必要はありません。詳細は、MySQL の読み取り専用増分スナップショットを参照してください。
メッセージの形式
Kafka メッセージのキーは、topic.prefix
コネクター設定オプションの値と一致する必要があります。
値は、type
フィールドと data
フィールドが含まれる JSON オブジェクトです。
シグナルタイプが execute-snapshot
に設定されている場合、data
フィールドには次の表にリストされているフィールドが含まれている必要があります。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
| 該当なし |
スナップショットに含めるデータコレクションの完全修飾名と一致するコンマ区切りの正規表現の配列。 |
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
|
以下の例は、典型的な execute-snapshot
Kafka メッセージを示しています。
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
6.5.3. Debezium JMX シグナリングチャネルの有効化
コネクター設定の signal.enabled.channels
プロパティーに jmx
を追加してから、JMX MBean Server がシグナル Bean を公開することで、JMX シグナリングを有効にできます。
手順
- 任意の JMX クライアントを使用して (例:JConsole または JDK Mission Control) MBean サーバーに接続します。
Mbean
debezium.<connector-type>.management.signals.<server>
を検索します。Mbean は、以下の入力パラメーターを受け入れるsignal
操作を公開します。- p0
- シグナルの ID。
- p1
-
シグナルのタイプ (例:
execute-snapshot)
。 - p2
- 指定されたシグナルタイプに関する追加情報を含む JSON データフィールド。
入力パラメーターの値を指定して
execute-snapshot
シグナルを送信します。
JSON data フィールドに、以下の表に記載されている情報を含めます。表6.9 スナップショットデータフィールドの実行 フィールド デフォルト 値 type
incremental
実行するスナップショットのタイプ。現在、Debezium は
incremental
型とblocking
型をサポートしています。data-collections
該当なし
スナップショットに含める テーブルの完全修飾名と一致するコンマ区切りの 正規表現の配列。
additional-conditions
該当なし
コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
各追加条件は、アドホックスナップショットがキャプチャーするデータをフィルタリングする基準を指定するオブジェクトです。各追加条件には次のプロパティーを設定できます。data-collection
- フィルターが適用されるデータコレクションの完全修飾名。各データ収集に異なるフィルターを適用できます。
filter
スナップショットに含めるためにデータベースレコードに存在する必要がある列の値を指定します (例:
"color='blue'"
)。
スナップショットプロセスは、データコレクションのレコードをフィルター
値と照合し、一致する値が含まれるレコードのみをキャプチャーします。filter
プロパティーに割り当てる特定の値は、アドホックスナップショットの種類によって異なります。-
増分スナップショットの場合、スナップショットがクエリーの条件句に追加する検索条件フラグメントを指定します (例:
"color='blue'"
)。 -
ブロッキングスナップショットの場合、
snapshot.select.statement.overrides
プロパティーで設定するものなど、完全なSELECT
ステートメントを指定します。
-
増分スナップショットの場合、スナップショットがクエリーの条件句に追加する検索条件フラグメントを指定します (例:
以下の図は、JConsole を使用してシグナルを送信する方法の例を示しています。
6.5.4. Debezium シグナルアクションの種類
シグナリングを使用して、以下のアクションを起こすことができます。
一部の信号はすべてのコネクターに対応していません。
6.5.4.1. ロギング信号
log
シグナルタイプのシグナリングテーブルエントリーを作成することで、ログにエントリーを追加するようコネクターに要求できます。シグナルの処理後、コネクターは指定されたメッセージをログに出力します。オプションとして、結果として得られるメッセージにストリーミング座標が含まれるようにシグナルを設定することもできます。
列 | 値 | 説明 |
---|---|---|
id |
| |
type |
| シグナルのアクションタイプです。 |
data |
{"message": "Signal message at offset {}"} |
|
6.5.4.2. アドホックスナップショットシグナル
execute-snapshot
シグナルタイプのシグナルを作成すると、アドホックスナップショットの開始をコネクターに要求できます。信号を処理した後、コネクターは要求されたスナップショットオペレーションを実行します。
コネクターが最初に起動したときに実行される最初のスナップショットとは異なり、アドホックスナップショットは、コネクターがすでにデータベースからの変更イベントのストリーミングを開始した後のランタイム中に発生します。いつでもアドホックなスナップショットを開始することができます。
アドホックスナップショットは、以下の Debezium コネクターで利用可能です。
- Db2
- MariaDB (テクノロジープレビュー)
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 値 |
---|---|
id |
|
type |
|
data |
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]} |
キー | 値 |
---|---|
test_connector |
{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"}]}} |
アドホックスナップショットの詳細は、お使いのコネクターのドキュメントの スナップショット のトピックを参照してください。
関連情報
アドホックスナップショット停止シグナル
stop-snapshot
シグナルタイプでシグナルテーブルエントリーを作成することにより、進行中のアドホックスナップショットを停止するようコネクターに要求できます。シグナルを処理した後、コネクターは現在進行中のスナップショット操作を停止します。
次の Debezium コネクターのアドホックスナップショットを停止できます。
- Db2
- MariaDB (テクノロジープレビュー)
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 値 |
---|---|
id |
|
type |
|
data |
{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]} |
シグナルの type
を指定する必要があります。data-collections
フィールドはオプションです。現在のスナップショットのすべてのアクティビティーを停止するようコネクターに要求するには、data-collections
フィールドを空白のままにします。増分スナップショットを続行したいが、スナップショットから特定のコレクションを除外したい場合は、除外するコレクションまたは正規表現の名前のコンマ区切りリストを指定します。コネクターが信号を処理した後、増分スナップショットが続行されますが、指定したコレクションからデータが除外されます。
6.5.4.3. 増分スナップショット
増分スナップショットはアドホックスナップショットの一種です。増分スナップショットでは、初期スナップショットと同様に、指定したテーブルのベースライン状態をキャプチャします。しかし、初期スナップショットとは異なり、増分スナップショットでは、一度にすべてのテーブルをキャプチャーするのではなく、チャンク単位でテーブルをキャプチャします。このコネクターは、スナップショットの進捗状況を追跡するために、電子透かしを使用しています。
増分スナップショットは、指定されたテーブルの初期状態を単一のモノリシックな操作ではなくチャンクでキャプチャーすることにより、初期スナップショットのプロセスに比べて以下のような利点があります。
- コネクターが指定されたテーブルのベースライン状態をキャプチャしている間、トランザクションログからのほぼリアルタイムのイベントのストリーミングは中断することなく継続されます。
- 増分スナップショットの処理が中断されても、中断した時点から再開することができます。
- 増分スナップショットはいつでも開始できます。
増分スナップショットの一時停止シグナル
pause-snapshot
シグナルタイプを使用してシグナルテーブルエントリーを作成することにより、進行中の増分スナップショットを一時停止するようコネクターに要求できます。信号を処理した後、コネクターは現在進行中のスナップショット操作の一時停止を停止します。そのため、信号の処理中の位置でスナップショット処理が一時停止されるため、データ収集を指定することはできません。
次の Debezium コネクターの増分スナップショットを一時停止できます。
- Db2
- MariaDB (テクノロジープレビュー)
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 値 |
---|---|
id |
|
type |
|
シグナルの type
を指定する必要があります。data
フィールドは無視されます。
増分スナップショットの一時停止シグナル
一時停止した増分スナップショットを再開するようコネクターに要求するには、resume-snapshot
シグナルタイプでシグナルテーブルエントリーを作成します。信号を処理した後、コネクターは以前に一時停止したスナップショット操作を再開します。
次の Debezium コネクターの増分スナップショットを再開できます。
- Db2
- MariaDB (テクノロジープレビュー)
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 値 |
---|---|
id |
|
type |
|
シグナルの type
を指定する必要があります。data
フィールドは無視されます。
増分スナップショットの詳細は、お使いのコネクターのドキュメントのスナップショットのトピックを参照してください。
6.5.4.4. ブロッキングスナップショットシグナル
execute-snapshot
シグナルタイプと、値 blocking
を持つ data.type
でシグナルを作成すると、アドホックブロッキングスナップショットの開始をコネクターに要求できます。信号を処理した後、コネクターは要求されたスナップショットオペレーションを実行します。
コネクターが最初に起動したときに実行される初期スナップショットとは異なり、アドホックブロッキングスナップショットは、コネクターがデータベースからの変更イベントのストリーミングを停止した後のランタイム中に発生します。アドホックブロッキングスナップショットはいつでも開始できます。
ブロッキングスナップショットは、以下の Debezium コネクターで利用可能です。
- Db2
- MariaDB (テクノロジープレビュー)
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 値 |
---|---|
id |
|
type |
|
data |
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]} |
キー | 値 |
---|---|
test_connector |
{"type":"execute-snapshot","data": {"type": "blocking"} |
ブロッキングスナップショットの詳細は、お使いのコネクターのドキュメントの スナップショット のトピックを参照してください。
6.5.4.5. カスタムシグナルアクションの定義
カスタムアクションを使用すると、Debezium シグナリングフレームワークを拡張して、デフォルトの実装では使用できないアクションをトリガーできます。カスタムアクションは複数のコネクターで使用できます。
カスタムシグナルアクションを定義するには、次のインターフェイスを定義する必要があります。
@FunctionalInterface public interface SignalAction<P extends Partition> { /** * @param signalPayload the content of the signal * @return true if the signal was processed */ boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException; }
io.debezium.pipeline.signal.actions.SignalAction
は、シグナリングチャネルを介して送信されるメッセージペイロードを表す 1 つのパラメーターを持つ、単一のメソッドを公開します。
カスタムシグナリングアクションを定義した後、SPI インターフェイス io.debezium.pipeline.signal.actions.SignalActionProvider
を使用して、カスタムアクションをシグナリングメカニズムで使用できるようにします。
public interface SignalActionProvider { /** * Create a map of signal action where the key is the name of the action. * * @param dispatcher the event dispatcher instance * @param connectorConfig the connector config * @return a concrete action */ <P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig); }
実装では、シグナルアクションのマップを返すはずです。マップキーはアクションの名前に設定します。キーはシグナルの type
として使用されます。
6.5.4.6. Debezium コアモジュールの依存関係
カスタムアクション Java プロジェクトには、Debezium コアモジュールに対するコンパイル依存関係があります。プロジェクトの pom.xml
ファイルに次のコンパイル依存関係を含めます。
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version> 1
</dependency>
META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider
ファイルでプロバイダーの実装を宣言します。
6.5.4.7. カスタムシグナルアクションのデプロイ
前提条件
- カスタムアクション Java プログラムがある。
手順
-
Debezium コネクターでカスタムアクションを使用するには、Java プロジェクトを JAR ファイルにエクスポートし、そのファイルを使用する各 Debezium コネクターの JAR ファイルを含むディレクトリーにコピーします。
たとえば、一般的なデプロイメントでは、Debezium コネクターファイルは Kafka Connect ディレクトリーのサブディレクトリー (/kafka/connect
) に保存され、各コネクター JAR は独自のサブディレクトリー (/kafka/connect/debezium-connector-db2
、/kafka/connect/debezium-connector-mysql
など) に格納されます。
複数のコネクターでカスタムアクションを使用するには、各コネクターのサブディレクトリーにカスタムシグナリングチャネル JAR ファイルのコピーを配置する必要があります。