第 7 章 在 Kafka Connect 中使用 AMQ Streams
使用 Kafka Connect 在 Kafka 和外部系统间流数据。Kafka Connect 提供了一个框架来移动大量数据,同时保持可扩展性和可靠性。Kafka Connect 通常用于将 Kafka 与 Kafka 集群外部的数据库、存储和消息传递系统集成。
Kafka Connect 使用连接器插件,用于为不同类型的外部系统实现连接。连接器插件有两种类型:sink 和 source。接收器连接器将数据从 Kafka 流传输到外部系统。外部系统中的源连接器数据流到 Kafka 中。
Kafka Connect 可以在独立或者分布式模式下运行。
- 独立模式
- 在独立模式中,Kafka Connect 在带有用户定义的配置的单个节点上运行,从属性文件中读取。
- 分布式模式
- 在分布式模式中,Kafka Connect 在一个或多个 worker 节点上运行,并将工作负载分布到它们中。您可以使用 HTTP REST 接口管理连接器及其配置。
处理大量消息
您可以调整配置来处理大量消息。更多信息请参阅 第 9 章 处理大量消息。
7.1. 独立模式的 Kafka 连接 复制链接链接已复制到粘贴板!
在独立模式中,Kafka Connect 作为一个进程在单一节点中运行。您可以使用属性文件来管理独立模式的配置。
7.1.1. 在独立模式中配置 Kafka 连接 复制链接链接已复制到粘贴板!
要在独立模式下配置 Kafka Connect,请编辑 config/connect-standalone.properties 配置文件。以下选项是最重要的选项。
bootstrap.servers-
用作到 Kafka 的 bootstrap 连接的 Kafka 代理地址列表。例如,
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092。 key.converter-
用于将消息密钥转换到 Kafka 格式的类,以及从 Kafka 格式转换。例如,
org.apache.kafka.connect.json.JsonConverter. value.converter-
用于将消息有效负载转换为 Kafka 格式的和从 Kafka 格式转换的类。例如,
org.apache.kafka.connect.json.JsonConverter. offset.storage.file.filename- 指定存储偏移数据的文件。
在 config/connect-standalone.properties 的安装目录中提供了示例配置文件。
连接器插件打开了使用 bootstrap 地址到 Kafka 代理的客户端连接。要配置这些连接,请使用以 producer. 或 consumer. 为前缀的标准 Kafka producer 和使用者配置选项。
7.1.2. 在 Kafka Connect 中以独立模式配置连接器 复制链接链接已复制到粘贴板!
您可以使用属性文件在独立模式中为 Kafka Connect 配置连接器插件。大多数配置选项都特定于每个连接器。以下选项适用于所有连接器:
name- 连接器名称,必须在当前 Kafka Connect 实例中唯一。
connector.class-
连接器插件的类。例如,
org.apache.kafka.connect.file.FileStreamSinkConnector。 tasks.max- 指定连接器可以使用的最大任务数量。任务(Task)使连接器能够并行执行工作。连接器可能会创建少于指定的任务数量。
key.converter-
用于将消息密钥转换到 Kafka 格式的类,以及从 Kafka 格式转换。这会覆盖 Kafka Connect 配置设定的默认值。例如,
org.apache.kafka.connect.json.JsonConverter. value.converter-
用于将消息有效负载转换为 Kafka 格式的和从 Kafka 格式转换的类。这会覆盖 Kafka Connect 配置设定的默认值。例如,
org.apache.kafka.connect.json.JsonConverter.
另外,您必须为接收器连接器至少设置以下选项之一:
主题- 用作输入的以逗号分隔的主题列表。
topics.regex- 用作输入的 Java 正则表达式。
有关所有其他选项,请参阅可用连接器的文档。
AMQ Streams 包括示例连接器配置文件 - 请参阅 AMQ Streams 安装目录中的 config/connect-file-sink.properties 和 config/connect-file-source.properties。
7.1.3. 在独立模式下运行 Kafka 连接 复制链接链接已复制到粘贴板!
这个步骤描述了如何在独立模式中配置和运行 Kafka 连接。
先决条件
- 已安装并运行 AMQ Streams} 集群。
流程
编辑
/opt/kafka/config/connect-standalone.propertiesKafka 连接配置文件,并将bootstrap.server设置为指向您的 Kafka 代理。例如:bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092Copy to Clipboard Copied! Toggle word wrap Toggle overflow 启动 Kafka 与配置文件连接,并指定一个或多个连接器配置。
su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]
su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]Copy to Clipboard Copied! Toggle word wrap Toggle overflow 验证 Kafka Connect 是否正在运行。
jcmd | grep ConnectStandalone
jcmd | grep ConnectStandaloneCopy to Clipboard Copied! Toggle word wrap Toggle overflow