第 7 章 Kafka Connect


Kafka Connect 是一个在 Apache Kafka 和外部系统间流传输数据的工具。它提供移动大量数据的框架,同时保持可扩展性和可靠性。Kafka Connect 通常用于将 Kafka 与 Kafka 集群外部的数据库、存储和消息传递系统集成。

Kafka Connect 使用连接器插件,为不同类型的外部系统实现连接。有两种连接器插件:接收器和源。sink 连接器将数据从 Kafka 流传输到外部系统。源连接器将来自外部系统的数据流传输到 Kafka。

Kafka Connect 可使用独立或分布式模式运行。

独立模式
在独立模式中,Kafka Connect 在带有从属性文件中读取的用户定义的配置的单个节点上运行。
分布式模式
在分布式模式中,Kafka Connect 在一个或多个 worker 节点上运行,工作负载分布在它们中。您可以使用 HTTP REST 接口管理连接器及其配置。

7.1. 独立模式的 Kafka 连接

在独立模式中,Kafka Connect 作为单个进程在单一节点上运行。您可以使用属性文件管理独立模式的配置。

7.1.1. 在独立模式中配置 Kafka 连接

要在独立模式下配置 Kafka Connect,请编辑 config/connect-standalone.properties 配置文件。以下选项是最重要的。

bootstrap.servers
用作 Kafka 的 bootstrap 连接的 Kafka 代理地址列表。例如: kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
key.converter
用于将消息密钥转换为 Kafka 格式的类。例如,org.apache.kafka.connect.json.JsonConverter.
value.converter
用于将消息有效负载转换为 Kafka 格式的类。例如,org.apache.kafka.connect.json.JsonConverter.
offset.storage.file.filename
指定存储偏移数据的文件。

安装目录中提供了示例配置文件,即 config/connect-standalone.properties。有关所有支持的 Kafka Connect 配置选项的完整列表,请参阅 Kafka Connect 配置参数

连接器插件使用 bootstrap 地址打开到 Kafka 代理的客户端连接。要配置这些连接,请使用以 producer.consumer. 为前缀的标准 Kafka producer 和使用者配置选项。

有关配置 Kafka 生成者和消费者的更多信息,请参阅:

7.1.2. 在独立模式中配置 Kafka Connect 的连接器

您可以使用属性文件在独立模式中配置 Kafka Connect 的连接器插件。大多数配置选项都特定于每个连接器。以下选项适用于所有连接器:

name
连接器的名称,它在当前 Kafka Connect 实例中必须是唯一的。
connector.class
连接器插件的类。例如,org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max
指定连接器可以使用的最大任务数量。任务可让连接器并行执行工作。连接器可能会创建比指定更少的任务。
key.converter
用于将消息密钥转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,org.apache.kafka.connect.json.JsonConverter.
value.converter
用于将消息有效负载转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,org.apache.kafka.connect.json.JsonConverter.

另外,您必须为接收器连接器设置以下选项之一:

topics
以逗号分隔的主题列表,用作输入。
topics.regex
用作输入的 Java 正则表达式。

有关所有其他选项,请查看可用连接器的文档。

AMQ Streams 包括示例连接器配置文件 - 请参阅 AMQ Streams 安装目录中的 config/connect-file-sink.propertiesconfig/connect-file-source.properties

7.1.3. 在独立模式中运行 Kafka 连接

这个步骤描述了如何在独立模式中配置和运行 Kafka Connect。

先决条件

  • 已安装并运行 AMQ Streams} 集群。

流程

  1. 编辑 /opt/kafka/config/connect-standalone.properties Kafka 连接配置文件,并将 bootstrap.server 设置为指向您的 Kafka 代理。例如:

    bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
    Copy to Clipboard Toggle word wrap
  2. 使用配置文件启动 Kafka 连接并指定一个或多个连接器配置。

    su - kafka
    /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties
    [connector2.properties ...]
    Copy to Clipboard Toggle word wrap
  3. 验证 Kafka Connect 是否正在运行。

       jcmd | grep ConnectStandalone
    Copy to Clipboard Toggle word wrap

其他资源

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat