14.5. MicroProfile Reactive Messaging 参考


以下是 MicroProfile 配置规范所需的被动消息传递属性密钥前缀的列表:

  • mp.messaging.incoming.[channel-name].[attribute]=[value]
  • mp.messaging.outgoing.[channel-name].[attribute]=[value]
  • mp.messaging.connector.[connector-name].[attribute]=[value]

请注意,channel-name@Incoming.value ()@Outgoing.value ()。要清楚起见,请查看这个一组连接器方法示例:

@Outgoing("to")
public int send() {
   int i = // Randomly generated...
   return i;
}

@Incoming("from")
public void receive(int i) {
   // Process payload
}

在本例中,所需的属性前缀如下:

  • mp.messaging.incoming.from.这定义了 receive () 方法。
  • mp.messaging.outgoing.to.这将定义 send () 方法。

请记住,这是一个示例。因为不同的连接器可识别不同的属性,所以您指示的前缀取决于您要配置的连接器。

以下是重新主动消息传递流和通过 @Channel 和 Emitter 结构触发的代码之间的数据交换示例:

@Path("/")
@ApplicationScoped
class MyBean {
    @Inject @Channel("my-stream")
    Emitter<String> emitter; 
1


    Publisher<String> dest;

    public MyBean() { 
2

    }

    @Inject
    public MyBean(@Channel("my-stream") Publisher<String> dest) {
        this.dest = subscribeAndAllowMultipleSubscriptions(dest);
    }

    private Publisher subscribeAndAllowMultipleSubscriptions(Publisher delegate) {
    } 
3
 
4
 
5


    @POST
    public PublisherBuilder<String> publish(@FormParam("value") String value) {
        return emitter.send(value);
    }

    @GET
    public Publisher poll() {
        return dest;
    }

    @PreDestroy
    public void close() { 
6


    }
}

行中详情:

1
嵌套构造器的发布程序。
2
您需要此空构造器来满足 Java 规范的上下文和依赖注入(CDI)。
3
订阅委派。
4
在可以处理多个订阅的发布程序中嵌套委托。
5
嵌套发布程序从委派中转发数据。
6
取消订阅被动消息传递提供的发布程序。

在本例中,MicroProfile Reactive Messaging 正在侦听 my-stream 内存流,因此通过 Emitter 发送的消息会收到注入的发布者。但请注意,需要满足以下条件才能使这个数据交换成功:

  1. 在调用 Emitter.send () 之前,该频道必须具有有效的订阅。在本例中,请注意构造器调用的 subscribeAndAllowMultipleSubscriptions () 方法确保 bean 可用于用户代码调用时具有活跃的订阅。
  2. 在注入的 publisher 中只能有一个 订阅 。如果要使用 REST 调用公开接收发布程序,其中每个调用 poll () 方法会导致对 dest publisher 的新订阅,您必须实施自己的发布程序,以便从注入的每个客户端广播数据。

14.5.3. Apache Kafka 用户 API

您可以使用 Apache Kafka 用户 API 来获取有关收到消息 Kafka 的更多信息,并影响 Kafka 处理消息的方式。此 API 存储在 io/smallrye/reactive/messaging/kafka/api 软件包中,它由以下类组成:

  • IncomingKafkaRecordMetadata.这个元数据包含以下信息:

    • Kafka 记录 密钥,由 消息 表示。
    • 用于 消息的 Kafka 主题和 分区 以及这些 消息中 的偏移量。
    • Message timestamptimestampType
    • Message 标头.这些是应用程序可在生成端附加的信息,并在消费端接收信息。
  • OutgoingKafkaRecordMetadata.使用这个元数据,您可以指定或覆盖 Kafka 处理信息的方式。它包含以下信息:

    • Kafka 视为消息键的密钥。
    • 您希望 Kafka 使用 的主题
    • 分区
    • 时间戳,如果您不希望 Kafka 生成 的时间戳
    • 标头.
  • KafkaMetadataUtil 包含将 OutgoingKafkaRecordMetadata 写入 Message 的工具方法,并从 Message 读取 IncomingKafkaRecordMetadata
重要

如果您将 OutgoingKafkaRecordMetadata 写入发送到未映射到 Kafka 的频道的消息,则被动消息传递框架会忽略它。相反,如果您从未映射到 Kafka 的频道 读取 IncomingKafkaRecordMetadata,则该消息会返回 null

如何写入和读取消息 的示例
@Inject
@Channel("from-user")
Emitter<Integer> emitter;

