3.6. Debezium Db2 コネクターのデプロイ
以下の方法のいずれかを使用して Debezium Db2 コネクターをデプロイできます。
Debezium Db2 コネクターでは、Db2 データベースに接続するために Db2JDBC ドライバーが必要です。ドライバーの入手方法については、Db2JDBC ドライバーの入手を参照してください。
3.6.1. Db2 JDBC ドライバーの取得 リンクのコピーリンクがクリップボードにコピーされました!
ライセンス要件により、Db2 JDBC ドライバーファイルは Debezium Db2 コネクターアーカイブに含まれていません。使用するデプロイメント方法に関係なく、デプロイメントを完了するためにドライバーファイルをダウンロードする必要があります。
以下の手順では、ドライバーを取得してお使いの環境で使用する方法を説明します。
手順
ブラウザーから、IBM サポートサイトに ナビゲートし、ご使用のバージョンの Db2 に一致する JDBC ドライバーをダウンロードします。
-
Dockerfile を使用してコネクターを構築する場合は、ダウンロードしたファイルを Debezium Db2 コネクターのファイルがあるディレクトリー、たとえば
<kafka_home>/libsディレクトリーにコピーします。 AMQ ストリームを使用してコネクターを Kafka Connect イメージに追加する場合:
- このドライバーを Maven リポジトリーまたは OpenShift クラスターで利用できる別の HTTP サーバーにデプロイします。
-
アーティファクト URL を
KafkaConnectカスタムリソースに追加します。
-
Dockerfile を使用してコネクターを構築する場合は、ダウンロードしたファイルを Debezium Db2 コネクターのファイルがあるディレクトリー、たとえば
KafkaConnector リソースを適用してコネクターをデプロイすると、コネクターは指定されたドライバーを使用するよう設定されます。
3.6.2. AMQ Streams を使用した Db2 コネクターデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
Debezium 1.7 以降、Debezium コネクターのデプロイに推奨される方法は、AMQ Streams を使用してコネクタープラグインが含まれる Kafka Connect コンテナーイメージをビルドすることです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnectCR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnectorCR。AMQStreams が Kafka Connect Pod を開始し、KafkaConnectorCR を適用してコネクターを開始します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。AMQ Streams が Kafka Connect イメージをビルドすると、指定のアーティファクトをダウンロードし、イメージに組み込みます。
Kafka Connect CR の spec.build.output パラメーターは、生成される KafkaConnectコンテナーイメージを格納する場所を指定します。コンテナーイメージは Docker レジストリーまたは OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。イメージストリームは自動的に作成されません。
KafkaConnect リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- AMQ Streams on OpenShift の使用のKafka Connect の設定を参照してください。
- AMQ Streams を使用した新しいコンテナーイメージの自動作成と OpenShift での AMQ Streams のアップグレード
3.6.3. AMQ Streams を使用した Debezium Db2 コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
以前のバージョンの AMQ Streams では、OpenShift に Debezium コネクターをデプロイするには、最初にコネクター用の Kafka Connect イメージをビルドする必要がありました。コネクターを OpenShift にデプロイする場合に現在推奨される方法は、AMQ Streams でビルド設定を使用して、使用する Debezium コネクタープラグインが含まれる Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。新規に作成されたコンテナーは .spec.build.output で指定されたコンテナーレジストリーにプッシュされ、Kafka Connect Pod のデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドしたら、KafkaConnector カスタムリソースを作成し、ビルドに含まれるコネクターを起動します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- AMQ Streams Operator が稼働している。
- Kafka クラスターは、Apache Open Shift での AMQ ストリームのデプロイとアップグレードに記載されているようにデプロイされます。
- Red Hat Integration ライセンスがある。
- Kafka Connect is deployed on AMQ Streams。
-
OpenShift
ocCLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下が必要です。
- ImageStream リソースがクラスターにデプロイされている。クラスターの ImageStream を明示的に作成している。ImageStreams はデフォルトでは利用できません。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnectカスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、以下の例のようにmetadata.annotationsおよびspec.buildプロパティーを指定するKafkaConnectCR を作成します。dbz-connect.yamlなどの名前でファイルを保存します。例3.1 Debezium コネクターを含む
KafkaConnectカスタムリソースを定義するdbz-connect.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表3.13 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resourcesアノテーションを"true"に設定して、クラスター Operator がKafkaConnectorリソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.outputは、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.typeの有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestreamです。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.outputの指定に関する詳細は、AMQ Streams Build スキーマ参照 のドキュメントを参照 してください。5
plugins設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインnameと、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.typeの値は、artifacts.urlで指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip、tgz、またはjarです。Debezium コネクターアーカイブは、.zipファイル形式で提供されます。JDBC ドライバーファイルは.jar形式です。typeの値は、urlフィールドで参照されるファイルのタイプと一致する必要があります。7
artifacts.urlの値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。以下のコマンドを入力して、
KafkaConnectビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnectorリソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnectorCR を作成し、db2-inventory-connector.yamlとして保存します。例3.2 Debezium コネクターの
KafkaConnectorカスタムリソースを定義するdb2-inventory-connector.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表3.14 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースに接続するユーザーアカウントの名前。
8
データベースユーザーアカウントのパスワード
9
変更をキャプチャーするデータベースの名前。
10
データベースインスタンスまたはクラスターの論理名。
指定の名前は英数字またはアンダースコアからのみ形成する必要があります。
論理名は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクターと統合する場合、名前空間は関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの名前空間でも使用されます。11
コネクターが変更イベントをキャプチャーするテーブルのリスト。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc create -n debezium -f {context}-inventory-connector.yamloc create -n debezium -f {context}-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターは Kafka Connect クラスターに登録され、
KafkaConnectorCR のspec.config.database.dbnameで指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium Db2 のデプロイメントを確認 する準備が整いました。
3.6.4. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium Db2 コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Debezium Db2 コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnectCR。imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる operator およびイメージを提供します。 -
Debezium Db2 コネクターを定義する
KafkaConnectorCR。この CR をKafkaConnectCR を適用したのと同じ OpenShift インスタンスに適用します。
前提条件
- Db2 が実行中で、Db2 を設定して Debezium コネクターと連携する 手順が完了済みである必要があります。
- AMQ Streams は OpenShift にデプロイされ、Apache Kafka および Kafka Connect が稼働している必要があります。詳細は、Deploying and Upgrading AMQ Streams on OpenShift を参照してください。
- Podman または Docker がインストールされている。
- Db2 に必要な JDBC ドライバーを 取得している。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.ioやdocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect の Debezium Db2 コンテナーを作成します。
- Debezium Db2 コネクターアーカイブをダウンロードします。
Debezium Db2 コネクターアーカイブをデプロイメントして、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。
./my-plugins/ ├── debezium-connector-db2 │ ├── ...
./my-plugins/ ├── debezium-connector-db2 │ ├── ...Copy to Clipboard Copied! Toggle word wrap Toggle overflow registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0をベースイメージとして使用して、新規の Dockerfile を作成します。たとえば、ターミナルウィンドウから以下のコマンドを入力します。my-pluginsはプラグインディレクトリーの名前に置き換えます。Copy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、現在のディレクトリーに
debezium-container-for-db2.yamlという名前の Docker ファイルを作成します。前のステップで作成した
debezium-container-for-db2.yamlDocker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-db2:latest .
podman build -t debezium-container-for-db2:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-container-for-db2:latest .
docker build -t debezium-container-for-db2:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは、
debezium-container-for-db2という名前のコンテナーイメージを構築します。カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。
podman push <myregistry.io>/debezium-container-for-db2:latest
podman push <myregistry.io>/debezium-container-for-db2:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-container-for-db2:latest
docker push <myregistry.io>/debezium-container-for-db2:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow 新しい Debezium Db2
KafkaConnectカスタムリソース (CR) を作成します。たとえば、以下の例のようにannotationsおよびimageプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下のコマンドを入力して、
KafkaConnectCR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium Db2 コネクターインスタンスを設定する
KafkaConnectorカスタムリソースを作成します。通常、コネクターに使用できる設定プロパティーを使用して、
.yamlファイルに Debezium Db2 コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。以下の例では、ポート
50000で Db2 サーバーホスト192.168.99.100に接続する Debezium コネクターを設定します。このホストには、mydatabaseという名前のデータベース、名前がinventoryというテーブルがあり、fulfillmentがサーバーの論理名です。Db2
inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表3.15 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録する場合のコネクターの名前。
2
この Db2 コネクタークラスの名前。
3
1 度に 1 つのタスクのみが動作する必要があります。
4
コネクターの設定。
5
Db2 インスタンスのアドレスであるデータベースホスト。
6
Db2 インスタンスのポート番号。
7
Db2 ユーザーの名前。
8
Db2 ユーザーのパスワード。
9
変更をキャプチャーするデータベースの名前。
10
namespace を形成する Db2 インスタンス/クラスターの論理名で、コネクターが書き込む Kafka トピックの名前、Kafka Connect スキーマ名、および Arvo コネクター が使用される場合に対応する Avro スキーマの namespace のすべてに使用されます。
11
Debezium が変更をキャプチャーする必要があるすべてのテーブルのリスト。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnectorリソースをinventory-connector.yamlファイルに保存した場合は、以下のコマンドを実行します。oc apply -f inventory-connector.yaml
oc apply -f inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは
inventory-connectorを登録し、コネクターはKafkaConnectorCR に定義されているmydatabaseデータベースに対して実行を開始します。
Debezium Db2 コネクターに設定できる設定プロパティーの完全リストは、Db2 コネクタープロパティー を参照してください。
結果
コネクターが起動すると、コネクターが変更をキャプチャーするように設定された Db2 データベーステーブルの 整合性スナップショット が実行されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。
3.6.5. Debezium Db2 コネクターが実行していることの確認 リンクのコピーリンクがクリップボードにコピーされました!
コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが AMQ Streams on OpenShift にデプロイされている。
-
OpenShift
ocCLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnectorリソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnectorを入力します。 - KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-db2)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを入力します。
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc describe KafkaConnector inventory-connector-db2 -n debezium
oc describe KafkaConnector inventory-connector-db2 -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例3.3
KafkaConnectorリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopicを入力します。 - KafkaTopics リストから確認するトピックの名前をクリックします (例: inventory-connector-db2.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを入力します。
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例3.4
KafkaTopicリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector_db2.inventory.products_on_hand
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector_db2.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow トピック名を指定する形式は、手順 1 で返された
oc describeコマンドと同じです (例:inventory_connector_db2.inventory.addresses)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例3.5 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector_db2.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_db2.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_db2.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.db2.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector_db2.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.7.2.Final-redhat-00001","connector":"db2","name":"inventory_connector_db2","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"db2-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector_db2.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_db2.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_db2.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.db2.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector_db2.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.7.2.Final-redhat-00001","connector":"db2","name":"inventory_connector_db2","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"db2-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記の例では、
payload値は、コネクタースナップショットがテーブルinventory.products_on_handから 読み込み (op" ="r") イベントを生成したことを示しています。product_idレコードのbefore状態はnullであり、レコードに以前の値が存在しないことを示します。"after"状態がproduct_id101で項目のquantityを3で示しています。
3.6.6. Debezium Db2 コネクター設定プロパティーの説明 リンクのコピーリンクがクリップボードにコピーされました!
Debezium Db2 コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。プロパティーに関する情報は、以下のように設定されています。
- 必要な設定プロパティー
- 高度な設定プロパティー
Debezium がデータベース履歴トピックから読み取るイベントを処理する方法を制御する データベース履歴コネクター設定プロパティー。
- データベースドライバーの動作を制御する パススルーデータベースドライバープロパティー。
必要な Debezium Db2 コネクター設定プロパティー
以下の設定プロパティーは、デフォルト値がない場合は必須です。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| デフォルトなし | コネクターの一意名。同じ名前で再登録を試みると失敗します。このプロパティーはすべての Kafka Connect コネクターに必要です。 | |
| デフォルトなし |
コネクターの Java クラスの名前。Db2 コネクターには、常に | |
|
| このコネクターのために作成する必要のあるタスクの最大数。Db2 コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。 | |
| デフォルトなし | Db2 データベースサーバーの IP アドレスまたはホスト名。 | |
|
| Db2 データベースサーバーの整数のポート番号。 | |
| デフォルトなし | Db2 データベースサーバーに接続するための Db2 データベースユーザーの名前。 | |
| デフォルトなし | Db2 データベースサーバーへの接続時に使用するパスワード。 | |
| デフォルトなし | 変更をストリーミングする Db2 データベースの名前 | |
| デフォルトなし | Debezium が変更をキャプチャーするデータベースをホストする特定の Db2 データベースサーバーの namespace を特定および提供する論理名。データベースサーバーの論理名には英数字とハイフン、ドット、アンダースコアのみを使用する必要があります。論理名は、他のコネクター全体で一意となる必要があります。これは、このコネクターからレコードを受信するすべての Kafka トピックのトピック名接頭辞として使用されるためです。 | |
| デフォルトなし |
コネクターで変更をキャプチャーするテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。include リストに含まれていないテーブルの変更はキャプチャーされません。各識別子の形式は schemaName.tableName です。デフォルトでは、コネクターはシステム以外のテーブルすべての変更をキャプチャーします。また、 | |
| デフォルトなし |
コネクターで変更をキャプチャーしないテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。コネクターは exclude リストに含まれていないシステム以外のテーブルごとに変更をキャプチャーします。各識別子の形式は schemaName.tableName です。また、 | |
| 空の文字列 | 変更イベント値から除外する列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。プライマリーキー列は、値から除外された場合でも、イベントのキーに常に含まれます。 | |
| 該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。作成された変更イベントレコードでは、指定された列の値は仮名に置き換えられます。
仮名は、指定された hashAlgorithm と salt を適用すると得られるハッシュ化された値で設定されます。使用されるハッシュ関数に基づいて、参照整合性は維持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest section に説明されています。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。 | |
|
|
時間、日付、およびタイムスタンプは、異なる精度の種類で表すことができます。 | |
|
|
削除 イベントの後に廃棄 (tombstone) イベントが続くかどうかを制御します。 | |
|
| コネクターがデータベーススキーマの変更を、データベースサーバー ID と同じ名前の Kafka トピックに公開するかどうかを指定するブール値。各スキーマの変更は、データベース名が含まれるキーと、スキーマ更新を記述する JSON 構造である値で記録されます。これは、コネクターがデータベース履歴を内部で記録する方法には依存しません。 | |
| 該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。変更イベントレコードでは、これらの列の値がプロパティー名の 長さ によって指定される文字数よりも長い場合は切り捨てられます。単一の設定で、異なる長さを持つ複数のプロパティーを指定できます。長さは正の整数である必要があります (例: | |
| 該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。変更イベント値では、指定のテーブルコラムの値はアスタリスク ( | |
| 該当なし |
列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は、databaseName.tableName.columnName または databaseName.schemaName.tableName.columnName です。 | |
| 該当なし |
一部の列のデータベース固有のデータ型名と一致する正規表現のコンマ区切りリスト (任意)。完全修飾データ型名の形式は、databaseName.tableName.typeName または databaseName.schemaName.tableName.typeName です。 | |
| 空の文字列 | 指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。
デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。
プロパティーは複数のテーブルのエントリーをリストできます。リスト内の異なるテーブルのエントリーは、セミコロンを使用して、区切ります。 |
高度なコネクター設定プロパティー
以下の 高度な 設定プロパティーには、ほとんどの状況で機能するデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。
| プロパティー | デフォルト | 説明 |
|---|---|---|
|
|
コネクターの起動時にスナップショットを実行する基準を指定します。 | |
|
|
スナップショットの実行中に、トランザクション分離レベルとキャプチャーモードのテーブルをロックする期間を制御します。使用できる値は次のとおりです。 | |
|
|
イベントの処理中にコネクターが例外を処理する方法を指定します。使用できる値は次のとおりです。 | |
|
| コネクターがイベントのバッチの処理を開始する前に、新しい変更イベントの発生を待つ期間をミリ秒単位で指定する正の整数値。デフォルトは 1000 ミリ秒 (1 秒) です。 | |
|
|
ブロッキングキューの最大サイズの正の整数値。コネクターは、データベースログから読み取る変更イベントをブロッキングキューに配置してから Kafka に書き込みます。このキューは、たとえば Kafka へのレコードの書き込みが遅い場合や Kafka が利用できない場合などに、変更データテーブルを読み取るためのバックプレシャーを提供できます。キューに表示されるイベントは、コネクターによって定期的に記録されるオフセットには含まれません。 | |
|
| コネクターが処理するイベントの各バッチの最大サイズを指定する正の整数値。 | |
|
| ブロッキングキューの最大サイズ (バイト単位) の long 値。この機能はデフォルトで無効になっています。正の long 値が設定されると有効になります。 | |
|
|
コネクターがハートビートメッセージを Kafka トピックに送信する頻度を制御します。デフォルトの動作では、コネクターはハートビートメッセージを送信しません。 | |
|
|
コネクターがハートビートメッセージを送信するトピック名の接頭辞を指定します。このトピック名の形式は | |
| デフォルトなし | コネクターの起動時にスナップショットを実行するまでコネクターが待つ必要がある間隔 (ミリ秒単位)。クラスターで複数のコネクターを起動する場合、このプロパティーは、コネクターのリバランスが行われる原因となるスナップショットの中断を防ぐのに役立ちます。 | |
|
| スナップショットの実行中、コネクターは行のバッチでテーブルの内容を読み取ります。このプロパティーは、バッチの行の最大数を指定します。 | |
|
|
スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する正の整数値。コネクターがこの間隔でテーブルロックを取得できないと、スナップショットは失敗します。詳細は、コネクターによるスナップショットの実行方法 を参照してください。その他の可能な設定は次のとおりです。 | |
| デフォルトなし | スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。
プロパティーには、
スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 ( "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"
作成されるスナップショットでは、コネクターには | |
|
コネクターが
そうでない場合は | Avro の命名要件 に準拠するためにフィールド名がサニタイズされるかどうかを示します。 | |
|
|
コネクターがトランザクション境界でイベントを生成し、トランザクションメタデータで変更イベントエンベロープを強化するかどうかを決定します。コネクターにこれを実行させる場合は | |
| デフォルトなし |
ストリーミング中にスキップされる操作タイプのコンマ区切りリスト。操作には、 | |
| デフォルトなし |
シグナルをコネクターへの送信に使用されるデータコレクションの完全修飾名 。 シグナル機能はテクノロジープレビュー機能です。 | |
|
| 増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。 増分スナップショットはテクノロジープレビュー機能です。 |
Debezium コネクターデータベース履歴設定プロパティー
Debezium には、コネクターがスキーマ履歴トピックと対話する方法を制御する database.history.* プロパティーのセットが含まれています。
以下の表は、Debezium コネクターを設定するための database.history プロパティーについて説明しています。
| プロパティー | デフォルト | 説明 |
|---|---|---|
| コネクターがデータベーススキーマの履歴を保存する Kafka トピックの完全名。 | ||
| Kafka クラスターへの最初の接続を確立するために コネクターが使用するホストとポートのペアのリスト。このコネクションは、コネクターによって以前に保存されたデータベーススキーマ履歴の取得や、ソースデータベースから読み取られる各 DDL ステートメントの書き込みに使用されます。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。 | ||
|
| 永続化されたデータのポーリングが行われている間にコネクターが起動/回復を待つ最大時間 (ミリ秒単位) を指定する整数値。デフォルトは 100 ミリ秒です。 | |
|
|
エラーでコネクターのリカバリーが失敗する前に、コネクターが永続化された履歴データの読み取りを試行する最大回数。データが受信されなかった場合に最大待機する時間は、 | |
|
|
コネクターが不正または不明なデータベースのステートメントを無視するかどうか、または人が問題を修正するために処理を停止するかどうかを指定するブール値。安全なデフォルトは | |
|
今後のリリースで非推奨になり、削除される予定です。代わりに |
|
コネクターがすべての DDL ステートメントを記録するかどうかを指定するブール値
安全なデフォルトは |
|
|
コネクターがすべての DDL ステートメントを記録するかどうかを指定するブール値
安全なデフォルトは |
プロデューサーおよびコンシューマークライアントを設定するためのパススルーデータベース履歴プロパティー
Debezium は、Kafka プロデューサーを使用して、データベース履歴トピックにスキーマの変更を書き込みます。同様に、コネクターが起動すると、データベース履歴トピックから読み取る Kafka コンシューマーに依存します。database.history.producer.* および database.history.consumer.* 接頭辞で始まるパススルー設定プロパティーのセットに値を割り当てて、Kafka プロデューサーおよびコンシューマークライアントの設定を定義します。パススループロデューサーおよびコンシューマーデータベース履歴プロパティーは、以下の例のように Kafka ブローカーとのこれらのクライアントの接続をセキュアにする方法など、さまざまな動作を制御します。
Debezium は、プロパティーを Kafka クライアントに渡す前に、プロパティー名から接頭辞を削除します。
Kafka プロデューサー設定プロパティー および Kafka コンシューマー設定プロパティーの詳細は、Kafka のドキュメントを参照してください。
Debezium コネクターのパススルーデータベースドライバー設定プロパティー
Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは、接頭辞 database.* で始まります。たとえば、コネクターは database.foobar=false などのプロパティーを JDBC URL に渡します。
データベース履歴クライアントのパススループロパティー の場合のように、Debezium はプロパティーから接頭辞を削除してからデータベースドライバーに渡します。