开发 Kafka 客户端应用程序


Red Hat AMQ Streams 2.4

开发客户端应用程序以使用 AMQ Streams 与 Kafka 交互

摘要

开发可通过 Kafka 代理发送和接收消息的客户端应用程序。在客户端和代理之间建立安全访问。

使开源包含更多

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息

第 1 章 开发客户端概述

为 AMQ Streams 安装开发 Kafka 客户端应用程序,可生成信息、消耗消息或两者。您可以开发客户端应用程序,以用于 OpenShift 上的 AMQ Streams 或 RHEL 上的 AMQ Streams。

消息包含一个可选的键和值,其中包含消息数据,以及标头和相关元数据。键标识消息的主题或消息的属性。如果您需要按照发送的顺序处理一组消息,则必须使用相同的密钥。

消息以批处理的形式发送。消息包含标头和元数据,它们提供客户端过滤和路由的详情,如消息的时间戳和偏移位置。

Kafka 为开发客户端应用程序提供客户端 API。Kafka 生成者和消费者 API 是与客户端应用程序中的 Kafka 集群交互的主要方法。API 控制消息流。producer API 发送消息到 Kafka 主题,而使用者 API 从主题读取信息。

AMQ Streams 支持使用 Java 编写的客户端。您如何开发您的客户端取决于您的具体用例。数据持久性可能是优先级或高吞吐量。这些需求可以通过配置客户端和代理来满足。但是,所有客户端都必须能够连接到给定 Kafka 集群中的所有代理。

1.1. 支持 HTTP 客户端

作为在客户端上使用 Kafka producer 和消费者 API 的替代选择,您可以设置和使用 AMQ Streams Kafka Bridge。Kafka Bridge 提供了一个 RESTful 接口,它允许基于 HTTP 的客户端与 Kafka 集群交互。它提供与 Strimzi 的 Web API 连接的优点,而无需解释 Kafka 协议的客户端应用程序。Kafka 通过 TCP 使用二进制协议。

如需更多信息,请参阅使用 AMQ Streams Kafka Bridge

1.2. 调整您的生产者和消费者

您可以添加更多配置属性来优化 Kafka 客户端的性能。当您有一定时间分析客户端和代理配置如何执行时,您可能要执行此操作。

如需更多信息,请参阅 Kafka 配置调整

1.3. 监控客户端交互

分布式追踪有助于端到端跟踪消息。您可以在 Kafka 使用者和制作者客户端应用程序中启用追踪。

如需更多信息,请参阅以下指南中的分布式追踪文档:

注意

当使用术语客户端应用程序时,我们专门引用使用 Kafka 生成者和消费者向 Kafka 集群发送和接收信息的应用程序。我们没有引用其他 Kafka 组件,如 Kafka Connect 或 Kafka Streams,它们都有自己的不同用例和功能。

第 2 章 客户端开发先决条件

开发与 AMQ Streams 搭配使用的客户端需要以下先决条件。

  • 您有红帽帐户。
  • 您有一个 Kafka 集群在 AMQ Streams 中运行。
  • Kafka 代理配置为安全客户端连接的监听程序。
  • 已为集群创建主题。
  • 您有一个 IDE 来开发和测试您的客户端。
  • 已安装了 JDK 11 或更高版本。

第 3 章 在 Maven 项目中添加客户端依赖项

如果要开发基于 Java 的 Kafka 客户端,您可以将 Kafka 客户端的红帽依赖项(包括 Kafka Streams)添加到 Maven 项目的 pom.xml 文件中。只有由红帽构建的客户端库才支持 AMQ Streams。

您可以将以下工件添加为依赖项:

kafka-clients

包含 Kafka ProducerConsumerAdminClient API。

  • Producer API 可让应用程序将数据发送到 Kafka 代理。
  • Consumer API 可让应用程序使用来自 Kafka 代理的数据。
  • AdminClient API 提供了管理 Kafka 集群的功能,包括主题、代理和其他组件。
kafka-streams

包含 KafkaStreams API。

Kafka Streams 可让应用程序从一个或多个输入流接收数据。您可以使用此 API 对数据流(如映射、过滤和加入)运行一系列实时操作。您可以使用 Kafka Streams 将结果写入一个或多个输出流。它是 Red Hat Maven 存储库中的 kafka-streams JAR 软件包的一部分。

3.1. 在 Maven 项目中添加 Kafka 客户端依赖项

在您的 Maven 项目中为 Kafka 客户端添加红帽依赖项。

先决条件

  • 具有现有 pom.xml 的 Maven 项目。

流程

  1. 将 Red Hat Maven 存储库添加到 Maven 项目的 pom.xml 文件的 < repositories > 部分。

    <repositories>
        <repository>
            <id>redhat-maven</id>
            <url>https://maven.repository.redhat.com/ga/</url>
        </repository>
    </repositories>
  2. kafka-clients 作为 & lt;dependency > 添加到 Maven 项目的 pom.xml 文件中。

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0.redhat-00004</version>
        </dependency>
    </dependencies>
  3. 构建 Maven 项目,将 Kafka 客户端依赖项添加到项目中。

