5.2.5. 部署示例 KafkaConnector 资源
AMQ Streams 在 example 。这会创建一个基本的 /connect/source-connector.yaml 中包含一个 KafkaConnect or 示例FileStreamSourceConnector 实例,它将 Kafka 许可证文件的每一行(一个文件源示例)发送到单个 Kafka 主题。
这个步骤描述了如何创建:
-
从 Kafka 许可证文件(源)读取数据的
FileStreamSourceConnector,并将数据作为消息写入 Kafka 主题。 -
从 Kafka 主题读取信息并将消息写入临时文件( sink)的
FileStreamSinkConnector。
在生产环境中,您要准备包含所需 Kafka Connect 连接器的容器镜像,如 第 5.2.3 节 “使用连接器插件扩展 Kafka Connect” 所述。
提供 FileStreamSourceConnector 和 FileStreamSinkConnector 作为示例。如此处所述,在容器中运行这些连接器不太可能适合生产用例。
先决条件
- Kafka Connect 部署
- KafkaConnectors 在 Kafka Connect 部署中启用
- Cluster Operator 正在运行
流程
编辑 example
/connect/source-connector.yaml文件:Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在 OpenShift 集群中创建源
KafkaConnector:oc apply -f examples/connect/source-connector.yaml
oc apply -f examples/connect/source-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 创建一个
示例/connect/sink-connector.yaml文件:touch examples/connect/sink-connector.yaml
touch examples/connect/sink-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 将以下 YAML 粘贴到
sink-connector.yaml文件中:Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在 OpenShift 集群中创建 sink
KafkaConnector:oc apply -f examples/connect/sink-connector.yaml
oc apply -f examples/connect/sink-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 检查是否创建了连接器资源:
oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name my-source-connector my-sink-connector
oc get kctr --selector strimzi.io/cluster=MY-CONNECT-CLUSTER -o name my-source-connector my-sink-connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow 将 MY-CONNECT-CLUSTER 替换为您的 Kafka 连接集群。
在容器中,执行
kafka-console-consumer.sh读取由源连接器写入主题的消息:oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
oc exec MY-CLUSTER-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server MY-CLUSTER-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginningCopy to Clipboard Copied! Toggle word wrap Toggle overflow
源和接收器连接器配置选项
连接器配置在 KafkaConnector 资源的 spec.config 属性中定义。
FileStreamSourceConnector 和 FileStreamSinkConnector 类支持与 Kafka Connect REST API 相同的配置选项。其他连接器支持不同的配置选项。
| 名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
|
| 字符串 | null | 可将消息写入到的源文件。如果没有指定,则使用标准输入。 |
|
| list | null | 发布数据的 Kafka 主题。 |
| 名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
|
| 字符串 | null | 要写入消息的目标文件。如果没有指定,则使用标准输出。 |
|
| list | null | 一个或多个 Kafka 主题,从中读取数据。 |
|
| 字符串 | null | 与一个或多个 Kafka 主题匹配以从中读取数据的正则表达式。 |
其它资源