2.3. Kafka MirrorMaker クラスターの設定
本章では、Kafka クラスター間でデータを複製するために AMQ Streams クラスターで Kafka MirrorMaker デプロイメントを設定する方法を説明します。
AMQ Streams では、MirrorMaker または MirrorMaker 2.0 を使用できます。MirrorMaker 2.0 は最新バージョンで、Kafka クラスター間でより効率的にデータをミラーリングする方法を提供します。
MirrorMaker を使用している場合は、KafkaMirrorMaker
リソースを設定します。
以下の手順は、リソースの設定方法を示しています。
KafkaMirrorMaker
リソースの完全なスキーマは、「KafkaMirrorMaker schema のスキーマ参照」に記載されています。
2.3.1. Kafka MirrorMaker の設定
KafkaMirrorMaker
リソースのプロパティーを使用して、Kafka MirrorMaker デプロイメントを設定します。
TLS または SASL 認証を使用して、プロデューサーおよびコンシューマーのアクセス制御を設定できます。この手順では、コンシューマーおよびプロデューサー側で TLS による暗号化および認証を使用する設定を説明します。
前提条件
以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。
- ソースおよびターゲットの Kafka クラスターが使用できる必要があります。
手順
KafkaMirrorMaker
リソースのspec
プロパティーを編集します。設定可能なプロパティーは以下の例のとおりです。
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaMirrorMaker metadata: name: my-mirror-maker spec: replicas: 3 1 consumer: bootstrapServers: my-source-cluster-kafka-bootstrap:9092 2 groupId: "my-group" 3 numStreams: 2 4 offsetCommitInterval: 120000 5 tls: 6 trustedCertificates: - secretName: my-source-cluster-ca-cert certificate: ca.crt authentication: 7 type: tls certificateAndKey: secretName: my-source-secret certificate: public.crt key: private.key config: 8 max.poll.records: 100 receive.buffer.bytes: 32768 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 9 ssl.enabled.protocols: "TLSv1.2" ssl.protocol: "TLSv1.2" ssl.endpoint.identification.algorithm: HTTPS 10 producer: bootstrapServers: my-target-cluster-kafka-bootstrap:9092 abortOnSendFailure: false 11 tls: trustedCertificates: - secretName: my-target-cluster-ca-cert certificate: ca.crt authentication: type: tls certificateAndKey: secretName: my-target-secret certificate: public.crt key: private.key config: compression.type: gzip batch.size: 8192 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 12 ssl.enabled.protocols: "TLSv1.2" ssl.protocol: "TLSv1.2" ssl.endpoint.identification.algorithm: HTTPS 13 whitelist: "my-topic|other-topic" 14 resources: 15 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 16 type: inline loggers: mirrormaker.root.logger: "INFO" readinessProbe: 17 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 metrics: 18 lowercaseOutputName: true rules: - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*><>Count" name: "kafka_server_$1_$2_total" - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*, topic=(.+)><>Count" name: "kafka_server_$1_$2_total" labels: topic: "$3" jvmOptions: 19 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest 20 template: 21 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer: 22 env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831" tracing: 23 type: jaeger
- 1
- 2
- コンシューマーおよびプロデューサーのブートストラップサーバー。
- 3
- 4
- 5
- 6
- コンシューマーまたはプロデューサーの TLS 証明書が X.509 形式で保存される、キー名のある TLS による暗号化。複数の証明書が同じシークレットに保存されている場合は、複数回リストできます。
- 7
- OAuth ベアラートークン、SASL ベースの SCRAM-SHA-512 または PLAIN メカニズムを使用し、ここで示された TLS メカニズム を使用する、コンシューマーおよびプロデューサーの認証。
- 8
- 9
- TLS バージョンの特定の 暗号スイート と実行される外部リスナーの SSL プロパティー。
- 10
HTTPS
に設定することで、ホスト名の検証が有効になります。空の文字列を指定すると検証が無効になります。- 11
abortOnSendFailure
プロパティがtrue
に設定されている場合、メッセージの送信に失敗した後、Kafka MirrorMaker は終了し、コンテナは再起動します。- 12
- TLS バージョンの特定の 暗号スイート と実行される外部リスナーの SSL プロパティー。
- 13
HTTPS
に設定することで、ホスト名の検証が有効になります。空の文字列を指定すると検証が無効になります。- 14
- ソースからターゲット Kafka クラスターにミラーリングされたトピックの 許可リスト。
- 15
- 16
- 指定された loggers and log levels が ConfigMap を介して直接的に (
inline
) または間接的に (external
) に追加されます。カスタム ConfigMap は、log4j.properties
またはlog4j2.properties
キー下に配置する必要があります。MirrorMaker にはmirrormaker.root.logger
と呼ばれる単一のロガーがあります。ログレベルは INFO、ERROR、WARN、TRACE、DEBUG、FATAL、または OFF に設定できます。 - 17
- コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
- 18
- Prometheus メトリクス。この例では、Prometheus JMX エクスポーターの設定で有効になっています。
metrics: {}
を使用すると追加設定なしでメトリクスを有効にすることができます。 - 19
- Kafka MirrorMaker を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
- 20
- 高度なオプション:特別な場合のみ 推奨される コンテナーイメージの設定。
- 21
- テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
- 22
- 環境変数は、Jaeger を使用した分散トレーシングにも設定 されます。
- 23
警告abortOnSendFailure
プロパティーがfalse
に設定されると、プロデューサーはトピックの次のメッセージを送信しようとします。失敗したメッセージは再送されないため、元のメッセージが失われる可能性があります。リソースを作成または更新します。
oc apply -f <your-file>