3.2. 在 Maven 项目中添加 Kafka Streams 依赖项

在您的 Maven 项目中为 Kafka Streams 添加红帽依赖项。

先决条件

  • 具有现有 pom.xml 的 Maven 项目。

流程

  1. 将 Red Hat Maven 存储库添加到 Maven 项目的 pom.xml 文件的 < repositories > 部分。

    <repositories>
        <repository>
            <id>redhat-maven</id>
            <url>https://maven.repository.redhat.com/ga/</url>
        </repository>
    </repositories>
  2. kafka-streams 作为 & lt;dependency > 添加到 Maven 项目的 pom.xml 文件中。

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>3.4.0.redhat-00004</version>
        </dependency>
    </dependencies>
  3. 构建 Maven 项目,将 Kafka Streams 依赖项添加到项目中。

3.3. 在您的 Maven 项目中添加 OAuth 2.0 依赖项

将 OAuth 2.0 的红帽依赖项添加到您的 Maven 项目中。

先决条件

  • 具有现有 pom.xml 的 Maven 项目。

流程

  1. 将 Red Hat Maven 存储库添加到 Maven 项目的 pom.xml 文件的 < repositories > 部分。

    <repositories>
        <repository>
            <id>redhat-maven</id>
            <url>https://maven.repository.redhat.com/ga/</url>
        </repository>
    </repositories>
  2. kafka-oauth-client 作为 & lt;dependency > 添加到 Maven 项目的 pom.xml 文件中。

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.12.0.redhat-00006</version>
    </dependency>
  3. 构建 Maven 项目,将 OAuth 2.0 依赖项添加到项目。

要连接到 Kafka 集群,必须将客户端应用程序配置为一组用来识别代理并启用连接的最小属性。另外,您需要添加一个序列化器/反序列化机制,将信息转换为 Kafka 使用的字节数组格式或移出。在开发消费者客户端时,您首先将初始连接添加到 Kafka 集群,用于发现所有可用的代理。建立连接后,您可以开始使用 Kafka 主题的信息或向它们生成信息。

虽然不需要,但建议使用唯一的客户端 ID,以便您可以在日志和指标集合中标识您的客户端。

您可以在属性文件中配置属性。使用属性文件意味着您可以修改配置,而无需重新编译代码。

例如,您可以使用以下代码在 Java 客户端中载入属性:

将配置属性加载到客户端中

Properties properties = new Properties();
InsetPropertyStream insetPropertyStream = new FileInsetPropertyStream("config.properties");
properties.load(insetPropertyStream);
KafkaProducer<String, String> consumer = new KafkaProducer<>(properties);

您还可以直接使用将属性直接添加到配置对象中的代码中。例如,您可以将 setProperty () 方法用于 Java 客户端应用程序。当您只有少量配置属性时,直接添加属性是一个有用的选项。

4.1. 基本制作者客户端配置

在开发制作者客户端时,请配置以下内容:

  • 与 Kafka 集群的连接
  • 将 Kafka 代理的消息密钥转换为字节的序列化器
  • 将 Kafka 代理的消息值转换为字节的序列化器

如果您想要发送和存储压缩消息,您可能还会添加压缩类型。

基本制作者客户端配置属性

client.id = my-producer-id 
1

bootstrap.servers = my-cluster-kafka-bootstrap:9092 
2

key.serializer = org.apache.kafka.common.serialization.StringSerializer 
3

value.serializer = org.apache.kafka.common.serialization.StringSerializer 
4

1
客户端的逻辑名称。
2
客户端的 bootstrap 地址,以便客户端能够进行到 Kafka 集群的初始连接。
3
在发送到 Kafka 代理前,序列化器将消息密钥转换为字节。
4
在发送到 Kafka 代理前,序列化器将消息值转换为字节。

将制作者客户端配置直接添加到代码中

Properties props = new Properties();
props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-producer-id");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

KafkaProducer 指定它发送的消息的字符串键和值类型。在将键和值从指定类型转换为字节前,必须使用的序列化程序将它们转换为 Kafka。

4.2. 基本消费者客户端配置

在开发消费者客户端时,请配置以下内容:

  • 与 Kafka 集群的连接
  • 一个反序列化器,用于将从 Kafka 代理获取的字节转换为客户端应用程序可理解的消息键
  • 将从 Kafka 代理获取的字节转换为消息值的反序列化器,可供客户端应用程序理解

通常,您还添加一个消费者组 ID,以将消费者与消费者组关联。用户组是一个逻辑实体,用于将大型数据流处理从一个或多个主题分发到并行用户。消费者使用 group.id 分组,允许消息分散到成员中。在给定的消费者组中,每个主题分区都由一个消费者读取。单个消费者可以处理许多分区。对于最大并行性,为每个分区创建一个消费者。如果有超过分区的用户,一些消费者会保持闲置状态,在出现故障时可以接管。

基本消费者客户端配置属性

client.id = my-consumer-id 
1

group.id = my-group-id 
2

bootstrap.servers = my-cluster-kafka-bootstrap:9092 
3

