5.5. Kafka Connect 配置
基本的 Kafka Connect 配置需要一个 bootstrap 地址来连接到 Kafka 集群,以及加密和身份验证详情。
默认情况下会使用相同的配置 Kafka Connect 实例:
- Kafka Connect 集群的组 ID
- 保存连接器偏移的 Kafka 主题
- 存储连接器和任务状态配置的 Kafka 主题
- 存储连接器和任务状态更新的 Kafka 主题
如果使用多个不同的 Kafka Connect 实例,这些设置必须反映每个实例。
显示 Kafka Connect 配置的 YAML 示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect spec: # ... config: group.id: my-connect-cluster offset.storage.topic: my-connect-cluster-offsets config.storage.topic: my-connect-cluster-configs status.storage.topic: my-connect-cluster-status # ...
连接器
连接器与 Kafka Connect 独立配置。该配置描述了要馈入和离开 Kafka Connect 的源输入数据和目标输出数据。外部源数据必须引用存储消息的特定主题。
Kafka 提供两个内置连接器:
-
FileStreamSourceConnector
将外部系统的数据流传输到 Kafka,从输入源读取行并将每行发送到 Kafka 主题。 -
FileStreamSinkConnector
将 Kafka 数据流传输到外部系统,读取 Kafka 主题中的信息,并在输出文件中为每个文件创建一个行。
您可以使用连接器插件添加其他连接器,它们是一组 JAR 文件或 TGZ 存档,定义了连接到特定类型外部系统所需的实施。
您可以创建一个使用新连接器插件的自定义 Kafka Connect 镜像。
要创建镜像,您可以使用:
- Kafka Connect 配置,以便 AMQ Streams 自动创建新镜像。
- 红帽生态系统目录中的 Kafka 容器镜像作为基础镜像。
- OpenShift 构建 和 Source-to-Image(S2I) 框架,以创建新的容器镜像。
对于 AMQ Streams 自动创建新镜像,build
配置需要 output
属性引用存储容器镜像的容器 registry,以及 plugins
属性来列出要添加到镜像中的连接器插件及其工件。
output
属性描述了镜像的类型和名称,以及包含访问容器 registry 所需凭证的 Secret 名称(可选)。plugins
属性描述了工件的类型以及从中下载工件的 URL。另外,您可以在解压缩工件前指定一个 SHA-512 校验和来验证工件。
用于自动创建新镜像的 Kafka Connect 配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: # ... build: output: type: docker image: my-registry.io/my-org/my-connect-cluster:latest pushSecret: my-registry-credentials plugins: - name: debezium-postgres-connector artifacts: - type: tgz url: https://ARTIFACT-ADDRESS.tgz sha512sum: HASH-NUMBER-TO-VERIFY-ARTIFACT # ... #...
管理连接器
您可以使用 KafkaConnector 资源或 Kafka Connect REST API 在 Kafka Connect 集群中创建和管理连接器实例。KafkaConnector 资源提供 OpenShift 原生方法,并由 Cluster Operator 管理。
KafkaConnector 资源的 spec
指定连接器类和配置设置,以及处理数据的连接器 任务 的最大数量。
显示 KafkaConnector 配置的 YAML 示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 2 config: file: "/opt/kafka/LICENSE" topic: my-topic # ...
您可以通过在 KafkaConnect
资源中添加注解来启用 KafkaConnectors。KafkaConnector 资源必须部署到与其链接的 Kafka Connect 集群相同的命名空间中。
启用 KafkaConnector 的 YAML 显示注解示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect annotations: strimzi.io/use-connector-resources: "true" # ...