9.6. Kafka Connect の設定
KafkaConnect カスタムリソースの spec プロパティーを更新して、Kafka Connect デプロイメントを設定します。
Kafka Connect を使用して、Kafka クラスターへの外部データ接続を設定します。KafkaConnect リソースのプロパティーを使用して、Kafka Connect デプロイメントを設定します。
Kafka Connect クラスターの設定オプションの詳細は、Streams for Apache Kafka カスタムリソース API リファレンス を参照してください。
KafkaConnector の設定
KafkaConnect リソースを使用すると、Kafka Connect のコネクターインスタンスを OpenShift ネイティブに作成および管理できます。
Kafka Connect 設定では、strimzi.io/use-connector-resources アノテーションを追加して、Kafka Connect クラスターの KafkaConnectors を有効にします。また、build 設定を追加して、データ接続に必要なコネクタープラグインを備えたコンテナーイメージを Streams for Apache Kafka が自動的にビルドするようにすることもできます。Kafka Connect コネクターの外部設定は、externalConfiguration プロパティーで指定します。
コネクターを管理するには、KafkaConnector カスタムリソースまたは Kafka Connect REST API を使用できます。KafkaConnector リソースは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。これらの方法を使用してコネクターを作成、再設定、または削除する方法の詳細については、コネクターの追加 を参照してください。
コネクター設定は、HTTP リクエストの一部として Kafka Connect に渡され、Kafka 自体に保存されます。ConfigMap およびシークレットは、設定やデータの保存に使用される標準的な OpenShift リソースです。ConfigMap およびシークレットを使用してコネクターの特定の要素を設定できます。その後、HTTP REST コマンドで設定値を参照できます。これにより、必要な場合は設定が分離され、よりセキュアになります。この方法は、ユーザー名、パスワード、証明書などの機密性の高いデータに適用されます。
大量のメッセージ処理
設定を調整して、大量のメッセージを処理できます。詳細は、大量のメッセージの処理 を参照してください。
KafkaConnect カスタムリソース設定の例
- 1
KafkaConnectを使用します。- 2
- Kafka Connect クラスターの KafkaConnectors を有効にします。
- 3
- タスクを実行するワーカーのレプリカノード数。
- 4
- mTLS、トークンベースの OAuth、SASL ベース SCRAM-SHA-256/SCRAM-SHA-512、または PLAIN として指定された Kafka Connect クラスターの認証。デフォルトでは、Kafka Connect はプレーンテキスト接続を使用して Kafka ブローカーに接続します。
- 5
- Kafka クラスターに接続するためのブートストラップサーバー。
- 6
- クラスターの TLS 証明書が X.509 形式で保存されるキー名のある TLS による暗号化。複数の証明書が同じシークレットに保存されている場合は、複数回リストできます。
- 7
- ワーカーの Kafka Connect 設定 (コネクターではない)。Streams for Apache Kafka によって直接管理されないプロパティーに限り、標準の Apache Kafka 設定を指定できます。
- 8
- コネクタープラグインで自動的にコンテナーイメージをビルドするためのビルド設定プロパティー。
- 9
- (必須) 新しいイメージがプッシュされるコンテナーレジストリーの設定。
- 10
- (必須) 新しいコンテナーイメージに追加するコネクタープラグインとそれらのアーティファクトのリスト。各プラグインは、1 つ以上の
artifactを使用して設定する必要があります。 - 11
- ここで示す環境変数や、ボリュームを使用したコネクターの外部設定。設定プロバイダープラグインを使用して、外部ソースから設定値を読み込むこともできます。
- 12
- 現在
cpuおよびmemoryである、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。 - 13
- 指定された Kafka ロガーおよびログレベルが ConfigMap を介して直接的に (
inline) または間接的に (external) に追加されます。カスタム Log4j 設定は、ConfigMap のlog4j.propertiesキーまたはlog4j2.propertiesキーの下に配置する必要があります。Kafka Connectlog4j.rootLoggerロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。 - 14
- コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
- 15
- Prometheus メトリクス。この例では、Prometheus JMX エクスポーターの設定が含まれる ConfigMap を参照して有効になります。
metricsConfig.valueFrom.configMapKeyRef.key配下に空のファイルが含まれる ConfigMap の参照を使用して、追加設定なしでメトリックを有効にできます。 - 16
- Kafka Connect を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
- 17
- 高度なオプション: コンテナーイメージの設定。特別な状況でのみ推奨されます。
- 18
- 特別なオプション: 展開のための Rack awareness 設定。これは、リージョン間ではなく、同じロケーション内でのデプロイメントを目的とした特殊なオプションです。このオプションは、コネクターがリーダーレプリカではなく、最も近いレプリカから消費する場合に使用できます。場合によっては、最も近いレプリカから消費することで、ネットワークの使用率を改善したり、コストを削減したりできます。
topologyKeyは、ラック ID を含むノードラベルと一致する必要があります。この設定で使用される例では、標準のtopology.kubernetes.io/zoneラベルを使用するゾーンを指定します。最も近いレプリカから消費するには、Kafka ブローカー設定でRackAwareReplicaSelectorを有効にします。 - 19
- テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
- 20
- 分散トレース用に環境変数が設定されます。
- 21
- 分散トレーシングは、OpenTelemetry を使用して有効になります。
9.6.1. 複数のインスタンス用の Kafka Connect の設定 リンクのコピーリンクがクリップボードにコピーされました!
デフォルトでは、Streams for Apache Kafka は、Kafka Connect によって使用される内部トピックのグループ ID と名前を設定します。Kafka Connect の複数のインスタンスを実行する場合は、次の config プロパティーを使用してこれらのデフォルト設定を変更する必要があります。
group.id が同じすべてのインスタンスで、3 つのトピックの値を同じにする必要があります。
これらのデフォルト設定を変更しない限り、同じ Kafka クラスターに接続する各インスタンスは同じ値でデプロイされます。実際には、これはすべてのインスタンスがクラスターを形成し、同じ内部トピックを使用することを意味します。
複数のインスタンスが同じ内部トピックを使用しようとすると、予期しないエラーが発生するため、インスタンスごとにこれらのプロパティーの値を変更する必要があります。
9.6.2. Kafka Connect のユーザー認可の設定 リンクのコピーリンクがクリップボードにコピーされました!
Kafka で認可を使用する場合、Kafka Connect ユーザーはクラスターグループおよび Kafka Connect の内部トピックへの読み取り/書き込みアクセス権が必要です。この手順では、simple 認可および ACL を使用してアクセス権限を付与する方法を説明します。
Kafka Connect クラスターグループ ID と内部トピックのプロパティーは、デフォルトで Streams for Apache Kafka によって設定されます。あるいは、KafkaConnect リソースの spec で明示的に定義することもできます。複数の Kafka Connect インスタンスを実行する 場合はグループ ID とトピックの値が異なる必要があるため、これは複数のインスタンスに対して Kafka Connect を設定する場合に便利です。
簡易認可は、Kafka AclAuthorizer および StandardAuthorizer プラグインで管理される ACL ルールを使用し、適切なアクセスレベルを確保します。KafkaUser リソースに簡易認証を使用するように設定する方法については、AclRule スキーマリファレンスを参照してください。
前提条件
- OpenShift クラスター
- 稼働中の Cluster Operator
手順
KafkaUserリソースのauthorizationプロパティーを編集し、アクセス権限をユーザーに付与します。アクセス権は、
literal名の値を使用して、Kafka Connect トピックとクラスターグループに対して設定されます。次の表に、トピックとクラスターグループ ID に設定されたデフォルトの名前を示します。Expand 表9.2 アクセス権設定の名前 プロパティー 名前 offset.storage.topicconnect-cluster-offsetsstatus.storage.topicconnect-cluster-statusconfig.storage.topicconnect-cluster-configsgroupconnect-clusterこの設定例では、デフォルト名を使用してアクセス権を指定します。Kafka Connect インスタンスに別の名前を使用している場合は、ACL 設定でそれらの名前を使用します。
簡易認可の設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow リソースを作成または更新します。
oc apply -f KAFKA-USER-CONFIG-FILE
oc apply -f KAFKA-USER-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow
9.6.3. Kafka Connect コネクターの手動停止または一時停止 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースを使用してコネクターを設定している場合は、state 設定を使用してコネクターを停止または一時停止します。コネクターとタスクがインスタンス化されたままになる一時停止状態とは対照的に、コネクターを停止すると設定のみが保持され、アクティブなプロセスは保持されません。コネクター実行の停止は、単に一時停止するよりも長時間停止する場合に適しています。一時停止されたコネクターはすぐに再開されますが、停止されたコネクターにはメモリーとリソースが解放されるという利点があります。
state 設定は、KafkaConnectorSpec スキーマの (非推奨) pause 設定を置き換えるもので、コネクターでの一時停止を許可します。これまでに pause 設定を使用してコネクターを一時停止した場合には、競合の回避目的にのみ state 設定の使用に移行することを推奨します。
前提条件
- Cluster Operator が稼働中である。
手順
一時停止または停止するコネクターを制御する
KafkaConnectorカスタムリソースの名前を見つけます。oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnectorリソースを編集して、コネクターを停止または一時停止します。Kafka Connect コネクターを停止する設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow state設定をstoppedまたはpausedに変更します。このプロパティーが設定されていない場合のコネクターのデフォルトの状態はrunningです。変更を
KafkaConnector設定に適用します。stateをrunningに変更するか、設定を削除して、コネクターを再開できます。
あるいは、Kafka Connect API を公開し、stop エンドポイントと 一時pause エンドポイントを使用してコネクターの実行を停止することもできます。たとえば、PUT /connectors/<connector_name>/stop などです。その後、resume エンドポイントを使用して再起動できます。
9.6.4. Kafka Connect コネクターの手動での再起動 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースを使用してコネクターを管理している場合は、strimzi.io/restart アノテーションを使用してコネクターの再起動を手動でトリガーします。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka コネクターを制御する
KafkaConnectorカスタムリソースの名前を見つけます。oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow OpenShift で
KafkaConnectorリソースにアノテーションを付けて、コネクターを再起動します。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"Copy to Clipboard Copied! Toggle word wrap Toggle overflow restartアノテーションはtrueに設定されています。次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクターは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnectorカスタムリソースから削除されます。
9.6.5. Kafka Connect コネクタータスクの手動での再起動 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースを使用してコネクターを管理している場合は、strimzi.io/restart-task アノテーションを使用して、コネクタータスクの再起動を手動でトリガーします。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka コネクタータスクを制御する
KafkaConnectorカスタムリソースの名前を見つけます。oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnectorカスタムリソースから再起動するタスクの ID を検索します。oc describe KafkaConnector <kafka_connector_name>
oc describe KafkaConnector <kafka_connector_name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow タスク ID は 0 から始まる負の値ではない整数です。
OpenShift で
KafkaConnectorリソースにアノテーションを付けて、ID を使用してコネクタータスクを再開します。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"Copy to Clipboard Copied! Toggle word wrap Toggle overflow この例では、タスク
0が再起動されます。次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクタータスクは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnectorカスタムリソースから削除されます。