7.7. Kafka Connect 配置
使用 AMQ Streams 的 KafkaConnect
资源快速轻松地创建新的 Kafka Connect 集群。
当使用 KafkaConnect 资源部署 Kafka Connect
时,您可以指定 bootstrap 服务器地址(在 spec.bootstrapServers
中)以连接到 Kafka 集群。当服务器停机时,您可以指定多个地址。您还指定身份验证凭据和 TLS 客户端证书,以建立安全连接。
Kafka 集群不需要由 AMQ Streams 管理,或部署到 OpenShift 集群。
您还可以使用 KafkaConnect
资源来指定以下内容:
- 构建包含插件的容器镜像的插件配置,以建立连接
- 属于 Kafka Connect 集群的 worker pod 配置
-
启用使用
KafkaConnector
资源管理插件的注解
Cluster Operator 管理使用 KafkaConnector 资源部署的 Kafka Connect
集群,以及利用 KafkaConnector
资源创建的连接器。
插件配置
插件提供创建连接器实例的实施。当插件实例化时,会为连接特定类型的外部数据系统提供配置。插件提供一组或者多个 JAR 文件,该文件定义了连接器和任务实施,以连接到指定类型数据源。许多外部系统的插件可用于 Kafka 连接。您还可以创建自己的插件。
配置描述了要发送到 Kafka Connect 的源输入数据和目标输出数据。对于源连接器,外部源数据必须引用要存储消息的特定主题。插件也可以包含转换数据所需的库和文件。
Kafka Connect 部署可以有一个或多个插件,但每个插件只能有一个版本。
您可以创建自定义 Kafka Connect 镜像,其中包括您选择的插件。您可以通过两种方式创建镜像:
要自动创建容器镜像,您可以使用 KafkaConnect 资源的 build
属性指定要添加到 Kafka Connect
集群中的插件。AMQ Streams 会自动下载插件工件并将其添加到新容器镜像中。
插件配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: # ... build: 1 output: 2 type: docker image: my-registry.io/my-org/my-connect-cluster:latest pushSecret: my-registry-credentials plugins: 3 - name: my-connector artifacts: - type: tgz url: https://<plugin_download_location>.tgz sha512sum: <checksum_to_verify_the_plugin> # ... # ...
如果使用 Dockerfile 构建镜像,您可以使用 AMQ Streams 的最新容器镜像作为基础镜像来添加插件配置文件。
显示手动添加插件配置示例
FROM registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
worker 的 Kafka Connect 集群配置
您可以在 KafkaConnect
资源的 config
属性中指定 worker 的配置。
分布式 Kafka Connect 集群有一个组 ID 和一组内部配置主题。
-
group.id
-
offset.storage.topic
-
config.storage.topic
-
status.storage.topic
Kafka Connect 集群默认使用这些属性的值配置。Kafka Connect 集群无法共享组 ID 或主题名称,因为它将创建错误。如果使用多个不同的 Kafka Connect 集群,则每个创建的 Kafka Connect 集群的 worker 必须是唯一的。
每个 Kafka Connect 集群使用的连接器名称也必须是唯一的。
在以下示例中,指定了 JSON 转换器。Kafka Connect 使用的内部 Kafka 主题设置了复制因素。对于生产环境,至少应有 3 个。在创建主题后更改复制因素将无效。
worker 配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect # ... spec: config: # ... group.id: my-connect-cluster 1 offset.storage.topic: my-connect-cluster-offsets 2 config.storage.topic: my-connect-cluster-configs 3 status.storage.topic: my-connect-cluster-status 4 key.converter: org.apache.kafka.connect.json.JsonConverter 5 value.converter: org.apache.kafka.connect.json.JsonConverter 6 key.converter.schemas.enable: true 7 value.converter.schemas.enable: true 8 config.storage.replication.factor: 3 9 offset.storage.replication.factor: 3 10 status.storage.replication.factor: 3 11 # ...
- 1
- Kafka 中的 Kafka Connect 集群 ID。每个 Kafka Connect 集群都必须是唯一的。
- 2
- 存储连接器偏移的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
- 3
- 存储连接器和任务状态配置的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
- 4
- 存储连接器和任务状态更新的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
- 5
- 转换程序,将消息密钥转换为 Kafka 中存储的 JSON 格式。
- 6
- 转换程序,将消息值转换为 Kafka 中存储的 JSON 格式。
- 7
- 为将消息键转换为结构化 JSON 格式的 schema。
- 8
- 为将消息值转换为结构化 JSON 格式的 schema。
- 9
- 存储连接器偏移的 Kafka 主题的复制因素。
- 10
- 存储连接器和任务状态配置的 Kafka 主题的复制因素。
- 11
- 存储连接器和任务状态更新的 Kafka 主题的复制因素。
连接器的 KafkaConnector
管理
在将插件添加到用于部署中的 worker pod 的容器镜像后,您可以使用 AMQ Streams 的 KafkaConnector
自定义资源或 Kafka Connect API 来管理连接器实例。您还可以使用这些选项创建新的连接器实例。
KafkaConnector
资源提供了一种 OpenShift 原生的方法来管理 Cluster Operator 连接器。要使用 KafkaConnector
资源管理连接器,您必须在 KafkaConnect
自定义资源中指定注解。
启用 KafkaConnectors 的注解
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" # ...
将 use-connector-resources
设置为 true
可启用 KafkaConnectors 创建、删除和重新配置连接器。
如果在 KafkaConnect
配置中启用了 use-connector-resources
,则必须使用 KafkaConnector
资源来定义和管理连接器。KafkaConnector
资源被配置为连接到外部系统。它们部署到与 Kafka Connect 集群和与外部数据系统交互的 Kafka 集群相同的 OpenShift 集群。
Kafka 组件在同一个 OpenShift 集群中包含
配置指定连接器实例如何连接到外部数据系统,包括任何身份验证。您还需要陈述要监视的数据。对于源连接器,您可能会在配置中提供数据库名称。您还可以通过指定目标主题名称指定数据在 Kafka 中的位置。
使用 tasksMax
指定最大任务数。例如,带有 tasksMax: 2
的源连接器可能会将源数据导入两个任务。
KafkaConnector 源连接器配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 config: 5 file: "/opt/kafka/LICENSE" 6 topic: my-topic 7 # ...
- 1
KafkaConnector
资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。- 2
- 在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。
- 3
- 连接器类的全名。这应该存在于 Kafka Connect 集群使用的镜像中。
- 4
- 连接器可创建的最大 Kafka Connect 任务数量。
- 5
- 连接器配置 作为键值对。
- 6
- 外部数据文件的位置。在本例中,我们将
FileStreamSourceConnector
配置为从/opt/kafka/LICENSE
文件中读取。 - 7
- 将源数据发布到的 Kafka 主题。
您可以 从外部来源(如 OpenShift Secret 或 ConfigMap)为连接器加载 机密配置值。
Kafka Connect API
使用 Kafka Connect REST API 作为使用 KafkaConnector
资源管理连接器的替代选择。Kafka Connect REST API 作为一个运行在 <connect_cluster_name>-connect-api:8083
的服务其中 <connect_cluster_name> 是 Kafka Connect 集群的名称。
您可以将连接器配置添加为 JSON 对象。
添加连接器配置的 curl 请求示例
curl -X POST \ http://my-connect-cluster-connect-api:8083/connectors \ -H 'Content-Type: application/json' \ -d '{ "name": "my-source-connector", "config": { "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", "file": "/opt/kafka/LICENSE", "topic":"my-topic", "tasksMax": "4", "type": "source" } }'
如果启用了 KafkaConnectors,Cluster Operator 将恢复使用 Kafka Connect REST API 进行的手动更改。
REST API 支持的操作在 Apache Kafka Connect API 文档中 描述。
您可以在 OpenShift 外部公开 Kafka Connect API 服务。为此,您可以创建一个使用连接机制来提供访问的服务,如入口或路由。我们建议使用,因为连接是不安全的。