4.2. Debezium MongoDB コネクターの仕組み
コネクターがサポートする MongoDB トポロジーの概要は、アプリケーションを計画するときに役立ちます。
MongoDB コネクターが設定およびデプロイされると、シードアドレスの MongoDB サーバーに接続して起動し、利用可能な各レプリカセットの詳細を判断します。各レプリカセットには独立した独自の oplog があるため、コネクターはレプリカセットごとに個別のタスクの使用を試みます。コネクターは、使用するタスクの最大数を制限でき、十分なタスクが利用できない場合は、コネクターは各タスクに複数のレプリカセットを割り当てます。ただし、タスクはレプリカセットごとに個別のスレッドを使用します。
シャードクラスターに対してコネクターを実行する場合は、レプリカセットの数よりも大きい tasks.max の値を使用します。これにより、コネクターはレプリカセットごとに 1 つのタスクを作成でき、Kafka Connect が利用可能なワーカープロセス全体でタスクを調整、配布、および管理できるようにします。
Debezium MongoDB コネクターの仕組みの詳細は、以下を参照してください。
- 「Debezium コネクターでサポートされる MongoDB トポロジー」
- 「Debezium MongoDB コネクターでレプリカセットおよびシャードクラスターに論理名を使用する方法」
- 「Debezium MongoDB コネクターでのスナップショットの実行方法」
- 「Debezium MongoDB コネクターでの変更イベントレコードのストリーミング方法」
- 「Debezium MongoDB 変更イベントレコードを受信する Kafka トピックのデフォルト名」
- 「イベントキーが Debezium MongoDB コネクターのトピックパーティション設定を制御する方法」
- 「トランザクション境界を表す Debezium MongoDB コネクターによって生成されたイベント」
4.2.1. Debezium コネクターでサポートされる MongoDB トポロジー リンクのコピーリンクがクリップボードにコピーされました!
MongoDB コネクターは以下の MongoDB トポロジーをサポートします。
- MongoDB レプリカセット
Debezium MongoDB コネクターは単一の MongoDB レプリカセットから変更をキャプチャーできます。実稼働のレプリカセットには、少なくとも 3 つのメンバー が必要です。
レプリカセットで MongoDB コネクターを使用するには、コネクターの
mongodb.hostsプロパティーを使用して、1 つ以上のレプリカセットサーバーのアドレスを シードアドレス として提供します。コネクターはこれらのシードを使用してレプリカセットに接続した後、レプリカセットからメンバーの完全セットを取得し、どのメンバーがプライマリーであるかを認識します。コネクターは、プライマリーに接続するタスクを開始し、プライマリーの oplog から変更をキャプチャーします。レプリカセットが新しいプライマリーを選出すると、タスクは自動的に新しいプライマリーに切り替えます。注記MongoDB がプロキシーと面する場合 (Docker on OS X や Windows などのように)、クライアントがレプリカセットに接続し、メンバーを検出すると、MongoDB クライアントはプロキシーを有効なメンバーから除外し、プロキシーを経由せずに直接メンバーに接続しようとし、失敗します。
このような場合、コネクターのオプションの
mongodb.members.auto.discover設定プロパティーをfalseに設定して、コネクターにメンバーシップの検出を見送るように指示し、代わりに最初のシードアドレス (mongodb.hostsプロパティーによって指定) をプライマリーノードとして使用するよう指示します。これは機能する可能性がありますが、選出が行われるときに問題が発生します。
- MongoDB のシャードクラスター
MongoDB のシャードクラスター は以下で設定されます。
- レプリカセットとしてデプロイされる 1 つ以上のシャード。
- クラスターの設定サーバーとして動作する個別のレプリカセット。
クライアントが接続し、要求を適切なシャードにルーティングする 1 つ以上の ルーター (
mongosとも呼ばれます)。シャードクラスターで MongoDB コネクターを使用するには、コネクターを設定サーバーレプリカセットのホストアドレスで設定します。コネクターがこのレプリカセットに接続すると、シャードクラスターの設定サーバーとして動作していることを検出し、クラスターでシャードとして使用される各レプリカセットに関する情報を検出した後、各レプリカセットから変更をキャプチャーするために別のタスクを起動します。新しいシャードがクラスターに追加される場合または既存のシャードが削除される場合、コネクターはそのタスクを自動的に調整します。
- MongoDB スタンドアロンサーバー
- スタンドアロンサーバーには oplog がないため、MongoDB コネクターはスタンドアロン MongoDB サーバーの変更を監視できません。スタンドアロンサーバーが 1 つのメンバーを持つレプリカセットに変換されると、コネクターが動作します。
MongoDB は、実稼働でのスタンドアロンサーバーの実行を推奨しません。詳細は MariaDB のドキュメント を参照してください。
4.2.2. Debezium MongoDB コネクターでレプリカセットおよびシャードクラスターに論理名を使用する方法 リンクのコピーリンクがクリップボードにコピーされました!
コネクター設定プロパティー topic.prefix は、MongoDB レプリカセットまたはシャードされたクラスターの 論理名 として提供されます。コネクターは論理名をさまざまな方法で使用します。すべてのトピック名の接頭辞として、各レプリカセットの変更ストリームの位置を記録する際に一意の識別子として使用されます。
各 MongoDB コネクターに、ソース MongoDB システムを意味する一意の論理名を命名する必要があります。論理名は、アルファベットまたはアンダースコアで始まり、残りの文字を英数字またはアンダースコアとすることが推奨されます。
4.2.3. Debezium MongoDB コネクターでのスナップショットの実行方法 リンクのコピーリンクがクリップボードにコピーされました!
タスクがレプリカセットを使用して起動すると、コネクターの論理名とレプリカセット名を使用して、コネクターが変更の読み取りを停止した位置を示す オフセット を検出します。オフセットが検出され、oplog に存在する場合、タスクは記録されたオフセットの位置から即座に ストリームの変更 を続行します。
ただし、オフセットが見つからない場合や、oplog にその位置が含まれなくなった場合、タスクは スナップショット を実行してレプリカセットの内容の現在の状態を取得する必要があります。このプロセスは、oplog の現在の位置を記録して開始され、オフセット (スナップショットが開始されたことを示すフラグとともに) として記録します。その後、タスクは各コレクションをコピーし、できるだけ多くのスレッドを生成し (snapshot.max.threads 設定プロパティーの値まで)、この作業を並行して行います。コネクターは、確認した各ドキュメントの個別の 読み取りイベント を記録します。読み取りイベントにはオブジェクトの識別子、オブジェクトの完全な状態、およびオブジェクトが見つかった MongoDB レプリカセットの ソース 情報が含まれます。ソース情報には、スナップショット中にイベントが生成されたことを示すフラグも含まれます。
このスナップショットは、コネクターのフィルターと一致するすべてのコレクションがコピーされるまで継続されます。タスクのスナップショットが完了する前にコネクターが停止した場合は、コネクターを再起動すると、再びスナップショットを開始します。
コネクターがレプリカセットのスナップショットを実行している間、タスクの再割り当てと再設定を回避します。コネクターは、スナップショットの進捗を報告するログメッセージを生成します。最大限の制御を行うために、コネクターごとに個別の Kafka Connect クラスターを実行します。
4.2.3.1. アドホックスナップショット リンクのコピーリンクがクリップボードにコピーされました!
アドホックスナップショットは、Debezium MongoDB コネクターのテクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
デフォルトでは、コネクターは初回スナップショット操作の開始後にのみ実行されます。通常の状況では、この最初のスナップショットが作成されると、コネクターではスナップショットプロセスは繰り返し処理されません。コネクターがキャプチャーする今後の変更イベントデータはストリーミングプロセス経由でのみ行われます。
ただし、場合によっては、最初のスナップショット中にコネクターを取得したデータが古くなったり、失われたり、または不完全となったり可能性があります。収集データを再キャプチャするメカニズムを提供するために、Debezium はアドホックスナップショットを実行するオプションを備えています。データベースで以下が変更されたことで、アドホックスナップショットが実行される場合があります。
- コネクター設定が変更され、異なるコレクションのセットをキャプチャーします。
- Kafka トピックを削除して、再構築する必要があります。
- 設定エラーや他の問題が原因で、データの破損が発生します。
いわゆる アドホックスナップショット を開始することで、以前にスナップショットをキャプチャしたコレクションに対してスナップショットを再実行することができます。アドホックスナップショットでは、シグナリングコレクション を使用する必要があります。シグナルリクエストを Debezium シグナルコレクションに送信して、アドホックスナップショットを開始します。
既存のコレクションのアドホックスナップショットを開始すると、コネクターはコレクションにすでに存在するトピックにコンテンツを追加します。既存のトピックが削除された場合には、トピックの自動作成 が有効になっているのであれば、Debezium は自動的にトピックを作成できます。
アドホックのスナップショットシグナルは、スナップショットに追加するコレクションを指定します。スナップショットは、データベースの内容全体をキャプチャーしたり、データベース内のコレクションのサブセットのみをキャプチャーしたりできます。また、スナップショットは、データベース内のコレクションの内容のサブセットをキャプチャーできます。
キャプチャするコレクションは、シグナリングコレクションに execute-snapshot メッセージを送信することで指定します。execute-snapshot シグナルのタイプを incremental に設定し、スナップショットに含めるコレクション名を次の表に示すように指定します。
| フィールド | デフォルト | 値 |
|---|---|---|
|
|
|
実行するスナップショットのタイプを指定します。 |
|
| 該当なし |
スナップショットされるコレクションの完全修飾名にマッチする正規表現を含む配列。 |
|
| 該当なし | コレクションの内容のサブセットを取得するために、コレクションの列に基づいて条件を指定するオプションの文字列。 |
アドホックスナップショットのトリガー
execute-snapshot シグナルタイプのエントリーをシグナルコレクションに追加して、アドホックスナップショットを開始します。コネクターがメッセージを処理した後に、スナップショット操作を開始します。スナップショットプロセスは、最初と最後のプライマリーキーの値を読み取り、これらの値を各コレクションの開始ポイントおよびエンドポイントとして使用します。コレクションのエントリー数と設定されたチャンクサイズに基づいて、Debezium はコレクションをチャンクに分割し、チャンクごとに 1 度に 1 つずつスナップショットを順番に作成していきます。
現在、execute-snapshot アクションタイプは 増分スナップショット のみをトリガーします。詳細は、スナップショットの増分を参照してください。
4.2.3.2. 増分スナップショット リンクのコピーリンクがクリップボードにコピーされました!
増分ナップショットは、Debezium MongoDB コネクターのテクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。
スナップショットを柔軟に管理するため、Debezium には 増分スナップショット と呼ばれる補助スナップショットメカニズムが含まれています。増分スナップショットは、Debezium コネクターにシグナルを送信するための Debezium メカニズムに依存します。
増分スナップショットでは、最初のスナップショットのように、データベースの完全な状態を一度にすべてキャプチャーする代わりに、一連の設定可能なチャンクで各コレクションを段階的にキャプチャーします。スナップショットがキャプチャーするコレクションと、各チャンクのサイズ を指定できます。チャンクのサイズにより、データベース上の各フェッチ操作中にスナップショットで収集される行数が決まります。増分スナップショットのデフォルトのチャンクサイズは 1 KB です。
Debezium は、増分スナップショットの進行に伴い、その進捗を追跡するために透かしを使用し、キャプチャした各コレクション行の記録を保持します。この段階的なアプローチでは、標準の初期スナップショットプロセスと比較して、以下の利点があります。
- スナップショットが完了するまで、ストリーミングストリーミングを延期する代わりに、ストリームしたデータキャプチャーと並行して増分スナップショットを実行できます。コネクターはスナップショットプロセス全体で変更ログからのほぼリアルタイムイベントをキャプチャーし続け、他の操作はブロックしません。
- 増分スナップショットの進捗が中断された場合は、データを失うことなく再開できます。プロセス再開後、スナップショットは、最初からコレクションを再キャプチャするのではなく、停止したポイントから開始されます。
-
いつでも増分スナップショットを実行し、必要に応じてプロセスを繰り返してデータベースの更新に適合できます。例えば、コネクターの設定を変更してコレクションをその
collection.include.listプロパティーにコレクションを追加します。
増分スナップショットプロセス
増分スナップショットを実行する場合には、Debezium は各コレクションをプライマリーキー別に分類して、設定されたチャンクサイズ に基づいてコレクションをチャンクに分割します。チャンクごとに作業し、チャンク内の各コレクション行をキャプチャします。キャプチャーする行ごとに、スナップショットは READ イベントを出力します。そのイベントは、対象となるチャンクのスナップショットを開始する時の行の値を表します。
スナップショットが進むと、他のプロセスがデータベースにアクセスし続け、コレクションのレコードが変更される可能性があります。このような変更を反映させるように、通常通りに INSERT、UPDATE、DELETE 操作がトランザクションログにコミットされます。同様に、継続中の Debezium ストリーミングプロセスは、これらの変更イベントを検出し、対応する変更イベントレコードを Kafka に出力します。
Debezium を使用してプライマリーキーが同じレコード間での競合を解決する方法
場合によっては、ストリーミングプロセスが出力する UPDATE または DELETE イベントを順番に受信できます。つまり、スナップショットがその行の READ イベントを含むチャンクをキャプチャする前に、ストリーミングプロセスがコレクション行を変更するイベントを発行する可能性があります。スナップショットが最終的に対象の行にあった READ イベントを出力すると、その値はすでに置き換えられています。Debezium は、シーケンスが到達する増分スナップショットイベントが正しい論理順序で処理されるように、競合を解決するためにバッファースキームを使用します。スナップショットのイベント間で競合が発生し、ストリームされたイベントが解決されてからでないと、Debezium はイベントのレコードを Kafka に送信しません。
スナップショットウィンドウ
遅れて到着した READ イベントと、同じコレクション行を変更するストリームイベント間の衝突を解決するために、Debezium はいわゆる スナップショットウィンドウ を採用しています。スナップショットウィンドウは、増分スナップショットが指定されたコレクションチャンクのデータをキャプチャする間隔を区切ります。チャンクのスナップショットウィンドウを開く前に、Debezium は通常の動作に従い、トランザクションログから直接ターゲットの Kafka トピックにイベントをダウンストリームに出力します。ただし、特定のチャンクのスナップショットが開放された瞬間から終了するまで、Debezium は重複除去のステップを実行して、プライマリーキーが同じイベント間での競合を解決します。
データコレクションごとに、Debezium は 2 種類のイベントを出力し、それらの両方のレコードを単一の宛先 Kafka トピックに保存します。テーブルから直接キャプチャーするスナップショットレコードは、READ 操作として出力されます。その間、ユーザーはデータコレクションのレコードの更新を続け、各コミットを反映するようにトランザクションログが更新されるので、Debezium は変更ごとに UPDATE または DELETE 操作を出力します。
スナップショットウィンドウが開放され、Debezium がスナップショットチャンクの処理を開始すると、スナップショットレコードをメモリーバッファーに提供します。スナップショットウィンドウ中に、バッファー内の READ イベントのプライマリーキーは、受信ストリームイベントのプライマリーキーと比較されます。一致するものが見つからない場合、ストリーミングされたイベントレコードが Kafka に直接送信されます。Debezium が一致を検出すると、バッファーされた READ イベントを破棄し、ストリーミングされたレコードを宛先トピックに書き込みます。これは、ストリーミングされたイベントが静的スナップショットイベントよりも論理的に優先されるためです。チャンクのスナップショットウィンドウが終了すると、バッファーに含まれるのは、関連するトランザクションログイベントが存在しない READ イベントのみです。Debezium は、これらの残りの READ イベントをコレクションの Kafka トピックに発行します。
コネクターは各スナップショットチャンクにプロセスを繰り返します。
増分スナップショットでは、プライマリーキーが安定した順序で並べる必要があります。ただし、文字列 はエンコーディングと特殊文字が予期しない動作につながる可能性があるため、順序が安定しない場合があります (Mongo sort String)。増分スナップショットを実行する場合は、プライマリーキーに他のタイプを使用することを検討してください。
増分スナップショットは、現在、単一のレプリカセットのデプロイメントに対してのみサポートされています。
4.2.3.3. 増分スナップショットのトリガー リンクのコピーリンクがクリップボードにコピーされました!
現在、増分スナップショットを開始する唯一の方法は、ソースのデータベース上のシグナリングコレクションに アドホックなスナップショットシグナル を送信することです。
MongoDB の insert() メソッドを使用して、シグナルコレクションにシグナルを送信します。
Debezium は、信号コレクションの変化を検出した後、信号を読み取り、要求されたスナップショット操作を実行します。
送信するクエリーは、スナップショットに含めるコレクションを指定し、オプションでスナップショット操作の種類を指定します。現在、スナップショット操作で唯一の有効なオプションはデフォルト値の incremental だけです。
スナップショットに含めるテーブルを指定するには、テーブルをリストアップした data-collections 配列、またはテーブルのマッチングに使用する正規表現の配列を指定します。たとえば、{"data-collections": ["public.Collection1", "public.Collection2"]}
増分スナップショットシグナルの data-collections アレイにはデフォルト値がありません。data-collections アレイが空である場合には、アクションが不要であり、スナップショットを実行しないことが、Debezium で検出されます。
スナップショットに含めるコレクションの名前に、データベース、スキーマ、またはテーブルの名前にドット (.) が含まれている場合、そのコレクションを data-collections 配列に追加するには、名前の各パートを二重引用符でエスケープする必要があります。
たとえば、以下のようなテーブルを含めるには public スキーマに存在し、その名前が MyCollection のテーブルを含めるには、"public"."MyCollection" の形式を使用します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在します。
-
シグナルデータコレクションは
signal.data.collectionプロパティーで指定されます。
手順
シグナリングコレクションにスナップショットシグナルドキュメントを挿入します。
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コマンドの
id、type、およびdataパラメーターの値は、シグナリングコレクションのフィールド に対応します。以下の表では、この例のパラメーターについて説明しています。
Expand 表4.2 シグナリングコレクションに増分スナップショットシグナルを送信するための MongoDB insert() コマンドのフィールドの説明 項目 値 説明 1
db.debeziumSignalソースデータベース上のシグナリングコレクションの完全修飾名を指定します。
2
null
idパラメーターは、シグナルリクエストのID識別子として割り当てられる任意の文字列を指定します。
前述の例の insert メソッドは、オプションの_idパラメーターの使用が省略されています。ドキュメントはパラメーターの値を明示的に割り当てないため、MongoDB が自動的にドキュメントに割り当てる任意の ID がシグナルリクエストのID 識別子になります。
この文字列を使用して、シグナリングコレクションのエントリーにロギングメッセージを識別します。Debezium はこの識別子文字列を使用しません。代わりに、スナップショット作成中に、Debezium は独自のID文字列をウォーターマークシグナルとして生成します。3
execute-snapshottypeパラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collectionsシグナルの
dataフィールドの必須コンポーネントで、スナップショットに含めるコレクション名の配列またはコレクション名と一致する正規表現を指定します。
この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。5
incremental実行するスナップショット操作の種類指定するシグナルの
dataフィールドの任意のtypeコンポーネント。
現在、唯一の有効なオプションはデフォルト値incrementalだけです。
値を指定しない場合には、コネクターは増分スナップショットを実行します。
以下の例は、コネクターによってキャプチャーされる増分スナップショットイベントの JSON を示しています。
例: 増分スナップショットイベントメッセージ
| 項目 | フィールド名 | 説明 |
|---|---|---|
| 1 |
|
実行するスナップショット操作タイプを指定します。 |
| 2 |
|
イベントタイプを指定します。 |
4.2.3.4. 増分スナップショットの停止 リンクのコピーリンクがクリップボードにコピーされました!
ソースデータベースのコレクションにシグナルを送信することで、増分スナップショットを停止することもできます。シグナリングコレクションにドキュメントを挿入して、スナップショット停止のシグナルを送信します。Debezium はシグナルコレクションの変更を検出した後、シグナルを読み、増分スナップショット操作が進行中であればそれを停止します。
送信するクエリーは、incremental のスナップショット操作を指定し、任意で、削除する実行中のスナップショットのコレクションを指定します。
前提条件
- ソースデータベースにシグナリングデータコレクションが存在します。
-
シグナルデータコレクションは
signal.data.collectionプロパティーで指定されます。
手順
シグナリングコレクションにスナップショット停止のシグナルドキュメントを挿入します。
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow signal コマンドの
id、type、およびdataパラメーターの値は、シグナリングコレクションのフィールド に対応します。以下の表では、この例のパラメーターについて説明しています。
Expand 表4.3 シグナリングコレクションに増分スナップショットの停止ドキュメントを送信するための insert コマンドのフィールドの説明 項目 値 説明 1
db.debeziumSignalソースデータベース上のシグナリングコレクションの完全修飾名を指定します。
2
null
前述の例の insert メソッドは、オプションの
_idパラメーターの使用が省略されています。ドキュメントはパラメーターの値を明示的に割り当てないため、MongoDB が自動的にドキュメントに割り当てる任意の ID がシグナルリクエストのID 識別子になります。
この文字列を使用して、シグナリングコレクションのエントリーにロギングメッセージを識別します。Debezium はこの識別子文字列を使用しません。3
stop-snapshottypeパラメーターを指定し、シグナルがトリガーする操作を指定します。
4
data-collectionsシグナルの
dataフィールドの任意コンポーネントで、スナップショットから削除するコレクション名の配列またはコレクション名と一致する正規表現を指定します。
この配列は、完全修飾名でテーブルをマッチさせる正規表現をリストアップします。signal.data.collection設定プロパティーでコネクターのシグナリングテーブル名を指定するのと同じ形式を使用します。dataフィールドのこのコンポーネントを省略すると、シグナルは進行中の増分スナップショット全体を停止します。5
incremental停止させるスナップショット操作の種類を指定する信号の
dataフィールドの必須コンポーネント。
現在、有効な唯一のオプションはincrementalです。typeの値を指定しない場合、シグナルは増分スナップショットの停止に失敗します。
4.2.4. Debezium MongoDB コネクターでの変更イベントレコードのストリーミング方法 リンクのコピーリンクがクリップボードにコピーされました!
レプリカセットレコードのコネクタータスクがオフセットを取得すると、オフセットを使用して変更のストリーミングを開始する oplog の位置を判断します。その後、タスクは (設定によって) レプリカセットのプライマリーノードに接続するか、レプリカセット全体の変更ストリームに接続し、その位置から変更のストリーミングを開始します。すべての作成、挿入、および削除操作を処理して Debezium の 変更イベント に変換します。各変更イベントには操作が検出された oplog の位置が含まれ、コネクターはこれを最新のオフセットとして定期的に記録します。オフセットが記録される間隔は、Kafka Connect ワーカー設定プロパティーである offset.flush.interval.ms によって制御されます。
コネクターが正常に停止されると、処理された最後のオフセットが記録され、再起動時にコネクターは停止した場所から続行されます。しかし、コネクターのタスクが予期せず終了した場合、最後にオフセットが記録された後、最後のオフセットが記録される前に、タスクによってイベントが処理および生成されることがあります。再起動時に、コネクターは最後に 記録された オフセットから開始し、クラッシュの前に生成された同じイベントを生成する可能性があります。
Kafka パイプライン内のすべてのコンポーネントが正常に動作している場合、Kafka コンシューマーはすべてのメッセージを 1 度だけ 受信します。ただし、問題が発生した場合、Kafka はコンシューマーが 少なくとも 1 回 のみすべてのメッセージを受信することを保証できます。予期しない結果を回避するには、コンシューマーは重複メッセージを処理できる必要があります。
前述のように、コネクタータスクは常にレプリカセットのプライマリーノードを使用して oplog からの変更をストリーミングし、コネクターが可能な限り最新の操作を確認できるようにし、代わりにセカンダリーが使用された場合よりも短いレイテンシーで変更をキャプチャーできるようにします。レプリカセットが新しいプライマリーを選出すると、コネクターは即座に変更のストリーミングを停止し、新しいプライマリーに接続して、同じ場所にある新しいプライマリーノードから変更のストリーミングを開始します。同様に、コネクターとレプリカセットメンバーとの通信で問題が発生した場合は、レプリカセットが過剰にならないように指数バックオフを使用して再接続を試みます。接続の確立後、停止した場所から変更のストリーミングを続行します。これにより、コネクターはレプリカセットメンバーシップの変更を動的に調整でき、通信障害を自動的に処理できます。
要約すると、MongoDB コネクターはほとんどの状況で実行を継続します。通信の問題により、問題が解決されるまでコネクターが待機する可能性があります。
4.2.5. Debezium 変更イベントの before フィールドに入力するための MongoDB サポート リンクのコピーリンクがクリップボードにコピーされました!
MongoDB 6.0 以降では、変更ストリームを設定して、ドキュメントのイメージ前の状態を出力し、MongoDB 変更イベントの before フィールドにデータを投入できます。MongoDB で事前のイメージを使用できるようにするには、db.createCollection()、create、または collMod を使用して、コレクションの changeStreamPreAndPostImages を設定する必要があります。Debezium MongoDB が変更イベントに事前イメージを追加できるようにするには、コネクターの capture.mode を *_with_pre_image オプションのいずれかに設定します。
MongoDB 変更イベントのサイズは 16 メガバイトに制限されます。したがって、事前イメージを使用すると、このしきい値を超過し、障害が発生する可能性があります。変更ストリームの制限を超えないようにする方法は、MongoDB のドキュメント を参照してください。
4.2.6. Debezium MongoDB 変更イベントレコードを受信する Kafka トピックのデフォルト名 リンクのコピーリンクがクリップボードにコピーされました!
MongoDB コネクターは、各コレクションのドキュメントに対するすべての挿入、更新、および削除操作のイベントを 1 つの Kafka トピックに書き込みます。Kafka トピックの名前は常に logicalName.databaseName.collectionName の形式を取ります。logicalName は、topic.prefix 設定プロパティーで指定されるコネクターの 論理名、databaseName は操作が発生したデータベースの名前、collectionName は影響を受けるドキュメントが存在する MongoDB コレクションの名前です。
たとえば、products, products_on_hand, customers, and orders の 4 つのコレクションで設定される inventory データベースを含む MongoDB レプリカセットについて考えてみましょう。コネクターが監視するこのデータベースの論理名が fulfillment である場合、コネクターは以下の 4 つの Kafka トピックでイベントを生成します。
-
fulfillment.inventory.products -
fulfillment.inventory.products_on_hand -
fulfillment.inventory.customers -
fulfillment.inventory.orders
トピック名には、レプリカセット名やシャード名が含まれないことに注意してください。その結果、シャード化コレクションへの変更 (各シャードにコレクションのドキュメントのサブセットが含まれる) はすべて同じ Kafka トピックに移動します。
Kafka を設定して、必要に応じてトピックを 自動作成 できます。そうでない場合は、Kafka 管理ツールを使用してコネクターを起動する前にトピックを作成する必要があります。
4.2.7. イベントキーが Debezium MongoDB コネクターのトピックパーティション設定を制御する方法 リンクのコピーリンクがクリップボードにコピーされました!
MongoDB コネクターは、イベントのトピックパーティションを明示的に決定しません。代わりに、Kafka はイベントキーに基づいてトピックのパーティションを作成する方法を決定できます。Kafka Connect ワーカー設定に Partitioner 実装の名前を定義することで、Kafka のパーティショニングロジックを変更できます。
Kafka は、1 つのトピックパーティションに書き込まれたイベントのみ、合計順序を維持します。キーでイベントのパーティションを行うと、同じキーを持つすべてのイベントは常に同じパーティションに移動します。これにより、特定のドキュメントのすべてのイベントが常に完全に順序付けされます。
4.2.8. トランザクション境界を表す Debezium MongoDB コネクターによって生成されたイベント リンクのコピーリンクがクリップボードにコピーされました!
Debezium は、トランザクションメタデータ境界を表すイベントを生成でき、データイベントメッセージを補完できます。
Debezium は、コネクターのデプロイ後に発生するトランザクションに対してのみメタデータを登録し、受信します。コネクターをデプロイする前に発生するトランザクションのメタデータは利用できません。
Debezium はすべてのトランザクションの BEGIN および END に対して、以下のフィールドが含まれるイベントを生成します。
status-
BEGINまたはEND id- 一意のトランザクション識別子の文字列表現。
event_count(ENDイベント用)- トランザクションによって出力されるイベントの合計数。
data_collections(ENDイベント用)-
指定のデータコレクションからの変更によって出力されたイベントの数を提供する
data_collectionとevent_countのペアの配列。
以下の例では、一般的なメッセージを示します。
topic.transaction オプションで上書きされない限り、トランザクションイベントは <topic.prefix>.transaction という名前のトピックに書き込まれます。
変更データイベントのエンリッチメント
トランザクションメタデータを有効にすると、データメッセージ Envelope は新しい transaction フィールドでエンリッチされます。このフィールドは、複合フィールドの形式ですべてのイベントに関する情報を提供します。
id- 一意のトランザクション識別子の文字列表現。
total_order- トランザクションによって生成されたすべてのイベントを対象とするイベントの絶対位置。
data_collection_order- トランザクションによって出力されたすべてのイベントを対象とするイベントのデータコレクションごとの位置。
以下は、メッセージの内容の例です。