9.6. Kafka Connect の設定
KafkaConnect
カスタムリソースの spec
プロパティーを更新して、Kafka Connect デプロイメントを設定します。
Kafka Connect を使用して、Kafka クラスターへの外部データ接続を設定します。KafkaConnect
リソースのプロパティーを使用して、Kafka Connect デプロイメントを設定します。
Kafka Connect クラスターの設定オプションの詳細は、Streams for Apache Kafka カスタムリソース API リファレンス を参照してください。
KafkaConnector の設定
KafkaConnect
リソースを使用すると、Kafka Connect のコネクターインスタンスを OpenShift ネイティブに作成および管理できます。
Kafka Connect 設定では、strimzi.io/use-connector-resources
アノテーションを追加して、Kafka Connect クラスターの KafkaConnectors を有効にします。また、build
設定を追加して、データ接続に必要なコネクタープラグインを備えたコンテナーイメージを Streams for Apache Kafka が自動的にビルドするようにすることもできます。Kafka Connect コネクターの外部設定は、externalConfiguration
プロパティーで指定します。
コネクターを管理するには、KafkaConnector
カスタムリソースまたは Kafka Connect REST API を使用できます。KafkaConnector
リソースは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。これらの方法を使用してコネクターを作成、再設定、または削除する方法の詳細については、コネクターの追加 を参照してください。
コネクター設定は、HTTP リクエストの一部として Kafka Connect に渡され、Kafka 自体に保存されます。ConfigMap およびシークレットは、設定やデータの保存に使用される標準的な OpenShift リソースです。ConfigMap およびシークレットを使用してコネクターの特定の要素を設定できます。その後、HTTP REST コマンドで設定値を参照できます。これにより、必要な場合は設定が分離され、よりセキュアになります。この方法は、ユーザー名、パスワード、証明書などの機密性の高いデータに適用されます。
大量のメッセージ処理
設定を調整して、大量のメッセージを処理できます。詳細は、大量のメッセージの処理 を参照してください。
KafkaConnect
カスタムリソース設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect 1 metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 2 spec: replicas: 3 3 authentication: 4 type: tls certificateAndKey: certificate: source.crt key: source.key secretName: my-user-source bootstrapServers: my-cluster-kafka-bootstrap:9092 5 tls: 6 trustedCertificates: - secretName: my-cluster-cluster-cert certificate: ca.crt - secretName: my-cluster-cluster-cert certificate: ca2.crt config: 7 group.id: my-connect-cluster offset.storage.topic: my-connect-cluster-offsets config.storage.topic: my-connect-cluster-configs status.storage.topic: my-connect-cluster-status key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: true value.converter.schemas.enable: true config.storage.replication.factor: 3 offset.storage.replication.factor: 3 status.storage.replication.factor: 3 build: 8 output: 9 type: docker image: my-registry.io/my-org/my-connect-cluster:latest pushSecret: my-registry-credentials plugins: 10 - name: connector-1 artifacts: - type: tgz url: <url_to_download_connector_1_artifact> sha512sum: <SHA-512_checksum_of_connector_1_artifact> - name: connector-2 artifacts: - type: jar url: <url_to_download_connector_2_artifact> sha512sum: <SHA-512_checksum_of_connector_2_artifact> externalConfiguration: 11 env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-creds key: awsAccessKey - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey resources: 12 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 13 type: inline loggers: log4j.rootLogger: INFO readinessProbe: 14 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 metricsConfig: 15 type: jmxPrometheusExporter valueFrom: configMapKeyRef: name: my-config-map key: my-key jvmOptions: 16 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest 17 rack: topologyKey: topology.kubernetes.io/zone 18 template: 19 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer: 20 env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_OTLP_ENDPOINT value: "http://otlp-host:4317" tracing: type: opentelemetry 21
- 1
KafkaConnect
を使用します。- 2
- Kafka Connect クラスターの KafkaConnectors を有効にします。
- 3
- タスクを実行するワーカーのレプリカノード数。
- 4
- mTLS、トークンベースの OAuth、SASL ベース SCRAM-SHA-256/SCRAM-SHA-512、または PLAIN として指定された Kafka Connect クラスターの認証。デフォルトでは、Kafka Connect はプレーンテキスト接続を使用して Kafka ブローカーに接続します。
- 5
- Kafka クラスターに接続するためのブートストラップサーバー。
- 6
- クラスターの TLS 証明書が X.509 形式で保存されるキー名のある TLS による暗号化。複数の証明書が同じシークレットに保存されている場合は、複数回リストできます。
- 7
- ワーカーの Kafka Connect 設定 (コネクターではない)。Streams for Apache Kafka によって直接管理されないプロパティーに限り、標準の Apache Kafka 設定を指定できます。
- 8
- コネクタープラグインで自動的にコンテナーイメージをビルドするためのビルド設定プロパティー。
- 9
- (必須) 新しいイメージがプッシュされるコンテナーレジストリーの設定。
- 10
- (必須) 新しいコンテナーイメージに追加するコネクタープラグインとそれらのアーティファクトのリスト。各プラグインは、1 つ以上の
artifact
を使用して設定する必要があります。 - 11
- ここで示す環境変数や、ボリュームを使用したコネクターの外部設定。設定プロバイダープラグインを使用して、外部ソースから設定値を読み込むこともできます。
- 12
- 現在
cpu
およびmemory
である、サポートされるリソースの予約を要求し、消費可能な最大リソースを指定を制限します。 - 13
- 指定された Kafka ロガーおよびログレベルが ConfigMap を介して直接的に (
inline
) または間接的に (external
) に追加されます。カスタム Log4j 設定は、ConfigMap のlog4j.properties
キーまたはlog4j2.properties
キーの下に配置する必要があります。Kafka Connectlog4j.rootLogger
ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。 - 14
- コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
- 15
- Prometheus メトリクス。この例では、Prometheus JMX エクスポーターの設定が含まれる ConfigMap を参照して有効になります。
metricsConfig.valueFrom.configMapKeyRef.key
配下に空のファイルが含まれる ConfigMap の参照を使用して、追加設定なしでメトリックを有効にできます。 - 16
- Kafka Connect を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
- 17
- 高度なオプション: コンテナーイメージの設定。特別な状況でのみ推奨されます。
- 18
- 特別なオプション: 展開のための Rack awareness 設定。これは、リージョン間ではなく、同じロケーション内でのデプロイメントを目的とした特殊なオプションです。このオプションは、コネクターがリーダーレプリカではなく、最も近いレプリカから消費する場合に使用できます。場合によっては、最も近いレプリカから消費することで、ネットワークの使用率を改善したり、コストを削減したりできます。
topologyKey
は、ラック ID を含むノードラベルと一致する必要があります。この設定で使用される例では、標準のtopology.kubernetes.io/zone
ラベルを使用するゾーンを指定します。最も近いレプリカから消費するには、Kafka ブローカー設定でRackAwareReplicaSelector
を有効にします。 - 19
- テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
- 20
- 分散トレース用に環境変数が設定されます。
- 21
- 分散トレーシングは、OpenTelemetry を使用して有効になります。
9.6.1. 複数のインスタンス用の Kafka Connect の設定
デフォルトでは、Streams for Apache Kafka は、Kafka Connect によって使用される内部トピックのグループ ID と名前を設定します。Kafka Connect の複数のインスタンスを実行する場合は、次の config
プロパティーを使用してこれらのデフォルト設定を変更する必要があります。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect spec: config: group.id: my-connect-cluster 1 offset.storage.topic: my-connect-cluster-offsets 2 config.storage.topic: my-connect-cluster-configs 3 status.storage.topic: my-connect-cluster-status 4 # ... # ...
group.id
が同じすべてのインスタンスで、3 つのトピックの値を同じにする必要があります。
これらのデフォルト設定を変更しない限り、同じ Kafka クラスターに接続する各インスタンスは同じ値でデプロイされます。実際には、これはすべてのインスタンスがクラスターを形成し、同じ内部トピックを使用することを意味します。
複数のインスタンスが同じ内部トピックを使用しようとすると、予期しないエラーが発生するため、インスタンスごとにこれらのプロパティーの値を変更する必要があります。
9.6.2. Kafka Connect のユーザー認可の設定
Kafka で認可を使用する場合、Kafka Connect ユーザーはクラスターグループおよび Kafka Connect の内部トピックへの読み取り/書き込みアクセス権が必要です。この手順では、simple
認可および ACL を使用してアクセス権限を付与する方法を説明します。
Kafka Connect クラスターグループ ID と内部トピックのプロパティーは、デフォルトで Streams for Apache Kafka によって設定されます。あるいは、KafkaConnect
リソースの spec
で明示的に定義することもできます。複数の Kafka Connect インスタンスを実行する 場合はグループ ID とトピックの値が異なる必要があるため、これは複数のインスタンスに対して Kafka Connect を設定する場合に便利です。
簡易認可は、Kafka AclAuthorizer
および StandardAuthorizer
プラグインで管理される ACL ルールを使用し、適切なアクセスレベルを確保します。KafkaUser
リソースに簡易認証を使用するように設定する方法については、AclRule
スキーマリファレンスを参照してください。
前提条件
- OpenShift クラスター
- 稼働中の Cluster Operator
手順
KafkaUser
リソースのauthorization
プロパティーを編集し、アクセス権限をユーザーに付与します。アクセス権は、
literal
名の値を使用して、Kafka Connect トピックとクラスターグループに対して設定されます。次の表に、トピックとクラスターグループ ID に設定されたデフォルトの名前を示します。表9.2 アクセス権設定の名前 プロパティー 名前 offset.storage.topic
connect-cluster-offsets
status.storage.topic
connect-cluster-status
config.storage.topic
connect-cluster-configs
group
connect-cluster
この設定例では、デフォルト名を使用してアクセス権を指定します。Kafka Connect インスタンスに別の名前を使用している場合は、ACL 設定でそれらの名前を使用します。
簡易認可の設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: # ... authorization: type: simple acls: # access to offset.storage.topic - resource: type: topic name: connect-cluster-offsets patternType: literal operations: - Create - Describe - Read - Write host: "*" # access to status.storage.topic - resource: type: topic name: connect-cluster-status patternType: literal operations: - Create - Describe - Read - Write host: "*" # access to config.storage.topic - resource: type: topic name: connect-cluster-configs patternType: literal operations: - Create - Describe - Read - Write host: "*" # cluster group - resource: type: group name: connect-cluster patternType: literal operations: - Read host: "*"
リソースを作成または更新します。
oc apply -f KAFKA-USER-CONFIG-FILE
9.6.3. Kafka Connect コネクターの手動停止または一時停止
KafkaConnector
リソースを使用してコネクターを設定している場合は、state
設定を使用してコネクターを停止または一時停止します。コネクターとタスクがインスタンス化されたままになる一時停止状態とは対照的に、コネクターを停止すると設定のみが保持され、アクティブなプロセスは保持されません。コネクター実行の停止は、単に一時停止するよりも長時間停止する場合に適しています。一時停止されたコネクターはすぐに再開されますが、停止されたコネクターにはメモリーとリソースが解放されるという利点があります。
state
設定は、KafkaConnectorSpec
スキーマの (非推奨) pause
設定を置き換えるもので、コネクターでの一時停止を許可します。これまでに pause
設定を使用してコネクターを一時停止した場合には、競合の回避目的にのみ state
設定の使用に移行することを推奨します。
前提条件
- Cluster Operator が稼働中である。
手順
一時停止または停止するコネクターを制御する
KafkaConnector
カスタムリソースの名前を見つけます。oc get KafkaConnector
KafkaConnector
リソースを編集して、コネクターを停止または一時停止します。Kafka Connect コネクターを停止する設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 2 config: file: "/opt/kafka/LICENSE" topic: my-topic state: stopped # ...
state
設定をstopped
またはpaused
に変更します。このプロパティーが設定されていない場合のコネクターのデフォルトの状態はrunning
です。変更を
KafkaConnector
設定に適用します。state
をrunning
に変更するか、設定を削除して、コネクターを再開できます。
あるいは、Kafka Connect API を公開し、stop
エンドポイントと 一時pause
エンドポイントを使用してコネクターの実行を停止することもできます。たとえば、PUT /connectors/<connector_name>/stop
などです。その後、resume
エンドポイントを使用して再起動できます。
9.6.4. Kafka Connect コネクターの手動での再起動
KafkaConnector
リソースを使用してコネクターを管理している場合は、strimzi.io/restart
アノテーションを使用してコネクターの再起動を手動でトリガーします。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka コネクターを制御する
KafkaConnector
カスタムリソースの名前を見つけます。oc get KafkaConnector
OpenShift で
KafkaConnector
リソースにアノテーションを付けて、コネクターを再起動します。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"
restart
アノテーションはtrue
に設定されています。次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクターは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnector
カスタムリソースから削除されます。
9.6.5. Kafka Connect コネクタータスクの手動での再起動
KafkaConnector
リソースを使用してコネクターを管理している場合は、strimzi.io/restart-task
アノテーションを使用して、コネクタータスクの再起動を手動でトリガーします。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka コネクタータスクを制御する
KafkaConnector
カスタムリソースの名前を見つけます。oc get KafkaConnector
KafkaConnector
カスタムリソースから再起動するタスクの ID を検索します。oc describe KafkaConnector <kafka_connector_name>
タスク ID は 0 から始まる負の値ではない整数です。
OpenShift で
KafkaConnector
リソースにアノテーションを付けて、ID を使用してコネクタータスクを再開します。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"
この例では、タスク
0
が再起動されます。次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクタータスクは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnector
カスタムリソースから削除されます。