2.4.4. MirrorMaker 2.0 を使用した Kafka クラスター間でのデータの同期
MirrorMaker 2.0 を使用して、設定を介して Kafka クラスター間のデータを同期します。
設定では以下を指定する必要があります。
- 各 Kafka クラスター
- TLS 認証を含む各クラスターの接続情報
レプリケーションのフローおよび方向
- クラスター対クラスター
- トピック対トピック
KafkaMirrorMaker2
リソースのプロパティーを使用して、Kafka MirrorMaker 2.0 のデプロイメントを設定します。
従来のバージョンの MirrorMaker は継続してサポートされます。従来のバージョンに設定したリソースを使用する場合は、MirrorMaker 2.0 でサポートされる形式に更新する必要があります。
MirrorMaker 2.0 によって、レプリケーション係数などのプロパティーのデフォルト設定値が提供されます。デフォルトに変更がない最小設定の例は以下のようになります。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 2.8.0 connectCluster: "my-cluster-target" clusters: - alias: "my-cluster-source" bootstrapServers: my-cluster-source-kafka-bootstrap:9092 - alias: "my-cluster-target" bootstrapServers: my-cluster-target-kafka-bootstrap:9092 mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: {}
TLS または SASL 認証を使用して、ソースおよびターゲットクラスターのアクセス制御を設定できます。この手順では、ソースおよびターゲットクラスターに対して TLS による暗号化および認証を使用する設定を説明します。
前提条件
以下を実行する方法については、『 OpenShift での AMQ Streams のデプロイおよびアップグレード』を参照してください。
- ソースおよびターゲットの Kafka クラスターが使用できる必要があります。
手順
KafkaMirrorMaker2
リソースのspec
プロパティーを編集します。設定可能なプロパティーは以下の例のとおりです。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 2.8.0 1 replicas: 3 2 connectCluster: "my-cluster-target" 3 clusters: 4 - alias: "my-cluster-source" 5 authentication: 6 certificateAndKey: certificate: source.crt key: source.key secretName: my-user-source type: tls bootstrapServers: my-cluster-source-kafka-bootstrap:9092 7 tls: 8 trustedCertificates: - certificate: ca.crt secretName: my-cluster-source-cluster-ca-cert - alias: "my-cluster-target" 9 authentication: 10 certificateAndKey: certificate: target.crt key: target.key secretName: my-user-target type: tls bootstrapServers: my-cluster-target-kafka-bootstrap:9092 11 config: 12 config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 13 ssl.enabled.protocols: "TLSv1.2" ssl.protocol: "TLSv1.2" ssl.endpoint.identification.algorithm: HTTPS 14 tls: 15 trustedCertificates: - certificate: ca.crt secretName: my-cluster-target-cluster-ca-cert mirrors: 16 - sourceCluster: "my-cluster-source" 17 targetCluster: "my-cluster-target" 18 sourceConnector: 19 tasksMax: 10 20 config: replication.factor: 1 21 offset-syncs.topic.replication.factor: 1 22 sync.topic.acls.enabled: "false" 23 refresh.topics.interval.seconds: 60 24 replication.policy.separator: "" 25 replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy" 26 heartbeatConnector: 27 config: heartbeats.topic.replication.factor: 1 28 checkpointConnector: 29 config: checkpoints.topic.replication.factor: 1 30 refresh.groups.interval.seconds: 600 31 sync.group.offsets.enabled: true 32 sync.group.offsets.interval.seconds: 60 33 emit.checkpoints.interval.seconds: 60 34 replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy" topicsPattern: ".*" 35 groupsPattern: "group1|group2|group3" 36 resources: 37 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 38 type: inline loggers: connect.root.logger.level: "INFO" readinessProbe: 39 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 jvmOptions: 40 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest 41 template: 42 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer: 43 env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831" tracing: type: jaeger 44 externalConfiguration: 45 env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-creds key: awsAccessKey - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey
- 1
- 常に同じになる Kafka Connect と Mirror Maker 2.0 の バージョン。
- 2
- 3
- Kafka Connect の Kafka クラスターエイリアス。ターゲット Kafka クラスターを指定する必要があります。Kafka クラスターは、その内部トピックのために Kafka Connect によって使用されます。
- 4
- 同期される Kafka クラスターの 指定。
- 5
- ソースの Kafka クラスターの クラスターエイリアス。
- 6
- 7
- ソース Kafka クラスターに接続するための ブートストラップサーバー。
- 8
- ソース Kafka クラスターの TLS 証明書が X.509 形式で保存されるキー名のある TLS による暗号化。複数の証明書が同じシークレットに保存されている場合は、複数回リストできます。
- 9
- ターゲット Kafka クラスターの クラスターエイリアス。
- 10
- ターゲット Kafka クラスターの認証は、ソース Kafka クラスターと同様に設定されます。
- 11
- ターゲット Kafka クラスターに接続するための ブートストラップサーバー。
- 12
- Kafka Connect の設定。標準の Apache Kafka 設定が提供されることがありますが、AMQ Streams によって直接管理されないプロパティーに限定されます。
- 13
- TLS バージョンの特定の 暗号スイート と実行される外部リスナーの SSL プロパティー。
- 14
HTTPS
に設定することで、ホスト名の検証が有効になります。空の文字列を指定すると検証が無効になります。- 15
- ターゲット Kafka クラスターの TLS による暗号化は、ソース Kafka クラスターと同様に設定されます。
- 16
- 17
- MirrorMaker 2.0 コネクターによって使用されるソースクラスターの クラスターエイリアス。
- 18
- MirrorMaker 2.0 コネクターによって使用されるターゲットクラスターの クラスターエイリアス。
- 19
- リモートトピックを作成する
MirrorSourceConnector
の設定。デフォルトの設定オプションはconfig
によって上書きされます。 - 20
- コネクターによる作成が可能なタスクの最大数。タスクは、データのレプリケーションを処理し、並行して実行されます。インフラストラクチャーが処理のオーバーヘッドをサポートする場合、この値を大きくするとスループットが向上されます。Kafka Connect は、クラスターのメンバー間でタスクを分散します。ワーカーよりも多くのタスクがある場合は、ワーカーには複数のタスクが割り当てられます。シンクコネクターでは、消費される各トピックパーティションに 1 つタスクがあるようになることを目指します。ソースコネクターでは、並行して実行できるタスクの数は外部システムによって異なる場合もあります。並列処理を実現できない場合、コネクターは最大数より少ないタスクを作成します。
- 21
- ターゲットクラスターで作成されるミラーリングされたトピックのレプリケーション係数。
- 22
- ソースおよびターゲットクラスターのオフセットをマップする
MirrorSourceConnector
offset-syncs
内部トピックのレプリケーション係数。 - 23
- ACL ルールの同期 が有効になっていると、同期されたトピックに ACL が適用されます。デフォルトは
true
です。 - 24
- 新規トピックのチェック頻度を変更する任意設定。デフォルトでは 10 分毎にチェックされます。
- 25
- リモートトピック名の変更に使用する区切り文字を定義します。
- 26
- リモートトピック名の自動変更をオーバーライドするポリシーを追加します。その名前の前にソースクラスターの名前を追加する代わりに、トピックが元の名前を保持します。このオプションの設定は、active/passive バックアップおよびデータ移行に役立ちます。トピックオフセットの同期を設定するには、このプロパティーも
checkpointConnector.config
に設定する必要があります。 - 27
- 接続チェックを実行する
MirrorHeartbeatConnector
の設定。デフォルトの設定オプションはconfig
によって上書きされます。 - 28
- ターゲットクラスターで作成されたハートビートトピックのレプリケーション係数。
- 29
- オフセットを追跡する
MirrorCheckpointConnector
の設定。デフォルトの設定オプションはconfig
によって上書きされます。 - 30
- ターゲットクラスターで作成されたチェックポイントトピックのレプリケーション係数。
- 31
- 新規コンシューマーグループのチェック頻度を変更する任意設定。デフォルトでは 10 分毎にチェックされます。
- 32
- コンシューマーグループのオフセットを同期する任意設定。これは、active/passive 設定でのリカバリーに便利です。同期はデフォルトでは有効になっていません。
- 33
- コンシューマーグループオフセットの同期が有効な場合は、同期の頻度を調整できます。
- 34
- オフセット追跡のチェック頻度を調整します。オフセット同期の頻度を変更する場合、これらのチェックの頻度も調整する必要がある場合があります。
- 35
- 正規表現パターンとして定義されたソースクラスターからのトピックレプリケーション。ここで、すべてのトピックを要求します。
- 36
- 正規表現パターンとして定義されたソースクラスターからのコンシューマーグループレプリケーション。ここで、3 つのコンシューマーグループを名前で要求します。コンマ区切りリストを使用できます。
- 37
- 38
- 指定された Kafka loggers and log levels が ConfigMap を介して直接的に (
inline
) または間接的に (external
) に追加されます。カスタム ConfigMap は、log4j.properties
またはlog4j2.properties
キー下に配置する必要があります。Kafka Connectlog4j.rootLogger
ロガーでは、ログレベルを INFO、ERROR、WARN、TRACE、DEBUG、FATAL または OFF に設定できます。 - 39
- コンテナーを再起動するタイミング (liveness) およびコンテナーがトラフィックを許可できるタイミング (readiness) を把握するためのヘルスチェック。
- 40
- Kafka MirrorMaker を実行している仮想マシン (VM) のパフォーマンスを最適化するための JVM 設定オプション。
- 41
- 高度な任意設定: 特別な場合のみ推奨されるコンテナーイメージの設定。
- 42
- テンプレートのカスタマイズ。ここでは、Pod は非アフィニティーでスケジュールされるため、Pod は同じホスト名のノードではスケジュールされません。
- 43
- 環境変数は、Jaeger を使用した分散トレーシングにも設定 されます。
- 44
- 45
- 環境変数として Kafka MirrorMaker にマウントされた OpenShift Secret の外部設定。また、Kubernetes 設定プロバイダーを使用して 外部ソースから設定値を読み込むこともできます。
リソースを作成または更新します。
oc apply -f MIRRORMAKER-CONFIGURATION-FILE