key.deserializer = org.apache.kafka.common.serialization.StringDeserializer 
4

value.deserializer = org.apache.kafka.common.serialization.StringDeserializer 
5

1
客户端的逻辑名称。
2
消费者的组 ID,以便能够加入特定的消费者组。
3
客户端的 bootstrap 地址,以便客户端能够进行到 Kafka 集群的初始连接。
4
反序列化程序,将从 Kafka 代理获取的字节转换为消息键。
5
反序列化程序,将从 Kafka 代理获取的字节转换为消息值。

将消费者客户端配置直接添加到代码中

Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer-id");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

KafkaConsumer 为它接收的信息指定字符串键和值类型。使用的序列化程序必须能够将从 Kafka 接收的字节转换为指定类型。

注意

每个消费者组都必须具有唯一的 group.id。如果您使用相同的 group.id 重新启动消费者,它会在停止前从其停止前恢复消耗的消息。

第 5 章 配置安全连接

保护 Kafka 集群和客户端应用程序之间的连接有助于确保集群和客户端之间的通信的机密性、完整性和真实性。

要实现安全连接,您可以引入与身份验证、加密和授权相关的配置:

身份验证
使用身份验证机制来验证客户端应用程序的身份。
加密
启用使用 SSL/TLS 加密在客户端和服务器之间传输的数据加密。
授权
根据客户端应用程序的身份验证身份控制 Kafka 代理上允许的客户端访问和操作。

在没有身份验证的情况下无法使用授权。如果没有启用身份验证,则无法确定客户端的身份,因此无法强制执行授权规则。这意味着,即使定义了授权规则,也不会在没有身份验证的情况下强制执行它们。

在 AMQ Streams 中,监听器用于在 Kafka 代理和客户端之间配置网络连接。侦听器配置选项决定了代理如何侦听进入客户端连接以及如何管理安全访问。所需的配置取决于您选择的身份验证、加密和授权机制。

您可以配置 Kafka 代理和客户端应用程序以启用安全功能。保护到 Kafka 集群的客户端连接的一般概述如下:

  1. 安装 AMQ Streams 组件,包括 Kafka 集群。
  2. 对于 TLS,请为每个代理和客户端应用程序生成 TLS 证书。
  3. 在代理配置中配置监听程序以安全连接。
  4. 为安全连接配置客户端应用程序。

根据您用来建立与 Kafka 代理的安全和验证连接的机制配置客户端应用程序。Kafka 代理使用的身份验证、加密和授权必须与连接的客户端应用程序使用的身份验证、加密和授权匹配。客户端应用程序和代理需要同意安全协议和配置,以进行安全通信。例如,Kafka 客户端和 Kafka 代理必须使用相同的 TLS 版本和密码套件。

注意

客户端和服务器之间不匹配的安全配置可能会导致连接失败或潜在的安全漏洞。仔细配置和测试代理和客户端应用程序非常重要,以确保它们被正确保护,并能够安全地通信。

5.1. 为安全访问设置代理

在为安全访问配置客户端应用程序前,您必须首先在 Kafka 集群中设置代理来支持您要使用的安全机制。要启用安全连接,您可以使用适当的安全机制配置创建监听程序。

在 RHEL 上使用 AMQ Streams 时,保护与 Kafka 集群的客户端连接的一般概述如下:

  1. 在 RHEL 服务器上安装 AMQ Streams 组件,包括 Kafka 集群。
  2. 对于 TLS,为 Kafka 集群中的所有代理生成 TLS 证书。
  3. 在代理配置属性文件中配置监听程序。

    • 为 Kafka 集群监听程序配置身份验证,如 TLS 或 SASL SCRAM-SHA-512。
    • 为 Kafka 集群上所有启用的监听程序(如 简单 授权)配置授权。
  4. 对于 TLS,请为每个客户端应用程序生成 TLS 证书。
  5. 创建 config.properties 文件,以指定客户端应用程序使用的连接详情和身份验证凭证。
  6. 启动 Kafka 客户端应用程序并连接到 Kafka 集群。

    • 使用 config.properties 文件中定义的属性连接到 Kafka 代理。
  7. 验证客户端是否可以成功连接到 Kafka 集群,并安全地使用并生成信息。

有关设置代理的更多信息,请参阅在 RHEL 上使用 AMQ Streams

5.1.2. 为 RHEL 上的 Kafka 集群配置安全监听程序

使用配置属性文件在 Kafka 中配置监听程序。要为 Kafka 代理配置安全连接,您可以在此文件中为 TLS、SASL 和其他与安全相关的配置设置相关属性。

以下是在 Kafka 代理的 server.properties 配置文件中指定的 TLS 侦听器配置示例,其密钥存储和 truststore 格式为:

server.properties中的监听程序配置示例

listeners = listener_1://0.0.0.0:9093, listener_2://0.0.0.0:9094
listener.security.protocol.map = listener_1:SSL, listener_2:PLAINTEXT
ssl.keystore.type = PKCS12
ssl.keystore.location = /path/to/keystore.p12
ssl.keystore.password = <password>
ssl.truststore.type = PKCS12
ssl.truststore.location = /path/to/truststore.p12
ssl.truststore.password = <password>
ssl.client.auth = required
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer.
super.users = User:superuser

