8.2. 在分布式模式中使用 Kafka Connect
在分布式模式中,Kafka Connect 作为 worker 进程集群运行,每个 worker 在单独的节点上运行。连接器可以在集群中的任何 worker 上运行,从而提高可扩展性和容错能力。连接器由 worker 管理,它们相互协调,以分发工作并确保每个连接器在任何给定时间都在单一节点上运行。
8.2.1. 在分布式模式下配置 Kafka 连接 复制链接链接已复制到粘贴板!
要在分布式模式下配置 Kafka Connect,请编辑 config/connect-distributed.properties 配置文件。以下选项是最重要的。
bootstrap.servers-
用作 Kafka 的 bootstrap 连接的 Kafka 代理地址列表。例如:
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092。 key.converter-
用于将消息密钥转换为 Kafka 格式的类。例如,
org.apache.kafka.connect.json.JsonConverter. value.converter-
用于将消息有效负载转换为 Kafka 格式的类。例如,
org.apache.kafka.connect.json.JsonConverter. group.id-
分布式 Kafka Connect 集群的名称。这必须是唯一的,且不得与另一个消费者组 ID 冲突。默认值为
connect-cluster。 config.storage.topic-
用于存储连接器配置的 Kafka 主题。默认值为
connect-configs。 offset.storage.topic-
用于存储偏移的 Kafka 主题。默认值为
connect-offset。 status.storage.topic-
用于 worker 节点状态的 Kafka 主题。默认值为
connect-status。
AMQ Streams 在分布式模式下包括 Kafka Connect 的示例配置文件 - 请参阅 AMQ Streams 安装目录中的 config/connect-distributed.properties。
连接器插件使用 bootstrap 地址打开到 Kafka 代理的客户端连接。要配置这些连接,请使用以 producer. 或 consumer. 为前缀的标准 Kafka producer 和使用者配置选项。
8.2.2. 在分布式模式下运行 Kafka Connect 复制链接链接已复制到粘贴板!
在分布式模式下运行 Kafka Connect。
先决条件
- AMQ Streams 已安装,一个 Kafka 集群正在运行。
运行集群
编辑所有 Kafka Connect worker 节点上的
/opt/kafka/config/connect-distributed.propertiesKafka Connect 配置文件。-
设置
bootstrap.server选项以指向 Kafka 代理。 -
设置
group.id选项。 -
设置
config.storage.topic选项。 -
设置
offset.storage.topic选项。 设置
status.storage.topic选项。例如:
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-statusCopy to Clipboard Copied! Toggle word wrap Toggle overflow
-
设置
使用所有 Kafka Connect 节点上的
/opt/kafka/config/connect-distributed.properties配置文件启动 Kafka Connect worker。su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow 验证 Kafka Connect 是否正在运行。
jcmd | grep ConnectDistributed
jcmd | grep ConnectDistributedCopy to Clipboard Copied! Toggle word wrap Toggle overflow - 使用 Kafka Connect REST API 管理连接器。