7.2. 分布式模式的 Kafka 连接
在分布式模式中,Kafka Connect 在一个或多个 worker 节点上运行,并将工作负载分布到它们中。您可以使用 HTTP REST 接口管理连接器插件及其配置。
7.2.1. 在分布式模式下配置 Kafka 连接 复制链接链接已复制到粘贴板!
要在分布式模式下配置 Kafka Connect,请编辑 config/connect-distributed.properties 配置文件。以下选项是最重要的选项。
bootstrap.servers-
用作到 Kafka 的 bootstrap 连接的 Kafka 代理地址列表。例如,
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092。 key.converter-
用于将消息密钥转换到 Kafka 格式的类,以及从 Kafka 格式转换。例如,
org.apache.kafka.connect.json.JsonConverter. value.converter-
用于将消息有效负载转换为 Kafka 格式的和从 Kafka 格式转换的类。例如,
org.apache.kafka.connect.json.JsonConverter. group.id-
分布式 Kafka Connect 集群的名称。这必须是唯一的,且不得与其他消费者组 ID 冲突。默认值为
connect-cluster。 config.storage.topic-
用于存储连接器配置的 Kafka 主题。默认值为
connect-configs。 offset.storage.topic-
用于存储偏移的 Kafka 主题。默认值为
connect-offset。 status.storage.topic-
用于 worker 节点状态的 Kafka 主题。默认值为
connect-status。
AMQ Streams 在分布式模式下包括 Kafka Connect 的示例配置文件 - 请参阅 AMQ Streams 安装目录中的 config/connect-distributed.properties。
连接器插件打开了使用 bootstrap 地址到 Kafka 代理的客户端连接。要配置这些连接,请使用以 producer. 或 consumer. 为前缀的标准 Kafka producer 和使用者配置选项。
7.2.2. 在分布式 Kafka Connect 中配置连接器 复制链接链接已复制到粘贴板!
HTTP REST 接口
分布式 Kafka Connect 的连接器是使用 HTTP REST 接口配置的。REST 接口默认侦听端口 8083。它支持以下端点:
GET/connectors- 返回现有连接器列表。
POST/连接器- 创建连接器。请求正文必须是带有连接器配置的 JSON 对象。
GET/connectors/ <name>- 获取有关特定连接器的信息。
GET /connectors/ <name> /config- 获取特定连接器的配置。
PUT /connectors/ <name> /config- 更新特定连接器的配置。
GET /connectors/ <name> /status- 获取特定连接器的状态。
PUT /connectors/ <name> /pause- 暂停连接器及其所有任务。连接器将停止处理任何消息。
PUT /connectors/ <name> /resume- 恢复暂停的连接器。
POST /connectors/ <name> /restart- 如果失败,请重启连接器。
DELETE /connectors/ <name>- 删除连接器。
GET /connector-plugins- 获取所有支持的连接器插件列表。
连接器配置
大多数配置选项都是特定于连接器的连接器,包括在连接器的文档中。以下字段对于所有连接器都是通用的。
name- 连接器的名称。在给定的 Kafka Connect 实例中必须是唯一的。
connector.class-
连接器插件的类。例如,
org.apache.kafka.connect.file.FileStreamSinkConnector。 tasks.max- 此连接器使用的最大任务数量。连接器使用任务来并行处理其工作。Connetors 可能会创建比指定更少的任务。
key.converter-
用于转换消息密钥的类,以及从 Kafka 格式转换。这会覆盖 Kafka Connect 配置设定的默认值。例如,
org.apache.kafka.connect.json.JsonConverter. value.converter-
用于转换消息有效负载到 Kafka 格式的类。这会覆盖 Kafka Connect 配置设定的默认值。例如,
org.apache.kafka.connect.json.JsonConverter.
另外,必须为接收器连接器设置以下选项之一:
主题- 用作输入的以逗号分隔的主题列表。
topics.regex- 用作输入的 Java 正则表达式。
有关所有其他选项,请查看特定连接器的文档。
AMQ Streams 包括连接器配置文件示例。您可以在 AMQ Streams 安装目录中的 config/connect-file-sink.properties 和 config/connect-file-source.properties 中找到。
7.2.3. 运行分布式 Kafka 连接 复制链接链接已复制到粘贴板!
这个步骤描述了如何在分布式模式下配置和运行 Kafka 连接。
先决条件
- 已安装并运行 AMQ Streams 集群。
运行容器
编辑所有 Kafka Connect worker 节点上的
/opt/kafka/config/connect-distributed.propertiesKafka Connect 配置文件。-
设置
bootstrap.server选项以指向您的 Kafka 代理。 -
设置
group.id选项。 -
设置
config.storage.topic选项。 -
设置
offset.storage.topic选项。 设置
status.storage.topic选项。例如:
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-statusCopy to Clipboard Copied! Toggle word wrap Toggle overflow
-
设置
使用所有 Kafka Connect 节点上的
/opt/kafka/config/connect-distributed.properties配置文件启动 Kafka Connect worker。su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow 验证 Kafka Connect 是否正在运行。
jcmd | grep ConnectDistributed
jcmd | grep ConnectDistributedCopy to Clipboard Copied! Toggle word wrap Toggle overflow
7.2.4. 创建连接器 复制链接链接已复制到粘贴板!
此流程描述了如何使用 Kafka Connect 创建连接器插件,用于在分布式模式的 Kafka Connect 中使用。
先决条件
- 以分布式模式运行的 Kafka 连接安装。
流程
使用连接器配置准备 JSON 有效负载。例如:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 发送 POST 请求到
<KafkaConnectAddress>:8083/connectors,以创建连接器。以下示例使用curl:curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectorsCopy to Clipboard Copied! Toggle word wrap Toggle overflow 通过向 <
KafkaConnectAddress > :8083/connectors发送 GET 请求来验证连接器是否已部署。以下示例使用curl:curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectorsCopy to Clipboard Copied! Toggle word wrap Toggle overflow
7.2.5. 删除连接器 复制链接链接已复制到粘贴板!
这个步骤描述了如何使用 Kafka Connect 在分布式模式下从 Kafka Connect 中删除连接器插件。
先决条件
- 以分布式模式运行的 Kafka 连接安装。
删除连接器
通过向
<KafkaConnectAddress>:8083/connectors/<ConnectorName>发送GET请求来验证连接器是否存在。以下示例使用curl:curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectorsCopy to Clipboard Copied! Toggle word wrap Toggle overflow 要删除连接器,请发送
DELETE请求到 <KafkaConnectAddress > :8083/connectors。以下示例使用curl:curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow 通过将 GET 请求发送到 <
KafkaConnectAddress>:8083/connectors来验证连接器是否已删除。以下示例使用curl:curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectorsCopy to Clipboard Copied! Toggle word wrap Toggle overflow