listener 属性指定每个监听程序名称,以及代理侦听的 IP 地址和端口。protocol 映射告知 listener_1 侦听器将 SSL 协议用于使用 TLS 加密的客户端。listener_2 为不使用 TLS 加密的客户端提供 PLAINTEXT 连接。密钥存储包含代理的私钥和证书。truststore 包含用于验证客户端应用程序身份的可信证书。ssl.client.auth 属性强制执行客户端身份验证。

Kafka 集群使用简单的授权。授权器设置为 SimpleAclAuthorizer。定义了单个超级用户,在所有监听器上没有限制访问。AMQ Streams 支持 Kafka SimpleAclAuthorizer 和自定义授权器插件。

如果我们使用 listener.name.<name_of_listener> 为配置属性添加前缀,则配置特定于该监听程序。

这只是一个示例配置。某些配置选项特定于监听器类型。如果您使用 OAuth 2.0 或 Open Policy Agent (OPA),还必须在特定监听器中配置对授权服务器或 OPA 服务器的访问。您可以根据特定要求和环境创建监听程序。

有关监听程序配置的更多信息,请参阅 Apache Kafka 文档

使用 ACL 进行微调访问

您可以使用访问控制列表(ACL)来微调对 Kafka 集群的访问。要创建和管理访问控制列表(ACL),请使用 kafka-acls.sh 命令行工具。ACL 将访问规则应用到客户端应用程序。

在以下示例中,第一个 ACL 授予名为 my-topic 的特定主题的读和描述权限。resource.patternType 设置为 literal,这意味着资源名称必须完全匹配。

第二个 ACL 为名为 my-group 的特定消费者组授予读取权限。resource.patternType 设置为 prefix,这意味着资源名称必须与前缀匹配。

ACL 配置示例

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:my-user --operation Read --operation Describe --topic my-topic --resource-pattern-type literal \
--allow-principal User:my-user --operation Read --group my-group --resource-pattern-type prefixed

在 OpenShift 上使用 AMQ Streams 时,保护与 Kafka 集群的客户端连接的一般概述如下:

  1. 使用 Cluster Operator 在 OpenShift 环境中部署 Kafka 集群。使用 Kafka 自定义资源配置和安装集群并创建监听程序。

    • 为监听程序配置身份验证,如 TLS 或 SASL SCRAM-SHA-512。Cluster Operator 创建一个包含集群 CA 证书的 secret,以验证 Kafka 代理的身份。
    • 为所有启用的监听程序配置授权,如 简单的 授权。
  2. 使用 User Operator 创建代表您的客户端的 Kafka 用户。使用 KafkaUser 自定义资源配置和创建用户。

    • 为与监听器验证机制匹配的 Kafka 用户(客户端)配置身份验证。User Operator 创建一个包含客户端用于与 Kafka 集群进行身份验证的客户端的 secret。
    • 为您的 Kafka 用户(客户端)配置与监听器的授权机制匹配的授权。授权规则允许对 Kafka 集群上的特定操作。
  3. 创建 config.properties 文件,以指定客户端应用程序连接到集群所需的连接详情和身份验证凭证。
  4. 启动 Kafka 客户端应用程序并连接到 Kafka 集群。

    • 使用 config.properties 文件中定义的属性连接到 Kafka 代理。
  5. 验证客户端是否可以成功连接到 Kafka 集群,并安全地使用并生成信息。

有关设置代理的更多信息,请参阅在 OpenShift 中配置 AMQ Streams

当您使用 AMQ Streams 部署 Kafka 自定义资源时,您可以在 Kafka spec 中添加监听程序配置。使用监听程序配置来保护 Kafka 中的连接。要为 Kafka 代理配置安全连接,请在监听器级别上为 TLS、SASL 和其他与安全相关的配置设置相关属性。

外部监听程序提供对 OpenShift 集群外部的 Kafka 集群的客户端访问。AMQ Streams 创建监听器服务和 bootstrap 地址,以根据配置启用对 Kafka 集群的访问。例如,您可以创建使用以下连接机制的外部监听程序:

  • 节点端口
  • loadbalancers
  • OpenShift 路由

以下是 Kafka 资源的 nodeport 侦听器配置示例:

Kafka 资源中的监听程序配置示例

apiVersion: {KafkaApiVersion}
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    listeners:
      - name: plaintext
        port: 9092
        type: internal
        tls: false
        configuration:
          useServiceDnsDomain: true
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: route
        tls: true
        authentication:
          type: tls
    authorization:
      type: simple
      superUsers:
        - CN=superuser
  # ...

监听器 属性配置了三个监听程序: 纯文本tlsexternal外部监听程序nodeport 类型,它使用 TLS 进行加密和验证。当使用 Cluster Operator 创建 Kafka 集群时,会自动生成 CA 证书。您可以将集群 CA 添加到客户端应用程序的信任存储中,以验证 Kafka 代理的身份。另外,您可以将 AMQ Streams 配置为使用代理或监听程序级别的自己的证书。当客户端应用程序需要不同的安全配置时,可能需要在监听器级别使用证书。在监听器级别使用证书也会添加额外的控制层和安全性。

