第 8 章 在 Kafka Connect 中使用 AMQ Streams
使用 Kafka 连接到 Kafka 和外部系统之间的数据流。Kafka Connect 提供了一个用于移动大量数据的框架,同时保持可扩展性和可靠性。Kafka Connect 通常用于将 Kafka 与 Kafka 集群外部的数据库、存储和消息系统集成。
Kafka Connect 以独立或分布式模式运行。
- 独立模式
- 在独立模式中,Kafka Connect 在单一节点上运行。独立模式用于开发和测试。
- 分布式模式
- 在分布式模式中,Kafka Connect 在一个或多个 worker 节点上运行,工作负载分布在它们中。分布式模式主要用于生产环境。
Kafka Connect 使用为不同类型的外部系统实现连接的连接器插件。连接器插件有两种类型:sink 和 source。sink 连接器将数据从 Kafka 流传输到外部系统。源连接器将数据从外部系统流传输到 Kafka。
您还可以使用 Kafka Connect REST API 创建、管理和监控连接器实例。
连接器配置指定详情,如源或接收器连接器以及要从写入的 Kafka 主题。如何管理配置取决于您是否在独立或分布式模式下运行 Kafka 连接。
- 在独立模式中,您可以通过 Kafka Connect REST API 将连接器配置作为 JSON 提供,也可以使用属性文件来定义配置。
- 在分布式模式中,您只能通过 Kafka Connect REST API 将连接器配置作为 JSON 提供。
处理大量信息
您可以调整配置来处理大量信息。更多信息请参阅 第 11 章 处理大量信息。
8.1. 在独立模式中使用 Kafka 连接 复制链接链接已复制到粘贴板!
在 Kafka Connect 独立模式中,连接器与 Kafka Connect worker 进程在同一节点上运行,该进程在一个 JVM 中作为单个进程运行。这意味着 worker 进程和连接器共享相同的资源,如 CPU、内存和磁盘。
8.1.1. 在独立模式下配置 Kafka 连接 复制链接链接已复制到粘贴板!
要在独立模式下配置 Kafka Connect,请编辑 config/connect-standalone.properties 配置文件。以下选项是最重要的。
bootstrap.servers-
用作 Kafka 的 bootstrap 连接的 Kafka 代理地址列表。例如:
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092。 key.converter-
用于将消息密钥转换为 Kafka 格式或从 Kafka 格式转换的类。例如,
org.apache.kafka.connect.json.JsonConverter. value.converter-
用于将消息有效负载转换为 Kafka 格式的类。例如,
org.apache.kafka.connect.json.JsonConverter. offset.storage.file.filename- 指定存储偏移数据的文件。
连接器插件使用 bootstrap 地址打开与 Kafka 代理的客户端连接。要配置这些连接,请使用以 producer. 或 consumer. 为前缀的标准 Kafka producer 和使用者配置选项。
8.1.2. 在独立模式下运行 Kafka 连接 复制链接链接已复制到粘贴板!
在独立模式中配置并运行 Kafka Connect。
先决条件
- AMQ Streams 已安装,一个 Kafka 集群正在运行。
您已在属性文件中指定了连接器配置。
您还可以使用 Kafka Connect REST API 来管理连接器。
流程
编辑
/opt/kafka/config/connect-standalone.propertiesKafka 连接配置文件,并将bootstrap.server设置为指向您的 Kafka 代理。例如:bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用配置文件启动 Kafka 连接,并指定一个或多个连接器配置。
su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]
su - kafka /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties [connector2.properties ...]Copy to Clipboard Copied! Toggle word wrap Toggle overflow 验证 Kafka Connect 是否正在运行。
jcmd | grep ConnectStandalone
jcmd | grep ConnectStandaloneCopy to Clipboard Copied! Toggle word wrap Toggle overflow