@Incoming("from-user")
@Outgoing("to-kafka")
public Message<Integer> send(Message<Integer> msg) {
    // Set the key in the metadata
    OutgoingKafkaRecordMetadata<String> md =
            OutgoingKafkaRecordMetadata.<String>builder()
                .withKey("KEY-" + i)
                .build();
    // Note that Message is immutable so the copy returned by this method
    // call is not the same as the parameter to the method
    return KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, md);
}

@Incoming("from-kafka")
public CompletionStage<Void> receive(Message<Integer> msg) {
    IncomingKafkaRecordMetadata<String, Integer> metadata =
        KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get();

    // We can now read the Kafka record key
    String key = metadata.getKey();

    // When using the Message wrapper around the payload we need to explicitly ack
    // them
    return msg.ack();
}
microprofile-config.properties 文件中的 Kafka 映射示例
kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=some-topic
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer

mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=some-topic
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
注意

您必须为传出频道指定 key.serializer,并为传入频道指定 key.deserializer

这是 Kafka 连接器的简单 microprofile-config.properties 文件示例。其属性与示例中的属性对应 "MicroProfile 重新主动消息传递连接器,以与外部消息传递系统集成"。

kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to.connector=smallrye-kafka
mp.messaging.outgoing.to.topic=my-topic
mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

mp.messaging.incoming.from.connector=smallrye-kafka
mp.messaging.incoming.from.topic=my-topic
mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
Expand
表 14.6. 条目的讨论
entry描述

这些是"频道"。

发送,receive

这些是"方法"。

请注意, 频道位于 send () 方法,而 from 频道则位于 receive () 方法。

kafka.bootstrap.servers=kafka:9092

这将指定应用程序必须连接到的 Kafka 代理的 URL。您还可以在频道级别指定一个 URL,如下所示: mp.messaging.outgoing.to.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to.connector=smallrye-kafka

这表示您希望要频道从 Kafka 接收信息。

小型被动消息传递是用于构建应用程序的框架。请注意,smallrye-kafka 值是 SmallRye 特定于重新主动的消息传递。如果您使用 Galleon 置备自己的服务器,您可以通过包含 microprofile-reactive-messaging-kafka Galleon 层来启用 Kafka 集成。

mp.messaging.outgoing.to.topic=my-topic

这表示您要将数据发送到名为 my-topic 的 Kafka 主题。

Kafka "topic" 是一个类别或源名称,信息存储在并发布到其中。所有 Kafka 信息都组织为主题。生产者应用将数据写入主题和消费者应用, 主题中读取数据。

mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

这告知连接器使用 IntegerSerializer 来序列化 send () 方法输出的值。Kafka 为标准 Java 类型提供序列化器。您可以通过编写实现 org.apache.kafka.common.serialization.Serializer 的类来实现自己的序列化器,然后在部署中包含该类。

mp.messaging.incoming.from.connector=smallrye-kafka

这表示您要使用 from 频道来接收来自 Kafka 的信息。同样,smallrye-kafka 的值是 SmallRye reactive messaging-specific。

mp.messaging.incoming.from.topic=my-topic

这表示您的连接器应该从名为 my-topic 的 Kafka 主题中读取数据。

mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

这告知连接器在调用 receive () 方法前,使用 IntegerDeserializer 来反序列化主题中的值。您可以通过编写实现 org.apache.kafka.common.serialization.Deserializer 的类来实现自己的反序列化器,然后在部署中包含该类。

注意

这个属性列表并不全面。如需更多信息,请参阅 SmallRye Reactive Messaging Apache Kafka 文档。

强制 MicroProfile 主动消息传递前缀

