2.4. Kafka MirrorMaker 2 集群配置
使用 KafkaMirrorMaker2 资源配置 Kafka MirrorMaker 2 部署。MirrorMaker 2 在两个或多个 Kafka 集群之间复制数据,并在数据中心之间复制数据。
第 6.2.128 节 “KafkaMirrorMaker2 模式参考” 描述 KafkaMirrorMaker2 资源的完整模式。
MirrorMaker 2 资源配置与之前的 MirrorMaker 版本不同。如果您选择使用 MirrorMaker 2,目前没有传统支持,因此任何资源都必须手动转换为新格式。
2.4.1. MirrorMaker 2 数据复制 复制链接链接已复制到粘贴板!
跨集群的数据复制支持需要以下情况:
- 在系统失败时恢复数据
- 用于分析的数据聚合
- 对特定集群的数据访问限制
- 在特定位置置备数据以改进延迟
2.4.1.1. MirrorMaker 2 配置 复制链接链接已复制到粘贴板!
MirrorMaker 2 使用来自源 Kafka 集群的信息,并将其写入目标 Kafka 集群。
MirrorMaker 2 使用:
- 源集群配置使用来自源集群的数据
- 将数据输出到目标集群的目标集群配置
MirrorMaker 2 基于 Kafka Connect 框架,连接器 管理集群之间的数据传输。
您可以配置 MirrorMaker 2 以定义 Kafka Connect 部署,包括源和目标集群的连接详情,然后运行一组 MirrorMaker 2 连接器来进行连接。
MirrorMaker 2 由以下连接器组成:
MirrorSourceConnector-
源连接器将主题从源集群复制到目标集群。它还复制 ACL,且是
MirrorCheckpointConnector才能运行所必需的。 MirrorCheckpointConnector- checkpoint 连接器定期跟踪偏移。如果启用,它还会在源和目标集群之间同步消费者组偏移。
MirrorHeartbeatConnector- heartbeat 连接器会定期检查源和目标集群之间的连接。
如果您使用 User Operator 管理 ACL,则无法通过连接器进行 ACL 复制。
将数据从源集群镜像 (mirror)到目标集群的过程是异步的。每个 MirrorMaker 2 实例将数据从一个源集群镜像到一个目标集群。您可以使用多个 MirrorMaker 2 实例在任意数量的集群间镜像数据。
图 2.1. 在两个集群间复制
默认情况下,检查源集群中的新主题每 10 分钟进行。您可以通过在源连接器配置中添加 refresh.topics.interval.seconds 来更改频率。
2.4.1.1.1. 集群配置 复制链接链接已复制到粘贴板!
您可以在 主动/被动或主动/主动 集群配置中 使用 MirrorMaker 2。
- 主动/主动集群配置
- 主动/主动配置有两个活跃集群来复制数据。应用程序可以使用任一集群。每个集群都可以提供相同的数据。这样,您可以在不同的地理位置提供相同的数据。因为消费者组在两个集群中都活跃,复制主题的使用者偏移不会重新同步到源集群。
- 主动/被动集群配置
- 主动/被动配置具有主动集群,将数据复制到被动集群。被动集群保持待机状态。在系统失败时,您可以使用被动集群进行数据恢复。
预期的结构是,生成者和消费者仅连接到活跃集群。每个目标目的地都需要一个 MirrorMaker 2 集群。
2.4.1.1.2. 双向复制(active/active) 复制链接链接已复制到粘贴板!
MirrorMaker 2 架构支持 主动/主动 集群配置中的双向复制。
每个集群使用 source 和 remote 主题的概念复制其他集群的数据。由于同一主题存储在每个集群中,所以远程主题由 MirrorMaker 2 重命名,以表示源集群。原始集群的名称前面是主题名称的前面。
图 2.2. 主题重命名
通过标记原始集群,主题不会复制到该集群。
在配置需要数据聚合的架构时,通过 远程主题 复制的概念非常有用。消费者可以订阅同一集群中的源和远程主题,而无需单独的聚合集群。
2.4.1.1.3. 单向复制(active/passive) 复制链接链接已复制到粘贴板!
MirrorMaker 2 架构支持 主动/被动 集群配置中的单向复制。
您可以使用 主动/被动集群 配置进行备份或将数据迁移到另一个集群。在这种情况下,您可能不希望自动重命名远程主题。
您可以通过将 IdentityReplicationPolicy 添加到源连接器配置来覆盖自动重命名。应用此配置后,主题会保留其原始名称。
2.4.1.2. 主题配置同步 复制链接链接已复制到粘贴板!
MirrorMaker 2 支持源和目标集群之间的主题配置同步。您可以在 MirrorMaker 2 配置中指定源主题。MirrorMaker 2 监控源主题。MirrorMaker 2 会检测并将更改传播到源主题到远程主题。更改可能包括自动创建缺少的主题和分区。
在大多数情况下,您写入本地主题并从远程主题读取。虽然对于远程主题不会阻止写操作,但应该避免它们。
2.4.1.3. 偏移跟踪 复制链接链接已复制到粘贴板!
MirrorMaker 2 使用内部主题跟踪消费者组的偏移。
offset-syncs主题-
offset-syncs主题映射复制主题元数据的源和目标偏移。 checkpointstopic-
checkpoints主题映射源和目标集群中每个消费者组中复制的主题分区的最后提交偏移量。
与 MirrorMaker 2 内部使用的一样,您不直接与这些主题交互。
MirrorCheckpointConnector 发出偏移跟踪的 检查点。checkpoints 主题的偏移通过配置预先确定的间隔进行跟踪。两个主题都允许从故障转移上的正确偏移位置完全恢复复制。
offset-syncs 主题的默认位置是 源集群。您可以使用 offset-syncs.topic.location 连接器配置将其更改为 目标集群。您需要对包含该主题的集群进行读/写访问。使用目标集群作为 offset-syncs 主题的位置,即使您只对源集群具有读取访问权限,您也可以使用 MirrorMaker 2。
2.4.1.4. 同步消费者组偏移 复制链接链接已复制到粘贴板!
__consumer_offsets 主题存储各个消费者组的提交偏移信息。偏移同步会定期将源集群的消费者组的使用者偏移转移到目标集群的使用者偏移量中。
偏移同步在主动/被动配置中特别有用。如果活跃集群停机,消费者应用程序可以切换到被动(standby)集群,并从最后一个传输的偏移位置获取。
要使用主题偏移同步,请通过将 sync.group.offsets.enabled 添加到检查点连接器配置来启用同步,并将属性设置为 true。默认情况下禁用同步。
在源连接器中使用 IdentityReplicationPolicy 时,还必须在检查点连接器配置中配置它。这样可确保为正确的主题应用镜像的使用者偏移。
消费者偏移只针对在目标集群中没有活跃的消费者组同步。如果消费者组位于目标集群,则无法执行同步,并返回 UNKNOWN_MEMBER_ID 错误。
如果启用,则定期从源集群进行偏移同步。您可以通过在检查点连接器配置中添加 sync.group.offsets.interval.seconds 和 emit.checkpoints.interval.seconds 来更改频率。属性指定同步消费者组偏移的频率,以及为偏移跟踪发送检查点的频率。这两个属性的默认值为 60 秒。您还可以使用 refresh.groups.interval.seconds 属性更改检查新消费者组的频率,该属性默认每 10 分钟执行。
由于同步是基于时间的,因此消费者向被动集群切换的任何用户都会造成一些消息重复。
如果您使用 Java 编写的应用,您可以使用 RemoteClusterUtils.java 实用程序通过应用同步偏移。实用程序从 checkpoints 主题获取消费者组的远程偏移。
2.4.1.5. 连接检查 复制链接链接已复制到粘贴板!
MirrorHeartbeatConnector 发出 心跳 来检查集群之间的连接。
内部 心跳 主题从源集群复制。目标集群使用 heartbeat 主题检查以下内容:
- 连接器管理集群间的连接正在运行
- 源集群可用
2.4.2. 连接器配置 复制链接链接已复制到粘贴板!
为内部连接器使用 Mirrormaker 2 连接器配置,用于编配 Kafka 集群之间的数据同步。
下表描述了连接器属性以及您配置的连接器属性。
| 属性 | sourceConnector | checkpointConnector | heartbeatConnector |
|---|---|---|---|
| ✓ | ✓ | ✓ |
| ✓ | ✓ | ✓ |
| ✓ | ✓ | ✓ |
| ✓ | ✓ | |
| ✓ | ✓ | |
| ✓ | ✓ | |
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ | ||
| ✓ |
2.4.3. 连接器制作者和消费者配置 复制链接链接已复制到粘贴板!
MirrorMaker 2 连接器使用内部制作者和消费者。如果需要,您可以配置这些制作者和消费者来覆盖默认设置。
例如,您可以增加 source producer 的 batch.size,将主题发送到目标 Kafka 集群,以更好地容纳大量消息。
生产者和消费者配置选项取决于 MirrorMaker 2 的实施,可能随时更改。
下表描述了每个连接器的生产者和消费者,以及您可以添加配置。
| 类型 | Description | 配置 |
|---|---|---|
| 制作者 | 将主题信息发送到目标 Kafka 集群。考虑在处理大量数据时调整此制作者的配置。 |
|
| 制作者 |
写入 |
|
| 消费者 | 从源 Kafka 集群检索主题信息。 |
|
| 类型 | Description | 配置 |
|---|---|---|
| 制作者 | 发送消费者偏移检查点。 |
|
| 消费者 |
加载 |
|
您可以将 offset-syncs.topic.location 设置为 target 来使用目标 Kafka 集群作为 offset-syncs 主题的位置。
| 类型 | Description | 配置 |
|---|---|---|
| 制作者 | 发送心跳。 |
|
下例演示了如何配置制作者和消费者。
连接器制作者和消费者配置示例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
version: 3.4.0
# ...
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
sourceConnector:
tasksMax: 5
config:
producer.override.batch.size: 327680
producer.override.linger.ms: 100
producer.request.timeout.ms: 30000
consumer.fetch.max.bytes: 52428800
# ...
checkpointConnector:
config:
producer.override.request.timeout.ms: 30000
consumer.max.poll.interval.ms: 300000
# ...
heartbeatConnector:
config:
producer.override.request.timeout.ms: 30000
# ...
2.4.4. 指定最多的任务数 复制链接链接已复制到粘贴板!
连接器创建负责在 Kafka 中移动数据的任务。每个连接器由一个或多个在运行任务的 worker pod 进行分组的任务组成。在复制大量分区或同步大量消费者组的偏移时,增加任务数量可以帮助解决性能问题。
任务并行运行。为 worker 分配一个或多个任务。单个任务由一个 worker pod 处理,因此您不需要多个 worker pod 而不是任务。如果有多个任务,worker 会处理多个任务。
您可以使用 tasksMax 属性在 MirrorMaker 配置中指定最大连接器任务数量。在不指定最多任务数量的情况下,默认设置是一个任务。
heartbeat 连接器始终使用单个任务。
为源和检查点连接器启动的任务数量是 tasksMax 的最大任务数和 tasksMax 的值之间的较低值。对于源连接器,可能的最大任务数量是从源集群复制的每个分区。对于检查点连接器,可能的最大任务数都是从源集群复制的每个消费者组。在设置最多任务数量时,请考虑分区数量和支持进程的硬件资源。
如果基础架构支持处理开销,增加任务数量可以提高吞吐量和延迟。例如,当有大量分区或消费者组时,添加更多任务可减少轮询源集群的时间。
当您有大量分区时,增加检查点连接器的任务数量非常有用。
为源连接器增加任务数量
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
# ...
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
sourceConnector:
tasksMax: 10
# ...
当您有大量消费者组时,增加检查点连接器的任务数量会很有用。
为检查点连接器增加任务数量
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
# ...
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
checkpointConnector:
tasksMax: 10
# ...
默认情况下,M MirrorMaker 2 每 10 分钟检查新的消费者组。您可以调整 refresh.groups.interval.seconds 配置以更改频率。调整降低时要小心。更频繁地检查会对性能造成负面影响。
2.4.4.1. 检查连接器任务操作 复制链接链接已复制到粘贴板!
如果使用 Prometheus 和 Grafana 监控部署,您可以检查 MirrorMaker 2 性能。AMQ Streams 提供的 MirrorMaker 2 Grafana 仪表板示例显示了与任务和延迟相关的以下指标。
- 任务数量
- 复制延迟
- 偏移同步延迟
2.4.5. ACL 规则同步 复制链接链接已复制到粘贴板!
如果您不使用 User Operator,则可以对远程主题进行 ACL 访问。
如果使用 AclAuthorizer,但没有 User Operator,则管理对代理的访问的 ACL 规则也适用于远程主题。读取源主题的用户可以读取其远程的等效内容。
OAuth 2.0 授权不支持以这种方式访问远程主题。
2.4.6. 配置 Kafka MirrorMaker 2 复制链接链接已复制到粘贴板!
使用 KafkaMirrorMaker2 资源的属性来配置 Kafka MirrorMaker 2 部署。使用 MirrorMaker 2 在 Kafka 集群间同步数据。
配置必须指定:
- 每个 Kafka 集群
- 每个集群的连接信息,包括身份验证
复制流和方向
- 集群到集群
- topic to topic
以前版本的 MirrorMaker 仍被支持。如果要使用为之前的版本配置的资源,则必须将其更新为 MirrorMaker 2 支持的格式。
MirrorMaker 2 为复制因素等属性提供默认配置值。最小配置(默认值)保持不变,如下例所示:
MirrorMaker 2 的最小配置
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: my-mirror-maker2
spec:
version: 3.4.0
connectCluster: "my-cluster-target"
clusters:
- alias: "my-cluster-source"
bootstrapServers: my-cluster-source-kafka-bootstrap:9092
- alias: "my-cluster-target"
bootstrapServers: my-cluster-target-kafka-bootstrap:9092
mirrors:
- sourceCluster: "my-cluster-source"
targetCluster: "my-cluster-target"
sourceConnector: {}
您可以使用 mTLS 或 SASL 身份验证为源和目标集群配置访问控制。此流程显示对源和目标集群使用 TLS 加密和 mTLS 身份验证的配置。
您可以在 KafkaMirrorMaker2 资源中指定您要从源集群复制的主题和消费者组。您可以使用 topicsPattern 和 groupsPattern 属性来执行此操作。您可以提供名称或使用正则表达式列表。默认情况下,如果您没有设置 topicsPattern 和 groupsPattern 属性,则会复制所有主题和消费者组。您还可以使用 ".*" 作为正则表达式来复制所有主题和消费者组。但是,尝试只指定您需要指定主题和消费者组,以避免在集群中造成不必要的额外负载。
处理大量信息
您可以调整配置来处理大量信息。如需更多信息,请参阅处理大量信息。
先决条件
- AMQ Streams 正在运行
- 源和目标 Kafka 集群可用
流程
编辑
KafkaMirrorMaker2资源的spec属性。您可以配置的属性显示在以下示例配置中:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 3.4.01 replicas: 32 connectCluster: "my-cluster-target"3 clusters:4 - alias: "my-cluster-source"5 authentication:6 certificateAndKey: certificate: source.crt key: source.key secretName: my-user-source type: tls bootstrapServers: my-cluster-source-kafka-bootstrap:90927 tls:8 trustedCertificates: - certificate: ca.crt secretName: my-cluster-source-cluster-ca-cert - alias: "my-cluster-target"9 authentication:10 certificateAndKey: certificate: target.crt key: target.key secretName: my-user-target type: tls bootstrapServers: my-cluster-target-kafka-bootstrap:909211 config:12 config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 ssl.cipher.suites: TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA38413 ssl.enabled.protocols: TLSv1.2 ssl.protocol: TLSv1.2 ssl.endpoint.identification.algorithm: HTTPS14 tls:15 trustedCertificates: - certificate: ca.crt secretName: my-cluster-target-cluster-ca-cert mirrors:16 - sourceCluster: "my-cluster-source"17 targetCluster: "my-cluster-target"18 sourceConnector:19 tasksMax: 1020 autoRestart:21 enabled: true config: replication.factor: 122 offset-syncs.topic.replication.factor: 123 sync.topic.acls.enabled: "false"24 refresh.topics.interval.seconds: 6025 replication.policy.separator: "."26 replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"27 heartbeatConnector:28 autoRestart: enabled: true config: heartbeats.topic.replication.factor: 129 checkpointConnector:30 autoRestart: enabled: true config: checkpoints.topic.replication.factor: 131 refresh.groups.interval.seconds: 60032 sync.group.offsets.enabled: true33 sync.group.offsets.interval.seconds: 6034 emit.checkpoints.interval.seconds: 6035 replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" topicsPattern: "topic1|topic2|topic3"36 groupsPattern: "group1|group2|group3"37 resources:38 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging:39 type: inline loggers: connect.root.logger.level: "INFO" readinessProbe:40 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 jvmOptions:41 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest42 rack: topologyKey: topology.kubernetes.io/zone43 template:44 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer:45 env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831" tracing: type: jaeger46 externalConfiguration:47 env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-creds key: awsAccessKey - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey- 1
- Kafka Connect 和 Mirror Maker 2.0 版本,它们始终相同。
- 2
- 用于运行服务的 worker 的副本节点数量。
- 3
- Kafka Connect 的 Kafka 集群别名,它必须 指定目标 Kafka 集群。Kafka Connect 用于其内部主题。
- 4
- 正在同步的 Kafka 集群的规格。
- 5
- 源 Kafka 集群的集群别名。
- 6
- 7
- 用于连接到源 Kafka 集群的 Bootstrap 服务器。
- 8
- TLS 加密,使用密钥名称,其中 TLS 证书存储为源 Kafka 集群的 X.509 格式。如果证书存储在同一 secret 中,它可以多次列出。
- 9
- 目标 Kafka 集群的集群别名。
- 10
- 目标 Kafka 集群的验证配置方式与源 Kafka 集群相同。
- 11
- 用于连接目标 Kafka 集群的 Bootstrap 服务器。
- 12
- Kafka 连接配置。可以提供标准 Apache Kafka 配置,仅限于不受 AMQ Streams 直接管理的属性。
- 13
- 外部监听程序的 SSL 属性 使用 TLS 版本的特定 密码套件 运行。
- 14
- 通过设置为
HTTPS来启用主机名验证。空字符串禁用验证。 - 15
- 目标 Kafka 集群的 TLS 加密与源 Kafka 集群相同。
- 16
- 17
- MirrorMaker 2 连接器使用的源集群的集群 别名。
- 18
- MirrorMaker 2 连接器使用的目标 集群的集群别名。
- 19
- 创建远程主题的
MirrorSourceConnector的配置。配置会覆盖默认配置选项。 - 20
- 连接器可以创建的最大任务数量。任务处理数据复制并并行运行。如果基础架构支持处理开销,增加这个值可以提高吞吐量。Kafka Connect 在集群成员之间分发任务。如果任务数量超过 worker,则 worker 会被分配多个任务。对于接收器连接器,旨在为每个消耗的主题分区有一个任务。对于源连接器,并行运行的任务数量也可能依赖于外部系统。如果无法达到并行性,则连接器创建少于最大任务数量。
- 21
- 启用自动重启失败的连接器和任务。最多进行七次重启尝试,之后必须手动重新启动。
- 22
- 在目标集群中创建的镜像主题的复制因素。
- 23
MirrorSourceConnectoroffset-syncs内部主题的复制因素,用于映射源和目标集群的偏移。- 24
- 25
- 可选设置,以更改检查新主题的频率。默认值为每 10 分钟进行一次检查。
- 26
- 定义用于重命名远程主题的分隔符。
- 27
- 添加可覆盖远程主题自动重命名的策略。该主题不会用源集群的名称来附加名称,而是保留其原始名称。此可选设置可用于主动/被动备份和数据迁移。要配置主题偏移同步,还必须为
checkpointConnector.config设置此属性。 - 28
- 执行连接 检查的
MirrorHeartbeatConnector的配置。配置会覆盖默认配置选项。 - 29
- 在目标集群中创建的 heartbeat 主题的复制因素。
- 30
- 跟踪偏移的
MirrorCheckpointConnector的配置。配置会覆盖默认配置选项。 - 31
- 在目标集群中创建的检查点主题的复制因素。
- 32
- 可选设置,用于更改新消费者组的检查频率。默认值为每 10 分钟进行一次检查。
- 33
- 可选设置来同步消费者组偏移,这对于在主动/被动配置中恢复非常有用。默认情况下不启用同步。
- 34
- 如果启用了使用者组偏移的同步,您可以调整同步的频率。
- 35
- 调整检查偏移跟踪的频率。如果您更改了偏移同步的频率,您可能需要调整这些检查的频率。
- 36
- 使用 以逗号分隔列表或正则表达式模式 定义的 源集群的主题复制。源连接器复制指定的主题。checkpoint 连接器跟踪指定主题的偏移。在这里,我们按名称请求三个主题。
- 37
- 从 定义为逗号分隔列表或正则表达式模式 的 源集群的消费者组复制。checkpoint 连接器复制指定的消费者组。在这里,我们按名称请求三个消费者组。
- 38
- 39
- 指定 Kafka 连接日志记录器和日志级别 直接(
内联)或通过 ConfigMap 间接(外部)添加。自定义 ConfigMap 必须放在log4j.properties或log4j2.properties键下。对于 Kafka Connectlog4j.rootLogger日志记录器,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG, FATAL 或 OFF。 - 40
- 使用 HealthCheck 可以知道何时重启一个容器(存活度)以及何时一个容器可以开始接受流量(就绪度)。
- 41
- JVM 配置选项 优化运行 Kafka MirrorMaker 的虚拟机(VM)的性能。
- 42
- ADVANCED OPTION: 容器镜像配置,这只在特殊情况下建议使用。
- 43
- SPECIALIZED OPTION: Rack awareness 配置用于部署。这是用于在同一位置(而不是跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用这个选项。在某些情况下,从最接近的副本消耗可以提高网络利用率或降低成本。
topologyKey必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准topology.kubernetes.io/zone标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用RackAwareReplicaSelector。 - 44
- 模板自定义.这里的 pod 使用反关联性调度,因此 pod 不会调度到具有相同主机名的节点。
- 45
- 为分布式追踪设置环境变量。
- 46
- 为 Jaeger 启用分布式追踪。
- 47
创建或更新资源:
oc apply -f MIRRORMAKER-CONFIGURATION-FILE
2.4.7. 保护 Kafka MirrorMaker 2 部署 复制链接链接已复制到粘贴板!
此流程概述了保护 MirrorMaker 2 部署所需的配置。
您需要为源 Kafka 集群和目标 Kafka 集群进行单独的配置。您还需要单独的用户配置来提供 MirrorMaker 连接到源和目标 Kafka 集群所需的凭证。
对于 Kafka 集群,您可以为 OpenShift 集群内的安全连接指定内部监听程序,为 OpenShift 集群外部的连接指定外部监听程序。
您可以配置身份验证和授权机制。为源和目标 Kafka 集群实施的安全选项必须与 MirrorMaker 2 实施的安全选项兼容。
创建集群和用户身份验证凭据后,您可以在 MirrorMaker 配置中指定它们进行安全连接。
在此过程中,使用 Cluster Operator 生成的证书,但 您可以通过安装自己的证书 来替换它们。您还可以将监听程序 配置为使用由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书。
开始前
在开始此步骤前,请查看 AMQ Streams 提供 的示例配置文件。它们包括使用 mTLS 或 SCRAM-SHA-512 身份验证来保护 MirrorMaker 2 部署的示例。示例指定用于在 OpenShift 集群内连接的内部监听程序。
示例提供了完整的授权配置,包括 MirrorMaker 2 所需的所有 ACL,以允许源和目标 Kafka 集群上的操作。
先决条件
- AMQ Streams 正在运行
- 源和目标集群的独立命名空间
此流程假设源和目标 Kafka 集群被安装到单独的命名空间(如果需要使用 Topic Operator),则需要这样做。Topic Operator 只监视指定命名空间中的单个集群。
通过将集群分隔到命名空间中,您需要复制集群 secret,以便可以在命名空间外访问它们。您需要在 MirrorMaker 配置中引用 secret。
流程
配置两个
Kafka资源,一个用于保护源 Kafka 集群,另一个用于保护目标 Kafka 集群。您可以为身份验证和授权添加监听程序配置。
在本例中,为带有 TLS 加密和 mTLS 验证的 Kafka 集群配置内部监听程序。Kafka
简单授权已启用。使用 TLS 加密和 mTLS 验证的源 Kafka 集群配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-source-cluster spec: kafka: version: 3.4.0 replicas: 1 listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls authorization: type: simple config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.4" storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false zookeeper: replicas: 1 storage: type: persistent-claim size: 100Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {}使用 TLS 加密和 mTLS 身份验证的目标 Kafka 集群配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-target-cluster spec: kafka: version: 3.4.0 replicas: 1 listeners: - name: tls port: 9093 type: internal tls: true authentication: type: tls authorization: type: simple config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.4" storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false zookeeper: replicas: 1 storage: type: persistent-claim size: 100Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {}在单独的命名空间中创建或更新
Kafka资源。oc apply -f <kafka_configuration_file> -n <namespace>Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便在 Kafka 集群中启用身份验证。
证书在 secret <
cluster_name> -cluster-ca-cert中创建。配置两个
KafkaUser资源,一个用于源 Kafka 集群的用户,另一个用于目标 Kafka 集群的用户。-
将相同的身份验证和授权类型配置为对应的源和目标 Kafka 集群。例如,如果您在源 Kafka 集群的
Kafka配置中使用了tls验证和simple授权类型,请在KafkaUser配置中使用相同的。 配置 MirrorMaker 2 所需的 ACL,以允许源和目标 Kafka 集群上的操作。
ACL 由内部 MirrorMaker 连接器和底层 Kafka 连接框架使用。
mTLS 验证的源用户配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-source-user labels: strimzi.io/cluster: my-source-cluster spec: authentication: type: tls authorization: type: simple acls: # MirrorSourceConnector - resource: # Not needed if offset-syncs.topic.location=target type: topic name: mm2-offset-syncs.my-target-cluster.internal operations: - Create - DescribeConfigs - Read - Write - resource: # Needed for every topic which is mirrored type: topic name: "*" operations: - DescribeConfigs - Read # MirrorCheckpointConnector - resource: type: cluster operations: - Describe - resource: # Needed for every group for which offsets are synced type: group name: "*" operations: - Describe - resource: # Not needed if offset-syncs.topic.location=target type: topic name: mm2-offset-syncs.my-target-cluster.internal operations: - ReadmTLS 身份验证的目标用户配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-target-user labels: strimzi.io/cluster: my-target-cluster spec: authentication: type: tls authorization: type: simple acls: # Underlying Kafka Connect internal topics to store configuration, offsets, or status - resource: type: group name: mirrormaker2-cluster operations: - Read - resource: type: topic name: mirrormaker2-cluster-configs operations: - Create - Describe - DescribeConfigs - Read - Write - resource: type: topic name: mirrormaker2-cluster-status operations: - Create - Describe - DescribeConfigs - Read - Write - resource: type: topic name: mirrormaker2-cluster-offsets operations: - Create - Describe - DescribeConfigs - Read - Write # MirrorSourceConnector - resource: # Needed for every topic which is mirrored type: topic name: "*" operations: - Create - Alter - AlterConfigs - Write # MirrorCheckpointConnector - resource: type: cluster operations: - Describe - resource: type: topic name: my-source-cluster.checkpoints.internal operations: - Create - Describe - Read - Write - resource: # Needed for every group for which the offset is synced type: group name: "*" operations: - Read - Describe # MirrorHeartbeatConnector - resource: type: topic name: heartbeats operations: - Create - Describe - Write注意您可以通过将
type设置为tls-external来使用在 User Operator 外部发布的证书。如需更多信息,请参阅 第 6.2.93 节 “KafkaUserSpec模式参考”。-
将相同的身份验证和授权类型配置为对应的源和目标 Kafka 集群。例如,如果您在源 Kafka 集群的
在您为源和目标集群创建的每个命名空间中创建或更新
KafkaUser资源。oc apply -f <kafka_user_configuration_file> -n <namespace>根据所选的验证类型,User Operator 会创建代表客户端(MirrorMaker)的用户,以及用于客户端身份验证的安全凭证。
User Operator 会创建一个与
KafkaUser资源相同的新 secret。secret 包含用于 mTLS 验证的私钥和公钥。公钥包含在用户证书中,证书由客户端 CA 签名。使用身份验证详情配置
KafkaMirrorMaker2资源,以连接到源和目标集群。带有 TLS 加密和 mTLS 身份验证的 MirrorMaker 2 配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker-2 spec: version: 3.4.0 replicas: 1 connectCluster: "my-target-cluster" clusters: - alias: "my-source-cluster" bootstrapServers: my-source-cluster-kafka-bootstrap:9093 tls:1 trustedCertificates: - secretName: my-source-cluster-cluster-ca-cert certificate: ca.crt authentication:2 type: tls certificateAndKey: secretName: my-source-user certificate: user.crt key: user.key - alias: "my-target-cluster" bootstrapServers: my-target-cluster-kafka-bootstrap:9093 tls:3 trustedCertificates: - secretName: my-target-cluster-cluster-ca-cert certificate: ca.crt authentication:4 type: tls certificateAndKey: secretName: my-target-user certificate: user.crt key: user.key config: # -1 means it will use the default replication factor configured in the broker config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1 mirrors: - sourceCluster: "my-source-cluster" targetCluster: "my-target-cluster" sourceConnector: config: replication.factor: 1 offset-syncs.topic.replication.factor: 1 sync.topic.acls.enabled: "false" heartbeatConnector: config: heartbeats.topic.replication.factor: 1 checkpointConnector: config: checkpoints.topic.replication.factor: 1 sync.group.offsets.enabled: "true" topicsPattern: "topic1|topic2|topic3" groupsPattern: "group1|group2|group3"在与目标 Kafka 集群相同的命名空间中创建或更新
KafkaMirrorMaker2资源。oc apply -f <mirrormaker2_configuration_file> -n <namespace_of_target_cluster>
2.4.8. 执行 Kafka MirrorMaker 2 连接器重启 复制链接链接已复制到粘贴板!
此流程描述了如何使用 OpenShift 注解手动触发 Kafka MirrorMaker 2 连接器的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka MirrorMaker 2 连接器的
KafkaMirrorMaker2自定义资源的名称:oc get KafkaMirrorMaker2查找要从
KafkaMirrorMaker2自定义资源重启的 Kafka MirrorMaker 2 连接器的名称。oc describe KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME要重启连接器,请在 OpenShift 中注解
KafkaMirrorMaker2资源。在本例中,oc annotate重启一个名为my-source->my-target.MirrorSourceConnector的连接器:oc annotate KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME "strimzi.io/restart-connector=my-source->my-target.MirrorSourceConnector"等待下一个协调发生(默认为两分钟)。
只要协调过程检测到注解,Kafka MirrorMaker 2 连接器会被重启。当重启请求被接受时,该注解会从
KafkaMirrorMaker2自定义资源中删除。
2.4.9. 执行 Kafka MirrorMaker 2 连接器任务的重启 复制链接链接已复制到粘贴板!
此流程描述了如何使用 OpenShift 注解手动触发 Kafka MirrorMaker 2 连接器任务的重启。
先决条件
- Cluster Operator 正在运行。
流程
查找控制您要重启的 Kafka MirrorMaker 2 连接器的
KafkaMirrorMaker2自定义资源的名称:oc get KafkaMirrorMaker2查找 Kafka MirrorMaker 2 连接器的名称以及从
KafkaMirrorMaker2自定义资源重启的任务 ID。任务 ID 是非负的整数,从 0 开始。oc describe KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME要重启连接器任务,请在 OpenShift 中注解
KafkaMirrorMaker2资源。在本例中,oc annotaterestart task 0 of a connector namedmy-source->my-target.MirrorSourceConnector:oc annotate KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME "strimzi.io/restart-connector-task=my-source->my-target.MirrorSourceConnector:0"等待下一个协调发生(默认为两分钟)。
只要协调过程检测到注解,Kafka MirrorMaker 2 连接器任务会被重启。当 restart 任务请求时,注解会从
KafkaMirrorMaker2自定义资源中删除。