2.5. Kafka Bridge 集群配置
本节论述了如何在 AMQ Streams 集群中配置 Kafka Bridge 部署。
Kafka Bridge 为将基于 HTTP 的客户端与 Kafka 集群集成提供了一个 API。
如果使用 Kafka Bridge,您可以配置 KafkaBridge
资源。
KafkaBridge
资源的完整 schema 信息包括在 第 13.2.110 节 “KafkaBridge
模式参考” 中。
2.5.1. 配置 Kafka 网桥
使用 Kafka Bridge 向 Kafka 集群发出基于 HTTP 的请求。
使用 KafkaBridge
资源的属性来配置 Kafka Bridge 部署。
为了防止不同 Kafka 网桥实例处理客户端消费者请求时出现问题,必须使用基于地址的路由来确保请求路由到正确的 Kafka Bridge 实例。另外,每个独立的 Kafka Bridge 实例都必须有一个副本。Kafka Bridge 实例具有自己的状态,不与其他实例共享。
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams :
流程
编辑
KafkaBridge
资源的spec
属性。您可以配置的属性显示在此示例配置中:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: replicas: 3 1 bootstrapServers: my-cluster-kafka-bootstrap:9092 2 tls: 3 trustedCertificates: - secretName: my-cluster-cluster-cert certificate: ca.crt - secretName: my-cluster-cluster-cert certificate: ca2.crt authentication: 4 type: tls certificateAndKey: secretName: my-secret certificate: public.crt key: private.key http: 5 port: 8080 cors: 6 allowedOrigins: "https://strimzi.io" allowedMethods: "GET,POST,PUT,DELETE,OPTIONS,PATCH" consumer: 7 config: auto.offset.reset: earliest producer: 8 config: delivery.timeout.ms: 300000 resources: 9 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 10 type: inline loggers: logger.bridge.level: "INFO" # enabling DEBUG just for send operation logger.send.name: "http.openapi.operation.send" logger.send.level: "DEBUG" jvmOptions: 11 "-Xmx": "1g" "-Xms": "1g" readinessProbe: 12 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 image: my-org/my-image:latest 13 template: 14 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" bridgeContainer: 15 env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831"
- 1
- 2
- 3
- 使用源 Kafka 集群的 TLS 证书以 X.509 格式存储的密钥名称进行 TLS 加密。如果证书存储在同一个 secret 中,则可以多次列出证书。
- 4
- Kafka Bridge 集群的身份验证,使用 TLS 机制 (此处所示)、使用 OAuth bearer 令牌 或基于 SASL 的 SCRAM-SHA-512 或 PLAIN 机制。默认情况下,Kafka Bridge 在不进行身份验证的情况下连接到 Kafka 代理。
- 5
- 对 Kafka 代理的 HTTP 访问.
- 6
- CORS 访问 指定选定的资源和访问方法.请求中的附加 HTTP 标头描述了允许访问 Kafka 集群的原始数据。
- 7
- 8
- 9
- 10
- 指定 Kafka Bridge 日志记录器和日志级别 直接(
内联)或通过
ConfigMap 间接(外部
)添加。自定义 ConfigMap 必须放在log4j.properties 或
log4j2.properties
键下。对于 Kafka Bridge loggers,您可以将日志级别设置为 INFO、ERROR、WARN、TRACE、DEBUG、FATAL 或 OFF。 - 11
- 运行 Kafka 网桥的虚拟机(VM)的 JVM 配置选项 优化性能。
- 12
- 健康检查以了解 何时重新启动容器(存活度)以及容器何时可以接受流量(就绪度)。
- 13
- ADVANCED OPTION: 容器镜像配置,只在特殊情况下推荐这样做。
- 14
- 模板自定义.在这里,pod 被调度为反关联性,因此 pod 不会调度到具有相同主机名的节点。
- 15
- 还 使用 Jaeger 为分布式追踪设置 环境变量。
创建或更新资源:
oc apply -f KAFKA-BRIDGE-CONFIG-FILE