7.7. Kafka Connect 配置
使用 AMQ Streams 的 KafkaConnect
资源快速轻松地创建新的 Kafka Connect 集群。
当使用 KafkaConnect 资源部署 Kafka Connect
时,您可以指定 bootstrap 服务器地址(在 spec.bootstrapServers
中)以连接到 Kafka 集群。当服务器停机时,您可以指定多个地址。您还指定身份验证凭据和 TLS 客户端证书,以建立安全连接。
Kafka 集群不需要由 AMQ Streams 管理,或部署到 OpenShift 集群。
您还可以使用 KafkaConnect
资源来指定以下内容:
- 构建包含插件的容器镜像的插件配置,以建立连接
- 属于 Kafka Connect 集群的 worker pod 配置
-
启用使用
KafkaConnector
资源管理插件的注解
Cluster Operator 管理使用 KafkaConnector 资源部署的 Kafka Connect
集群,以及利用 KafkaConnector
资源创建的连接器。
插件配置
插件提供创建连接器实例的实施。当插件实例化时,会为连接特定类型的外部数据系统提供配置。插件提供一组或者多个 JAR 文件,该文件定义了连接器和任务实施,以连接到指定类型数据源。许多外部系统的插件可用于 Kafka 连接。您还可以创建自己的插件。
配置描述了要发送到 Kafka Connect 的源输入数据和目标输出数据。对于源连接器,外部源数据必须引用要存储消息的特定主题。插件也可以包含转换数据所需的库和文件。
Kafka Connect 部署可以有一个或多个插件,但每个插件只能有一个版本。
您可以创建自定义 Kafka Connect 镜像,其中包括您选择的插件。您可以通过两种方式创建镜像:
要自动创建容器镜像,您可以使用 KafkaConnect 资源的 build
属性指定要添加到 Kafka Connect
集群中的插件。AMQ Streams 会自动下载插件工件并将其添加到新容器镜像中。
插件配置示例
如果使用 Dockerfile 构建镜像,您可以使用 AMQ Streams 的最新容器镜像作为基础镜像来添加插件配置文件。
显示手动添加插件配置示例
FROM registry.redhat.io/amq-streams/kafka-36-rhel8:2.6.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
FROM registry.redhat.io/amq-streams/kafka-36-rhel8:2.6.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
worker 的 Kafka Connect 集群配置
您可以在 KafkaConnect
资源的 config
属性中指定 worker 的配置。
分布式 Kafka Connect 集群有一个组 ID 和一组内部配置主题。
-
group.id
-
offset.storage.topic
-
config.storage.topic
-
status.storage.topic
Kafka Connect 集群默认使用这些属性的值配置。Kafka Connect 集群无法共享组 ID 或主题名称,因为它将创建错误。如果使用多个不同的 Kafka Connect 集群,则每个创建的 Kafka Connect 集群的 worker 必须是唯一的。
每个 Kafka Connect 集群使用的连接器名称也必须是唯一的。
在以下示例中,指定了 JSON 转换器。Kafka Connect 使用的内部 Kafka 主题设置了复制因素。对于生产环境,至少应有 3 个。在创建主题后更改复制因素将无效。
worker 配置示例
- 1
- Kafka 中的 Kafka Connect 集群 ID。每个 Kafka Connect 集群都必须是唯一的。
- 2
- 存储连接器偏移的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
- 3
- 存储连接器和任务状态配置的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
- 4
- 存储连接器和任务状态更新的 Kafka 主题。每个 Kafka Connect 集群都必须是唯一的。
- 5
- 转换程序,将消息密钥转换为 Kafka 中存储的 JSON 格式。
- 6
- 转换程序,将消息值转换为 Kafka 中存储的 JSON 格式。
- 7
- 为将消息键转换为结构化 JSON 格式的 schema。
- 8
- 为将消息值转换为结构化 JSON 格式的 schema。
- 9
- 存储连接器偏移的 Kafka 主题的复制因素。
- 10
- 存储连接器和任务状态配置的 Kafka 主题的复制因素。
- 11
- 存储连接器和任务状态更新的 Kafka 主题的复制因素。
连接器的 KafkaConnector
管理
在将插件添加到用于部署中的 worker pod 的容器镜像后,您可以使用 AMQ Streams 的 KafkaConnector
自定义资源或 Kafka Connect API 来管理连接器实例。您还可以使用这些选项创建新的连接器实例。
KafkaConnector
资源提供了一种 OpenShift 原生的方法来管理 Cluster Operator 连接器。要使用 KafkaConnector
资源管理连接器,您必须在 KafkaConnect
自定义资源中指定注解。
启用 KafkaConnectors 的注解
将 use-connector-resources
设置为 true
可启用 KafkaConnectors 创建、删除和重新配置连接器。
如果在 KafkaConnect
配置中启用了 use-connector-resources
,则必须使用 KafkaConnector
资源来定义和管理连接器。KafkaConnector
资源被配置为连接到外部系统。它们部署到与 Kafka Connect 集群和与外部数据系统交互的 Kafka 集群相同的 OpenShift 集群。
Kafka 组件在同一个 OpenShift 集群中包含
配置指定连接器实例如何连接到外部数据系统,包括任何身份验证。您还需要陈述要监视的数据。对于源连接器,您可能会在配置中提供数据库名称。您还可以通过指定目标主题名称指定数据在 Kafka 中的位置。
使用 tasksMax
指定最大任务数。例如,带有 tasksMax: 2
的源连接器可能会将源数据导入两个任务。
KafkaConnector 源连接器配置示例
- 1
KafkaConnector
资源的名称,用作连接器的名称。使用对 OpenShift 资源有效的任何名称。- 2
- 在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到它们所链接的 Kafka Connect 集群相同的命名空间中。
- 3
- 连接器类的全名。这应该存在于 Kafka Connect 集群使用的镜像中。
- 4
- 连接器可创建的最大 Kafka Connect 任务数量。
- 5
- 启用自动重启失败的连接器和任务。默认情况下,重启数量是无限的,但您可以使用
maxRestarts
属性设置自动重启次数的最大值。 - 6
- 连接器配置 作为键值对。
- 7
- 外部数据文件的位置。在本例中,我们将
FileStreamSourceConnector
配置为从/opt/kafka/LICENSE
文件中读取。 - 8
- 将源数据发布到的 Kafka 主题。
您可以 从外部来源(如 OpenShift Secret 或 ConfigMap)为连接器加载 机密配置值。
Kafka Connect API
使用 Kafka Connect REST API 作为使用 KafkaConnector
资源管理连接器的替代选择。Kafka Connect REST API 作为一个运行在 <connect_cluster_name>-connect-api:8083
的服务其中 <connect_cluster_name> 是 Kafka Connect 集群的名称。
您可以将连接器配置添加为 JSON 对象。
添加连接器配置的 curl 请求示例
如果启用了 KafkaConnectors,Cluster Operator 将恢复使用 Kafka Connect REST API 进行的手动更改。
REST API 支持的操作在 Apache Kafka Connect API 文档中 进行了描述。
您可以在 OpenShift 外部公开 Kafka Connect API 服务。为此,您可以创建一个使用连接机制来提供访问的服务,如入口或路由。我们建议使用,因为连接是不安全的。