6.4. Kafka Connect のデプロイ
Kafka Connect は、コネクタープラグインを使用して Kafka ブローカーと他のシステムの間でデータをストリーミングする統合ツールです。Kafka Connect は、Kafka と、データベースやメッセージングシステムなどの外部データソースまたはターゲットを統合するためのフレームワークを提供し、コネクターを使用してデータをインポートまたはエクスポートします。コネクターは、必要な接続設定を提供するプラグインです。
AMQ Streams では、Kafka Connect は分散 (distributed) モードでデプロイされます。Kafka Connect はスタンドアロンモードでも動作しますが、AMQ Streams ではサポートされません。
Kafka Connect は、コネクター の概念を使用し、スケーラビリティーと信頼性を維持しながら Kafka クラスターで大量のデータを移動するフレームワークを提供します。
Cluster Operator は、KafkaConnect
リソースを使用してデプロイされた Kafka Connect クラスターと、KafkaConnector
リソースを使用して作成されたコネクターを管理します。
Kafka Connect を使用するには、次のことを行う必要があります。
コネクター という用語は、Kafka Connect クラスター内で実行されているコネクターインスタンスや、コネクタークラスと同じ意味で使用されます。本ガイドでは、本文の内容で意味が明確である場合に コネクター という用語を使用します。
6.4.1. Kafka Connect の OpenShift クラスターへのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
この手順では、Cluster Operator を使用して Kafka Connect クラスターを OpenShift クラスターにデプロイする方法を説明します。
Kafka Connect クラスターのデプロイメントは、コネクターのワークロードを タスク として分散する設定可能な数のノード (ワーカー とも呼ばれます) を使用して実装されるため、メッセージフローのスケーラビリティと信頼性が高くなります。
デプロイメントでは、YAML ファイルの仕様を使用して KafkaConnect
リソースが作成されます。
AMQ Streams には、設定ファイルのサンプル が用意されています。この手順では、以下のサンプルファイルを使用します。
-
examples/connect/kafka-connect.yaml
手順
Kafka Connect を OpenShift クラスターにデプロイします。
examples/connect/kafka-connect.yaml
ファイルを使用して Kafka Connect をデプロイします。oc apply -f examples/connect/kafka-connect.yaml
oc apply -f examples/connect/kafka-connect.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメントのステータスを確認します。
oc get pods -n <my_cluster_operator_namespace>
oc get pods -n <my_cluster_operator_namespace>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow デプロイメント名と準備状態が表示されている出力
NAME READY STATUS RESTARTS my-connect-cluster-connect-<pod_id> 1/1 Running 0
NAME READY STATUS RESTARTS my-connect-cluster-connect-<pod_id> 1/1 Running 0
Copy to Clipboard Copied! Toggle word wrap Toggle overflow my-connect-cluster
は、Kafka Connect クラスターの名前です。Pod ID は、作成された各 Pod を識別します。
デフォルトのデプロイでは、単一の Kafka Connect Pod を作成します。
READY
は、Ready/expected 状態のレプリカ数を表示します。STATUS
がRunning
と表示されれば、デプロイメントは成功です。
6.4.2. 複数のインスタンス用の Kafka Connect の設定 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect のインスタンスを複数実行している場合は、以下の config
プロパティーのデフォルト設定を変更する必要があります。
group.id
が同じすべての Kafka Connect インスタンスで、これら 3 つのトピックの値を揃える必要があります。
デフォルト設定を変更しない限り、同じ Kafka クラスターに接続する Kafka Connect インスタンスはそれぞれ同じ値でデプロイされます。事実上、すべてのインスタンスが結合されてクラスターで実行されて同じトピックを使用するようになります。
複数の Kafka Connect クラスターが同じトピックの使用を試みると、Kafka Connect は想定どおりに動作せず、エラーが生成されます。
複数の Kafka Connect インスタンスを実行する場合は、インスタンスごとにこれらのプロパティーの値を変更してください。
6.4.3. コネクターの追加 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect はコネクターを使用して他のシステムと統合し、データをストリーミングします。コネクターは Kafka Connector
クラスのインスタンスであり、次のいずれかのタイプになります。
- ソースコネクター
- ソースコネクターは、外部システムからデータを取得し、それをメッセージとして Kafka に提供するランタイムエンティティーです。
- シンクコネクター
- シンクコネクターは、Kafka トピックからメッセージを取得し、外部システムに提供するランタイムエンティティーです。
Kafka Connect はプラグインアーキテクチャーを使用して、コネクターの実装アーティファクトを提供します。プラグインは他のシステムへの接続を可能にし、データを操作するための追加の設定を提供します。プラグインには、コネクターや、データコンバーターや変換などの他のコンポーネントが含まれます。コネクターは、特定のタイプの外部システムで動作します。各コネクターは、その設定のスキーマを定義します。設定を Kafka Connect に指定して、Kafka Connect 内にコネクターインスタンス を作成します。次に、コネクターインスタンスは、システム間でデータを移動するための一連のタスクを定義します。
次のいずれかの方法で、コネクタープラグインを Kafka Connect に追加します。
コンテナーイメージにプラグインを追加したら、次の方法でコネクターインスタンスを開始、停止、および管理できます。
これらのオプションを使用して、新しいコネクターインスタンスを作成することもできます。
6.4.3.1. コネクタープラグインを使用して新しいコンテナーイメージを自動的に構築する リンクのコピーリンクがクリップボードにコピーされました!
AMQ Streams が追加のコネクターを使用して新しいコンテナーイメージを自動的に構築するように、Kafka Connect を設定します。コネクタープラグインは、KafkaConnect
カスタムリソースの .spec.build.plugins
プロパティーを使用して定義します。AMQ Streams はコネクタープラグインを自動的にダウンロードし、新しいコンテナーイメージに追加します。コンテナーは、.spec.build.output
に指定されたコンテナーリポジトリーにプッシュされ、Kafka Connect デプロイメントで自動的に使用されます。
前提条件
- Cluster Operator がデプロイされている。
- コンテナーレジストリー。
イメージをプッシュ、保存、およびプルできる独自のコンテナーレジストリーを提供する必要があります。AMQ Streams は、プライベートコンテナーレジストリーだけでなく、Quay や Docker Hub などのパブリックレジストリーもサポートします。
手順
.spec.build.output
でコンテナーレジストリーを、.spec.build.plugins
で追加のコネクターを指定して、KafkaConnect
カスタムリソースを設定します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow リソースを作成または更新します。
oc apply -f <kafka_connect_configuration_file>
$ oc apply -f <kafka_connect_configuration_file>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 新しいコンテナーイメージがビルドされ、Kafka Connect クラスターがデプロイされるまで待ちます。
-
Kafka Connect REST API または
KafkaConnector
カスタムリソースを使用して、追加したコネクタープラグインを使用します。
6.4.3.2. Kafka Connect ベースイメージからコネクタープラグインを使用して新しいコンテナーイメージを構築する リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect ベースイメージからコネクタープラグインを使用してカスタム Docker イメージを作成するカスタムイメージを /opt/kafka/plugins
ディレクトリーに追加します。
Red Hat Ecosystem Catalog の Kafka コンテナーイメージは、追加のコネクタープラグインで独自のカスタムイメージを作成するためのベースイメージとして使用できます。
AMQ Stream バージョンの Kafka Connect は起動時に、/opt/kafka/plugins
ディレクトリーに含まれるサードパーティーのコネクタープラグインをロードします。
手順
ベースイメージとして
registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1
を使用して、新しいDockerfile
を作成します。FROM registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
FROM registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
Copy to Clipboard Copied! Toggle word wrap Toggle overflow プラグインファイルの例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow COPY コマンドは、コンテナーイメージにコピーするプラグインファイルを指します。
この例では、Debezium コネクター (MongoDB、MySQL、および PostgreSQL) のプラグインを追加しますが、簡潔にするためにすべてのファイルがリストされているわけではありません。Kafka Connect で実行されている Debezium は、他の Kafka Connect タスクと同じように表示されます。
- コンテナーイメージをビルドします。
- カスタムイメージをコンテナーレジストリーにプッシュします。
新しいコンテナーイメージを示します。
次のいずれかの方法でイメージを指定できます。
KafkaConnect
カスタムリソースのKafkaConnect.spec.image
プロパティーを編集します。設定されている場合、このプロパティーは Cluster Operator の
STRIMZI_KAFKA_CONNECT_IMAGES
環境変数をオーバーライドします。Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
ファイルのSTRIMZI_KAFKA_CONNECT_IMAGES
環境変数を編集して、新しいコンテナーイメージを指すようにし、Cluster Operator を再インストールします。
6.4.3.3. KafkaConnector リソースのデプロイ リンクのコピーリンクがクリップボードにコピーされました!
コネクターを管理するために KafkaConnector
リソースをデプロイします。KafkaConnector
カスタムリソースは、Cluster Operator によるコネクターの管理に OpenShift ネイティブのアプローチを提供します。Kafka Connect REST API のように、コネクターを管理するために HTTP 要求を送信する必要はありません。該当する KafkaConnector
リソースを更新して稼働中のコネクターインスタンスを管理した後、更新を適用します。Cluster Operator は、実行中のコネクターインスタンスの設定を更新します。該当する KafkaConnector
を削除して、コネクターを削除します。
KafkaConnector
リソースは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
この手順で示す設定では、autoRestart
プロパティーが true
に設定されています。失敗したコネクターとタスクの自動再起動を有効にします。再起動は最大 7 回試行され、その後は手動で再起動する必要があります。KafkaConnector
リソースにアノテーションを付けて、コネクターを 再起動する か コネクタータスク を手動で再起動します。
コネクターの例
独自のコネクターを使用するか、AMQ Streams が提供する例を試すことができます。Apache Kafka 3.1.0 までは、サンプルファイルコネクタープラグインが Apache Kafka に含まれていました。Apache Kafka の 3.1.1 および 3.2.0 リリースから、例を他のコネクターと同様にプラグインパスに追加する必要があります。
AMQ Streams は、サンプルファイルコネクタープラグイン用の サンプル KafkaConnector
設定ファイル (examples/connect/source-connector.yaml
) を提供します。これにより、次のコネクターインスタンスが KafkaConnector
リソースとして作成されます。
-
Kafka ライセンスファイル (ソース) から各行を読み取り、データをメッセージとして単一の Kafka トピックに書き込む
FileStreamSourceConnector
インスタンス。 -
Kafka トピックからメッセージを読み取り、メッセージを一時ファイル (シンク) に書き込む
FileStreamSinkConnector
インスタンス。
この手順では、サンプルファイルを使用してコネクターを作成します。
サンプルコネクターは、運用環境での使用を意図したものではありません。
前提条件
- Kafka Connect デプロイメント。
- Cluster Operator が稼働している。
手順
次のいずれかの方法で、
FileStreamSourceConnector
およびFileStreamSinkConnector
プラグインを Kafka Connect に追加します。Kafka Connect 設定で
strimzi.io/use-connector-resources annotation
をtrue
に設定します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnector
リソースを有効にすると、Cluster Operator はそれらを監視します。examples/connect/source-connector.yaml
ファイルを編集します。Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- コネクターの名前として使用される
KafkaConnector
リソースの名前。OpenShift リソースで有効な名前を使用します。 - 2
- コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
- 3
- コネクタークラスのフルネームまたはエイリアス。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
- 4
- コネクターが作成できる Kafka Connect タスクの最大数。
- 5
- 失敗したコネクターとタスクの自動再起動を有効にします。
- 6
- キーと値のペアとしての コネクター設定。
- 7
- このサンプルソースコネクター設定では、
/opt/kafka/LICENSE
ファイルからデータが読み取られます。 - 8
- ソースデータのパブリッシュ先となる Kafka トピック。
OpenShift クラスターでソース
KafkaConnector
を作成します。oc apply -f examples/connect/source-connector.yaml
oc apply -f examples/connect/source-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow examples/connect/sink-connector.yaml
ファイルを作成します。touch examples/connect/sink-connector.yaml
touch examples/connect/sink-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 以下の YAML を
sink-connector.yaml
ファイルに貼り付けます。Copy to Clipboard Copied! Toggle word wrap Toggle overflow OpenShift クラスターにシンク
KafkaConnector
を作成します。oc apply -f examples/connect/sink-connector.yaml
oc apply -f examples/connect/sink-connector.yaml
Copy to Clipboard Copied! Toggle word wrap Toggle overflow コネクターリソースが作成されたことを確認します。
oc get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name my-source-connector my-sink-connector
oc get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name my-source-connector my-sink-connector
Copy to Clipboard Copied! Toggle word wrap Toggle overflow <my_connect_cluster> を Kafka Connect クラスターの名前に置き換えます。
コンテナーで、
kafka -console-consumer.sh
を実行して、ソースコネクターによってトピックに書き込まれたメッセージを読み取ります。oc exec <my_kafka_cluster>-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
oc exec <my_kafka_cluster>-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
Copy to Clipboard Copied! Toggle word wrap Toggle overflow <my_kafka_cluster> を Kafka クラスターの名前に置き換えます。
ソースおよびシンクコネクターの設定オプション
コネクター設定は、KafkaConnector
リソースの spec.config
プロパティーで定義されます。
FileStreamSourceConnector
クラスおよび FileStreamSinkConnector
クラスは、Kafka Connect REST API と同じ設定オプションをサポートします。他のコネクターは異なる設定オプションをサポートします。
名前 | タイプ | デフォルト値 | 説明 |
---|---|---|---|
| 文字列 | Null | メッセージを書き込むソースファイル。指定のない場合は、標準入力が使用されます。 |
| List | Null | データのパブリッシュ先となる Kafka トピック。 |
名前 | タイプ | デフォルト値 | 説明 |
---|---|---|---|
| 文字列 | Null | メッセージを書き込む宛先ファイル。指定のない場合は標準出力が使用されます。 |
| List | Null | データの読み取り元となる 1 つ以上の Kafka トピック。 |
| 文字列 | Null | データの読み取り元となる 1 つ以上の Kafka トピックと一致する正規表現。 |
6.4.3.4. コネクターの手動再起動 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector
リソースを使用してコネクターを管理している場合は、restart
アノテーションを使用してコネクターの再起動を手動でトリガーします。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka コネクターを制御する
KafkaConnector
カスタムリソースの名前を見つけます。oc get KafkaConnector
oc get KafkaConnector
Copy to Clipboard Copied! Toggle word wrap Toggle overflow OpenShift で
KafkaConnector
リソースにアノテーションを付けて、コネクターを再起動します。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart=true
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart=true
Copy to Clipboard Copied! Toggle word wrap Toggle overflow restart
アノテーションはtrue
に設定されています。次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクターは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnector
カスタムリソースから削除されます。
6.4.3.5. Kafka コネクタータスクを手動で再起動する リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector
リソースを使用してコネクターを管理している場合は、restart-task
アノテーションを使用して、コネクタータスクの再起動を手動でトリガーします。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka コネクタータスクを制御する
KafkaConnector
カスタムリソースの名前を見つけます。oc get KafkaConnector
oc get KafkaConnector
Copy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnector
カスタムリソースから再起動するタスクの ID を検索します。タスク ID は、0 から始まる非負の整数です。oc describe KafkaConnector <kafka_connector_name>
oc describe KafkaConnector <kafka_connector_name>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow OpenShift で
KafkaConnector
リソースにアノテーションを付けて、ID を使用してコネクタータスクを再開します。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task=0
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task=0
Copy to Clipboard Copied! Toggle word wrap Toggle overflow この例では、タスク
0
が再起動されます。次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクタータスクは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnector
カスタムリソースから削除されます。
6.4.3.6. Kafka Connect API の公開 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector
リソースを使用してコネクターを管理する代わりに、Kafka Connect REST API を使用します。Kafka Connect REST API は、<connect_cluster_name>-connect-api:8083
で実行しているサービスとして利用できます。ここで、<connect_cluster_name> は、お使いの Kafka Connect クラスターの名前になります。サービスは、Kafka Connect インスタンスの作成時に作成されます。
Kafka Connect REST API でサポートされる操作は、Apache Kafka Connect API のドキュメント で説明されています。
strimzi.io/use-connector-resources
アノテーションは KafkaConnectors を有効にします。アノテーションを KafkaConnect
リソース設定に適用した場合、そのアノテーションを削除して Kafka Connect API を使用する必要があります。それ以外の場合、Kafka Connect REST API を使用して直接行われた手動による変更は、 Cluster Operator によって元に戻されます。
コネクター設定を JSON オブジェクトとして追加できます。
コネクター設定を追加するための curl 要求の例
API には OpenShift クラスター内でのみアクセスできます。OpenShift クラスター外部で実行しているアプリケーションに Kafka Connect API がアクセスできるようにする場合は、以下の機能のいずれかを使用して Kafka Connect API を手動で公開できます。
-
LoadBalancer
またはNodePort
タイプのサービス -
Ingress
リソース (Kubernetes のみ) - OpenShift ルート (OpenShift のみ)
接続はセキュアではないため、外部からのアクセスはよく考えてから許可してください。
サービスを作成する場合には、<connect_cluster_name>-connect-api
サービスの selector
からラベルを使用して、サービスがトラフィックをルーティングする Pod を設定します。
サービスのセレクター設定
また、外部クライアントからの HTTP 要求を許可する NetworkPolicy
を作成する必要もあります。
Kafka Connect API への要求を許可する NetworkPolicy の例
- 1
- API への接続が許可される Pod のラベル。
クラスター外でコネクター設定を追加するには、curl コマンドで API を公開するリソースの URL を使用します。
6.4.3.7. Kafka Connect API へのアクセスの制限 リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect API へのアクセスを信頼できるユーザーのみに制限して、不正なアクションや潜在的なセキュリティーの問題を防ぐことが重要です。Kafka Connect API は、コネクター設定を変更するための広範な機能を提供するため、セキュリティー対策を講じることがさらに重要になります。管理者によりセキュアであると想定されている機密情報が、Kafka Connect API にアクセスできるユーザーに取得されてしまう可能性があります。
Kafka Connect REST API には、OpenShift クラスターへのアクセスが認証されており、ホスト名/IP アドレス、ポート番号など、エンドポイント URL を知っている場合には、アクセスできます。
たとえば、組織が Kafka Connect クラスターとコネクターを使用して、機密データを顧客データベースから中央データベースにストリーミングするとします。管理者は設定プロバイダープラグインを使用して、顧客データベースと中央データベースへの接続に関連する機密情報 (データベース接続の詳細や認証情報など) を保存します。設定プロバイダーは、この機密情報が許可されていないユーザーに公開されるのを防ぎます。ただし、Kafka Connect API にアクセスできるユーザーは、管理者の同意なしに顧客データベースにアクセスできます。これを行うには、偽のデータベースをセットアップし、それに接続するコネクターを設定します。次に、顧客データベースを参照するようにコネクター設定を変更しますが、データを中央データベースに送信する代わりに、偽のデータベースに送信します。偽のデータベースに接続するようにコネクターを設定すると、設定プロバイダーにセキュアに保存されているにもかかわらず、顧客データベースに接続するためのログインの詳細と認証情報が傍受されます。
KafkaConnector
カスタムリソースを使用している場合、デフォルトでは、OpenShift RBAC ルールにより、OpenShift クラスター管理者のみがコネクターに変更を加えることが許可されます。AMQ Streams リソースを管理するクラスター管理者以外のユーザーを指定 することもできます。Kafka Connect 設定で KafkaConnector
リソースを有効にすると、Kafka Connect REST API を使用して直接行われた変更は Cluster Operator によって元に戻されます。KafkaConnector
リソースを使用していない場合、デフォルトの RBAC ルールは Kafka Connect API へのアクセスを制限しません。OpenShift RBAC を使用して Kafka Connect REST API への直接アクセスを制限する場合は、KafkaConnector
リソースを有効にして使用する必要があります。
セキュリティーを強化するために、Kafka Connect API の次のプロパティーを設定することを推奨します。
org.apache.kafka.disallowed.login.modules
(Kafka 3.4 以降)
org.apache.kafka.disallowed.login.modules
Java システムプロパティーを設定して、セキュアではないログインモジュールの使用を防止します。たとえば、com.sun.security.auth.module.JndiLoginModule
を指定すると、KafkaJndiLoginModule
が使用できなくなります。ログインモジュールを禁止する設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 信頼できるログインモジュールのみを許可し、使用しているバージョンに対する Kafka からの最新のアドバイスに従ってください。ベストプラクティスとして、
org.apache.kafka.disallowed.login.modules
システムプロパティーを使用して、Kafka Connect 設定でセキュアではないログインモジュールを明示的に禁止する必要があります。connector.client.config.override.policy
connector.client.config.override.policy
プロパティーをNone
に設定して、コネクター設定が Kafka Connect 設定とそれが使用するコンシューマーおよびプロデューサーをオーバーライドしないようにします。コネクターオーバーライドポリシーを指定する設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
6.4.3.8. Kafka Connect API の使用から KafkaConnector カスタムリソースの使用への切り替え リンクのコピーリンクがクリップボードにコピーされました!
Kafka Connect API の使用から KafkaConnector
カスタムリソースの使用に切り替えて、コネクターを管理できます。スイッチの作成は、次の作業を以下の順序で行います。
-
設定で
KafkaConnector
リソースをデプロイし、コネクターインスタンスを作成します。 -
strimzi.io/use-connector-resources
アノテーションをtrue
に設定して、Kafka Connect 設定でKafkaConnector
リソースを有効にします。
作成する前に KafkaConnector
リソースを有効にすると、すべてのコネクターが削除されます。
KafkaConnector
リソースの使用から Kafka Connect API の使用に切り替えるには、最初に KafkaConnector
リソースを有効にするアノテーションを Kafka Connect 設定から削除します。それ以外の場合、Kafka Connect REST API を使用して直接行われた手動による変更は、 Cluster Operator によって元に戻されます。
切り替えを行うときは、KafkaConnect
リソースのステータスを確認 してください。metadata.generation
(デプロイの現在のバージョン) の値は、status.observedGeneration
(リソースの最新の調整) と一致する必要があります。Kafka Connect クラスターが Ready
になったら、KafkaConnector
リソースを削除できます。
6.4.4. Kafka Connect クラスターリソースのリスト リンクのコピーリンクがクリップボードにコピーされました!
以下のリソースは、OpenShift クラスターの Cluster Operator によって作成されます。
- connect-cluster-name-connect
次の Kafka Connect リソースに付けられた名前:
-
Kafka Connect ワーカーノード Pod を作成するデプロイメント (
StableConnectIdentities
フィーチャーゲートが無効な場合)。 -
Kafka Connect ワーカーノード Pod を作成する StrimziPodSet (
StableConnectIdentities
フィーチャーゲートが有効な場合)。 -
安定した DNS 名を Connect Pod に提供するヘッドレスサービス (
StableConnectIdentities
フィーチャーゲートが有効な場合)。 - Kafka Connect ワーカーノードに設定された Pod の Disruption Budget。
-
Kafka Connect ワーカーノード Pod を作成するデプロイメント (
- connect-cluster-name-connect-idx
-
Kafka Connect StrimziPodSet によって作成された Pod (
StableConnectIdentities
フィーチャーゲートが有効な場合)。 - connect-cluster-name-connect-api
- Kafka Connect クラスターを管理するために REST インターフェイスを公開するサービス。
- connect-cluster-name-config
- Kafka Connect 補助設定が含まれ、Kafka ブローカー Pod によってボリュームとしてマウントされる ConfigMap。