第 7 章 在 Kafka Connect 中使用 AMQ Streams


使用 Kafka Connect 在 Kafka 和外部系统间流数据。Kafka Connect 提供了一个用于移动大量数据的框架,同时保持可扩展性和可靠性。Kafka Connect 通常用于将 Kafka 与数据库、存储和消息系统集成到您的 Kafka 集群外。

Kafka Connect 使用连接器插件来实现不同类型的外部系统的连接。连接器插件有两种:sink 和 source。sink 连接器数据从 Kafka 流传输到外部系统。源连接器数据从外部系统流传输到 Kafka。

Kafka Connect 可以在独立或分布式模式下运行。

独立模式
在独立模式中,Kafka Connect 在带有用户定义的配置的单个节点上运行,从属性文件读取。
分布式模式
在分布式模式下,Kafka Connect 在一个或多个 worker 节点上运行,且工作负载在它们间分布。您可以使用 HTTP REST 接口管理连接器及其配置。

处理大量消息

您可以调优配置来处理大量消息。更多信息请参阅 第 9 章 处理大量消息

7.1. 独立模式的 Kafka 连接

在独立模式中,Kafka Connect 会作为一个进程在单一节点上运行。您可以使用属性文件管理单机模式的配置。

7.1.1. 在独立模式下配置 Kafka 连接

要在独立模式下配置 Kafka Connect,请编辑 config/connect-standalone.properties 配置文件。以下选项是最重要的。

bootstrap.servers
Kafka 代理地址列表,用作到 Kafka 的 bootstrap 连接。例如,kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
key.converter
用于将消息密钥转换为 Kafka 格式的类。例如,org.apache.kafka.connect.json.JsonConverter.
value.converter
用于将消息有效负载转换为 Kafka 格式的类。例如,org.apache.kafka.connect.json.JsonConverter.
offset.storage.file.filename
指定存储偏移数据的文件。

连接器插件使用 bootstrap 地址打开到 Kafka 代理的客户端连接。要配置这些连接,请使用以 producer.consumer. 为前缀的标准 Kafka producer 和使用者配置选项。

7.1.2. 在 Kafka 连接中配置连接器

您可以使用属性文件在独立模式下为 Kafka 连接配置连接器插件。大多数配置选项都特定于每个连接器。以下选项适用于所有连接器:

name
连接器的名称,该名称在当前 Kafka Connect 实例中必须是唯一的。
connector.class
连接器插件类。For example, org.apache.kafka.connect.file.FileStreamSinkConnector.
tasks.max
指定连接器可以使用的最大任务数量。通过任务,可以并行执行连接器。连接器可以创建比指定少的任务。
key.converter
用于将消息密钥转换为 Kafka 格式的类。这会覆盖由 Kafka Connect 配置设置的默认值。例如,org.apache.kafka.connect.json.JsonConverter.
value.converter
用于将消息有效负载转换为 Kafka 格式的类。这会覆盖由 Kafka Connect 配置设置的默认值。例如,org.apache.kafka.connect.json.JsonConverter.

另外,您必须为接收器连接器至少设置以下选项之一:

topics
用作输入的以逗号分隔的主题列表。
topics.regex
用作输入的主题的 Java 正则表达式。

有关所有其他选项,请查看相关文档。

AMQ Streams 包括示例连接器配置文件 - 请参阅 AMQ Streams 安装目录中的 config/connect-file-sink.propertiesconfig/connect-file-source.properties

7.1.3. 在独立模式下运行 Kafka 连接

这个步骤描述了如何在独立模式下配置并运行 Kafka 连接。

先决条件

  • AMQ Streams 已安装,一个 Kafka 集群正在运行。

流程

  1. 编辑 /opt/kafka/config/connect-standalone.properties Kafka 连接配置文件,并将 bootstrap.server 设置为指向您的 Kafka 代理。例如:

    bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
    Copy to Clipboard Toggle word wrap
  2. 使用配置文件启动 Kafka Connect,并指定一个或多个连接器配置。

    su - kafka
    /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties
    [connector2.properties ...]
    Copy to Clipboard Toggle word wrap
  3. 验证 Kafka Connect 是否正在运行。

    jcmd | grep ConnectStandalone
    Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat