第 15 章 连接到 Kafka
Apache Kafka 是一个分布式流平台,可用于获取和发布数据。在集成中,您可以从 Kafka 主题订阅您指定的数据,或将数据发布到您指定的 Kafka 主题。要做到这一点,创建一个到 Kafka 的连接,并将该连接添加到集成流中。详情在以下主题中:
15.1. 创建到 Kafka 代理的连接 复制链接链接已复制到粘贴板!
在集成中,要订阅 Kafka 主题的数据或将数据发布到 Kafka 主题,请创建一个到 Kafka 的连接,然后将该连接添加到集成中。
先决条件
- 您已创建了 Kafka 实例,创建了服务帐户,并设置 Kafka 主题,如 开始使用 Apache Kafka 的 Red Hat OpenShift Streams 所述。您知道服务帐户的客户端 ID 和客户端 Secret。
您知道 Kafka 实例的 Bootstrap 服务器 URI。要获得 Bootstrap 服务器 URI:
- 登录到 Red Hat Managed Services web 控制台。
- 在 Web 控制台的 Kafka Instances 页面中,对于您要连接的相关 Kafka 实例,选择选项图标(三个垂直点),然后单击 Connection 以查看 Bootstrap 服务器 URI。
- 对于 PLAIN SSL 机制,您知道用户名和密码。
- 如果要使用传输层安全(TLS)加密数据,则 Kafka 代理的 PEM 证书文本。通常,您可以从 Kafka 服务器管理员获取代理证书文本。
流程
- 在 Fuse Online 中,在左侧面板中,单击 Connections 以显示任何可用的连接。
- 点击 Create Connection 以显示连接器。
- 点 Kafka Message Broker 连接器。
在 Kafka 代理 URIs 字段中,键入或选择您希望此连接访问的代理,或者输入以逗号分隔的 Kafka 代理 URI 列表。每个 URI 应采用
host:port的形式。对于 Red Hat Managed Kafka,输入 Managed Kafka 实例 Bootstrap 服务器 URI。请注意,如果您在 OpenShift 集群上安装了 Strimzi Operator 或 AMQ Streams Operator,则 URI 会被自动发现,您可以选择它。
对于 Security Protocol 字段,选择以下选项之一:
- 如果要加密数据以在传输中保护,请选择 TLS (端口层安全性)。跳至第 7 步。
- 如果要使用 SASL 验证并使用 SSL 加密您的数据(例如,在 Red Hat Managed Kafka 中),请选择 SASL_SSL。
- 如果您不想加密数据,请选择 Plain,然后跳至第 8 步。
如果您选择了 SASL_SSL 作为安全协议,则需要选择两个选项来设置凭证:
- 如果要使用 PLAIN 作为 SASL 机制,则必须设置 Username 和 Password 字段。
如果要使用 OAUTHBEARER 作为 SASL 机制,则必须设置以下字段:
- 带有 OAuth 客户端的用户名。
- 使用 OAUth 客户端 Secret 密码。
SASL 登录回调处理程序 类,您可以使用来自 kafka-clients 2.5 版本或 Strimzi 项目中的 kafka-oauth-client 的所有回调处理程序类。要连接到 Red Hat Managed Kafka,您可以使用:
io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler-
带有由您的供应商提供的
/oauth/tokenendpoint URI 的 OAuth 令牌端点 URI。
- 如果您在第 5 步中选择了 TLS,然后在 Broker 证书 字段中粘贴 Kafka 代理的 PEM 证书文本。
可选。点 Add a custom 属性指定
key:value对,以配置 Kafka producer 和使用者选项。例如,如果您希望新集成消耗来自主题的旧消息,请将
auto.offset.reset值从默认值(最新的)更改为最早的,方法是输入 auto.offset.reset for the Key 字段,并在 Value 字段中键入earlie st。有关 Kafka producer 配置选项的详情,请访问 https://kafka.apache.org/documentation/#producerconfigs
有关 Kafka consumer 配置选项的详情,请访问 https://kafka.apache.org/documentation/#consumerconfigs
注意: 如果您添加配置属性,Fuse Online 不会将它们包含在下一步中,作为其验证过程的一部分。
- 单击 Validate。Fuse Online 立即尝试验证连接并显示一条消息,指示验证是否成功。如果验证失败,请调整输入参数并重试。
- 如果验证成功,点 下一步。
- 在 Name 字段中输入您的选择的名称,帮助您将这个连接与任何其他连接区分开来。例如,您可以键入 Kafka Test。
- 在 Description 字段中输入任何有助于了解这个连接的信息。
- 点 Save 以查看您创建的连接现在可用。如果您输入了示例名称,您会看到 Kafka Test 显示为您可以选择添加到集成的连接。