提示

使用配置供应商插件将配置数据加载到生成者和消费者客户端。配置提供程序插件从 secret 或 ConfigMap 加载配置数据。例如,您可以告知提供程序自动从 Strimzi secret 获取证书。如需更多信息,请参阅在 OpenShift 上运行的 AMQ Streams 文档

Kafka 集群使用简单的授权。authorization 属性类型设置为 simple。定义了单个超级用户,在所有监听器上没有限制访问。AMQ Streams 支持 Kafka SimpleAclAuthorizer 和自定义授权器插件。

这只是一个示例配置。某些配置选项特定于监听器类型。如果您使用 OAuth 2.0 或 Open Policy Agent (OPA),还必须在特定监听器中配置对授权服务器或 OPA 服务器的访问。您可以根据特定要求和环境创建监听程序。

如需有关监听器配置的更多信息,请参阅 GenericKafkaListener 模式参考

注意

当使用 路由类型 监听程序来访问 OpenShift 上的 Kafka 集群时,会启用 TLS 透传功能。OpenShift 路由旨在使用 HTTP 协议,但它也可用于为其他协议代理网络流量,包括 Apache Kafka 使用的 Kafka 协议。客户端建立与路由的连接,路由将使用 TLS Server Name Indication (SNI)扩展将流量转发到 OpenShift 集群中运行的代理,以获取目标主机名。SNI 扩展允许路由正确识别每个连接的目标代理。

使用 ACL 进行微调访问

您可以使用访问控制列表(ACL)来微调对 Kafka 集群的访问。要添加访问控制列表(ACL),您可以配置 KafkaUser 自定义资源。当您创建 KafkaUser 时,AMQ Streams 会自动管理创建和更新 ACL。ACL 将访问规则应用到客户端应用程序。

在以下示例中,第一个 ACL 授予名为 my-topic 的特定主题的读和描述权限。resource.patternType 设置为 literal,这意味着资源名称必须完全匹配。

第二个 ACL 为名为 my-group 的特定消费者组授予读取权限。resource.patternType 设置为 prefix,这意味着资源名称必须与前缀匹配。

KafkaUser 资源中的 ACL 配置示例

apiVersion: {KafkaUserApiVersion}
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  # ...
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operations:
          - Read
          - Describe
      - resource:
          type: group
          name: my-group
          patternType: prefix
        operations:
          - Read

注意

如果在配置 Kafka 用户时将 tls-external 指定为身份验证选项,您可以使用自己的客户端证书,而不是由 User Operator 生成的客户端证书。

5.2. 为安全访问设置客户端

在 Kafka 代理上设置监听程序来支持安全连接后,下一步是将客户端应用程序配置为使用这些监听程序与 Kafka 集群通信。这包括为每个客户端提供适当的安全设置,以便根据监听器上配置的安全机制与集群进行身份验证。

5.2.1. 配置安全协议

配置客户端应用程序使用的安全协议,以匹配 Kafka 代理监听程序中配置的协议。例如,对 TLS 身份验证使用 SSL (安全套接字层),或使用 TLS 加密对 SASL (Simple Authentication and Security Layer)验证 SASL_SSL。在您的客户端配置中添加 truststore 和 keystore,它支持访问 Kafka 集群所需的身份验证机制。

truststore
truststore 包含用于验证 Kafka 代理真实性的可信证书颁发机构(CA)的公共证书。当客户端连接到安全 Kafka 代理时,可能需要验证代理的身份。
keystore
密钥存储包含客户端的私钥及其公钥。当客户端希望向代理验证其自身时,它会显示自己的证书。

如果使用 TLS 身份验证,您的 Kafka 客户端配置需要一个信任存储和密钥存储来连接到 Kafka 集群。如果您使用 SASL SCRAM-SHA-512,则通过交换用户名和密码凭证而不是数字证书来执行身份验证,因此不需要密钥存储。SCRAM-SHA-512 是一种更加轻量级的机制,但它不像使用基于证书的身份验证一样安全。

注意

如果您有自己的证书基础架构并使用来自第三方 CA 的证书,那么客户端的默认信任存储可能已包含公共 CA 证书,您不需要将它们添加到客户端的信任存储中。如果服务器证书由已包含在默认信任存储中的公共 CA 证书之一签名,客户端会自动信任它。

您可以创建一个 config.properties 文件来指定客户端应用程序使用的身份验证凭证。

在以下示例中,security.protocol 设置为 SSL,以启用客户端和代理之间的 TLS 身份验证和加密。

ssl.truststore.locationssl.truststore.password 属性指定信任存储的位置和密码。ssl.keystore.locationssl.keystore.password 属性指定密钥存储的位置和密码。

使用 PKCS thePublic-Key Cryptography Standards 文件格式。您还可以使用 base64 编码的 PEM (Privacy Enhanced Mail)格式。

TLS 身份验证的客户端配置属性示例

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SSL
ssl.truststore.location = /path/to/ca.p12
ssl.truststore.password = truststore-password
ssl.keystore.location = /path/to/user.p12
ssl.keystore.password = keystore-password
client.id = my-client

在以下示例中,security.protocol 设置为 SASL_SSL,以便在客户端和服务器之间使用 TLS 加密启用 SASL 身份验证。如果您只需要身份验证而不是加密,您可以使用 SASL 协议。进行身份验证的 SASL 机制是 SCRAM-SHA-512。可以使用不同的身份验证机制。sasl.jaas.config 属性指定身份验证凭据。

SCRAM-SHA-512 身份验证的客户端配置示例

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = SCRAM-SHA-512
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \
  username = "user" \
  password = "secret";
ssl.truststore.location = path/to/truststore.p12
ssl.truststore.password = truststore_password
ssl.truststore.type = PKCS12
client.id = my-client

注意

对于不支持 PEM 格式的应用程序,您可以使用 OpenSSL 等工具将 PEM 文件转换为 PKCS the 格式。

5.2.2. 配置允许的 TLS 版本和密码套件

您可以组合 SSL 配置和密码套件,以进一步保护客户端应用程序和 Kafka 集群之间的基于 TLS 的通信。在 Kafka 代理配置中指定支持的 TLS 版本和密码套件。如果要限制它们使用的 TLS 版本和密码套件,您还可以将配置添加到客户端。客户端上的配置应该只使用代理上启用的协议和密码套件。

在以下示例中,SSL 使用 security.protocol 进行 Kafka 代理和客户端应用程序之间的通信。您可以将密码套件指定为用逗号分开的列表。ssl.cipher.suites 属性是客户端允许使用的密码套件的逗号分隔列表。

Kafka 代理的 SSL 配置属性示例

security.protocol: "SSL"
ssl.enabled.protocols: "TLSv1.3", "TLSv1.2"
ssl.protocol: "TLSv1.3"
ssl.cipher.suites: "TLS_AES_256_GCM_SHA384"

ssl.enabled.protocols 属性指定可用于集群及其客户端之间安全通信的可用 TLS 版本。在本例中,TLSv1.3TLSv1.2 都已启用。ssl.protocol 属性为所有连接设置默认 TLS 版本,且必须从启用的协议中选择它。默认情况下,客户端使用 TLSv1.3 进行通信。如果客户端只支持 TLSv1.2,它仍然可以连接到代理并使用该支持的版本进行通信。同样,如果配置位于客户端,且代理只支持 TLSv1.2,客户端将使用支持的版本。

Apache Kafka 支持的密码套件取决于您使用的 Kafka 版本和底层环境。检查提供最高安全性级别的最新支持的密码套件。

5.2.3. 使用访问控制列表(ACL)

您不必为客户端应用程序中的 ACLS 明确配置任何内容。ACL 由 Kafka 代理在服务器端强制执行。当客户端向服务器发送请求来生成或消耗数据时,服务器会检查 ACL,以确定客户端(用户)是否被授权执行所请求的操作。如果客户端被授权,则会处理请求;否则请求将被拒绝,并返回错误。但是,客户端仍必须经过身份验证,并使用适当的安全协议启用与 Kafka 集群的安全连接。

如果您在 Kafka 代理上使用访问控制列表(ACL),请确保正确设置了 ACL,以限制对您要控制的主题和操作的访问。如果您使用 Open Policy Agent (OPA)策略来管理访问,在策略中配置授权规则,因此您不需要为 Kafka 代理指定 ACL。OAuth 2.0 提供了一些灵活性:您可以使用 OAuth 2.0 供应商来管理 ACL;或使用 OAuth 2.0 和 Kafka 的简单 授权来管理 ACL。

注意

ACL 适用于大多数类型的请求,不仅限于生成和消耗操作。例如,可将 ACLS 应用到读取操作,如描述主题或写入操作,如创建新主题。

5.2.4. 使用 OAuth 2.0 进行基于令牌的访问

使用 OAuth 2.0 开放标准进行 AMQ Streams 授权,通过 OAuth 2.0 供应商强制控制授权。OAuth 2.0 为应用提供了一种安全的方法来访问存储在其他系统中的用户数据。授权服务器可能会向客户端应用程序发出访问令牌,以授予 Kafka 集群的访问权限。

以下步骤描述了设置和使用 OAuth 2.0 进行令牌验证的一般方法:

  1. 使用代理和客户端凭证(如客户端 ID 和 secret)配置授权服务器。
  2. 从授权服务器获取 OAuth 2.0 凭据。
  3. 在 Kafka 代理中使用 OAuth 2.0 凭证配置监听程序,并与授权服务器交互。
  4. 将 Oauth 2.0 依赖项添加到客户端库。
  5. 使用 OAuth 2.0 凭证配置 Kafka 客户端,并与授权服务器交互。
  6. 在运行时获取访问令牌,该令牌使用 OAuth 2.0 供应商验证客户端。

