8.2. 在分布式模式下的 Kafka Connect
在分布式模式中,Kafka Connect 在一个或多个 worker 节点上运行,工作负载分布在它们中。您可以使用 HTTP REST 接口管理连接器插件及其配置。
8.2.1. 在分布式模式下配置 Kafka 连接 复制链接链接已复制到粘贴板!
要在分布式模式下配置 Kafka Connect,请编辑 config/connect-distributed.properties
配置文件。以下选项是最重要的。
bootstrap.servers
-
用作 Kafka 的 bootstrap 连接的 Kafka 代理地址列表。例如:
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
。 key.converter
-
用于将消息密钥转换为 Kafka 格式的类。例如,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
用于将消息有效负载转换为 Kafka 格式的类。例如,
org.apache.kafka.connect.json.JsonConverter
. group.id
-
分布式 Kafka Connect 集群的名称。这必须是唯一的,且不得与另一个消费者组 ID 冲突。默认值为
connect-cluster
。 config.storage.topic
-
用于存储连接器配置的 Kafka 主题。默认值为
connect-configs
。 offset.storage.topic
-
用于存储偏移的 Kafka 主题。默认值为
connect-offset
。 status.storage.topic
-
用于 worker 节点状态的 Kafka 主题。默认值为
connect-status
。
AMQ Streams 在分布式模式下包括 Kafka Connect 的示例配置文件 - 请参阅 AMQ Streams 安装目录中的 config/connect-distributed.properties
。
有关所有支持的 Kafka Connect 配置选项的完整列表,请参阅 附录 F, Kafka Connect 配置参数。
连接器插件使用 bootstrap 地址打开到 Kafka 代理的客户端连接。要配置这些连接,请使用以 producer.
或 consumer.
为前缀的标准 Kafka producer 和使用者配置选项。
有关配置 Kafka 生成者和消费者的更多信息,请参阅:
8.2.2. 在分布式 Kafka Connect 中配置连接器 复制链接链接已复制到粘贴板!
HTTP REST 接口
分布式 Kafka Connect 的连接器使用 HTTP REST 接口进行配置。默认情况下,REST 接口侦听端口 8083。它支持以下端点:
GET /connectors
- 返回现有连接器列表。
POST /connectors
- 创建连接器。请求正文必须是带有连接器配置的 JSON 对象。
GET /connectors/ <name>
- 获取有关特定连接器的信息。
GET /connectors/ <name> /config
- 获取特定连接器的配置。
PUT /connectors/ <name> /config
- 更新特定连接器的配置。
GET /connectors/ <name> /status
- 获取特定连接器的状态。
PUT /connectors/ <name> /pause
- 暂停连接器及其所有任务。连接器将停止处理任何信息。
PUT /connectors/ <name> /resume
- 恢复暂停的连接器。
POST /connectors/ <name> /restart
- 如果连接器失败,请重启它。
DELETE /connectors/ <name>
- 删除连接器。
GET /connector-plugins
- 获取所有支持的连接器插件的列表。
连接器配置
大多数配置选项都是特定于连接器的,包括在连接器的文档中。以下字段适用于所有连接器。
name
- 连接器的名称。在给定的 Kafka Connect 实例中必须是唯一的。
connector.class
-
连接器插件的类。例如
org.apache.kafka.connect.file.FileStreamSinkConnector
。 tasks.max
- 此连接器使用的最大任务数量。连接器使用任务来并行处理其工作。Connetors 可以创建比指定更少的任务。
key.converter
-
用于将消息密钥转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
用于将消息有效负载转换为 Kafka 格式的类。这会覆盖 Kafka Connect 配置设置的默认值。例如,
org.apache.kafka.connect.json.JsonConverter
.
另外,必须为接收器连接器设置以下选项之一:
topics
- 以逗号分隔的主题列表,用作输入。
topics.regex
- 用作输入的 Java 正则表达式。
有关所有其他选项,请查看特定连接器的文档。
AMQ Streams 包括示例连接器配置文件。您可以在 AMQ Streams 安装目录中的 config/connect-file-sink.properties
和 config/connect-file-source.properties
中找到。
8.2.3. 运行分布式 Kafka Connect 复制链接链接已复制到粘贴板!
这个步骤描述了如何在分布式模式下配置和运行 Kafka Connect。
先决条件
- 已安装并运行 AMQ Streams 集群。
运行集群
编辑所有 Kafka Connect worker 节点上的
/opt/kafka/config/connect-distributed.properties
Kafka Connect 配置文件。-
设置
bootstrap.server
选项以指向 Kafka 代理。 -
设置
group.id
选项。 -
设置
config.storage.topic
选项。 -
设置
offset.storage.topic
选项。 设置
status.storage.topic
选项。例如:
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
-
设置
使用所有 Kafka Connect 节点上的
/opt/kafka/config/connect-distributed.properties
配置文件启动 Kafka Connect worker。su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 验证 Kafka Connect 是否正在运行。
jcmd | grep ConnectDistributed
jcmd | grep ConnectDistributed
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
其他资源
- 有关安装 AMQ Streams 的详情,请参考 第 2.3 节 “安装 AMQ Streams”。
- 有关配置 AMQ Streams 的详情,请参考 第 2.8 节 “配置 AMQ Streams”。
- 有关支持的 Kafka Connect 配置选项的完整列表,请参阅 附录 F, Kafka Connect 配置参数。
8.2.4. 创建连接器 复制链接链接已复制到粘贴板!
此流程描述了如何使用 Kafka Connect REST API 创建连接器插件,以便在分布式模式中使用 Kafka Connect。
先决条件
- 以分布式模式运行的 Kafka 连接安装。
流程
使用连接器配置准备 JSON 有效负载。例如:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 发送 POST 请求到
<KafkaConnectAddress>:8083/connectors
,以创建连接器。以下示例使用curl
:curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 通过向 <
KafkaConnectAddress > :8083/connectors
发送 GET 请求来验证连接器是否已部署。以下示例使用curl
:curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectors
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
8.2.5. 删除连接器 复制链接链接已复制到粘贴板!
此流程描述了如何使用 Kafka Connect REST API 从分布式模式的 Kafka Connect 中删除连接器插件。
先决条件
- 以分布式模式运行的 Kafka 连接安装。
删除连接器
通过向
<KafkaConnectAddress>:8083/connectors/<ConnectorName>
发送GET
请求来验证连接器是否存在。以下示例使用curl
:curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectors
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 要删除连接器,请发送
DELETE
请求到 <KafkaConnectAddress& gt; :8083/connectors
。以下示例使用curl
:curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 通过向 <
KafkaConnectAddress > :8083/connectors
发送 GET 请求来验证连接器是否已删除。以下示例使用curl
:curl http://connect0.my-domain.com:8083/connectors
curl http://connect0.my-domain.com:8083/connectors
Copy to Clipboard Copied! Toggle word wrap Toggle overflow