6.5. Debezium Oracle コネクターのデプロイメント
以下の方法のいずれかを使用して Debezium Oracle コネクターをデプロイできます。
Debezium Oracle コネクターのアーカイブには、ライセンスの関係で、Oracle データベースに接続するために必要な Oracle JDBC ドライバーが含まれていません。コネクターがデータベースにアクセスできるようにするには、コネクター環境にドライバーを追加する必要があります。詳細は、Obtaining the Oracle JDBC driverを参照してください。
6.5.1. Oracle JDBC ドライバーの取得 リンクのコピーリンクがクリップボードにコピーされました!
Debezium が Oracle データベースに接続するために必要な Oracle JDBC ドライバーファイルは、ライセンスの関係で Debezium Oracle コネクターアーカイブに含まれていません。ドライバーは、Maven Central からダウンロード可能です。使用するデプロイメント方法に応じて、Kafka Connect カスタムリソースまたはコネクターイメージの構築に使用する Dockerfile にコマンドを追加して、ドライバーを取得することができます。
-
AMQ Streams を使用して Kafka Connect イメージにコネクターを追加する場合は、「AMQ Streams を使用した Debezium Oracle コネクターのデプロイ」 に示すように、
KafkaConnectカスタムリソースのbuilds.plugins.artifact.urlにドライバーの Maven Central の場所を追加してください。 -
Dockerfile を使用してコネクター用のコンテナーイメージを構築する場合、Dockerfile に
curlコマンドを挿入して、Maven Central から必要なドライバーファイルをダウンロードするための URL を指定します。詳細には、Dockerfile からカスタム Kafka Connect コンテナーイメージを構築して Debezium Oracle コネクターをデプロイ するを参照してください。
6.5.2. AMQ Streams を使用した Debezium Oracle コネクターのデプロイメント リンクのコピーリンクがクリップボードにコピーされました!
Debezium 1.7 以降、Debezium コネクターのデプロイに推奨される方法は、AMQ Streams を使用してコネクタープラグインが含まれる Kafka Connect コンテナーイメージをビルドすることです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnectCR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnectorCR。AMQStreams が Kafka Connect Pod を開始した後、KafkaConnectorCR を適用してコネクターを開始します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。AMQ Streams が Kafka Connect イメージをビルドすると、指定のアーティファクトをダウンロードし、イメージに組み込みます。
Kafka Connect CR の spec.build.output パラメーターは、生成される KafkaConnectコンテナーイメージを格納する場所を指定します。コンテナーイメージは Docker レジストリーまたは OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。イメージストリームは自動的に作成されません。
KafkaConnect リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- OpenShift での AMQ Streams の設定ガイドの Kafka Connect の設定 を参照してください。
- AMQ Streams を使用した新しいコンテナーイメージの自動作成と OpenShift での AMQ Streams のアップグレード
6.5.3. AMQ Streams を使用した Debezium Oracle コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
以前のバージョンの AMQ Streams では、OpenShift に Debezium コネクターをデプロイするには、最初にコネクター用の Kafka Connect イメージをビルドする必要がありました。コネクターを OpenShift にデプロイする場合に現在推奨される方法は、AMQ Streams でビルド設定を使用して、使用する Debezium コネクタープラグインが含まれる Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。
新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect クラスターのデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドしたら、KafkaConnector カスタムリソースを作成し、ビルドに含まれるコネクターを起動します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- AMQ Streams Operator が稼働している。
- Kafka クラスターが OpenShift での AMQ Streams のデプロイおよびアップグレード に記載されているようにデプロイされている。
- Kafka Connect が AMQ Streams にデプロイされている。
- Red Hat Integration ライセンスがある。
-
OpenShift
ocCLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成しておく。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合
- ImageStream リソースを新規コンテナーイメージを保存するためにクラスターにデプロイします。クラスターの ImageStream を明示的に作成する必要があります。ImageStreams はデフォルトでは利用できません。ImageStreams の詳細は、OpenShift Container Platform でのイメージストリームの管理 を参照してください。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnectカスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、metadata.annotationsおよびspec.buildプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。
例6.1 Debezium コネクターを含む
KafkaConnectカスタムリソースを定義するdbz-connect.yamlファイル次の例では、カスタムリソースは、次のアーティファクトをダウンロードするように設定されています。
- Debezium Oracle コネクターアーカイブ。
- サービスレジストリーアーカイブ。Service Registry はオプションのコンポーネントです。コネクターで Avro シリアル化を使用する場合にのみ、Service Registry コンポーネントを追加します。
- Debezium スクリプト SMT アーカイブと Debezium コネクターで使用する関連言語の依存関係。SMT アーカイブおよび言語の依存関係は任意のコンポーネントです。Debezium コンテンツベースのルーティング SMT または フィルター SMT を使用する場合にのみ、これらのコンポーネントを追加します。
- Oracle JDBC ドライバー。Oracle データベースに接続するために必要ですが、コネクターアーカイブには含まれていません。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表6.12 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resourcesアノテーションをtrueに設定して、クラスター Operator がKafkaConnectorリソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所と共にイメージに追加するプラグインを一覧表示します。3
build.outputは、新たにビルドされたイメージが保存されるレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.typeの有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestreamです。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.outputを指定する方法の詳細は、OpenShift での AMQ Streams の設定の AMQ Streams ビルドスキーマ参照 を参照してください。5
plugins設定は、Kafka Connect イメージに追加するすべてのコネクターを一覧表示します。一覧の各エントリーについて、プラグインnameと、コネクターのビルドに必要なアーティファクトに関する情報を指定します。任意で、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.typeの値は、artifacts.urlで指定したアーティファクトのファイルタイプを指定します。有効なタイプはzip、tgz、またはjarです。Debezium コネクターアーカイブは、.zipファイル形式で提供されます。JDBC ドライバーファイルは.jar形式です。typeの値は、urlフィールドで参照されるファイルのタイプと一致する必要があります。7
artifacts.urlの値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。Debezium コネクターアーティファクトは Red Hat リポジトリーで入手できます。OpenShift クラスターは指定されたサーバーにアクセスできる必要があります。8
(オプション) サービスレジストリーコンポーネントをダウンロードするための アーティファクト
typeとurlを 指定します。デフォルトの JSON コンバーターを使用する代わりに、コネクターが Apache Avro を使用して、Service Registry を使用してイベントのキーと値をシリアル化する場合にのみ、ServiceRegistry アーティファクトを含めます。9
(オプション) Debezium コネクターで使用する Debezium スクリプト SMT アーカイブのアーティファクト
typeとurlを指定します。Debezium コンテンツベースルーティング SMT または フィルター SMT を使用する場合にのみ、スクリプト SMT を含めます。スクリプト SMT を使用するには、groovy などの JSR 223 準拠のスクリプト実装もデプロイする必要があります。10
(オプション) JSR 223 準拠のスクリプト実装の JAR ファイルのアーティファクト
typeとurlを指定します。これは、Debezium スクリプト SMT で必要です。重要AMQ Streams を使用して Kafka Connect イメージにコネクタープラグインを組み込む場合、必要なスクリプト言語コンポーネントごとに、
artifacts.urlに JAR ファイルの場所を指定し、artifacts.typeの値もjarに設定する必要があります。値が無効な場合、実行時にコネクターが失敗します。スクリプト SMT で Apache Groovy 言語を使用できるようにするために、この例のカスタムリソースは、次のライブラリーの JAR ファイルを取得します。
-
groovy -
groovy-jsr223(スクリプトエージェント) -
groovy-json(JSON 文字列を解析するためのモジュール)
Debezium スクリプト SMT は、GraalVM JavaScript の JSR 223 実装の使用もサポートします。
11
Maven Central における Oracle JDBC ドライバーの場所を指定します。必要なドライバーは Debezium Oracle コネクターアーカイブには含まれていません。
以下のコマンドを入力して、
KafkaConnectビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定に一覧表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnectorリソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnectorCR を作成し、oracle-inventory-connector.yamlとして保存します。例6.2 Debezium コネクターの
KafkaConnectorカスタムリソースを定義するoracle-inventory-connector.yamlファイルCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表6.13 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースへの接続に使用するアカウントの名前。
8
Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。
9
変更をキャプチャーするデータベースの名前。
10
データベースインスタンスまたはクラスターのトピック接頭辞。
指定する名前は、英数字またはアンダースコアのみで設定する必要があります。
トピック接頭辞は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクター と統合する場合、この名前空間は関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの名前空間でも使用されます。11
コネクターが変更イベントをキャプチャーするテーブルの一覧。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc create -n debezium -f {context}-inventory-connector.yamloc create -n debezium -f {context}-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターは Kafka Connect クラスターに登録され、
KafkaConnectorCR のspec.config.database.dbnameで指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium Oracle デプロイメントを検証する 準備が整いました。
6.5.4. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium Oracle コネクターのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
Debezium Oracle コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnectCR。imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる operator およびイメージを提供します。 -
Debezium Oracle コネクターを定義する
KafkaConnectorCR。この CR をKafkaConnectCR を適用するのと同じ OpenShift インスタンスに適用します。
前提条件
- Oracle Database が稼働し、Oracle を設定して Debezium コネクターと連携する 手順が完了済みである必要があります。
- AMQ Streams は OpenShift にデプロイされ、Apache Kafka および Kafka Connect が稼働している必要があります。詳細は、Deploying and Upgrading AMQ Streams on OpenShift を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.ioやdocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。 Kafka Connect サーバーは、Oracle 用の必要な JDBC ドライバーをダウンロードするために、Maven Central にアクセスすることができます。また、ドライバーのローカルコピー、またはローカルの Maven リポジトリーや他の HTTP サーバーから利用可能なものを使用することもできます。
詳細は、Obtaining the Oracle JDBC driverを参照してください。
手順
Kafka Connect の Debezium Oracle コンテナーを作成します。
registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.0-12をベースイメージとして使用して、新規の Dockerfile を作成します。例えば、ターミナルウィンドウから、以下のコマンドを入力します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
任意のファイル名を指定できます。
2
Kafka Connect プラグインディレクトリーへのパスを指定します。Kafka Connect のプラグインディレクトリーが別の場所にある場合は、このパスを実際のディレクトリーのパスに置き換えてください。
このコマンドは、現在のディレクトリーに
debezium-container-for-oracle.yamlという名前の Docker ファイルを作成します。前のステップで作成した
debezium-container-for-oracle.yamlDocker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-oracle:latest .
podman build -t debezium-container-for-oracle:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow docker build -t debezium-container-for-oracle:latest .
docker build -t debezium-container-for-oracle:latest .Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは、
debezium-container-for-oracleという名前のコンテナーイメージを構築します。カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。
podman push <myregistry.io>/debezium-container-for-oracle:latest
podman push <myregistry.io>/debezium-container-for-oracle:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow docker push <myregistry.io>/debezium-container-for-oracle:latest
docker push <myregistry.io>/debezium-container-for-oracle:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow 新しい Debezium Oracle KafkaConnect カスタムリソース (CR) を作成します。たとえば、
annotationsおよびimageプロパティーを指定するdbz-connect.yamlという名前のKafkaConnectCR を作成します。以下の例は、KafkaConnectカスタムリソースを記述するdbz-connect.yamlファイルからの抜粋を示しています。
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 項目 説明 1
KafkaConnectorリソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotationsは Cluster Operator に示します。2
spec.imageは Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator のSTRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE変数がオーバーライドされます。以下のコマンドを入力して、
KafkaConnectCR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium Oracle コネクターインスタンスを設定する
KafkaConnectorカスタムリソースを作成します。通常、コネクターに使用できる設定プロパティーを使用して、
.yamlファイルに Debezium Db2 コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。以下の例では、ポート
1521で Oracle ホスト IP アドレスに接続する Debezium コネクターを設定します。このホストにはORCLCDBという名前のデータベースがあり、server1はサーバーの論理名です。Oracle
inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 表6.14 コネクター設定の説明 項目 説明 1
Kafka Connect サービスに登録する場合のコネクターの名前。
2
この Oracle コネクタークラスの名前。
3
Oracle インスタンスのアドレス。
4
Oracle インスタンスのポート番号。
5
コネクターのユーザーの作成 で指定した Oracle ユーザーの名前。
6
コネクターのユーザーの作成 で指定した Oracle ユーザーのパスワード。
7
変更をキャプチャーするデータベースの名前。
8
コネクターが変更をキャプチャーする Oracle のプラグ可能なデータベースの名前。コンテナーデータベース (CDB) のインストールのみで使用されます。
9
topic prefix は、コネクターが変更をキャプチャーする Oracle データベースサーバーの名前空間を識別して提供します。
10
DDL ステートメントをデータベーススキーマの履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。
11
コネクターが DDL ステートメントを書き、復元するデータベーススキーマの履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnectorリソースをinventory-connector.yamlファイルに保存した場合は、以下のコマンドを実行します。oc apply -f inventory-connector.yaml
oc apply -f inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 上記のコマンドは
inventory-connectorを登録し、コネクターはKafkaConnectorCR に定義されているserver1データベースに対して実行を開始します。
Debezium Oracle コネクターに設定できる設定プロパティーの完全リストは Oracle コネクタープロパティー を参照してください。
結果
コネクターが起動すると、コネクターが設定された Oracle データベースの整合性スナップショットが実行されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。
6.5.5. コンテナーデータベースとノンコンテナーデータベースの設定 リンクのコピーリンクがクリップボードにコピーされました!
Oracle Database は以下の展開タイプをサポートしています。
- コンテナーデータベース (CDB)
- 複数の PDB (Multi Pluggable Database) を格納できるデータベース。データベースクライアントは、標準的な非 CDB データベースであるかのように、各 PDB に接続します。
- ノンコンテナーデータベース (非 CDB)
- プラガブルデータベースの作成には対応していない、標準的な Oracle のデータベース。
6.5.6. Debezium Oracle コネクターが実行していることの確認 リンクのコピーリンクがクリップボードにコピーされました!
コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが AMQ Streams on OpenShift にデプロイされている。
-
OpenShift
ocCLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールへのアクセスがある。
手順
以下の方法のいずれかを使用して
KafkaConnectorリソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnectorを入力します。 - KafkaConnectors リストから、チェックするコネクターの名前をクリックします (例: inventory-connector-oracle)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを入力します。
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc describe KafkaConnector inventory-connector-oracle -n debezium
oc describe KafkaConnector inventory-connector-oracle -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例6.3
KafkaConnectorリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopicを入力します。 -
KafkaTopics リストから確認するトピックの名前をクリックします (例:
inventory-connector-oracle.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。 - Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを入力します。
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow このコマンドは、以下の出力のようなステータス情報を返します。
例6.4
KafkaTopicリソースのステータスCopy to Clipboard Copied! Toggle word wrap Toggle overflow
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下に例を示します。
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-oracle.inventory.products_on_hand
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-oracle.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow トピック名を指定する形式は、手順 1 で返された
oc describeコマンドと同じです (例:inventory-connector-oracle.inventory.addresses)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例6.5 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.1.4.Final-redhat-00001","connector":"oracle","name":"inventory-connector-oracle","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"oracle-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-oracle.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-oracle.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.1.4.Final-redhat-00001","connector":"oracle","name":"inventory-connector-oracle","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"oracle-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 上記の例では、
payload値は、コネクタースナップショットがテーブルinventory.products_on_handから 読み込み (op" ="r") イベントを生成したことを示しています。product_idレコードのbefore状態はnullであり、レコードに以前の値が存在しないことを示します。"after"状態がproduct_id101で項目のquantityを3で示しています。