11.5. Debezium コネクターへのシグナル送信
Debezium のシグナルメカニズムを使用すると、コネクターの動作を変更したり、テーブルの アドホックスナップショット を起動するなどの 1 回限りのアクションをトリガーする方法が可能になります。シグナルを使用してコネクターをトリガーし、指定されたアクションを実行するには、次のチャネルの 1 つ以上を使用するようにコネクターを設定できます。
- SourceSignalChannel
- SQL コマンドを発行して、特殊なシグナリングデータコレクションにシグナルメッセージを追加できます。ソースデータベース上に作成するシグナリングデータコレクションは、Debezium との通信専用に設計されています。
- KafkaSignalChannel
- シグナルメッセージを設定可能な Kafka トピックに送信します。
- JmxSignalChannel
-
JMX
signal
操作でシグナル を送信します。Debezium は、新しい ログレコード または アドホックスナップショットレコード がチャネルに追加されたことを検出すると、シグナルを読み取り、要求された操作を開始します。
シグナリングは、以下の Debezium コネクターで使用可能です。
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
signal.enabled.channels
設定プロパティーを設定して、有効にするチャネルを指定できます。このプロパティーには、有効になっているチャネルの名前がリストされます。デフォルトでは、Debezium は source
および kafka
のチャネルを提供します。source
チャネルは、増分スナップショットシグナルに必要なため、デフォルトで有効になっています。
11.5.1. Debezium ソースシグナリングチャネルの有効化
デフォルトでは、Debezium ソースシグナリングチャネルが有効になっています。
使用するコネクターごとにシグナリングを明示的に設定する必要があります。
手順
- ソースデータベースで、コネクターへのシグナル送信用にデータコレクションテーブルを作成します。シグナリングデータコレクションの必要な構造については、シグナリングデータコレクションの構造を参照してください。
- Db2 や SQL Server など、ネイティブな変更データキャプチャ (CDC) メカニズムを実装しているソースデータベースでは、信号テーブルの CDC を有効にします。
Debezium コネクターの設定にシグナリングデータコレクションの名前を追加します。
コネクター設定で、プロパティーsignal.data.collection
を追加し、その値を手順 1 で作成したシグナルデータコレクションの完全修飾名に設定します。
例:signal.data.collection = inventory.debezium_signals
.
シグナリングコレクションの完全修飾名のフォーマットは、コネクターによって異なります。
次の例では、各コネクターに使用する名前のフォーマットを示しています。- Db2
-
<schemaName>.<tableName>
- MongoDB
-
<databaseName>.<collectionName>
- MySQL
-
<databaseName>.<tableName>
- Oracle
-
<databaseName>.<schemaName>.<tableName>
- PostgreSQL
-
<schemaName>.<tableName>
- SQL Server
-
<databaseName>.<schemaName>.<tableName>
signal.data.collection
プロパティーの設定の詳細は、お使いのコネクターの設定プロパティーの表を参照してください。
11.5.1.1. デベージアムシグナリングデータ収集の必須構造
シグナルデータコレクションまたはシグナルテーブルは、コネクターに送信して指定の操作をトリガーするシグナルを保存します。シグナリングテーブルの構造は、以下の標準フォーマットに準拠する必要があります。
- 3 つのフィールド (列) があります。
- フィールドは、表 1 のように決まった順序で配置されています。
フィールド | タイプ | 説明 |
---|---|---|
|
|
シグナルのインスタンスを識別する任意のユニークな文字列です。 |
|
|
送信する信号の種類を指定します。 |
|
|
シグナルアクションに渡す、JSON 形式のパラメーターを指定します。 |
データコレクションのフィールド名は任意です。前述の表には、推奨される名称が記載されています。異なる命名規則を使用している場合は、各フィールドの値が期待される内容と一致していることを確認してください。
11.5.1.2. Debezium シグナルのデータコレクションの作成
標準 SQL の DDL クエリーをソースデータベースに送信して、シグナリングテーブルを作成します。
前提条件
- ソースデータベースでのテーブル作成に十分なアクセス権限がある。
手順
-
SQL クエリーをソースデータベースに送信して、必須の構造 と合致するテーブルを作成します。例:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
id
変数の VARCHAR
パラメーターに割り当てる容量は、シグナリングテーブルに送信されるシグナルの ID 文字列のサイズを考慮して、十分に確保する必要があります。
ID のサイズが使用可能なスペースを超える場合、コネクターは信号を処理できません。
次の例は、3 列の debezium_signal
テーブルを作成する CREATE TABLE
コマンドです。
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
11.5.2. Debezium Kafka シグナリングチャネルの有効化
Kafka シグナリングチャネルを有効にするには、有効にするチャネルを signal.enabled.channels
設定プロパティーに追加し、シグナルを受信するトピックの名前を signal.kafka.topic
プロパティーに追加します。シグナリングチャネルを有効にすると、シグナルを使用して、設定したシグナルトピックに送信されるように、kafka コンシューマーが作成されます。
コンシューマーに使用できる追加設定
Kafka シグナリングを使用してコネクターのアドホック増分スナップショットをトリガーするには、まずコネクター設定で ソース
シグナリングチャネルを有効にする 必要があります。ソースチャネルには、透かしメカニズムが実装されており、増分スナップショットでキャプチャーされ、ストリーミングの再開後に再度キャプチャーされる可能性のあるイベントが重複しないようにします。
メッセージの形式
Kafka メッセージのキーは、topic.prefix
コネクター設定オプションの値と一致する必要があります。
値は、type
フィールドと data
フィールドが含まれる JSON オブジェクトです。
シグナルタイプが execute-snapshot
に設定されている場合、data
フィールドには次の表にリストされているフィールドが含まれている必要があります。
フィールド | デフォルト | 値 |
---|---|---|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
| 該当なし |
スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。 |
| 該当なし | コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する条件を指定するオプションの文字列。 |
以下の例は、典型的な execute-snapshot
Kafka メッセージを示しています。
Key = `test_connector` Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
11.5.3. Debezium JMX シグナリングチャネルの有効化
JMX シグナリングを有効にするには、コネクター設定の signal.enabled.channels
プロパティーに jmx
を追加し、JMX MBean サーバーがシグナリング Bean を公開できる ようにします。
手順
- 任意の JMX クライアントを使用して (例:JConsole または JDK Mission Control) MBean サーバーに接続します。
Mbean
debezium.<connector-type>.management.signals.<server>
を検索します。Mbean は、以下の入力パラメーターを受け入れるsignal
操作を公開します。- p0
- シグナルの ID。
- p1
-
シグナルのタイプ (例:
execute-snapshot)
。 - P2
- 指定されたシグナルタイプに関する追加情報を含む JSON データフィールド。
入力パラメーターの値を指定して
execute-snapshot
シグナルを送信します。
JSON data フィールドに、以下の表に記載されている情報を含めます。表11.6 スナップショットデータフィールドの実行 フィールド デフォルト 値 type
incremental
実行するスナップショットのタイプ。現在、Debezium は
incremental
型のみをサポートしています。data-collections
該当なし
スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。
signal.data.collection 設定オプションに必要な形式と同じ形式を使用して名前を指定します。additional-condition
該当なし
コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する条件を指定するオプションの文字列。
以下の図は、JConsole を使用してシグナルを送信する方法の例を示しています。
11.5.4. Debezium シグナルアクションの種類
シグナリングを使用して、以下のアクションを起こすことができます。
一部の信号はすべてのコネクターに対応していません。
11.5.4.1. ロギング信号
log
シグナルタイプのシグナリングテーブルエントリーを作成することで、ログにエントリーを追加するようコネクターに要求できます。シグナルの処理後、コネクターは指定されたメッセージをログに出力します。オプションとして、結果として得られるメッセージにストリーミング座標が含まれるようにシグナルを設定することもできます。
列 | 値 | 説明 |
---|---|---|
id |
| |
type |
| シグナルのアクションタイプです。 |
data |
|
|
11.5.4.2. アドホックスナップショットシグナル
execute-snapshot
シグナルタイプのシグナルを作成すると、アドホックスナップショットの開始をコネクターに要求できます。信号を処理した後、コネクターは要求されたスナップショットオペレーションを実行します。
コネクターが最初に起動したときに実行される最初のスナップショットとは異なり、アドホックスナップショットは、コネクターがすでにデータベースからの変更イベントのストリーミングを開始した後のランタイム中に発生します。いつでもアドホックなスナップショットを開始することができます。
アドホックスナップショットは、以下の Debezium コネクターで利用可能です。
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 値 |
---|---|
id |
|
type |
|
data |
|
キー | 値 |
---|---|
test_connector |
|
アドホックスナップショットの詳細については、お使いのコネクターのドキュメントのスナップショットのトピックを参照してください。
関連情報
アドホックスナップショット停止シグナル
stop-snapshot
シグナルタイプでシグナルテーブルエントリーを作成することにより、進行中のアドホックスナップショットを停止するようコネクターに要求できます。シグナルを処理した後、コネクターは現在進行中のスナップショット操作を停止します。
次の Debezium コネクターのアドホックスナップショットを停止できます。
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 値 |
---|---|
id |
|
type |
|
data |
|
シグナルの type
を指定する必要があります。data-collections
フィールドはオプションです。現在のスナップショットのすべてのアクティビティーを停止するようコネクターに要求するには、data-collections
フィールドを空白のままにします。増分スナップショットを続行したいが、スナップショットから特定のコレクションを除外したい場合は、除外するコレクションまたは正規表現の名前のコンマ区切りリストを指定します。コネクターが信号を処理した後、増分スナップショットが続行されますが、指定したコレクションからデータが除外されます。
11.5.4.3. 増分スナップショット
増分スナップショットはアドホックスナップショットの一種です。増分スナップショットでは、初期スナップショットと同様に、指定したテーブルのベースライン状態をキャプチャします。しかし、初期スナップショットとは異なり、増分スナップショットでは、一度にすべてのテーブルをキャプチャーするのではなく、チャンク単位でテーブルをキャプチャします。このコネクターは、スナップショットの進捗状況を追跡するために、電子透かしを使用しています。
増分スナップショットは、指定されたテーブルの初期状態を単一のモノリシックな操作ではなくチャンクでキャプチャーすることにより、初期スナップショットのプロセスに比べて以下のような利点があります。
- コネクターが指定されたテーブルのベースライン状態をキャプチャしている間、トランザクションログからのほぼリアルタイムのイベントのストリーミングは中断することなく継続されます。
- 増分スナップショットの処理が中断されても、中断した時点から再開することができます。
- 増分スナップショットはいつでも開始できます。
増分スナップショットの一時停止シグナル
pause-snapshot
シグナルタイプを使用してシグナルテーブルエントリーを作成することにより、進行中の増分スナップショットを一時停止するようコネクターに要求できます。信号を処理した後、コネクターは現在進行中のスナップショット操作の一時停止を停止します。そのため、信号の処理中の位置でスナップショット処理が一時停止されるため、データ収集を指定することはできません。
次の Debezium コネクターの増分スナップショットを一時停止できます。
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 値 |
---|---|
id |
|
type |
|
シグナルの type
を指定する必要があります。data
フィールドは無視されます。
増分スナップショットの一時停止シグナル
一時停止した増分スナップショットを再開するようコネクターに要求するには、resume-snapshot
シグナルタイプでシグナルテーブルエントリーを作成します。信号を処理した後、コネクターは以前に一時停止したスナップショット操作を再開します。
次の Debezium コネクターの増分スナップショットを再開できます。
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
列 | 値 |
---|---|
id |
|
type |
|
シグナルの type
を指定する必要があります。data
フィールドは無視されます。
増分スナップショットの詳細については、お使いのコネクターのドキュメントのスナップショットのトピックを参照してください。