如果您在 Kafka 代理上配置了 OAuth 2.0 的监听程序,您可以将客户端应用程序设置为使用 OAuth 2.0。除了用于访问 Kafka 集群的标准 Kafka 客户端配置外,还必须为 OAuth 2.0 身份验证包含特定的配置。您还必须确保 Kafka 集群和客户端应用程序可以访问您要使用的授权服务器。

指定 SASL (简单身份验证和安全层)安全协议和机制。在生产环境中,推荐进行以下设置:

  • TLS 加密连接的 SASL_SSL 协议。
  • 用于使用 bearer 令牌的凭据交换的 OAUTHBEARER 机制

JAAS (Java 身份验证和授权服务)模块实施 SASL 机制。机制的配置取决于您使用的身份验证方法。例如,使用凭证交换您添加 OAuth 2.0 访问令牌端点、访问令牌、客户端 ID 和客户端 secret。客户端连接到授权服务器的令牌端点(URL),以检查令牌是否仍然有效。您还需要一个 truststore,其中包含授权服务器的公钥证书,以便进行身份验证。

OAauth 2.0 的客户端配置属性示例

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = OAUTHBEARER
# ...
sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    oauth.token.endpoint.uri = "https://localhost:9443/oauth2/token" \
    oauth.access.token = <access_token> \
    oauth.client.id = "<client_id>" \
    oauth.client.secret = "<client_secret>" \
    oauth.ssl.truststore.location = "/<truststore_location>/oauth-truststore.p12" \
    oauth.ssl.truststore.password = "<truststore_password>" \
    oauth.ssl.truststore.type = "PKCS12" \

有关将代理设置为使用 OAuth 2.0 的更多信息,请参阅以下指南:

5.2.5. 使用 Open Policy Agent (OPA)访问策略

使用带有 AMQ Streams 的 Open Policy Agent (OPA)策略控制器来评估您的 Kafka 集群的请求,具体取决于访问策略。开放策略代理(OPA)是一种策略引擎,用于管理授权策略。策略集中访问控制,可以动态更新,无需更改客户端应用程序。例如,您可以创建一个策略,仅允许某些用户(客户端)为特定主题生成和使用消息。

AMQ Streams 为 Kafka 授权使用 Open Policy Agent 插件作为授权器。

以下步骤描述了设置和使用 OPA 的一般方法:

  1. 设置 OPA 服务器的实例。
  2. 定义提供管理对 Kafka 集群访问权限的授权规则的策略。
  3. 为 Kafka 代理创建配置,以接受 OPA 授权并与 OPA 服务器交互。
  4. 配置 Kafka 客户端,以提供授权访问 Kafka 集群的凭证。

如果您在 Kafka 代理上配置了 OPA 的监听程序,您可以将客户端应用程序设置为使用 OPA。在监听器配置中,您可以指定一个 URL 来连接到 OPA 服务器并授权您的客户端应用程序。除了访问 Kafka 集群的标准 Kafka 客户端配置外,还必须添加凭证以使用 Kafka 代理进行身份验证。代理通过向 OPA 服务器发送请求来评估授权策略,以检查客户端是否有必要的授权来执行请求操作。您不需要信任存储或密钥存储来保护通信,因为策略引擎强制执行授权策略。

OPA 授权的客户端配置属性示例

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = SCRAM-SHA-512
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \
  username = "user" \
  password = "secret";
# ...

注意

红帽不支持 OPA 服务器。

有关将代理设置为使用 OPA 的更多信息,请参阅以下指南:

5.2.6. 在流消息时使用事务

通过在代理和制作者客户端应用程序中配置事务属性,您可以确保在单个事务中处理消息。事务向消息流增加可靠性和一致性。

代理始终启用事务。您可以使用以下属性更改默认配置:

事务的 Kafka 代理配置属性示例

transaction.state.log.replication.factor = 3
transaction.state.log.min.isr = 2
transaction.abort.timed.out.transaction.cleanup.interval.ms = 3600000

这是生产环境的典型配置,它为内部 __transaction_state 主题创建 3 个副本。\__transaction_state 主题存储有关事务在进行中的信息。事务日志至少需要 2 个同步副本。清理间隔是检查超时事务和清理对应的事务日志之间的时间。

要为客户端配置添加事务属性,您需要为生产者和消费者设置以下属性:

事务的制作者客户端配置属性示例

transactional.id = unique-transactional-id
enable.idempotence = true
max.in.flight.requests.per.connection = 5
acks = all
transaction.timeout.ms = 30000
delivery.timeout = 25000

事务 ID 允许 Kafka 代理跟踪事务。它是制作者的唯一标识符,应当与特定的分区一起使用。如果您需要为多组分区执行事务,则需要为每个集合使用不同的事务 ID。启用 idempotence 以避免生成者实例创建重复消息。使用 idempotence 时,消息使用制作者 ID 和序列号进行跟踪。代理收到消息时,它会检查制作者 ID 和序列号。如果收到具有相同制作者 ID 和序列号的消息,代理会丢弃重复的消息。

最大动态请求数设置为 5,以便以发送的顺序处理事务。分区最多可有 5 个动态请求,而不影响消息排序。

