2.3. MongoDB の Debezium コネクター
Debezium の MongoDB コネクターは、データベースおよびコレクションにおけるドキュメントの変更に対して、MongoDB レプリカセットまたは MongoDB シャードクラスターを追跡し、これらの変更を Kafka トピックのイベントとして記録します。コネクターは、シャードクラスターにおけるシャードの追加または削除、各レプリカセットのメンバーシップの変更、各レプリカセット内の選出、および通信問題の解決待ちを自動的に処理します。
このコネクターと互換性のある MongoDB のバージョンについては、Debezium でサポートされる設定ページを参照してください。
Debezium MongoDB コネクターを使用するための情報および手順は、以下のように設定されています。
2.3.1. Debezium MongoDB コネクターの概要 リンクのコピーリンクがクリップボードにコピーされました!
MongoDB のレプリケーションメカニズムは冗長性と高可用性を提供し、実稼働環境における MongoDB の実行に推奨される方法です。MongoDB コネクターは、レプリカセットまたはシャードクラスターの変更をキャプチャーします。
MongoDB レプリカセット は、すべてが同じデータのコピーを持つサーバーのセットで構成され、レプリケーションによって、クライアントがレプリカセットの プライマリー のドキュメントに追加したすべての変更が、セカンダリー と呼ばれる別のレプリカセットのサーバーに適用されるようにします。MongoDB のレプリケーションでは、プライマリーが oplog (または操作ログ) に変更を記録した後、各セカンダリーがプライマリーの oplog を読み取って、すべての操作を順番に独自のドキュメントに適用します。新規サーバーをレプリカセットに追加すると、そのサーバーは最初にプライマリーのすべてのデータベースおよびコレクションの スナップショット を実行し、次にプライマリーの oplog を読み取り、スナップショットの開始後に加えられたすべての変更を適用します。この新しいサーバーは、プライマリーの oplog の最後に到達するとセカンダリーになり、クエリーを処理できます。
2.3.1.1. MongoDB コネクターが変更ストリームを使用してイベントレコードをキャプチャーする方法の説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MongoDB コネクターはレプリカセットの一部ではありませんが、同様のレプリケーションメカニズムを使用して oplog データを取得します。主な違いは、コネクターが直接 oplog を読み取らない点です。代わりに、oplog データの取得およびデコードを MongoDB 変更ストリーム 機能に委譲します。変更ストリームでは、MongoDB サーバーはコレクションで発生する変更をイベントストリームとして公開します。Debezium コネクターはストリームを監視し、変更をダウンストリームに配信します。コネクターが最初にレプリカセットを検出すると、oplog を調べて最後に記録されたトランザクションを取得し、プライマリーのデータベースおよびコレクションのスナップショットを作成します。コネクターがデータのコピーを終了すると、以前に読み取られた oplog の位置から開始する変更ストリームを作成します。
MongoDB コネクターは変更を処理すると、イベントを発信する先の oplog/stream の位置を定期的に記録します。コネクターが停止したら、最後に処理した oplog ストリームの位置を記録するため、再起動後にその位置からストリーミングを再開できます。つまり、コネクターを停止、アップグレード、または維持でき、後で再起動できます。イベントを何も失うことなく、常に停止した場所を正確に特定します。当然ながら、MongoDB oplogs は通常最大サイズに制限されているため、コネクターがそれらを読み取る前に oplog の操作がパージされる可能性があります。この場合、再起動後、足りていない oplog 操作をコネクターが検出し、スナップショットを実行してから、変更をストリームします。
MongoDB コネクターは、レプリカセットのメンバーシップとリーダーシップの変更、シャードクラスター内でのシャードの追加と削除、および通信障害の原因となる可能性のあるネットワーク問題にも非常に寛容です。コネクターは常にレプリカセットのプライマリーノードを使用して変更をストリーミングします。そのため、レプリカセットの選出が行われ、他のノードがプライマリーになると、コネクターはすぐ変更のストリーミングを停止し、新しいプライマリーに接続し、新しいプライマリーを使用して変更のストリーミングを開始します。同様に、プライマリーとして設定されたレプリカとコネクターが通信できない場合は、再接続を試みます (ネットワークまたはレプリカセットを圧迫しないように指数バックオフを使用します)。接続が再確立された後、コネクターはキャプチャーした最後のイベントからの変更を引き続きストリーミングします。これにより、コネクターはレプリカセットメンバーシップの変更を動的に調整し、通信障害を自動的に処理します。
2.3.1.2. MongoDB コネクターが MongoDB 読み取り設定を使用する方法の説明 リンクのコピーリンクがクリップボードにコピーされました!
MongoDB 接続の読み取り設定を指定するには、mongodb.connection.string で readPreference パラメーターを設定します。
2.3.2. Debezium MongoDB コネクターの仕組み リンクのコピーリンクがクリップボードにコピーされました!
コネクターがサポートする MongoDB トポロジーの概要は、アプリケーションを計画するときに役立ちます。
Debezium MongoDB コネクターの仕組みの詳細は、以下を参照してください。
- 「Debezium コネクターでサポートされる MongoDB トポロジー」
- 「Debezium MongoDB コネクターでレプリカセットおよびシャードクラスターに論理名を使用する方法」
- 「Debezium MongoDB コネクターでのスナップショットの実行方法」
- 「アドホックスナップショット」
- 「増分スナップショット」
- 「Debezium MongoDB コネクターでの変更イベントレコードのストリーミング方法」
- 「Debezium MongoDB 変更イベントレコードを受信する Kafka トピックのデフォルト名」
- 「イベントキーが Debezium MongoDB コネクターのトピックパーティション設定を制御する方法」
- 「トランザクション境界を表す Debezium MongoDB コネクターによって生成されたイベント」
2.3.2.1. Debezium コネクターでサポートされる MongoDB トポロジー リンクのコピーリンクがクリップボードにコピーされました!
MongoDB コネクターは以下の MongoDB トポロジーをサポートします。
- MongoDB レプリカセット
Debezium MongoDB コネクターは単一の MongoDB レプリカセット から変更をキャプチャーできます。実稼働のレプリカセットには、少なくとも 3 つのメンバー が必要です。
レプリカセットで MongoDB コネクターを使用するには、コネクター設定の
mongodb.connection.stringプロパティーの値を レプリカセットの接続文字列 に設定する必要があります。コネクターが MongoDB 変更ストリームからの変更のキャプチャーを開始する準備ができると、接続タスクが開始されます。次に、接続タスクは、指定された接続文字列を使用して、使用可能なレプリカセットメンバーへの接続を確立します。
- MongoDB のシャードクラスター
MongoDB のシャードクラスター は以下で構成されます。
- レプリカセットとしてデプロイされる 1 つ以上の シャード。
- クラスターの 設定サーバー として動作する個別のレプリカセット。
クライアントが接続し、要求を適切なシャードにルーティングする 1 つ以上の ルーター (
mongosとも呼ばれます)。シャードクラスターで MongoDB コネクターを使用するには、コネクター設定で、
mongodb.connection.stringプロパティーの値を sharded cluster connection string に設定します。
- MongoDB スタンドアロンサーバー
- スタンドアロンサーバーには oplog がないため、MongoDB コネクターはスタンドアロン MongoDB サーバーの変更を監視できません。スタンドアロンサーバーが 1 つのメンバーを持つレプリカセットに変換されると、コネクターが動作します。
MongoDB は、実稼働でのスタンドアロンサーバーの実行を推奨しません。詳細は MongoDB のドキュメント を参照してください。
2.3.2.2. Debezium コネクターに必要なユーザーパーミッション リンクのコピーリンクがクリップボードにコピーされました!
MongoDB からデータをキャプチャーするために、Debezium は MongoDB ユーザーとしてデータベースに接続します。Debezium 用に作成する MongoDB ユーザーアカウントには、データベースから読み取るための特定のデータベースパーミッションが必要です。コネクターユーザーには次のパーミッションが必要です。
- データベースからの読み取り。
-
helloコマンドを実行します。
コネクターユーザーには次のパーミッションも必要な場合があります。
-
config.shardsシステムコレクションからの読み取り。
データベースの読み取りパーミッション
コネクターユーザーは、コネクターの capture.scope プロパティーの値に応じて、すべてのデータベースから、または特定のデータベースから読み取ることができる必要があります。capture.scope の設定に応じて、ユーザーに次のいずれかのパーミッションを割り当てます。
capture.scopeがdeploymentに設定されている- ユーザーに任意のデータベースを読み取るパーミッションを付与します。
capture.scopeがdatabaseに設定されている-
ユーザーに、コネクターの
capture.targetプロパティーで指定されたデータベースを読み取るパーミッションを付与します。 capture.scopeがcollectionに設定されている-
コネクターの
capture.targetプロパティーで指定されたコレクションを読み取る権限をユーザーに付与します。
capture.scope プロパティーの Debezium collection オプションの使用は、開発者プレビュー機能です。開発者プレビュー機能は、Red Hat ではいかなる形でもサポートされていません。また、機能的には完全ではなく、実稼働環境に対応していません。開発者プレビューのソフトウェアを実稼働ワークロードまたはビジネスクリティカルなワークロードには使用しないでください。開発者プレビューソフトウェアは、今後 Red Hat 製品サービスとして追加される可能性のある製品ソフトウェアを前もって早期に利用できます。お客様はこのソフトウェアを使用して機能をテストし、開発プロセス中にフィードバックを提供できます。Red Hat は、関連する SLA なしに、開発者プレビューソフトウェアに対するフィードバックを送信する手段を提供する場合があります。
Red Hat 開発者プレビューソフトウェアのサポート範囲の詳細は、開発者プレビューのサポート範囲 を参照してください。
MongoDB hello コマンドを使用する権限
capture.scope の設定に関係なく、ユーザーには MongoDB hello コマンドを実行する権限が必要です。
config.shards コレクションを読み取るパーミッション
Debezium 環境に応じて、コネクターがオフセット統合を実行できるようにするには、コネクターユーザーに config.shards コレクションを読み取るための明示的な権限を付与する必要があります。次のコネクター環境では、config.shards コレクションを読み取る権限が必要です。
- コネクターが Debezium 2.5 以前からアップグレードされた場合。
- コネクターがシャードされた MongoDB クラスターからの変更をキャプチャーするように設定されている場合。
2.3.2.3. Debezium MongoDB コネクターでレプリカセットおよびシャードクラスターに論理名を使用する方法 リンクのコピーリンクがクリップボードにコピーされました!
コネクター設定プロパティー topic.prefix は、MongoDB レプリカセットまたはシャードされたクラスターの 論理名 として提供されます。コネクターは論理名をさまざまな方法で使用します。すべてのトピック名の接頭辞として、各レプリカセットの変更ストリームの位置を記録する際に一意の識別子として使用されます。
各 MongoDB コネクターに、ソース MongoDB システムを意味する一意の論理名を命名する必要があります。論理名は、アルファベットまたはアンダースコアで始まり、残りの文字を英数字またはアンダースコアとすることが推奨されます。
2.3.2.4. Debezium MongoDB コネクターがオフセット統合を実行する方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MongoDB コネクターは、シャードされた MongoDB デプロイメントへの replica_set 接続をサポートしなくなりました。その結果、replica_set 接続モードを使用したコネクターバージョンによって記録されたオフセットは、現在のバージョンと互換性がありません。
接続モードの変更の影響を最小限に抑え、コネクターが不要なスナップショットを実行しないようにするには、アップグレード後にコネクターが再起動されると、コネクターによりオフセットを統合する手順が実行されます。このオフセット統合の手順を実行中に、コネクターは次の手順を実行して、以前のコネクターバージョンによって記録されたオフセットを調整します。
- コネクターバージョン 2.5 以降で記録されたオフセットはそのまま使用されます。
-
シャードされた MongoDB デプロイメントまたは MongoDB レプリカセットデプロイメントから
sharded接続モードでキャプチャーされたイベントのオフセットは、そのまま使用されます。 次の両方の条件が当てはまる場合、コネクターバージョン 2.5.x 以前で記録されたシャード固有のオフセットはそのまま使用されます。
- オフセットが現在のすべてのデータベースシャードに存在する。
-
オフセットの無効化 が有効になっている。
オフセット無効化が無効になっている場合、コネクターは起動に失敗します。
-
コネクターは、前の手順で既存のオフセットを処理した後、ストリーミングの変更を再開し、キャプチャーした新しいイベントのオフセットをコミットします。
オフセット統合の手順で既存のオフセットが検出されない場合、コネクターは 初期スナップショットを実行します。
2.3.2.5. Debezium MongoDB コネクターでのスナップショットの実行方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium タスクがレプリカセットを使用して起動すると、コネクターの論理名とレプリカセット名を使用して、コネクターが変更の読み取りを停止した位置を示す オフセット を検出します。オフセットが検出され、oplog に存在する場合、タスクは記録されたオフセットの位置から即座に ストリームの変更 を続行します。
ただし、オフセットが見つからない場合、または oplog にその位置が含まれていない場合、タスクはまず スナップショット を実行してレプリカセットの内容の現在の状態を取得する必要があります。このプロセスは、oplog の現在の位置を記録して開始され、オフセット (スナップショットが開始されたことを示すフラグとともに) として記録します。次に、タスクは各コレクションのコピーに進み、できるだけ多くのスレッド (snapshot.max.threads 設定プロパティーの値まで) を生成して、この作業を並行して実行します。コネクターは、参照したドキュメントごとに個別の 読み取りイベント を記録します。各読み取りイベントには、オブジェクトの識別子、オブジェクトの完全な状態、およびオブジェクトが見つかった MongoDB レプリカセットに関する ソース 情報が含まれます。ソース情報には、イベントがスナップショットの作成中に生成されたことを示すフラグも含まれます。
このスナップショットは、コネクターのフィルターと一致するすべてのコレクションがコピーされるまで継続されます。タスクのスナップショットが完了する前にコネクターが停止した場合は、コネクターを再起動すると、再びスナップショットを開始します。
コネクターがレプリカセットのスナップショットを実行している間、タスクの再割り当てと再設定を回避します。コネクターは、スナップショットの進捗を報告するログメッセージを生成します。最大限の制御を行うために、コネクターごとに個別の Kafka Connect クラスターを実行します。
スナップショットの詳細は、以下のセクションを参照してください。
| 設定 | 説明 |
|---|---|
|
| コネクターは起動するたびにスナップショットを実行します。スナップショットが完了すると、コネクターは、後続のデータベース変更のに備え、イベントレコードのストリーミングを開始します。 |
|
| コネクターが起動すると、初期データベーススナップショットが実行されます。 |
|
| コネクターはデータベーススナップショットを実行します。スナップショットが完了すると、コネクターは停止し、後続のデータベース変更のイベントレコードをストリーミングしなくなります。 |
|
|
非推奨です。 |
|
|
コネクターは関連するすべてのテーブルの構造をキャプチャーしますが、コネクターの起動時点でのデータセットを表す |
|
| コネクターが起動した後、次のいずれかの状況を検出した場合にのみスナップショットが実行されます。
|
詳細は、コネクター設定プロパティーテーブルの snapshot.mode を参照してください。
2.3.2.6. アドホックスナップショット リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。
ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。収集データを再キャプチャーするメカニズムを提供するために、Debezium はアドホックスナップショットを実行するオプションを備えています。Debezium 環境で次のいずれかの変更が発生したら、アドホックスナップショットを実行することを推奨します。
- コネクター設定が変更され、異なるコレクションのセットをキャプチャーします。
- Kafka トピックを削除して、再構築する必要があります。
- 設定エラーや他の問題が原因で、データの破損が発生します。
いわゆる アドホックスナップショット を開始することで、以前にスナップショットをキャプチャしたコレクションに対してスナップショットを再実行することができます。アドホックスナップショットでは、シグナリングコレクション を使用する必要があります。シグナルリクエストを Debezium シグナルコレクションに送信して、アドホックスナップショットを開始します。
既存のコレクションのアドホックスナップショットを開始すると、コネクターはコレクションにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。
アドホックのスナップショットシグナルは、スナップショットに追加するコレクションを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のコレクションのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のコレクションの内容のサブセットをキャプチャーできます。
キャプチャーするコレクションは、シグナリングコレクションに execute-snapshot メッセージを送信することで指定します。execute-snapshot シグナルのタイプを incremental または blocking に設定し、スナップショットに含めるコレクションの名前を次の表に示すように指定します。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプを指定します。 |
|
| 該当なし |
スナップショットに含めるコレクションの完全修飾名に一致する正規表現を含む配列。 |
|
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
|
|
| 該当なし | スナップショットプロセス中にコネクターがコレクションのプライマリーキーとして使用する列名を指定するオプションの文字列。 |
アドホック増分スナップショットのトリガー
アドホック増分スナップショットを開始するには、execute-snapshot シグナルタイプのエントリーをシグナリングコレクションに追加するか、シグナルメッセージを Kafka シグナリングトピックに送信します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各コレクションの開始ポイントおよびエンドポイントとして使用します。コレクションのエントリー数と設定されたチャンクサイズに基づいて、Debezium はコレクションをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。
詳細は、スナップショットの増分 を参照してください。
アドホックブロッキングスナップショットのトリガー
シグナリングコレクションまたはシグナリングトピックに、execute-snapshot シグナルタイプを持つエントリーを追加することによって、アドホックブロッキングスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。コネクターはストリーミングを一時的に停止し、初期スナップショットの時と同じプロセスに従って、指定されたコレクションのスナップショットを開始します。スナップショットが完了すると、コネクターはストリーミングを再開します。
詳細は、ブロッキングスナップショット を参照してください。
2.3.2.7. 増分スナップショット リンクのコピーリンクがクリップボードにコピーされました!
スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各コレクションを段階的にキャプチャーします。スナップショットがキャプチャーするコレクションと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1024 行です。
Debezium は、増分スナップショットの進行に伴い、その進捗を追跡するために透かしを使用し、キャプチャした各コレクション行の記録を保持します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。
- スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
- 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセス再開後、スナップショットは、最初からコレクションを再キャプチャーするのではなく、停止したポイントから開始されます。
-
いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。たとえば、コネクターの設定を変更してコレクションをその
collection.include.listプロパティーにコレクションを追加します。
増分スナップショットプロセス
増分スナップショットを実行する場合には、Debezium は各コレクションをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてコレクションをチャンクに分割します。チャンクごとに作業し、チャンク内の各コレクション行をキャプチャします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。
スナップショットが進むと、他のプロセスがデータベースにアクセスし続け、コレクションのレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERT、UPDATE、DELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。
Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法
場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、スナップショットがその行の READ イベントを含むチャンクをキャプチャーする前に、ストリーミングプロセスがコレクション行を変更するイベントを発行する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。
スナップショットウィンドウ
遅れて到着した READ イベントと、同じコレクション行を変更するストリームイベント間の衝突を解決するために、Debezium はいわゆる スナップショットウィンドウ を採用しています。スナップショットウィンドウは、増分スナップショットが指定されたコレクションチャンクのデータをキャプチャーする間隔を区切ります。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。
データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。
スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをコレクションの Kafka トピックに発行します。
コネクターは各スナップショットチャンクにプロセスを繰り返します。
現在、増分スナップショットを開始するには、次のいずれかの方法を使用できます。
増分スナップショットでは、各テーブルのプライマリーキーを安定した順序で並べる必要があります。String フィールドには特殊文字を含めることができ、さまざまなエンコーディングの対象となるため、文字列ベースのプライマリーキーは、一貫性のある予測可能な順序で並べ替えるのに適していません。増分スナップショットを実行する場合は、プライマリーキーを String 以外のデータ型に設定することを推奨します。
MongoDB の BSON 文字列型の詳細は、MongoDB のドキュメント を参照してください。
シャードクラスターの増分スナップショット
シャードされた MongoDB クラスターで増分スナップショットを使用するには、incremental.snapshot.chunk.size を変更ストリームパイプラインの 複雑化 に対応する値に設定する必要があります。
2.3.2.7.1. 増分スナップショットのトリガー リンクのコピーリンクがクリップボードにコピーされました!
増分スナップショットを開始するには、ソースデータベースのシグナリングコレクションに アドホックスナップショットシグナル を送信します。
MongoDB の insert() メソッドを使用して、シグナルコレクションにシグナルを送信します。
Debezium は、信号コレクションの変化を検出した後、信号を読み取り、要求されたスナップショット操作を実行します。
送信するクエリーは、スナップショットに含めるコレクションを指定し、オプションでスナップショット操作の種類を指定します。現在、スナップショット操作に有効なオプションは、incremental と blocking のみです。
スナップショットに含めるテーブルを指定するには、テーブルをリストアップした data-collections 配列、またはテーブルのマッチングに使用する正規表現の配列を指定します。たとえば、{"data-collections": ["public.Collection1", "public.Collection2"]}
増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。
スナップショットに含めるコレクションの名前に、データベース、スキーマ、またはテーブルの名前にドット (.) が含まれている場合、そのコレクションを data-collections 配列に追加するには、名前の各パートを二重引用符でエスケープする必要があります。
たとえば、public データベースに存在し、My.Collection という名前のデータコレクションを含めるには、"public"."My.Collection" の形式を使用します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collectionプロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットをトリガーする
シグナリングコレクションにスナップショットシグナルドキュメントを挿入します。
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>, "additional-conditions" : [{"data-collections" : "<collectionName>", "filter" : "<additional-condition>"}] }});<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>, "additional-conditions" : [{"data-collections" : "<collectionName>", "filter" : "<additional-condition>"}] }});Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コマンド内の
id、type、およびdataパラメーターの値は、シグナリングコレクションのフィールド に対応します。以下の表では、この例のパラメーターを説明しています。
Expand 表2.57 シグナリングコレクションに増分スナップショットシグナルを送信するための MongoDB insert() コマンドのフィールドの説明 項目 値 説明 1
db.debeziumSignalソースデータベース上のシグナリングコレクションの完全修飾名を指定します。
2
null
idパラメーターは、シグナルリクエストのID識別子として割り当てられる任意の文字列を指定します。
前述の例の insert メソッドは、オプションの_idパラメーターの使用が省略されています。ドキュメントはパラメーターの値を明示的に割り当てないため、MongoDB が自動的にドキュメントに割り当てる任意の ID がシグナルリクエストのid識別子になります。
この文字列を使用して、シグナリングコレクションのエントリーにロギングメッセージを識別します。Debezium はこの識別子文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自のID文字列をウォーターマークシグナルとして生成します。3
execute-snapshottypeパラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collectionsシグナルの
dataフィールドの必須コンポーネントで、スナップショットに含めるコレクション名の配列またはコレクション名と一致する正規表現を指定します。
この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。5
incremental実行するスナップショット操作のタイプを指定する、シグナルの
dataフィールドのオプションのtypeコンポーネント。
現在、incrementalとblocking型をサポートしています。
値を指定しない場合には、コネクターは増分スナップショットを実行します。6
additional-conditionsコネクターがスナップショットに含めるレコードのサブセットを決定するために評価する追加条件のセットを指定する、オプションの配列。
additional-conditions配列の各要素は、次のキーを含むオブジェクトです。data-collection:: フィルターが適用されるデータコレクションの完全修飾名。filter:: スナップショットに含めるためにデータ収集レコードに存在する必要がある列値を指定します (例:"color='blue'")。
以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。
例: 増分スナップショットイベントメッセージ
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
実行するスナップショット操作タイプを指定します。 |
| 2 |
|
イベントタイプを指定します。 |
2.3.2.7.2. Kafka シグナルチャネルを使用して増分スナップショットをトリガーする リンクのコピーリンクがクリップボードにコピーされました!
設定された Kafka トピック にメッセージを送信して、コネクターにアドホック増分スナップショットを実行するよう要求できます。
Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type と data フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは execute-snapshot で、data フィールドには以下のフィールドが必要です。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
|
| 該当なし |
スナップショットに含めるテーブルの完全修飾名と一致する、コンマ区切りの正規表現の配列。 |
|
| 該当なし |
コネクターがスナップショットに含めるレコードのサブセットを指定するために評価する基準を指定する、オプションの追加条件の配列。 |
例2.17 execute-snapshot Kafka メッセージ
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
additional-conditions 付きのアドホック増分スナップショット
Debezium は additional-conditions フィールドを使用してコレクションの内容のサブセットを選択します。
通常、Debezium はスナップショットを実行するときに、次のような SQL クエリーを実行します。
SELECT * FROM <tableName> ….
スナップショット要求に additional-conditions プロパティーが含まれている場合、プロパティーの data-collection および filter パラメーターが SQL クエリーに追加されます。次に例を示します。
SELECT * FROM <data-collection> WHERE <filter> ….
たとえば、列 id (プライマリーキー)、color、および brand を含む products コレクションがある場合、スナップショットに color='blue' のコンテンツのみを含める場合は、スナップショットをリクエストするときに、コンテンツをフィルタリングする additional-conditions プロパティーを追加することができます。
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue'"}]}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue'"}]}}`
また、additional-conditions プロパティーを使用して、複数の列に基づいて条件を渡すこともできます。たとえば、前の例と同じ products コレクションを使用して、color='blue' および brand='MyBrand' の products コレクションのコンテンツのみをスナップショットに含める場合は、次のリクエストを送信できます。
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
2.3.2.7.3. 増分スナップショットの停止 リンクのコピーリンクがクリップボードにコピーされました!
状況によっては、増分スナップショットを停止する必要がある場合があります。たとえば、スナップショットが正しく設定されていない場合や、他のデータベース操作にリソースが使用可能であるこのとの確認が必要な場合があります。ソースデータベースのコレクションにシグナルを送信することで、すでに実行中のスナップショットを停止できます。
スナップショットの停止シグナルをシグナリングコレクションに送信するには、スナップショットの停止シグナルドキュメントをシグナリングコレクションに挿入します。送信するスナップショット停止シグナルは、スナップショット操作の type を incremental として指定し、オプションで、現在実行中のスナップショットから省略するコレクションを指定します。Debezium はシグナルコレクションの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。
関連情報
また、JSON メッセージを Kafka シグナリングトピック に送信して、増分スナップショットを停止することもできます。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在する。
-
シグナルデータコレクションが
signal.data.collectionプロパティーで指定されている。
ソースシグナリングチャネルを使用して増分スナップショットを停止する
シグナリングコレクションにスナップショット停止のシグナルドキュメントを挿入します。
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow signal コマンドの
id、type、およびdataパラメーターの値は、シグナリングコレクションのフィールド に対応します。以下の表では、この例のパラメーターを説明しています。
Expand 表2.59 シグナリングコレクションに増分スナップショットの停止ドキュメントを送信するための insert コマンドのフィールドの説明 項目 値 説明 1
db.debeziumSignalソースデータベース上のシグナリングコレクションの完全修飾名を指定します。
2
null
前述の例の insert メソッドは、オプションの
_idパラメーターの使用が省略されています。ドキュメントはパラメーターの値を明示的に割り当てないため、MongoDB が自動的にドキュメントに割り当てる任意の ID がシグナルリクエストのid識別子になります。
この文字列を使用して、シグナリングコレクションのエントリーにロギングメッセージを識別します。Debezium はこの識別子文字列を使用しません。3
stop-snapshottypeパラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collectionsシグナルの
dataフィールドの任意コンポーネントで、スナップショットから削除するコレクション名の配列またはコレクション名と一致する正規表現を指定します。
配列には、database.collection形式の完全修飾名でコレクションに一致する正規表現がリストされます。dataフィールドからdata-collections配列を省略すると、シグナルによって進行中の増分スナップショット全体が停止されます。5
incremental停止するスナップショット操作のタイプを指定する信号の
dataフィールドの必須コンポーネント。
現在、有効な唯一のオプションはincrementalです。typeの値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。
2.3.2.7.4. Kafka シグナリングチャネルを使用して増分スナップショットを停止する リンクのコピーリンクがクリップボードにコピーされました!
設定された Kafka シグナリングトピック にシグナルメッセージを送信して、アドホック増分スナップショットを停止できます。
Kafka メッセージのキーは、topic.prefix コネクター設定オプションの値と一致する必要があります。
メッセージの値は、type と data フィールドが含まれる JSON オブジェクトとなっています。
シグナルタイプは stop-snapshot で、data フィールドには以下のフィールドが必要です。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプ。現在、Debezium は |
|
| 該当なし |
テーブルの完全修飾名に一致する、コンマで区切られた正規表現のオプションの配列、またはスナップショットから削除するコレクション名に一致するコレクション名または正規表現の配列。 |
次の例は、典型的な stop-snapshot の Kafka メッセージを示しています。
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.table1", "db1.table2"], "type": "INCREMENTAL"}}`
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.table1", "db1.table2"], "type": "INCREMENTAL"}}`
2.3.2.8. ブロッキングスナップショット リンクのコピーリンクがクリップボードにコピーされました!
スナップショットをより柔軟に管理するために、Debezium には ブロッキングスナップショット と呼ばれる補助アドホックスナップショットメカニズムが含まれています。ブロッキングスナップショットは、Debezium コネクターにシグナルを送信 するための Debezium メカニズムに依存します。
ブロッキングスナップショットは、ランタイム時にトリガーできることを除いて、初期スナップショット と同じように動作します。
次のような状況では、標準の初期スナップショットプロセスを使用するのではなく、ブロッキングスナップショットを実行する必要があります。
- 新しいコレクションを追加し、コネクターの実行中にスナップショットを完了したいと考えている。
- 大きなコレクションを追加し、増分スナップショットよりも短い時間でスナップショットを完了したいと考えている。
ブロッキングスナップショットのプロセス
ブロッキングスナップショットを実行すると、Debezium はストリーミングを停止し、初期スナップショットの時と同じプロセスに従って、指定されたコレクションのスナップショットを開始します。スナップショットが完了すると、ストリーミングが再開されます。
スナップショットの設定
シグナルの data コンポーネントでは、次のプロパティーを設定できます。
- data-collections: スナップショットする必要があるコレクションを指定します。
-
data-collections: スナップショットに含めるコレクションを指定します。
このプロパティーは、完全修飾コレクション名と一致する正規表現のコンマ区切りリストを受け入れます。プロパティーの動作は、ブロッキングスナップショットでキャプチャーするテーブルを指定するtable.include.listプロパティーの動作と似ています。 additional-conditions: コレクションごとに異なるフィルターを指定できます。
-
data-collectionプロパティーは、フィルターが適用されるコレクションの完全修飾名であり、データベースに応じて大文字と小文字を区別するか、区別しないかを指定できます。 -
filterプロパティーには、snapshot.select.statement.overridesで使用される値と同じものが設定されます。これは、大文字小文字を区別して一致させる必要があるコレクションの完全修飾名です。
-
以下に例を示します。
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
重複の可能性
スナップショットをトリガーするシグナルを送信した時点と、ストリーミングが停止してスナップショットが開始する時点との間に遅延が生じる可能性があります。この遅延の結果、スナップショットが完了した後、コネクターはスナップショットがキャプチャーしたレコードと重複するイベントレコードを発行する可能性があります。
2.3.2.9. Debezium MongoDB コネクターでの変更イベントレコードのストリーミング方法 リンクのコピーリンクがクリップボードにコピーされました!
レプリカセットレコードのコネクタータスクがオフセットを取得すると、オフセットを使用して変更のストリーミングを開始する oplog の位置を判断します。その後、タスクは (設定によって) レプリカセットのプライマリーノードに接続するか、レプリカセット全体の変更ストリームに接続し、その位置から変更のストリーミングを開始します。すべての作成、挿入、および削除操作を処理して Debezium の 変更イベント に変換します。各変更イベントには操作が検出された oplog の位置が含まれ、コネクターはこれを最新のオフセットとして定期的に記録します。オフセットが記録される間隔は、Kafka Connect ワーカー設定プロパティーである offset.flush.interval.ms によって制御されます。
コネクターが正常に停止されると、処理された最後のオフセットが記録され、再起動時にコネクターは停止した場所から続行されます。しかし、コネクターのタスクが予期せず終了した場合、最後にオフセットが記録された後、最後のオフセットが記録される前に、タスクによってイベントが処理および生成されることがあります。再起動時に、コネクターは最後に 記録された オフセットから開始し、クラッシュの前に生成された同じイベントを生成する可能性があります。
Kafka パイプライン内のすべてのコンポーネントが正常に動作している場合、Kafka コンシューマーはすべてのメッセージを 1 度だけ 受信します。ただし、問題が発生した場合、Kafka はコンシューマーが 少なくとも 1 回 のみすべてのメッセージを受信することを保証できます。予期しない結果を回避するには、コンシューマーは重複メッセージを処理できる必要があります。
前述のように、コネクタータスクは常にレプリカセットのプライマリーノードを使用して oplog からの変更をストリーミングし、コネクターが可能な限り最新の操作を確認できるようにし、代わりにセカンダリーが使用された場合よりも短いレイテンシーで変更をキャプチャーできるようにします。レプリカセットが新しいプライマリーを選出すると、コネクターは即座に変更のストリーミングを停止し、新しいプライマリーに接続して、同じ場所にある新しいプライマリーノードから変更のストリーミングを開始します。同様に、コネクターとレプリカセットメンバーとの通信で問題が発生した場合は、レプリカセットが過剰にならないように指数バックオフを使用して再接続を試みます。接続の確立後、停止した場所から変更のストリーミングを続行します。これにより、コネクターはレプリカセットメンバーシップの変更を動的に調整でき、通信障害を自動的に処理できます。
要約すると、MongoDB コネクターはほとんどの状況で実行を継続します。通信の問題により、問題が解決されるまでコネクターが待機する可能性があります。
2.3.2.10. Debezium 変更イベントの before フィールドに入力するための MongoDB サポート リンクのコピーリンクがクリップボードにコピーされました!
MongoDB 6.0 以降では、変更ストリームを設定して、ドキュメントのイメージ前の状態を出力し、MongoDB 変更イベントの before フィールドにデータを投入できます。MongoDB で事前のイメージを使用できるようにするには、db.createCollection()、create、または collMod を使用して、コレクションの changeStreamPreAndPostImages を設定する必要があります。Debezium MongoDB が変更イベントに事前イメージを追加できるようにするには、コネクターの capture.mode を *_with_pre_image オプションのいずれかに設定します。
MongoDB 変更イベントのサイズは 16 メガバイトに制限されます。したがって、事前イメージを使用すると、このしきい値を超過し、障害が発生する可能性があります。変更ストリームの制限を超えないようにする方法は、MongoDB のドキュメント を参照してください。
2.3.2.11. Debezium MongoDB 変更イベントレコードを受信する Kafka トピックのデフォルト名 リンクのコピーリンクがクリップボードにコピーされました!
MongoDB コネクターは、各コレクションのドキュメントに対するすべての挿入、更新、および削除操作のイベントを 1 つの Kafka トピックに書き込みます。Kafka トピックの名前は常に logicalName.databaseName.collectionName の形式を取ります。logicalName は、topic.prefix 設定プロパティーで指定されるコネクターの 論理名、databaseName は操作が発生したデータベースの名前、collectionName は影響を受けるドキュメントが存在する MongoDB コレクションの名前です。
たとえば、products, products_on_hand, customers, and orders の 4 つのコレクションで設定される inventory データベースを含む MongoDB レプリカセットについて考えてみましょう。コネクターが監視するこのデータベースの論理名が fulfillment である場合、コネクターは以下の 4 つの Kafka トピックでイベントを生成します。
-
fulfillment.inventory.products -
fulfillment.inventory.products_on_hand -
fulfillment.inventory.customers -
fulfillment.inventory.orders
トピック名には、レプリカセット名やシャード名が含まれないことに注意してください。その結果、シャード化コレクションへの変更 (各シャードにコレクションのドキュメントのサブセットが含まれる) はすべて同じ Kafka トピックに移動します。
Kafka を設定して、必要に応じてトピックを 自動作成 できます。そうでない場合は、Kafka 管理ツールを使用してコネクターを起動する前にトピックを作成する必要があります。
2.3.2.12. イベントキーが Debezium MongoDB コネクターのトピックパーティション設定を制御する方法 リンクのコピーリンクがクリップボードにコピーされました!
MongoDB コネクターは、イベントのトピックパーティションを明示的に決定しません。代わりに、Kafka はイベントキーに基づいてトピックのパーティションを作成する方法を決定できます。Kafka Connect ワーカー設定に Partitioner 実装の名前を定義することで、Kafka のパーティショニングロジックを変更できます。
Kafka は、1 つのトピックパーティションに書き込まれたイベントのみ、合計順序を維持します。キーでイベントのパーティションを行うと、同じキーを持つすべてのイベントは常に同じパーティションに移動します。これにより、特定のドキュメントのすべてのイベントが常に完全に順序付けされます。
2.3.2.13. トランザクション境界を表す Debezium MongoDB コネクターによって生成されたイベント リンクのコピーリンクがクリップボードにコピーされました!
Debezium は、トランザクションメタデータ境界を表すイベントを生成でき、データイベントメッセージを補完できます。
Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。
Debezium はすべてのトランザクションの BEGIN および END に対して、以下のフィールドが含まれるイベントを生成します。
status-
BEGINまたはEND id- 一意のトランザクション識別子の文字列表現。
event_count(ENDイベント用)- トランザクションによって出力されるイベントの合計数。
data_collections(ENDイベント用)-
指定のデータコレクションからの変更によって出力されたイベントの数を提供する
data_collectionとevent_countのペアの配列。
以下の例では、一般的なメッセージを示します。
topic.transaction オプションで上書きされない限り、トランザクションイベントは <topic.prefix>.transaction という名前のトピックに書き込まれます。
変更データイベントのエンリッチメント
トランザクションメタデータを有効にすると、データメッセージ Envelope は新しい transaction フィールドで強化されます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。
id- 一意のトランザクション識別子の文字列表現。
total_order- トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order- トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。
以下は、メッセージの内容の例です。
2.3.3. Debezium MongoDB コネクターのデータ変更イベントの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MongoDB コネクターは、データを挿入、更新、または削除する各ドキュメントレベルの操作に対してデータ変更イベントを生成します。各イベントにはキーと値が含まれます。キーと値の構造は、変更されたコレクションによって異なります。
Debezium および Kafka Connect は、イベントメッセージの継続的なストリーム を中心として設計されています。ただし、これらのイベントの構造は時間の経過とともに変化する可能性があり、コンシューマーによる処理が困難になることがあります。これに対応するために、各イベントにはコンテンツのスキーマが含まれます。スキーマレジストリーを使用している場合は、コンシューマーがレジストリーからスキーマを取得するために使用できるスキーマ ID が含まれます。これにより、各イベントが自己完結型になります。
以下のスケルトン JSON は、変更イベントの基本となる 4 つの部分を示しています。ただし、アプリケーションで使用するために選択した Kafka Connect コンバーターの設定方法によって、変更イベントのこれら 4 部分の表現が決定されます。schema フィールドは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。同様に、イベントキーおよびイベントペイロードは、変更イベントが生成されるようにコンバーターを設定した場合のみ変更イベントに含まれます。JSON コンバーターを使用し、変更イベントの基本となる 4 つの部分すべてを生成するように設定すると、変更イベントの構造は次のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
最初の |
| 2 |
|
最初の |
| 3 |
|
2 つ目の |
| 4 |
|
2 つ目の |
デフォルトでは、コネクターによって、変更イベントレコードがイベントの元のコレクションと同じ名前を持つトピックにストリーミングされます。トピック名 を参照してください。
MongoDB コネクターは、すべての Kafka Connect スキーマ名が Avro スキーマ名の形式 に準拠するようにします。つまり、論理サーバー名はアルファベットまたはアンダースコア (a-z、A-Z、または _) で始まる必要があります。論理サーバー名の残りの各文字と、データベース名とコレクション名の各文字は、アルファベット、数字、またはアンダースコア ( a-z、A-Z、0-9、または _) でなければなりません。無効な文字がある場合は、アンダースコアに置き換えられます。
論理サーバー名、データベース名、またはコレクション名に無効な文字が含まれ、名前を区別する唯一の文字が無効であると、無効な文字はすべてアンダースコアに置き換えられるため、予期せぬ競合が発生する可能性があります。
詳細は、以下のトピックを参照してください。
2.3.3.1. Debezium MongoDB 変更イベントのキー リンクのコピーリンクがクリップボードにコピーされました!
変更イベントのキーには、変更されたドキュメントのキーのスキーマと、変更されたドキュメントの実際のキーのスキーマが含まれます。特定のコレクションでは、スキーマとそれに対応するペイロードの両方に単一の id フィールドが含まれます。このフィールドの値は、MongoDB Extended JSON のシリアライゼーションの厳格モード から派生する文字列として表されるドキュメントの識別子です。
論理名が fulfillment のコネクター、inventory データベースが含まれるレプリカセット、および以下のようなドキュメントが含まれる customers コレクションについて考えてみましょう。
ドキュメントの例
変更イベントキーの例
customers コレクションへの変更をキャプチャーする変更イベントのすべてに、イベントキースキーマがあります。customers コレクションに前述の定義がある限り、customers コレクションへの変更をキャプチャーする変更イベントのキー構造はすべて以下のようになります。JSON では、以下のようになります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
キーのスキーマ部分は、キーの |
| 2 |
|
キーのペイロードの構造を定義するスキーマの名前。このスキーマは、変更したドキュメントのキーの構造を説明します。キースキーマ名の形式は connector-name.database-name.collection-name.
|
| 3 |
|
イベントキーの |
| 4 |
|
各フィールドの名前、型、および必要かどうかなど、 |
| 5 |
|
この変更イベントが生成されたドキュメントのキーが含まれます。この例では、キーには型 |
この例では、整数の識別子を持つドキュメントを使用しますが、有効な MongoDB ドキュメント識別子は、ドキュメント識別子を含め、同じように動作します。ドキュメント識別子の場合、イベントキーの payload.id 値は、厳格モードを使用する MongoDB Extended JSON シリアライゼーションとして更新されたドキュメントの元の _id フィールドを表す文字列です。以下の表では、さまざまな型の _id フィールドを表す例を示します。
| タイプ | MongoDB _id の値 | キーのペイロード |
|---|---|---|
| Integer | 1234 |
|
| Float | 12.34 |
|
| String | "1234" |
|
| Document |
|
|
| ObjectId |
|
|
| Binary |
|
|
2.3.3.2. Debezium MongoDB 変更イベントの値 リンクのコピーリンクがクリップボードにコピーされました!
変更イベントの値はキーよりも若干複雑です。キーと同様に、値には schema セクションと payload セクションがあります。schema セクションには、入れ子のフィールドを含む、Envelope セクションの payload 構造を記述するスキーマが含まれています。データを作成、更新、または削除する操作のすべての変更イベントには、Envelope 構造を持つ値 payload があります。
変更イベントキーの例を紹介するために使用した、同じサンプルドキュメントについて考えてみましょう。
ドキュメントの例
このドキュメントへの変更に対する変更イベントの値部分には、以下の各イベントタイプについて記述されています。
create イベント
以下の例は、customers コレクションにデータを作成する操作に対して、コネクターによって生成される変更イベントの値の部分を示しています。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
| 値のペイロードの構造を記述する、値のスキーマ。変更イベントの値スキーマは、コネクターが特定のコレクションに生成するすべての変更イベントで同じになります。 |
| 2 |
|
|
| 3 |
|
|
| 4 |
|
|
| 5 |
|
値の実際のデータ。これは、変更イベントが提供する情報です。 |
| 6 |
|
イベント発生後のドキュメントの状態を指定する任意のフィールド。この例では、 |
| 7 |
| イベントのソースメタデータを記述する必須のフィールド。このフィールドには、イベントの発生元、イベントの発生順序、およびイベントが同じトランザクションの一部であるかどうかなど、このイベントと他のイベントを比較するために使用できる情報が含まれています。ソースメタデータには以下が含まれています。
|
| 8 |
|
コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、
|
| 9 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
| 10 |
| コネクターがイベントを処理した時間をマイクロ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
| 9 |
| コネクターがイベントを処理した時間をナノ秒単位で表示するオプションフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
変更ストリームキャプチャーモード
サンプル customers コレクションにある更新の変更イベントの値には、そのコレクションの 作成 イベントと同じスキーマがあります。同様に、イベント値のペイロードは同じ構造を持ちます。ただし、イベント値ペイロードでは 更新 イベントに異なる値が含まれます。update イベントに after 値が含まれるのは、capture.mode オプションが change_streams_update_full に設定されている場合のみです。capture.mode オプションが *_with_pre_image オプションのいずれかに設定されている場合、before 値が指定されます。この場合、新たな構造化フィールド updateDescription が追加されました。
-
updatedFieldsは、更新されたドキュメントフィールドの JSON 表現とその値を含む文字列フィールドです -
removedFieldsは、ドキュメントから削除されたフィールド名のリストです。 -
truncatedArraysは、省略されたドキュメントのアレイのリストです。
以下は、コネクターによって customers コレクションでの更新に生成されるイベントの変更イベント値の例になります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
コネクターによってイベントが生成される原因となった操作の型を記述する必須文字列。この例では、 |
| 2 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
| 3 |
| 変更前の実際の MongoDB ドキュメントの JSON 文字列表現が含まれます。
キャプチャーモードが |
| 4 |
|
実際の MongoDB ドキュメントを表す JSON 文字列が含まれます。 |
| 5 |
|
ドキュメントの更新されたフィールド値の JSON 文字列表現が含まれます。この例では、更新により |
| 6 |
| イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。
|
イベント内の after の値は、ドキュメントの at-point-of-time の値として処理される必要があります。この値は動的に計算されるのではなく、コレクションから取得される。このため、複数の更新が次々に行われる場合、すべての 更新 更新イベントには、文書に保存されている最後の値を表す同じ after 値が含まれる可能性がある。
アプリケーションが段階的な変更の進化に依存している場合は、updateDescription のみに依存する必要があります。
delete イベント
delete change イベントの値は、create や update と同じ schema 部分を持ちます。delete イベントの payload 部分には、同じコレクションの 作成 と 更新 イベントとは異なる値が含まれます。特に、delete イベントには after 値も updateDescription 値も含まれません。以下は、customers コレクションのドキュメントの 削除 イベントの例になります。
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
操作の型を記述する必須の文字列。 |
| 2 |
|
コネクターがイベントを処理した時間を表示する任意のフィールド。この時間は、Kafka Connect タスクを実行している JVM のシステムクロックを基にします。 |
| 3 |
| 変更前の実際の MongoDB ドキュメントの JSON 文字列表現が含まれます。
キャプチャーモードが |
| 4 |
| イベントのソースメタデータを記述する必須のフィールド。このフィールドには、同じコレクションの 作成 または 更新 イベントと同じ情報が含まれますが、oplog の異なる位置からのイベントであるため、値は異なります。ソースメタデータには以下が含まれています。
|
MongoDB コネクターイベントは、Kafka ログコンパクション と動作するように設計されています。ログコンパクションにより、少なくとも各キーの最新のメッセージが保持される限り、一部の古いメッセージを削除できます。これにより、トピックに完全なデータセットが含まれ、キーベースの状態のリロードに使用できるようにするとともに、Kafka がストレージ領域を確保できるようにします。
tombstone イベント
一意に識別ドキュメントの MongoDB コネクターイベントはすべて同じキーを持ちます。ドキュメントが削除された場合でも、Kafka は同じキーを持つ以前のメッセージをすべて削除できるため、削除 イベントの値はログコンパクションで動作します。ただし、Kafka がそのキーを持つすべてのメッセージを削除するには、メッセージの値が null である必要があります。これを可能にするために、Debezium の MongoDB コネクターは 削除 イベントを出力した後に、null 値以外で同じキーを持つ特別な廃棄 (tombstone) イベントを出力します。tombstone イベントは、同じキーを持つすべてのメッセージを削除できることを Kafka に通知します。
2.3.4. Debezium コネクターと連携する MongoDB の設定 リンクのコピーリンクがクリップボードにコピーされました!
MongoDB コネクターは MongoDB の変更ストリームを使用して変更をキャプチャーするため、コネクターは MongoDB レプリカセットと、各シャードが個別のレプリカセットであるシャードクラスターとのみ動作します。レプリカセット または シャードクラスター の設定については、MongoDB ドキュメントを参照してください。また、レプリカセットで アクセス制御と認証 を有効にする方法についても理解するようにしてください。
oplog が読み取られる admin データベースを読み取るために適切なロールを持つ MongoDB ユーザーも必要です。さらに、ユーザーはシャードクラスターの設定サーバーで config データベースを読み取りできる必要もあり、listDatabases 権限も必要です。変更ストリームを使用する場合 (デフォルト)、ユーザーはクラスター全体の特権アクションである find および changeStream も持っている必要があります。
pre-image を使用して before フィールドに入力する場合は、最初に db.createCollection()、create、または collMod を使用してコレクションの changeStreamPreAndPostImages を有効にする必要があります。
最適な Oplog 設定
Debezium MongoDB コネクターは 変更ストリーム を読み取り、レプリカセットの oplog データを取得します。oplog は固定サイズの上限付きコレクションです。そのため、設定された最大サイズを超えると、最も古いエントリーを上書きしはじめます。何らかの理由でコネクターが停止した場合、再起動すると、最後の oplog ストリーム位置からストリーミングを再開しようとします。ただし、最後のストリーム位置が oplog から削除されていた場合、コネクターの snapshot.mode プロパティーに指定された値によっては、コネクターが起動に失敗し、無効な再開トークンのエラー が報告される可能性があります。障害が発生した場合は、Debezium がデータベースからレコードを引き続き取得できるように、新しいコネクターを作成する必要があります。詳細は、snapshot.mode が initial に設定されている場合、コネクターが長時間停止した後に失敗する を参照してください。
Debezium がストリーミングを再開するために必要なオフセット値を oplog が保持するようにするには、次のいずれかの方法を使用できます。
- oplog のサイズを大きくします。通常のワークロードに基づいて、oplog サイズを 1 時間あたりの oplog エントリーのピーク数よりも大きい値に設定します。
- oplog エントリーが保持される最小時間数を増やします (MongoDB 4.4 以降)。この設定は時間ベースであるため、oplog が最大設定サイズに達した場合でも、過去 n 時間のエントリーが確実に利用可能になります。これは一般的に推奨されるオプションですが、容量に近づいている高ワークロードのクラスターの場合は、最大 oplog サイズを指定してください。
oplog エントリーの欠落に関連する障害を防ぐには、レプリケーション動作を報告するメトリクスを追跡し、oplog サイズを最適化して Debezium をサポートすることが重要です。特に、Oplog GB/時間およびレプリケーション Oplog ウィンドウの値を監視する必要があります。レプリケーション oplog ウィンドウの値を超える間隔で Debezium がオフラインになり、Debezium がエントリーを消費できる速度よりも速くプライマリー oplog が増加すると、コネクター障害が発生する可能性があります。
これらのメトリクスを監視する方法については、MongoDB のドキュメント を参照してください。
oplog の最大サイズは、予想される 1 時間当たりの oplog の増加 (Oplog GB/時間) に、Debezium の障害に対処するために必要な時間を掛けた値に設定することを推奨します。
すなわち、以下のようになります。
Oplog GB/Hour X average reaction time to Debezium failure
たとえば、oplog のサイズ制限が 1 GB に設定されていて、oplog が 1 時間あたり 3 GB ずつ増加する場合、oplog エントリーは 1 時間に 3 回消去されます。この期間に Debezium で障害が発生した場合、最後の oplog の位置が削除される可能性があります。
oplog が 3 GB/時間の速度で増加し、Debezium が 2 時間オフラインの場合、oplog サイズを 3GB/時間 x 2 時間、つまり 6 GB に設定します。
2.3.5. Debezium MongoDB コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
以下の方法のいずれかを使用して Debezium MongoDB コネクターをデプロイできます。
Streams for Apache Kafka を使用して、コネクタープラグインを含むイメージを自動的に作成します。
以下は推奨される方法です。
-
Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドします。
この Containerfile デプロイメント方法は非推奨となりました。この方法の手順は、ドキュメントの今後のバージョンで削除される予定です。
2.3.5.1. Streams for Apache Kafka を使用した MongoDB コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターのデプロイで推奨される方法は、Streams for Apache Kafka を使用して、コネクタープラグインを含む Kafka Connect コンテナーイメージを構築することです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnectCR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnectorCR。Streams for Apache Kafka が Kafka Connect Pod を起動した後、KafkaConnectorCR を適用してコネクターを起動します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Apicurio Registry アーティファクトや Debezium スクリプトコンポーネントを追加できます。Streams for Apache Kafka は、Kafka Connect イメージをビルドするときに、指定されたアーティファクトをダウンロードし、それをイメージに組み込みます。
Kafka Connect CR の spec.build.output パラメーターは、生成される KafkaConnect コンテナーイメージを格納する場所を指定します。コンテナーイメージは、quay.io などのコンテナーレジストリー、または OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。ImageStreams は自動的に作成されません。
KafkaConnect リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の Kafka Connect の設定
- 「Streams for Apache Kafka on OpenShift のデプロイと管理」の 新しいコンテナーイメージの自動ビルド
2.3.5.2. Streams for Apache Kafka を使用した Debezium MongoDB コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
以前のバージョンの Streams for Apache Kafka では、OpenShift に Debezium コネクターをデプロイするには、まずコネクター用の Kafka Connect イメージをビルドする必要がありました。OpenShift にコネクターをデプロイするための現在の推奨方法は、Streams for Apache Kafka のビルド設定を使用して、使用する Debezium コネクタープラグインを含む Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中に、Streams for Apache Kafka Operator は、Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。
新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。Streams for Apache Kafka が Kafka Connect イメージをビルドした後、ビルドに含まれるコネクターを起動するための KafkaConnector カスタムリソースを作成します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- Streams for Apache Kafka Operator が実行されている。
- Apache Kafka クラスターが Streams for Apache Kafka on OpenShift のデプロイと管理 に記載されているとおりにデプロイされている。
- Kafka Connect が Streams for Apache Kafka にデプロイされている。
- Red Hat build of Debezium のライセンスを所有している。
-
OpenShift
ocCLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下を行います。
- 新規コンテナーイメージを保存するために、ImageStream リソースをクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams は、デフォルトでは利用できません。ImageStreams の詳細は、OpenShift Container Platform でのイメージストリームの管理 を参照してください。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnectカスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotationsおよびspec.buildプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。
例2.18 Debezium コネクターを含む
KafkaConnectカスタムリソースを定義したdbz-connect.yamlファイル次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。
- Debezium MongoDB コネクターアーカイブ。
- Red Hat build of Apicurio Registry アーカイブApicurio Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Apicurio Registry コンポーネントを追加します。
- Debezium スクリプティング SMT アーカイブと、Debezium コネクターで使用する関連スクリプティングエンジン。SMT アーカイブとスクリプト言語の依存関係はオプションのコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.67 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resourcesアノテーションを"true"に設定して、クラスター Operator がKafkaConnectorリソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.outputは、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.typeの有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestreamです。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.outputを指定する方法の詳細は、Streams for Apache Kafka API リファレンスの スキーマ参照のビルド を参照してください。5
plugins設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインnameと、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.typeの値は、artifacts.urlで指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip、tgz、またはjarです。Debezium コネクターアーカイブは、.zipファイル形式で提供されます。typeの値は、urlフィールドで参照されるファイルのタイプと一致させる必要があります。7
artifacts.urlの値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。Debezium コネクターアーティファクトは Red Hat リポジトリーで入手できます。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。8
(オプション) Apicurio Registry コンポーネントをダウンロードするためのアーティファクト
typeとurlを指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して Red Hat build of Apicurio Registry でイベントのキーと値をシリアル化する場合にのみ、Apicurio Registry アーティファクトを含めます。9
(オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト
typeとurlを指定します。Debezium コンテンツベースルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。10
(オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト
typeとurlを指定します。これは、Debezium スクリプト SMT で必要です。重要Streams for Apache Kafka を使用してコネクタープラグインを Kafka Connect イメージに組み込む場合、必要なスクリプト言語コンポーネントごとに、
artifacts.urlに JAR ファイルのロケーションを指定し、artifacts.typeの値もjarに設定する必要があります。値が無効な場合は、実行時にコネクターが失敗します。スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。
-
groovy -
groovy-jsr223(スクリプトエージェント) -
groovy-json(JSON 文字列を解析するためのモジュール)
別の方法として、Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。
以下のコマンドを入力して、
KafkaConnectビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnectorリソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnectorCR を作成し、mongodb-inventory-connector.yamlとして保存します。例2.19 Debezium コネクターの
KafkaConnectorカスタムリソースを定義するmongodb-inventory-connector.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.68 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレスおよびポート番号。
7
Debezium がデータベースへの接続に使用するアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
8
データベースインスタンスまたはクラスターのトピック接頭辞。
指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクター と統合する場合、この namespace は、関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの namespace でも使用されます。9
コネクターが変更をキャプチャーするコレクションの名前。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc create -n debezium -f mongodb-inventory-connector.yaml
oc create -n debezium -f mongodb-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターは Kafka Connect クラスターに登録され、
KafkaConnectorCR のspec.config.database.dbnameで指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium MongoDB のデプロイメントを確認 する準備が整いました。
2.3.5.3. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium MongoDB コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Debezium MongoDB コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、2 つのカスタムリソース (CR) を作成します。
-
Kafka Connect インスタンスを定義する
KafkaConnectCR。imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR は、Red Hat Streams for Apache Kafka がデプロイされている OpenShift インスタンスに適用します。Streams for Apache Kafka は、Apache Kafka を OpenShift に導入する Operator とイメージを提供します。 -
Debezium MongoDB コネクターを定義する
KafkaConnectorCR。この CR をKafkaConnectCR を適用するのと同じ OpenShift インスタンスに適用します。
前提条件
- MongoDB が稼働し、MongoDB を設定して Debezium コネクターと連携する 手順が完了済みである必要があります。
- Streams for Apache Kafka が OpenShift にデプロイされ、Apache Kafka および Kafka Connect が実行されている。詳細は、Streams for Apache Kafka on OpenShift のデプロイと管理 を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.ioやdocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect の Debezium MongoDB コンテナーを作成します。
registry.redhat.io/amq-streams/kafka-39-rhel9:2.9.0をベースイメージとして使用する Dockerfile を作成します。たとえば、ターミナルウィンドウから、以下のコマンドを入力します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
任意のファイル名を指定できます。
2
Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。
このコマンドは、現在のディレクトリーに
debezium-container-for-mongodb.yamlという名前の Dockerfile を作成します。前のステップで作成した
debezium-container-for-mongodb.yamlDocker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-mongodb:latest .
podman build -t debezium-container-for-mongodb:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-container-for-mongodb:latest .
docker build -t debezium-container-for-mongodb:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは、
debezium-container-for-mongodbという名前のコンテナーイメージを構築します。カスタムイメージを
quay.ioなどのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。podman push <myregistry.io>/debezium-container-for-mongodb:latest
podman push <myregistry.io>/debezium-container-for-mongodb:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-container-for-mongodb:latest
docker push <myregistry.io>/debezium-container-for-mongodb:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow 新しい Debezium MongoDB
KafkaConnectカスタムリソース (CR) を作成します。たとえば、annotationsおよびimageプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
KafkaConnectorリソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotationsは Cluster Operator に示します。2
spec.imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator のSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE変数がオーバーライドされます。以下のコマンドを入力して、
KafkaConnectCR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium MongoDB コネクターインスタンスを設定する
KafkaConnectorカスタムリソースを作成します。通常、コネクターに使用できる設定プロパティーを使用して、
.yamlファイルに Debezium MongoDB コネクターを設定します。コネクター設定で、Debezium に指示を出して MongoDB レプリカセットまたはシャードクラスターのサブセットの変更イベントを生成する場合があります。任意で、不必要なコレクションを除外するプロパティーを設定できます。以下の例では、
192.168.99.100のポート27017で MongoDB レプリカセットrs0に接続する Debezium コネクターを設定し、inventoryで発生する変更をキャプチャーします。inventory-connector-mongodbはレプリカセットの論理名です。
MongoDB
inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表2.69 MongoDB inventory-connector.yaml の例の設定の説明 項目 説明 1
コネクターを Kafka Connect に登録するために使用される名前。
2
MongoDB コネクタークラスの名前。
3
MongoDB レプリカセットへの接続に使用するホストアドレス。
4
MongoDB レプリカセットの 論理名。コネクターが書き込む Kafka トピックの名前、Kafka Connect スキーマ名、および Avro コンバーターが使用される場合に対応する Avro スキーマの namespace のすべてに使用されます。
5
監視するすべてのコレクションのコレクション namespace (例: <dbName>.<collectionName>) と一致する正規表現の任意リスト。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnectorリソースをinventory-connector.yamlファイルに保存した場合は、以下のコマンドを実行します。oc apply -f inventory-connector.yaml
oc apply -f inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは
inventory-connectorを登録し、コネクターはKafkaConnectorCR に定義されているinventoryコレクションに対して実行を開始します。
Debezium MongoDB コネクターに設定できる設定プロパティーの完全リストは、MongoDB コネクター設定プロパティー を参照してください。
結果
コネクターが起動したら、以下のアクションを完了します。
- MongoDB レプリカセットでコレクションの スナップショット 一貫性をもたせて実行する。
- レプリカセットの変更ストリームを読み取る。
- 挿入、更新、削除されたすべてのドキュメントの変更イベントを生成する。
- Kafka トピックに変更イベントレコードをストリーミングする。
2.3.5.4. Debezium MongoDB コネクターが実行していることの確認 リンクのコピーリンクがクリップボードにコピーされました!
コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが Streams for Apache Kafka on OpenShift にデプロイされている。
-
OpenShift
ocCLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnectorリソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnectorを入力します。 - KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-mongodb)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc describe KafkaConnector inventory-connector-mongodb -n debezium
oc describe KafkaConnector inventory-connector-mongodb -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例2.20
KafkaConnectorリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopicを入力します。 -
KafkaTopics リストから確認するトピックの名前をクリックします (例:
inventory-connector-mongodb.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。 - Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを実行します。
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例2.21
KafkaTopicリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-mongodb.inventory.products_on_hand
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-mongodb.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow トピック名を指定する形式は、手順 1 で返された
oc describeコマンドと同じです (例:inventory-connector-mongodb.inventory.addresses)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例2.22 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-mongodb.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mongodb.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mongodb.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mongodb.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-mongodb.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"mongodb","name":"inventory-connector-mongodb","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mongodb-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-mongodb.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mongodb.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mongodb.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"int64","optional":false,"field":"ts_us"},{"type":"int64","optional":false,"field":"ts_ns"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mongodb.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-mongodb.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"3.0.8.Final-redhat-00004","connector":"mongodb","name":"inventory-connector-mongodb","ts_ms":1638985247805,"ts_us":1638985247805000000,"ts_ns":1638985247805000000,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mongodb-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"ts_us":1638985247805102,"ts_ns":1638985247805102588,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記の例では、
payload値は、コネクタースナップショットがテーブルinventory.products_on_handから読み込み ("op" ="r") イベントを生成したことを示しています。product_idレコードの"before"状態はnullであり、レコードに以前の値が存在しないことを示しています。"after"状態は、product_id101を持つ項目のquantityが3であることを示しています。
2.3.5.5. Debezium MongoDB コネクターの設定プロパティーを説明します。 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MongoDB コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。
以下の設定プロパティーは、デフォルト値がない場合は 必須 です。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| false |
このプロパティーを 警告 このプロパティーを使用すると、現在のデフォルトの動作を変更できます。デフォルトの動作が変更され、コネクターが以前のコネクターバージョンによって記録されたオフセットを自動的に無効化して統合できるようになると、このプロパティーは今後のリリースで削除される可能性があります。 | |
| デフォルトなし | コネクターの一意名。同じ名前で再登録を試みると失敗します。(このプロパティーはすべての Kafka Connect コネクターに必要です) | |
| デフォルトなし |
コネクターの Java クラスの名前。MongoDB コネクターには、常に | |
| デフォルトなし |
コネクターが MongoDB レプリカセットに接続するために使用する 接続文字列 を指定します。このプロパティーは、MongoDB コネクターの以前のバージョンで使用できた | |
| デフォルトなし |
このコネクターが監視するコネクターや MongoDB レプリカセット、またはシャードクラスターを識別する一意の名前。このサーバー名は、MongoDB レプリカセットまたはクラスターから生成される永続化されたすべての Kafka トピックの接頭辞になるため、各サーバーは最大 1 つの Debezium コネクターによって監視される必要があります。名前を設定する文字は、英数字、ハイフン、ドット、アンダースコアのみです。論理名は、このコネクターからレコードを受信する Kafka トピックに名前を付ける際の接頭辞として使用されるため、他のすべてのコネクターで一意である必要があります。 警告 このプロパティーの値を変更しないでください。名前の値を変更すると、再起動後に、元のトピックにイベントを発行し続けるのではなく、新しい値に基づいた名前のトピックに後続のイベントを発行します。 | |
| DefaultMongoDbAuthProvider |
io.debezium.connector.mongodb.connection.MongoDbAuthProvider インターフェイスの実装である、完全な Java クラス名。このクラスは、MongoDB 接続の認証情報の設定を処理します (アプリケーションの起動ごとに呼び出されます)。デフォルトの動作では、それぞれのドキュメントに従って | |
| デフォルトなし |
デフォルトの | |
| デフォルトなし |
デフォルトの | |
|
|
デフォルトの | |
|
| コネクターは SSL を使用して MongoDB インスタンスに接続します。 | |
|
|
SSL が有効な場合、接続フェーズ中に厳密なホスト名のチェックを無効にするかどうかを制御する設定です。 | |
| regex | 追加/除外するデータベース名とコレクション名に基づいてイベントを照合するために使用されるモード。このプロパティーを以下の値のいずれかに設定します。
| |
| 空の文字列 |
監視対象のデータベース名に一致する正規表現またはリテラルの、オプションのコンマ区切りリスト。デフォルトでは、すべてのデータベースが監視されます。
データベースの名前を照合するために、Debezium は
このプロパティーを設定に含める場合は、 | |
| 空の文字列 |
監視対象から除外するデータベース名に一致する正規表現またはリテラルのオプションのコンマ区切りリスト。
データベースの名前を照合するために、Debezium は
このプロパティーを設定に含める場合は、 | |
| 空の文字列 |
監視対象の MongoDB コレクションの完全修飾名前空間に一致する正規表現またはリテラルの、オプションのコンマ区切りリスト。デフォルトでは、
名前空間の名前を照合するために、Debezium は
このプロパティーを設定に含める場合は、 | |
| 空の文字列 |
監視から除外する MongoDB コレクションの完全修飾名前空間に一致する正規表現またはリテラルの、オプションのコンマ区切りリスト。
名前空間の名前を照合するために、Debezium は
このプロパティーを設定に含める場合は、 | |
|
|
コネクターが MongoDB サーバーから
| |
|
| コネクターが開く 変更ストリームのスコープ を指定します。このプロパティーを以下のいずれかの値に設定します。
| |
|
コネクターが変更を監視するデータベースを指定します。このプロパティーは、 | ||
| 空の文字列 | 変更イベントメッセージ値から除外される必要があるフィールドの完全修飾名のコンマ区切りリスト (任意)。フィールドの完全修飾名の形式は databaseName.collectionName.fieldName.nestedFieldName で、databaseName および collectionName にはすべての文字と一致するワイルドカード (*) が含まれることがあります。 | |
| 空の文字列 | イベントメッセージ値のフィールドの名前を変更するために使用されるフィールドの完全修飾置換のコンマ区切りリスト (任意)。フィールドの完全修飾置換の形式は databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName で、databaseName および collectionName にはすべての文字と一致するワイルドカード (*) が含まれることがあります。コロン (:) は、フィールドの名前変更マッピングを決定するために使用されます。次のフィールドの置換は、リストの前のフィールド置換の結果に適用されるため、同じパスにある複数のフィールドの名前を変更する場合は、この点に注意してください。 | |
|
|
delete イベントの後に廃棄 (tombstone) イベントが続くかどうかを制御します。 | |
| none |
コネクターで使用されるメッセージコンバータとの互換性のために、スキーマ名をどのように調整するかを指定します。設定可能:
| |
| none |
コネクターで使用されるメッセージコンバータとの互換性のために、フィールド名をどのように調整するかを指定します。設定可能:
詳細は、Avro の命名 を参照してください。 |
以下の 高度な 設定プロパティーには、ほとんどの状況で機能する適切なデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。
| プロパティー | デフォルト | 説明 |
|---|---|---|
|
|
このオプションを MongoDB 変更ストリームコレクションで使用するには、ドキュメントの事前イメージと事後イメージを返す ようにコレクションを設定する必要があります。操作の事前イメージと事後イメージは、操作が行われる前に必要な設定が利用できる場合にのみ使用できます。 このプロパティーを以下のいずれかの値に設定します。
警告
ルックアッププロセスがドキュメントの取得に失敗した場合、イベントペイロードに ルックアップの失敗は、削除操作によってドキュメントが作成直後に削除されたか、シャーディングキーの変更によってドキュメントが別の場所に移動されたために発生することがあります。キーを設定するプロパティーのいずれかを変更すると、シャーディングキーが変更される可能性があります。
| |
|
| このコネクターの反復処理中に処理される必要があるイベントの各バッチの最大サイズを指定する正の整数値。デフォルトは 2048 です。 | |
|
|
ブロッキングキューが保持できるレコードの最大数を指定する正の整数値。Debezium はデータベースからストリームされたイベントを読み込む際、Kafka に書き込む前にブロッキングキューにイベントを配置します。ブロッキングキューは、コネクターが Kafka に書き込むよりも速くメッセージを取り込む場合、または Kafka が利用できなくなった場合に、データベースから変更イベントを読み込むためのバックプレッシャーを提供することができます。コネクターがオフセットを定期的に記録すると、キューに保持されるイベントは無視されます。 | |
|
|
ブロッキングキューの最大容量をバイト単位で指定する長整数値。デフォルトでは、ブロックキューにはボリューム制限は指定されません。キューが使用できるバイト数を指定するには、このプロパティーを正の long 値に設定します。 | |
|
| 各反復処理の実行中に新しい変更イベントが表示されるまでコネクターが待機する時間 (ミリ秒単位) を指定する正の整数値。デフォルトは 500 ミリ秒 (0.5 秒) です。 | |
|
| 最初に失敗した接続試行の後またはプライマリーが利用できない場合に、プライマリーへの再接続を試行するときの最初の遅延を指定する正の整数値。デフォルトは 1 秒 (1000 ミリ秒) です。 | |
|
| 接続試行に繰り返し失敗した後またはプライマリーが利用できない場合に、プライマリーへの再接続を試行するときの最大遅延を指定する正の整数値。デフォルトは 120 秒 (120,000 ミリ秒) です。 | |
|
|
レプリカセットのプライマリーへの接続を試行する場合の最大失敗回数を指定する正の整数値。この値を越えると、例外が発生し、タスクが中止されます。デフォルトは 16。 | |
| デフォルトなし | キーストアファイルの場所を指定する任意の設定。キーストアファイルは、クライアントと MongoDB サーバー間の双方向認証に使用できます。 | |
| デフォルトなし |
キーストアファイルのパスワード。 | |
| デフォルトなし |
キーストアファイルのタイプ。 | |
| デフォルトなし | サーバー証明書検証用のトラストストアファイルの場所。 | |
| デフォルトなし |
トラストストアファイルのパスワード。トラストストアの整合性をチェックし、トラストストアのロックを解除するために使用されます。 | |
| デフォルトなし |
トラストストアファイルのタイプ。 | |
|
|
ハートビートメッセージが送信される頻度を制御します。
このプロパティーを | |
|
| ストリーミング中にコネクターがスキップする操作タイプをコンマで区切ったリスト。以下のタイプの操作をスキップするようにコネクターを設定することができます。
コネクターに操作をスキップしてほしくない場合は、値を | |
| デフォルトなし | スナップショットに含まれるコレクション項目を制御します。このプロパティーはスナップショットにのみ影響します。databaseName.collectionName の形式でコレクション名のコンマ区切りリストを指定します。
指定する各コレクションに対して、別の設定プロパティー ( | |
| デフォルトなし |
コネクターの起動後、スナップショットを取得するまで待機する間隔 (ミリ秒単位)。 | |
| 0 |
コネクターがスナップショットを完了した後、ストリーミングプロセスの開始を遅延する時間をミリ秒単位で指定します。遅延間隔を設定すると、スナップショットが完了した直後で、ストリーミングプロセスの開始前に障害が発生した場合に、コネクターがスナップショットを再開できないようにします。Kafka Connect ワーカーに設定されている | |
|
|
スナップショットの実行中に各コレクションから 1 度に読み取る必要があるドキュメントの最大数を指定します。コネクターは、このサイズの複数のバッチでコレクションの内容を読み取ります。 | |
|
|
スナップショットに含めるスキーマの完全修飾名 ( スキーマの名前を一致させるために、Debezium は指定した正規表現を アンカー 正規表現として適用します。つまり、指定した式は、スキーマ名に存在する可能性のある部分文字列とは一致しない、スキーマの名前文字列全体と照合されます。 | |
|
| レプリカセットでコレクションの最初の同期を実行するために使用されるスレッドの最大数を指定する正の整数値。デフォルトは 1 です。 | |
| Initial | コネクターの開始時にスナップショットを実行する基準を指定します。このプロパティーを以下の値のいずれかに設定します。
| |
|
|
詳細は、トランザクションメタデータ を参照してください。 | |
| 10000 (10 秒) | 再試行可能なエラーが発生した後にコネクターを再起動するまで待機する時間 (ミリ秒単位)。 | |
|
| コネクターが新規、削除、または変更したレプリカセットをポーリングする間隔。 | |
| 10000 (10 秒) | 新しい接続試行が中断されるまでドライバーが待機する時間 (ミリ秒単位)。 | |
| 10000 (10 秒) | クラスターモニターが各サーバーへのアクセスを試行する頻度。 | |
| 0 |
ソケットでの送受信がタイムアウトするまでにかかる時間 (ミリ秒単位)。 | |
| 30000 (30 秒) | ドライバーがタイムアウトし、エラーが出力される前に、サーバーが選択されるまでドライバーが待つ時間 (ミリ秒単位)。 | |
| デフォルトなし | ストリーミングが変更されると、この設定は標準の MongoDB 集約ストリームパイプラインの一部としてストリームイベントを変更する処理を適用します。パイプラインは、データをフィルタリングまたは変換するためのデータベースへの命令で構成される MongoDB 集約パイプラインです。これを使用して、コネクターが消費するデータをカスタマイズできます。このプロパティーの値は、JSON 形式で許可された aggregation pipeline stages の配列である必要があります。これは、コネクターのサポートに使用される内部パイプラインの後に追加されることに注意してください (フィルタリング操作の種類、データベース名、コレクション名など)。 | |
| internal_first | 効果的な MongoDB 集約ストリームパイプラインを構築するために使用される順序。このプロパティーを以下の値のいずれかに設定します。
| |
| fail | 指定された BSON サイズを超えるドキュメントの変更イベントを処理するために使用されるストラテジー。このプロパティーを以下の値のいずれかに設定します。
| |
| 0 | 変更イベントが処理される保存済みドキュメントの最大許容サイズ (バイト単位)。これには、データベース操作の前後両方のサイズが含まれます。具体的には、MongoDB 変更イベントの fullDocument フィールドと fullDocumentBeforeChange フィールドのサイズが制限されます。 | |
|
|
実行タイムアウトの例外を発生させる前に、oplog/change stream カーソルが結果を生成するのを待つ最大期間 (ミリ秒単位) を指定します。値 | |
| デフォルトなし |
シグナルをコネクターへの送信に使用されるデータコレクションの完全修飾名 。コレクションを指定するには、 | |
| source | コネクターに対して有効な信号チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
| デフォルトなし | コネクターに対して有効になっている通知チャネル名のリスト。デフォルトでは、以下のチャネルが利用可能です。
| |
|
|
増分スナップショットチャンク中にコネクターがフェッチしてメモリーに読み込むドキュメントの最大数です。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。 | |
|
|
増分スナップショットによってキャプチャーされ、ストリーミングの再開後に再キャプチャーされる可能性のあるイベントを重複排除するために、コネクターが増分スナップショット中に使用するウォーターマークメカニズムを指定します。
| |
|
|
データ変更、スキーマ変更、トランザクション、ハートビートイベントなどのトピック名を決定するために使用する TopicNamingStrategy クラスの名前。デフォルトは | |
|
|
トピック名の区切り文字を指定します。デフォルトは | |
|
| トピック名を保持するために使用されるサイズ (bounded concurrent hash map)。このキャッシュは、与えられたデータコレクションに対応するトピック名を決定するのに役立つ。 | |
|
|
コネクターがハートビートメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
|
|
コネクターがトランザクションのメタデータメッセージを送信するトピックの名前を制御します。トピック名のパターンは、 | |
|
|
コンテキスト情報を提供するメタデータを追加して、MBean オブジェクト名をカスタマイズするタグを定義します。キーと値のペアのコンマ区切りリストを指定します。各キーは MBean オブジェクト名のタグを表し、対応する値はキーの値を表します。たとえば、 コネクターは、指定されたタグを基本 MBean オブジェクト名に追加します。タグは、メトリクスデータを整理および分類するのに役立ちます。特定のアプリケーションインスタンス、環境、リージョン、バージョンなどを識別するためのタグを定義できます。詳細は、カスタマイズされた MBean 名 を参照してください。 | |
|
|
接続エラーなど、再試行可能なエラーが発生する操作の後に、コネクターがどのように応答するかを指定します。
|
MongoDB コネクターが Kafka シグナリングトピックと対話する方法を設定するための Pass-through プロパティー
Debezium は、コネクターが Kafka シグナルトピックと対話する方法を制御する signal.* プロパティーのセットを提供します。
以下の表は、Kafka signal プロパティーを説明しています。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| <topic.prefix>-signal | コネクターがアドホックシグナルについて監視する Kafka トピックの名前。 注記 トピックの自動作成 が無効になっている場合は、必要なシグナリングトピックを手動で作成する必要があります。シグナルの順序を維持するには、シグナリングトピックが必要です。シグナリングトピックには単一のパーティションが必要です。 | |
| kafka-signal | Kafka コンシューマーによって使用されるグループ ID の名前。 | |
| デフォルトなし | コネクターが Kafka クラスターへの初期接続を確立するために使用するホストとポートのペアのリスト。各ペアは、Debezium Kafka Connect プロセスによって使用される Kafka クラスターを参照します。 | |
|
| コネクターが信号をポーリングするときに待機する最大ミリ秒数を指定する整数値。 |
MongoDB コネクター sink notification チャネルを設定するためのパススループロパティー
次の表では、Debezium sink notification チャネルの設定に使用できるプロパティーについて説明します。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし |
Debezium から通知を受信するトピックの名前。このプロパティーは、有効な通知チャネルの 1 つとして |
2.3.6. Debezium MongoDB コネクターのパフォーマンスの監視 リンクのコピーリンクがクリップボードにコピーされました!
Debezium MongoDB コネクターには、Zookeeper、Kafka、および Kafka Connect にある JMX メトリクスの組み込みサポートに加えて、2 種類のメトリクスがあります。
- スナップショットメトリクス は、スナップショットの実行中にコネクター操作に関する情報を提供します。
- メトリクスのストリーミング は、コネクターが変更をキャプチャーし、変更イベントレコードをストリーミングする際のコネクター操作に関する情報を提供します。
Debezium の監視に関するドキュメント では、JMX を使用してこれらのメトリックを公開する方法の詳細を提供します。
2.3.6.1. MongoDB コネクタースナップショットとストリーミング MBean オブジェクトのカスタマイズされた名前 リンクのコピーリンクがクリップボードにコピーされました!
Debezium コネクターは、コネクターの MBean 名を介してメトリクスを公開します。これらのメトリクスは各コネクターインスタンスに固有であり、コネクターのスナップショット、ストリーミング、およびスキーマ履歴プロセスの動作に関するデータを提供します。
デフォルトでは、正しく設定されたコネクターをデプロイすると、Debezium はさまざまなコネクターメトリクスごとに一意の MBean 名を生成します。コネクタープロセスのメトリクスを表示するには、MBean を監視するように可観測性スタックを設定します。ただし、これらのデフォルトの MBean 名はコネクター設定に依存しており、設定の変更によって MBean 名が変更される場合があります。MBean 名を変更すると、コネクターインスタンスと MBean 間のリンクが切断され、監視アクティビティーが中断されます。このシナリオでは、監視を再開するには、新しい MBean 名を使用するように監視スタックを再設定する必要があります。
MBean 名の変更が原因で監視が中断されないように、カスタムメトリクスタグを設定できます。カスタムメトリクスを設定するには、コネクター設定に custom.metric.tags プロパティーを追加します。このプロパティーは、各キーが MBean オブジェクト名のタグを表し、対応する値がそのタグの値を表すキーと値のペアを受け入れます。たとえば、k1=v1,k2=v2 です。Debezium は、指定されたタグをコネクターの MBean 名に追加します。
コネクターの custom.metric.tags プロパティーを設定した後、指定されたタグに関連付けられたメトリクスを取得するように監視スタックを設定できます。可観測性スタックは、変更可能な MBean 名ではなく、指定されたタグを使用してコネクターを一意に識別します。その後、Debezium が MBean 名の構築方法を再定義したり、コネクター設定の topic.prefix が変更されたりしても、メトリクススクレイプタスクは指定されたタグパターンを使用してコネクターを識別するため、メトリクスの収集は中断されません。
カスタムタグを使用するさらなる利点は、データパイプラインのアーキテクチャーを反映するタグを使用できるため、運用上のニーズに合った方法でメトリクスを整理できることです。たとえば、コネクターアクティビティーのタイプ、アプリケーションコンテキスト、またはデータソースを宣言する値を持つタグを指定できます (例: db1-streaming-for-application-abc)。複数のキーと値のペアを指定すると、指定されたすべてのペアがコネクターの MBean 名に追加されます。
次の例は、タグがデフォルトの MBean 名を変更する方法を示しています。
例2.23 カスタムタグがコネクター MBean 名を変更する方法
デフォルトでは、MongoDB コネクターはストリーミングメトリクスに次の MBean 名を使用します。
debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>
debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>
custom.metric.tags の値を database=salesdb-streaming,table=inventory に設定すると、Debezium は次のカスタム MBean 名を生成します。
debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
2.3.6.2. MongoDB スナップショット作成中の Debezium の監視 リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.mongodb:type=connector-metrics,context=snapshot,server=<topic.prefix>,task=<task.id> です。
スナップショット操作がアクティブでない場合や、最後のコネクターの起動後にスナップショットの作成が発生した場合に、スナップショットメトリクスは公開されません。
次の表に、使用可能なスナップショットメトリクスを示します。
| 属性 | タイプ | 説明 |
|---|---|---|
|
| コネクターが読み取りした最後のスナップショットイベント。 | |
|
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
|
| 前回の開始またはリセット以降にコネクターで確認されたイベントの合計数。 | |
|
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
|
| コネクターによって取得されるテーブルのリスト。 | |
|
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
|
| snapshotter とメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
|
| スナップショットに含まれているテーブルの合計数。 | |
|
| スナップショットによってまだコピーされていないテーブルの数。 | |
|
| スナップショットが起動されたかどうか。 | |
|
| スナップショットが一時停止されたかどうか。 | |
|
| スナップショットが中断されたかどうか。 | |
|
| スナップショットが完了したかどうか。 | |
|
| スナップショットが完了したかどうかに関わらず、これまでスナップショットにかかった時間 (秒単位)。スナップショットが一時停止された時間も含まれます。 | |
|
| スナップショットが一時停止された合計秒数。スナップショットが数回一時停止された場合は、一時停止時間が加算されます。 | |
|
| スナップショットの各テーブルに対してスキャンされる行数が含まれるマップ。テーブルは、処理中に増分がマップに追加されます。スキャンされた 10,000 行ごとに、テーブルの完成時に更新されます。 | |
|
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
|
| キュー内のレコードの現在の容量 (バイト単位)。 |
Debezium MongoDB コネクターは、以下のカスタムスナップショットメトリクスも提供します。
| 属性 | タイプ | 説明 |
|---|---|---|
|
|
| データベースの切断数。 |
2.3.6.3. Debezium MongoDB コネクターレコードストリーミングの監視 リンクのコピーリンクがクリップボードにコピーされました!
MBean は debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>,task=<task.id> です。
以下の表は、利用可能なストリーミングメトリクスのリストです。
| 属性 | タイプ | 説明 |
|---|---|---|
|
| コネクターが読み取られた最後のストリーミングイベント。 | |
|
| コネクターが最新のイベントを読み取りおよび処理してからの経過時間 (ミリ秒単位)。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、ソースデータベースによって報告されたデータ変更イベントの合計数。Debezium が処理するデータ変更ワークロードを表します。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された作成イベントの合計数。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された更新イベントの合計数。 | |
|
| コネクターを最後に起動してから、またはメトリクスをリセットしてから、コネクターによって処理された削除イベントの合計数。 | |
|
| コネクターに設定された include/exclude リストのフィルタリングルールによってフィルターされたイベントの数。 | |
|
| コネクターによって取得されるテーブルのリスト。 | |
|
| ストリーマーとメイン Kafka Connect ループの間でイベントを渡すために使用されるキューの長さ。 | |
|
| ストリーマーとメインの Kafka Connect ループの間でイベントを渡すために使用されるキューの空き容量。 | |
|
| コネクターが現在データベースサーバーに接続されているかどうかを示すフラグ。 | |
|
| 最後の変更イベントのタイムスタンプとそれを処理するコネクターとの間の期間 (ミリ秒単位)。この値には、データベースサーバーとコネクターが実行されているマシンのクロックの差が組み込まれます。 | |
|
| コミットされた処理済みトランザクションの数。 | |
|
| 最後に受信したイベントの位置。 | |
|
| 最後に処理されたトランザクションのトランザクション識別子。 | |
|
|
キューの最大バッファー (バイト単位)。このメトリクスは、 | |
|
| キュー内のレコードの現在の容量 (バイト単位)。 |
Debezium MongoDB コネクターは、以下のカスタムストリーミングメトリクスも提供します。
| 属性 | タイプ | 説明 |
|---|---|---|
|
|
| データベースの切断数。 |
|
|
| プライマリーノードの選出数。 |
2.3.7. Debezium MongoDB コネクターによる障害および問題の処理方法 リンクのコピーリンクがクリップボードにコピーされました!
Debezium は、複数のアップストリームデータベースのすべての変更をキャプチャーする分散システムであり、イベントの見逃しや損失は発生しません。システムが正常に操作している場合や、慎重に管理されている場合は、Debezium は変更イベントごとに 1 度だけ 配信します。
障害が発生しても、システムからイベントがなくなることはありません。ただし、障害から復旧している間は、変更イベントが繰り返えされる可能性があります。このような状態では、Debezium は Kafka と同様に、変更イベントを 少なくとも 1 回 配信します。
以下のトピックでは、Debezium MongoDB コネクターがさまざまな種類の障害および問題を処理する方法を詳説します。
設定および起動エラー
以下の状況では、起動時にコネクターが失敗し、エラーまたは例外がログに記録され、実行が停止されます。
- コネクターの設定が無効である。
- 指定の接続パラメーターを使用してコネクターを MongoDB に接続できない。
失敗したら、コネクターは指数バックオフを使用して再接続を試みます。再接続試行の最大数を設定できます。
このような場合、エラーには問題の詳細が含まれ、場合によっては回避策が提示されます。設定が修正されたり、MongoDB の問題が解決された場合はコネクターを再起動できます。
再接の続試行は、3 つのプロパティーで制御されます。
-
connect.backoff.initial.delay.ms- 初回の再接続を試みるまでの遅延。デフォルトは 1 秒 (1000 ミリ秒) です。 -
connect.backoff.max.delay.ms- 再接続を試行するまでの最大遅延。デフォルトは 120 秒 (120,000 ミリ秒) です。 -
connect.max.attempts- エラーが生成されるまでの最大試行回数。デフォルトは 16 です。
各遅延は、最大遅延以下で、前の遅延の 2 倍です。以下の表は、デフォルト値を指定した場合の、失敗した各接続試行の遅延と、失敗前の合計累積時間を表しています。
| 再接続試行回数 | 試行までの遅延 (秒単位) | 試行までの遅延合計 (分および秒単位) |
|---|---|---|
| 1 | 1 | 00:01 |
| 2 | 2 | 00:03 |
| 3 | 4 | 00:07 |
| 4 | 8 | 00:15 |
| 5 | 16 | 00:31 |
| 6 | 32 | 01:03 |
| 7 | 64 | 02:07 |
| 8 | 120 | 04:07 |
| 9 | 120 | 06:07 |
| 10 | 120 | 08:07 |
| 11 | 120 | 10:07 |
| 12 | 120 | 12:07 |
| 13 | 120 | 14:07 |
| 14 | 120 | 16:07 |
| 15 | 120 | 18:07 |
| 16 | 120 | 20:07 |
コネクターを開始できない - InvalidResumeToken または ChangeStreamHistoryLost
長期間停止されたコネクターは起動に失敗し、次の例外を報告します。
Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog
Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog
上記の例外は、コネクターの再開トークンに対応するエントリーが oplog に存在しないことを示しています。oplog にはコネクターが処理した最後のオフセットが含まれていないため、コネクターはストリーミングを再開できません。
障害から回復するには、次のいずれかのオプションを使用します。
- 障害が発生したコネクターを削除し、同じ設定で異なるコネクター名の新しいコネクターを作成します。
- コネクターを一時停止してからオフセットを削除するか、オフセットトピックを変更します。
再開トークンの欠落に関連する障害を防ぐには、oplog の設定を最適化 します。
Kafka Connect のプロセスは正常に停止する
Kafka Connect が分散モードで実行され、Kafka Connect プロセスが正常に停止された場合は、Kafka Connect はプロセスのシャットダウン前に、すべてのプロセスのコネクタータスクをそのグループの別の Kafka Connect プロセスに移行し、新しいコネクタータスクは、以前のタスクが停止した場所で開始されます。コネクタータスクが正常に停止され、新しいプロセスで再起動されるまでの間、プロセスに短い遅延が発生します。
グループにプロセスが 1 つだけあり、そのプロセスが正常に停止された場合、Kafka Connect はコネクターを停止し、各レプリカセットの最後のオフセットを記録します。再起動時に、レプリカセットタスクは停止した場所で続行されます。
Kafka Connect プロセスのクラッシュ
Kafka Connector プロセスが予期せず停止した場合、最後に処理されたオフセットを記録せずに、実行中のコネクタータスクが終了します。Kafka Connect が分散モードで実行されている場合は、他のプロセスでこれらのコネクタータスクを再起動します。ただし、MongoDB コネクターは以前のプロセスによって 記録 された最後のオフセットから再開します。つまり、新しい代替タスクによって、クラッシュの直前に処理された同じ変更イベントが生成される可能性があります。重複するイベントの数は、オフセットのフラッシュ期間とクラッシュの直前のデータ変更の量によって異なります。
障害からの復旧中に一部のイベントが重複された可能性があるため、コンシューマーは常に一部のイベントが重複している可能性があることを想定する必要があります。Debezium の変更はべき等であるため、一連のイベントは常に同じ状態になります。
Debezium の各変更イベントメッセージには、イベントの生成元に関するソース固有の情報が含まれます。これには、MongoDB イベントの一意なトランザクション識別子 (h) やタイムスタンプ (sec and ord) が含まれます。コンシューマーはこれらの値の他の部分を追跡し、特定のイベントがすでに発生しているかどうかを確認することができます。
snapshot.mode が initial に設定されている場合、コネクターが長時間停止した後に失敗します。
コネクターが正常に停止された場合、ユーザーはレプリカセットメンバーで操作を継続する可能性があります。コネクターがオフライン時に発生する変更は、MongoDB の oplog に記録されます。ほとんどの場合、コネクターは再起動後、oplog のオフセット値を読み取って、各レプリカセットに対して最後にストリーミングした操作を特定し、その時点から変更のストリーミングを再開します。再起動後、コネクターの停止中に発生したデータベース操作は通常どおり Kafka に発行され、しばらくすると、コネクターはデータベースとの遅れを取り戻します。コネクターの操作が遅れを取り戻すのに必要な時間は、Kafka の機能とパフォーマンス、およびデータベースで発生した変更の量によって異なります。
ただし、コネクターが長時間停止したままになっていると、コネクターが非アクティブとなっている間に MongoDB が oplog をパージし、コネクターの最後の位置に関する情報が失われる可能性があります。コネクターの再起動後、コネクターが処理した最後の操作をマークする以前のオフセット値が oplog に含まれていないため、ストリーミングを再開できません。また、snapshot.mode プロパティーが initial に設定されていて、オフセット値が存在しない場合にコネクターは通常どおりにスナップショットを実行できません。この場合、oplog には前のオフセットの値が含まれていないため、不一致が存在しますが、オフセット値はコネクターの内部 Kafka オフセットトピックに存在します。エラーが発生し、コネクターが失敗します。
障害から回復するには、障害が発生したコネクターを削除し、同じ設定で異なるコネクター名で新しいコネクターを作成します。新しいコネクターを開始すると、スナップショットが実行されてデータベースの状態が取り込まれ、ストリーミングが再開されます。
MongoDB による書き込みの損失
失敗した場合の特定の状況では、MongoDB はコミットを失う可能性があり、その場合には MongoDB コネクターでは、失われた変更をキャプチャーできません。たとえば、プライマリーが変更を適用して、その oplog にその変更を記録した後に、突然クラッシュした場合には、セカンダリーノードがコンテンツを読み取るまでに oplog が利用できなくなる可能性があります。その結果、新しいプライマリーノードとして選択されるセカンダリーノードには、oplog からの最新の変更が含まれていない可能性があります。
現時点では、MongoDB のこの副次的な影響を防ぐ方法はありません。