2.2. Kafka Connect/S2I 集群配置
本节论述了如何在 AMQ Streams 集群中配置 Kafka Connect 或 Kafka Connect with Source-to-Image(S2I)部署。
Kafka Connect 是一个集成工具包,用于使用 Connector
插件在 Kafka 代理和其他系统间流传输数据。Kafka Connect 提供了一个框架,用于将 Kafka 与外部数据源或目标(如数据库)集成,以使用连接器导入或导出数据。连接器是提供所需连接配置的插件。
如果使用 Kafka Connect,您可以配置 KafkaConnect 或
资源。如果您使用 Source-to-Image( KafkaConnect
S2IS2I
)框架来部署 Kafka Connect,请使用 KafkaConnectS2I 资源。
-
KafkaConnect
资源的完整 schema 信息包括在 第 B.79 节 “KafkaConnect
模式参考” 中。 -
KafkaConnectS2I
资源的完整 schema 包括在 第 B.95 节 “KafkaConnectS2I
模式参考” 中。
2.2.1. 配置 Kafka 连接
使用 Kafka Connect 设置到 Kafka 集群的外部数据连接。
使用 KafkaConnect 或
资源的属性来配置 Kafka Connect 部署。此流程中演示的示例适用于 KafkaConnect
S2IKafkaConnect
资源,但 KafkaConnectS2I
资源的属性相同。
Kafka 连接器配置
KafkaConnector
资源允许您以 OpenShift 原生方式为 Kafka Connect 创建和管理连接器实例。
在配置中,您可以通过添加 strimzi.io/use-connector-resources
注解来为 Kafka Connect 集群启用 Kafka Connectors
。您还可以通过 externalConfiguration
属性为 Kafka Connect 连接器指定外部配置。
使用 Kafka Connect HTTP REST 接口或使用 KafkaConnectors
创建、重新配置和删除连接器。如需有关这些方法的更多信息,请参阅 OpenShift 上部署和升级 AMQ Streams 中的 创建和管理连接器。
连接器配置作为 HTTP 请求的一部分传递给 Kafka Connect,并存储在 Kafka 本身中。ConfigMap 和机密是用于存储配置和机密数据的标准 OpenShift 资源。您可以使用 ConfigMap 和 Secret 配置连接器的某些元素。然后,您可以在 HTTP REST 命令中引用配置值(如果需要,这会使配置保持独立且更安全)。这个方法尤其适用于机密数据,如用户名、密码或证书。
先决条件
- OpenShift 集群
- 一个正在运行的 Cluster Operator
有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams :
步骤
编辑
KafkaConnect 或
资源的KafkaConnect
S2Ispec
属性。您可以配置的属性显示在此示例配置中:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect 1 metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 2 spec: replicas: 3 3 authentication: 4 type: tls certificateAndKey: certificate: source.crt key: source.key secretName: my-user-source bootstrapServers: my-cluster-kafka-bootstrap:9092 5 tls: 6 trustedCertificates: - secretName: my-cluster-cluster-cert certificate: ca.crt - secretName: my-cluster-cluster-cert certificate: ca2.crt config: 7 group.id: my-connect-cluster offset.storage.topic: my-connect-cluster-offsets config.storage.topic: my-connect-cluster-configs status.storage.topic: my-connect-cluster-status key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: true value.converter.schemas.enable: true config.storage.replication.factor: 3 offset.storage.replication.factor: 3 status.storage.replication.factor: 3 externalConfiguration: 8 env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-creds key: awsAccessKey - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey resources: 9 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 10 type: inline loggers: log4j.rootLogger: "INFO" readinessProbe: 11 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 metrics: 12 lowercaseOutputName: true lowercaseOutputLabelNames: true rules: - pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+) name: kafka_connect_worker_$1 help: "Kafka Connect JMX metric worker" type: GAUGE - pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+) name: kafka_connect_worker_rebalance_$1 help: "Kafka Connect JMX metric rebalance information" type: GAUGE jvmOptions: 13 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest 14 template: 15 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer: 16 env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831"
- 1
- 根据需要,使用
KafkaConnect
或 KafkaConnectS2I - 2
- 为
Kafka Connect 集群启用 KafkaConnectors
。 - 3
- 4
- Kafka Connect 集群的身份验证,使用 TLS 机制 (如下所示)、使用 OAuth bearer 令牌 或基于 SASL 的 SCRAM-SHA-512 或 PLAIN 机制。默认情况下,Kafka Connect 使用纯文本连接连接到 Kafka 代理。
- 5
- 用于连接到 Kafka Connect 集群的 bootstrap 服务器。
- 6
- 使用密钥名称进行 TLS 加密,在其下,TLS 证书以 X.509 格式存储到集群的 X.509 格式。如果证书存储在同一个 secret 中,则可以多次列出证书。
- 7
- Kafka Connect worker ( 而不是连接器)配置。标准 Apache Kafka 配置可能会提供,仅限于不是由 AMQ Streams 直接管理的属性。
- 8
- 使用环境变量(如下所示)或卷 为 Kafka 连接器的外部配置。
- 9
- 10
- 指定 Kafka 连接日志记录器和日志级别 直接(
内联)或通过
ConfigMap 间接(外部
)添加。自定义 ConfigMap 必须放在log4j.properties 或
log4j2.properties
键下。对于 Kafka Connectlog4j.rootLogger
日志记录器,您可以将日志级别设置为 INFO、ERROR、WARN、TRACE、DEBUG、FATAL 或 OFF。 - 11
- 健康检查以了解 何时重新启动容器(存活度)以及容器何时可以接受流量(就绪度)。
- 12
- Prometheus metrics,本例中通过 Prometheus JMX 导出器配置启用的 Prometheus 指标。您可以使用
metrics: {}
来在没有进一步配置的情况下启用指标。 - 13
- 运行 Kafka Connect 的虚拟机(VM)性能优化 JVM 配置选项。
- 14
- 高级选项:容器镜像配置,建议仅在特殊情况下使用。
- 15
- 模板自定义.在这里,pod 被调度为反关联性,因此 pod 不会调度到具有相同主机名的节点。
- 16
- 还 使用 Jaeger 为分布式追踪设置 环境变量。
创建或更新资源:
oc apply -f KAFKA-CONNECT-CONFIG-FILE
- 如果为 Kafka Connect 启用了授权,请配置 Kafka Connect 用户以启用对 Kafka Connect consumer 组和主题的访问。