通过将 acks 设为 all,生产者会等待来自要写入的主题分区的所有同步副本的确认,然后再将事务视为完成。这样可确保信息被大量写入(提交)到 Kafka 集群,即使代理失败,也不会丢失它们。

事务超时指定客户端在超时前必须完成事务的最长时间。交付超时指定制作者在超时前等待代理确认消息发送的最长时间。要确保在事务周期内发送消息,请将交付超时设置为小于事务超时。在指定重试次数时,请考虑网络延迟和消息吞吐量,并允许临时故障。

事务的消费者客户端配置属性示例

group.id = my-group-id
isolation.level = read_committed
enable.auto.commit = false

read_committed 隔离级别指定消费者只读取成功完成的事务的消息。消费者不会处理属于持续或失败的事务的任何消息。这样可确保消费者只读取属于完整事务的消息。

当使用事务流消息时,务必要将 enable.auto.commit 设置为 false。如果设置为 true,则消费者会定期提交偏移量,而无需考虑事务。这意味着消费者可以在事务完成前提交消息。通过将 enable.auto.commit 设置为 false,使用者仅读取并提交已完全写入并提交至该主题的消息,作为事务的一部分。

第 6 章 开发 Kafka 客户端

使用您首选的编程语言创建 Kafka 客户端,并将其连接到 AMQ Streams。

要与 Kafka 集群交互,客户端应用程序需要能够生成和使用信息。要至少开发和测试基本 Kafka 客户端应用程序,您需要执行以下操作:

  • 设置配置以连接到 Kafka 集群
  • 使用制作者和消费者来发送和接收消息

设置连接到 Kafka 集群的基本配置,并使用制作者和消费者是开发 Kafka 客户端的第一个步骤。之后,您可以扩展到提高客户端应用程序的输入、安全、性能、错误处理和功能。

先决条件

您已创建了包含以下属性值的客户端属性文件:

流程

  1. 为您的编程语言选择一个 Kafka 客户端库,如 Java、Python、.NET 等。
  2. 通过软件包管理器或从其源下载库来手动安装库。
  3. 在代码中导入 Kafka 客户端所需的类和依赖项。
  4. 根据您要创建的客户端类型,创建 Kafka 使用者或制作者对象。

    您可以同时有一个客户端。

  5. 提供配置属性以连接到 Kafka 集群,包括代理地址、端口和凭证(如果需要)。
  6. 使用 Kafka consumer 或 producer 对象订阅主题、生成信息或从 Kafka 集群检索信息。
  7. 处理连接或与 AMQ Streams 通信过程中可能出现的任何错误。

以下是使用 Apache Kafka 客户端库创建 Kafka 使用者的 Java 示例,并将其连接到 Kafka 集群以开始从指定主题读取。

消费者客户端示例

import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class AMQStreamsConsumerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        try (InputStream input = new FileInputStream("consumer.properties")) { 
1

            properties.load(input);
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("my-topic")); 
2


        while (true) { 
3

            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
            }
        }
    }
}

1
消费者客户端配置是从 consumer.properties 文件中读取和加载的。
2
subscribe () 方法订阅名为 my-topic 的主题。您可以在列表中添加其他主题。
3
消费者轮询 Kafka 的新信息,并检索消息批处理进行处理。
注意

如果消费者具有 group.id,则必须订阅一个或多个主题。代理需要知道消费者感兴趣的主题,以便它能够为消费者分配分区。

以下是使用 Apache Kafka 客户端库创建 Kafka 生成者的 Java 示例,并将其连接到 Kafka 集群,以启动生成信息到指定的主题。

生成者客户端示例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class AMQStreamsProducerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        try (InputStream input = new FileInputStream("producer.properties")) { 
1

            properties.load(input);
        } catch (IOException e) {
            e.printStackTrace();
        }

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties); 
2


        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "message-key", "message-value"); 
3

        producer.send(record);

        producer.close();
    }
}

1
producer 客户端配置是从 producer.properties 文件中读取和加载的。
2
客户端使用属性创建制作者。
3
消息发送到名为 my-topic 的主题。

附录 A. 使用您的订阅

AMQ Streams 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。

访问您的帐户

  1. 转至 access.redhat.com
  2. 如果您还没有帐户,请创建一个帐户。
  3. 登录到您的帐户。

激活订阅

  1. 转至 access.redhat.com
  2. 导航到 My Subscriptions
  3. 导航到 激活订阅 并输入您的 16 位激活号。

下载 Zip 和 Tar 文件

要访问 zip 或 tar 文件,请使用客户门户网站查找下载的相关文件。如果您使用 RPM 软件包,则不需要这一步。

  1. 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads
  2. INTEGRATION AND AUTOMATION 目录中找到 AMQ Streams for Apache Kafka 项。
  3. 选择所需的 AMQ Streams 产品。此时会打开 Software Downloads 页面。
  4. 单击组件的 Download 链接。

使用 DNF 安装软件包

要安装软件包以及所有软件包的依赖软件包,请使用:

dnf install <package_name>

要从本地目录中安装之前下载的软件包,请使用:

dnf install <path_to_download_package>

更新于 2023-05-19

法律通告

Copyright © 2023 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部