第 8 章 Kafka Connect
Kafka Connect 是一个在 Apache Kafka 和外部系统间流传输数据的工具。它为迁移大量数据提供了框架,同时保持了可扩展性和可靠性。Kafka Connect 通常用于将 Kafka 与您的 Kafka 集群外部的数据库、存储和消息系统集成。
Kafka Connect 使用连接器插件来为不同类型的外部系统实现连接。连接器插件有两种: sink 和 source。sink 连接器数据从 Kafka 到外部系统。源连接器数据从外部系统流传输到 Kafka。
Kafka Connect 能够以独立模式或分布式模式运行。
- 单机模式
- 在单机模式下,Kafka Connect 在单一节点上运行,且用户定义的配置从属性文件中读取。
- 分布式模式
- 在分布式模式中,Kafka Connect 在一个或多个 worker 节点上运行,工作负载分布在这些节点上。您可以使用 HTTP REST 接口管理连接器及其配置。
8.1. 以独立模式的 Kafka Connect
在单机模式中,Kafka Connect 作为单个进程在单一节点上运行。您可以使用属性文件管理单机模式的配置。
8.1.1. 以独立模式配置 Kafka Connect
要在独立模式下配置 Kafka Connect,编辑 config/connect-standalone.properties
配置文件。以下选项最为重要:
bootstrap.servers
-
用作到 Kafka 的 bootstrap 连接的 Kafka 代理地址列表。例如:
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
。 key.converter
-
用于将消息键转换为 Kafka 格式的类。例如:
org.apache.kafka.connect.json.JsonConverter
。 value.converter
-
用于将消息有效负载转换为 Kafka 格式的类。例如:
org.apache.kafka.connect.json.JsonConverter
。 offset.storage.file.filename
- 指定存储偏移数据的文件。
安装目录中提供了 config/connect-standalone.properties
的示例配置文件。有关所有支持的 Kafka Connect 配置选项的完整列表,请参阅 [kafka-connect-configuration-parameters-str]。
连接器插件使用 bootstrap 地址打开与 Kafka 代理的客户端连接。要配置这些连接,请使用前缀为 producer.
或 consumer.
的标准 Kafka 制作者和消费者配置选项。
有关配置 Kafka 生产者和消费者的详情,请参考:
8.1.2. 以独立模式在 Kafka Connect 中配置连接器
您可以使用属性文件在单机模式中为 Kafka Connect 配置连接器插件。大多数配置选项都特定于每个连接器。以下选项适用于所有连接器:
name
- 连接器的名称,在当前 Kafka Connect 实例中必须是唯一的。
connector.class
-
连接器插件的类。例如:
org.apache.kafka.connect.file.FileStreamSinkConnector
。 tasks.max
- 指定连接器可以使用的任务数量上限。任务使连接器能够并行执行任务。连接器可能会创建比指定更少的任务。
key.converter
-
用于将消息键转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如:
org.apache.kafka.connect.json.JsonConverter
。 value.converter
-
用于将消息有效负载转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如:
org.apache.kafka.connect.json.JsonConverter
。
另外,您必须为 sink 连接器至少设置以下选项之一:
topics
- 用作输入的主题的逗号分隔列表。
topics.regex
- 用作输入的主题的 Java 正则表达式。
有关所有其他选项,请查看可用连接器的文档。
AMQ Streams 包括连接器配置文件示例 - 请参阅 AMQ Streams 安装目录中的 config/connect-file-sink.properties
和 config/connect-file-source.properties
。
8.1.3. 以独立模式运行 Kafka Connect
这个步骤描述了如何在独立模式下配置和运行 Kafka Connect。
先决条件
- 已安装并运行的 AMQ Streams} 集群。
流程
编辑
/opt/kafka/config/connect-standalone.properties
Kafka Connect 配置文件并将bootstrap.server
设置为指向 Kafka 代理。例如:bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
使用 配置文件启动 Kafka Connect 并指定一个或多个连接器配置。
su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]
验证 Kafka Connect 是否正在运行。
jcmd | grep ConnectStandalone
其它资源
- 有关安装 AMQ Streams 的详情请参考 第 2.3 节 “安装 AMQ Streams”。
- 有关配置 AMQ 流的详情请参考 第 2.8 节 “配置 AMQ 流”。
- 有关支持的 Kafka Connect 配置选项的完整列表,请参阅 附录 F, Kafka Connect 配置参数。