8.2. 在分布式模式中使用 Kafka Connect
在分布式模式中,Kafka Connect 作为 worker 进程集群运行,每个 worker 在单独的节点上运行。连接器可以在集群中的任何 worker 上运行,从而提高可扩展性和容错能力。连接器由 worker 管理,它们相互协调,以分发工作并确保每个连接器在任何给定时间都在单一节点上运行。
8.2.1. 在分布式模式下配置 Kafka 连接 复制链接链接已复制到粘贴板!
要在分布式模式下配置 Kafka Connect,请编辑 config/connect-distributed.properties
配置文件。以下选项是最重要的。
bootstrap.servers
-
用作 Kafka 的 bootstrap 连接的 Kafka 代理地址列表。例如:
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
。 key.converter
-
用于将消息密钥转换为 Kafka 格式的类。例如,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
用于将消息有效负载转换为 Kafka 格式的类。例如,
org.apache.kafka.connect.json.JsonConverter
. group.id
-
分布式 Kafka Connect 集群的名称。这必须是唯一的,且不得与另一个消费者组 ID 冲突。默认值为
connect-cluster
。 config.storage.topic
-
用于存储连接器配置的 Kafka 主题。默认值为
connect-configs
。 offset.storage.topic
-
用于存储偏移的 Kafka 主题。默认值为
connect-offset
。 status.storage.topic
-
用于 worker 节点状态的 Kafka 主题。默认值为
connect-status
。
AMQ Streams 在分布式模式下包括 Kafka Connect 的示例配置文件 - 请参阅 AMQ Streams 安装目录中的 config/connect-distributed.properties
。
连接器插件使用 bootstrap 地址打开到 Kafka 代理的客户端连接。要配置这些连接,请使用以 producer.
或 consumer.
为前缀的标准 Kafka producer 和使用者配置选项。
8.2.2. 在分布式模式下运行 Kafka Connect 复制链接链接已复制到粘贴板!
在分布式模式下运行 Kafka Connect。
先决条件
- AMQ Streams 已安装,一个 Kafka 集群正在运行。
运行集群
编辑所有 Kafka Connect worker 节点上的
/opt/kafka/config/connect-distributed.properties
Kafka Connect 配置文件。-
设置
bootstrap.server
选项以指向 Kafka 代理。 -
设置
group.id
选项。 -
设置
config.storage.topic
选项。 -
设置
offset.storage.topic
选项。 设置
status.storage.topic
选项。例如:
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
-
设置
使用所有 Kafka Connect 节点上的
/opt/kafka/config/connect-distributed.properties
配置文件启动 Kafka Connect worker。su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 验证 Kafka Connect 是否正在运行。
jcmd | grep ConnectDistributed
jcmd | grep ConnectDistributed
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 使用 Kafka Connect REST API 管理连接器。