MicroProfile Reactive 消息传递规范需要 Kafka 的以下方法属性密钥前缀:

  • mp.messaging.incoming.[channel-name].[attribute]=[value]`
  • mp.messaging.outgoing.[channel-name].[attribute]=[value]`
  • mp.messaging.connector.[connector-name].[attribute]=[value]`

请注意,channel-name@Incoming.value ()@Outgoing.value ()

现在考虑以下方法对示例:

@Outgoing("to")
public int send() {
    int i = // Randomly generated...
    return i;
}

@Incoming("from")
public void receive(int i) {
    // Process payload
}

在这个方法对示例中,请注意以下所需属性前缀:

  • mp.messaging.incoming.from.此前缀选择属性作为 receive () 方法的配置。
  • mp.messaging.outgoing.to.此前缀选择属性作为 send () 方法的配置。

要使用自签名证书配置 Apache Kafka 连接器客户端,请在 microprofile-config.properties 文件中定义 client-ssl-context。您可以在连接器和频道级别进行此操作。

以下示例演示了如何配置使用 SSL/TLS 保护的 Apache Kafka 连接器。

连接器级别 client-ssl-context 定义示例

mp.messaging.incoming.from.security.protocol=SSL
mp.messaging.outgoing.to.security.protocol=SSL
mp.messaging.connector.smallrye-kafka.wildfly.elytron.ssl.context=exampleSSLContext

只有在使用自签名证书时,才需要属性 mp.messaging.connector.smallrye-kafka.wildfly.elytron.ssl.context

重要

不要在生产环境中使用自签名证书。只使用证书颁发机构(CA)签名的证书。

您可以为频道指定 client-ssl-context,如下所示:

频道级 client-ssl-context 定义示例

mp.messaging.incoming.from.wildfly.elytron.ssl.context=exampleSSLContext

在示例中,exampleSSLContext 仅与来自 的传入频道关联。

Expand
表 14.7. 条目的讨论
entry描述

mp.messaging.incoming.from.security.protocol=SSL

这指定了在连接到代理时要使用安全的传入频道连接。

mp.messaging.outgoing.to.security.protocol=SSL

这指定在连接到代理时要使用安全的传出频道连接。

mp.messaging.connector.smallrye-kafka.wildfly.elytron.ssl.context

如果 Kafka 代理使用证书颁发机构(CA)签名证书保护,则不需要指定此属性。

如果您使用自签名证书,请在管理模型的 /subsystem=elytron/client-ssl-context 444 下指定 Elytron 子系统中定义的 SSLContext

重要

不要在生产环境中使用自签名证书。只使用证书颁发机构(CA)签名的证书。

您可以使用以下管理 CLI 命令定义 client-ssl-context

Example

/subsystem=elytron/client-ssl-context=exampleSSLContext:add(key-manager=exampleServerKeyManager,trust-manager=exampleTLSTrustManager)

注意

当 SSL/TLS 连接与 SCRAM-SHA-512 身份验证一起使用时,SSL/TLS 协议提供加密,但不用于身份验证。要使用 SCRAM-SHA-512 协议验证连接,请参阅 配置 MicroProfile Reactive Messagaging Kafka 连接器以使用 SASL_PLAINTEXTSASL_SSL 身份验证协议

Apache Kafka 使用 简单身份验证和安全层 (SASL)协议来验证连接到 Apache Kafka 侦听器 的客户端。您可以将 监听程序 配置为使用未加密的 普通 类型和加密的 tls 类型通信。Apache Kafka 的流将 SASL 与 Salted Challenged Response Authentication Mechanism (SCRAM)协议(SASL SCRAM-SHA-512)相结合来提供身份验证。您必须定义一个 Kafka 自定义资源和 KafkaUser 自定义资源 YAML 文件,以便为两种类型的监听程序配置身份验证。

如果在服务器上配置了 SASL,即 Kafka 侦听程序 配置为使用 SCRAM-SHA-512 身份验证,客户端必须指定安全协议。如果连接到使用 TLS 加密 的监听程序,此协议应为 SASL_SSL。如果没有加密 侦听器,则协议应为 SASL_PLAINTEXT。客户端配置必须通过将其设置为使用 SCRAM-SHA-512 来指定 SASL 机制。

注意

当 SCRAM-SHA-512 身份验证与 SSL/TLS 连接一起使用时,SSL/TLS 协议提供加密,但不用于身份验证。要保护与 SSL/TLS 的连接,请参阅配置安全 MicroProfile 被动消息 Apache Kafka 连接器

使用普通 类型未加密的通信和 SASL SCRAM-SHA-512 身份验证 的监听程序 设置客户端身份验证。

先决条件

  • 已安装 Apache Kafka 集群 Operator 的流。

    注意

    集群操作器默认生成 Secret 资源。客户端配置中 的用户名和密码 必须与 Secret 资源定义的用户名和密码匹配

    有关创建自定义 Secret 资源的更多信息,请参阅自定义密码配置

    如需有关操作器生成的 Secret 资源的更多信息,请参阅 Operator 生成的 Secret

流程

  1. Kafka 自定义资源定义为 YAML 文件:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
      namespace: myproject
    spec:
      kafka:
        # ...
        listeners:
          - name: plain
            port: 9092
            type: internal
            tls: true
            authentication:
              type: scram-sha-512
  2. 将您的 KafkaUser 自定义资源定义为 YAML 文件:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      authentication:
        type: scram-sha-512

    KafkaUser .authentication 定义应与 监听器 .authentication 定义匹配。

  3. microprofile-config.properties 文件中配置您的客户端:

    # General config to set up SASL over PLAINTEXT
    mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092
    mp.messaging.connector.smallrye-kafka.sasl.mechanism=SCRAM-SHA-512
    mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
    mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="my-user" \
      password="my-password";
    注意

    用户名和密码 必须与集群操作器生成的 Secret 资源中的用户名和密码匹配。

这是高级消息队列协议(AMQP)连接器的简单 microprofile-config.properties 文件示例。其属性与 MicroProfile 被动消息传递连接器中的示例中的属性对应,用于与外部消息传递系统集成

amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis

mp.messaging.outgoing.to.connector=smallrye-amqp
mp.messaging.outgoing.to.address=my-topic

mp.messaging.incoming.from.connector=smallrye-amqp
mp.messaging.incoming.from.address=my-topic
Expand
表 14.8. 条目的讨论
entry描述

这些是"频道"。

发送,receive

这些是"方法"。

请注意, 频道位于 send () 方法,而 from 频道则位于 receive () 方法。

amqp-host=localhost

这将指定应用程序必须连接到的 AMQP 代理的 URL。您还可以在频道级别指定一个 URL,如下所示: mp.messaging.outgoing.to.host=localhost。如果没有指定 URL,值默认为 localhost

amqp-port=5672

这将指定 AMQP 代理的端口。

mp.messaging.outgoing.to.connector=smallrye-amqp

这表示您希望频道将消息发送到 AMQP。

小型被动消息传递是用于构建应用程序的框架。请注意,smallrye-amqp 值是特定于 SmallRye reactive 消息传递。如果您使用 Galleon 调配自己的服务器,您可以通过包含 microprofile-reactive-messaging-amqp Galleon 层来启用 AMQP 集成。

mp.messaging.outgoing.to.address=my-topic

这表示您要将数据发送到地址 my-topic 上的 AMQP 队列。如果您没有为 mp.messaging.outgoing.to.address 指定值,则该值将默认为频道,本例中为 "to"。

mp.messaging.incoming.from.connector=smallrye-amqp

这表示您要使用 from 频道来接收来自 AMQP 代理的消息。同样,smallrye-amqp 值是特定于 SmallRye reactive 消息传递。

mp.messaging.incoming.from.address=my-topic

这表示 您要从 频道上的 AMQP 队列 my-topic 中读取数据。

有关 SmallRye Reactive Messaging 的 AMQP 连接器支持的属性的完整列表,请参阅 SmallRye Reactive Messaging AMQP Connector Configuration Reference

连接到安全 AMQP 代理

要与通过 SSL/TLS 和简单身份验证和安全层(SASL)保护的 AMQ 代理连接,请在 microprofile-config.properties 文件中定义用于连接的 client-ssl-context。您可以在连接器级别以及频道级别进行此操作。

连接器级别 client-ssl-context 定义示例

amqp-use-ssl=true
mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context=exampleSSLContext

只有在使用自签名证书时,才需要属性 mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context

重要

不要在生产环境中使用自签名证书。仅使用证书颁发机构(CA)签名的证书。

您还可以为频道指定 client-ssl-context,如下所示:

频道级 client-ssl-context 定义示例

mp.messaging.incoming.from.wildfly.elytron.ssl.context=exampleSSLContext

在示例中,exampleSSLContext 仅与来自 的传入频道关联。

Expand
表 14.9. 条目的讨论
entry描述

amqp-use-ssl

这指定了我们在连接到代理时使用安全连接。

mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context

如果 AMQ 代理使用证书颁发机构(CA)签名证书进行保护,则不需要指定此属性。

如果您使用自签名证书,请在管理模型的 /subsystem=elytron/client-ssl-context 444 下指定 Elytron 子系统中定义的 SSLContext

重要

不要在生产环境中使用自签名证书。仅使用证书颁发机构(CA)签名的证书。

您可以使用以下管理 CLI 命令定义 client-ssl-context

/subsystem=elytron/client-ssl-context=exampleSSLContext:add(key-manager=exampleServerKeyManager,trust-manager=exampleTLSTrustManager)

如需更多信息,请参阅为 客户端证书配置信任存储和信任管理器JBoss EAP 中配置 SSL/TLS 指南中的为双向 SSL/TLS 配置 服务器证书

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部