2.2. Kafka Connect/S2I クラスターの設定
ここでは、AMQ Streams クラスターにて Kafka Connect や S2I (Source-to-Image) のある Kafka Connect デプロイメントを設定する方法を説明します。
Kafka Connect は、Connector
プラグインを使用して Kafka ブローカーと他のシステムの間でデータをストリーミングする統合ツールです。Kafka Connect は、Kafka と、データベースなどの外部データソースまたはターゲットと統合するためのフレームワークを提供し、コネクターを使用してデータをインポートまたはエクスポートします。コネクターは、必要な接続設定を提供するプラグインです。
Kafka Connect を使用している場合は、KafkaConnect
または KafkaConnectS2I
リソースを設定します。Source-to-Image(S 2I)フレームワークを使用して Kafka Connect をデプロイする場合は、KafkaConnectS
2I リソースを使用します。
-
KafkaConnect
リソースの完全なスキーマは 「KafkaConnect
スキーマ参照」 に記載されています。 -
KafkaConnectS2I
リソースの完全なスキーマは 「KafkaConnectS2I
スキーマ参照」 に記載されています。
2.2.1. Kafka Connect の設定
Kafka Connect を使用して、Kafka クラスターへの外部データ接続を設定します。
KafkaConnect
または KafkaConnectS2I
リソースのプロパティーを使用して、Kafka Connect デプロイメントを設定します。この手順の例は、KafkaConnect リソースの場合
ですが、プロパティーは KafkaConnectS2I
リソースと同じです。
Kafka Connector の設定
KafkaConnector
リソースを使用すると、Kafka Connect のコネクターインスタンスを OpenShift ネイティブに作成および管理できます。
設定で、strimzi.io/use-connector-resources
アノテーションを追加して、Kafka Connect クラスターの KafkaConnectors
を有効にします。externalConfiguration
プロパティーを使用して Kafka Connect コネクターの外部設定を指定することもできます。
コネクターは、Kafka Connect の HTTP REST インターフェースまたは KafkaConnectors
を使用して作成、再設定、および削除されます。これらの方法の詳細は、『OpenShift での AMQ Streams のデプロイおよびアップグレード』の「コネクターの作成および管理」を参照してください。
コネクター設定は、HTTP リクエストの一部として Kafka Connect に渡され、Kafka 自体に保存されます。ConfigMap およびシークレットは、設定やデータの保存に使用される標準的な OpenShift リソースです。ConfigMap およびシークレットを使用してコネクターの特定の要素を設定できます。その後、HTTP REST コマンドで設定値を参照できます (これにより、必要な場合は設定が分離され、よりセキュアになります)。この方法は、ユーザー名、パスワード、証明書などの機密性の高いデータに適用されます。
前提条件
- OpenShift クラスター。
- 稼働中の Cluster Operator。
以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。
手順
KafkaConnect
またはKafkaConnectS2I
リソースのspec
プロパティーを編集します。設定可能なプロパティーは以下の例のとおりです。
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect 1 metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 2 spec: replicas: 3 3 authentication: 4 type: tls certificateAndKey: certificate: source.crt key: source.key secretName: my-user-source bootstrapServers: my-cluster-kafka-bootstrap:9092 5 tls: 6 trustedCertificates: - secretName: my-cluster-cluster-cert certificate: ca.crt - secretName: my-cluster-cluster-cert certificate: ca2.crt config: 7 group.id: my-connect-cluster offset.storage.topic: my-connect-cluster-offsets config.storage.topic: my-connect-cluster-configs status.storage.topic: my-connect-cluster-status key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: true value.converter.schemas.enable: true config.storage.replication.factor: 3 offset.storage.replication.factor: 3 status.storage.replication.factor: 3 externalConfiguration: 8 env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-creds key: awsAccessKey - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey resources: 9 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 10 type: inline loggers: log4j.rootLogger: "INFO" readinessProbe: 11 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 metrics: 12 lowercaseOutputName: true lowercaseOutputLabelNames: true rules: - pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+) name: kafka_connect_worker_$1 help: "Kafka Connect JMX metric worker" type: GAUGE - pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+) name: kafka_connect_worker_rebalance_$1 help: "Kafka Connect JMX metric rebalance information" type: GAUGE jvmOptions: 13 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest 14 template: 15 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer: 16 env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831"
- 1
- 必要に応じて
KafkaConnect
またはKafkaConnectS2I
を使用します。 - 2
- Kafka Connect
クラスターの KafkaConnectors
を有効にします。 - 3
- 4
- OAuth ベアラートークン、SASL ベースの SCRAM-SHA-512 または PLAIN メカニズムを使用し、ここで示された TLS メカニズム を使用する、Kafka Connect クラスターの認証。デフォルトでは、Kafka Connect はプレーンテキスト接続を使用して Kafka ブローカーに接続します。
- 5
- Kafka Connect クラスターに接続するためのブートストラップサーバー。
- 6
- クラスターの TLS 証明書が X.509 形式で保存されるキー名のある TLS による暗号化。複数の証明書が同じシークレットに保存されている場合は、複数回リストできます。
- 7
- ワーカー の Kafka Connect 設定 (コネクターではない)。標準の Apache Kafka 設定が提供されることがありますが、AMQ Streams によって直接管理されないプロパティーに制限されます。
- 8
- ここで示す環境変数や、ボリュームを使用した Kafka コネクターの外部設定
- 9
- 10
- 指定された Kafka loggers and log levels が ConfigMap を介して直接的に (
inline
) または間接的に (external
) に追加されます。カスタム ConfigMap は、log4j.properties
またはlog4j2.properties
キー下に配置する必要があります。Kafka Connectlog4j.rootLogger
ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。 - 11
- コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
- 12
- Prometheus メトリクス。この例では、Prometheus JMX エクスポーターの設定で有効になっています。
metrics: {}
を使用すると追加設定なしでメトリクスを有効にすることができます。 - 13
- Kafka Connect を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
- 14
- 高度なオプション:特別な場合のみ 推奨される コンテナーイメージの設定。
- 15
- テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
- 16
- 環境変数は、Jaeger を使用した分散トレーシングにも設定 されます。
リソースを作成または更新します。
oc apply -f KAFKA-CONNECT-CONFIG-FILE
- Kafka Connect の承認が有効である場合、Kafka Connect ユーザーを設定し、Kafka Connect のコンシューマーグループおよびトピックへのアクセスを有効にします。