11.5. MicroProfile Reactive Messaging 参考


以下是 MicroProfile 配置规范所需的被动消息传递属性密钥前缀的列表:

  • mp.messaging.incoming.[channel-name].[attribute]=[value]
  • mp.messaging.outgoing.[channel-name].[attribute]=[value]
  • mp.messaging.connector.[connector-name].[attribute]=[value]

请注意,channel-name@Incoming.value ()@Outgoing.value ()。要清楚起见,请查看这个一组连接器方法示例:

@Outgoing("to")
public int send() {
   int i = // Randomly generated...
   return i;
}

@Incoming("from")
public void receive(int i) {
   // Process payload
}

在本例中,所需的属性前缀如下:

  • mp.messaging.incoming.from.这定义了 receive () 方法。
  • mp.messaging.outgoing.to.这将定义 send () 方法。

请记住,这是一个示例。因为不同的连接器可识别不同的属性,所以您指示的前缀取决于您要配置的连接器。

以下是重新主动消息传递流和通过 @Channel 和 Emitter 结构触发的代码之间的数据交换示例:

@Path("/")
@ApplicationScoped
class MyBean {
    @Inject @Channel("my-stream")
    Emitter<String> emitter; 
1


    Publisher<String> dest;

    public MyBean() { 
2

    }

    @Inject
    public MyBean(@Channel("my-stream") Publisher<String> dest) {
        this.dest = subscribeAndAllowMultipleSubscriptions(dest);
    }

    private Publisher subscribeAndAllowMultipleSubscriptions(Publisher delegate) {
    } 
3
 
4
 
5


    @POST
    public PublisherBuilder<String> publish(@FormParam("value") String value) {
        return emitter.send(value);
    }

    @GET
    public Publisher poll() {
        return dest;
    }

    @PreDestroy
    public void close() { 
6


    }
}

行中详情:

1
嵌套构造器的发布程序。
2
您需要此空构造器来满足 Java 规范的上下文和依赖注入(CDI)。
3
订阅委派。
4
在可以处理多个订阅的发布程序中嵌套委托。
5
嵌套发布程序从委派中转发数据。
6
取消订阅被动消息传递提供的发布程序。

在本例中,MicroProfile Reactive Messaging 正在侦听 my-stream 内存流,因此通过 Emitter 发送的消息会收到注入的发布者。但请注意,需要满足以下条件才能使这个数据交换成功:

  1. 在调用 Emitter.send () 之前,该频道必须具有有效的订阅。在本例中,请注意构造器调用的 subscribeAndAllowMultipleSubscriptions () 方法确保 bean 可用于用户代码调用时具有活跃的订阅。
  2. 在注入的 publisher 中只能有一个 订阅 。如果要使用 REST 调用公开接收发布程序,其中每个调用 poll () 方法会导致对 dest publisher 的新订阅,您必须实施自己的发布程序,以便从注入的每个客户端广播数据。

11.5.3. Apache Kafka 用户 API

您可以使用 Apache Kafka 用户 API 来获取有关收到消息 Kafka 的更多信息,并影响 Kafka 处理消息的方式。此 API 存储在 io/smallrye/reactive/messaging/kafka/api 软件包中,它由以下类组成:

  • IncomingKafkaRecordMetadata.这个元数据包含以下信息:

    • Kafka 记录 密钥,由 消息 表示。
    • 用于 消息的 Kafka 主题和 分区 以及这些 消息中 的偏移量。
    • Message timestamptimestampType
    • Message 标头.这些是应用程序可在生成端附加的信息,并在消费端接收信息。
  • OutgoingKafkaRecordMetadata.使用这个元数据,您可以指定或覆盖 Kafka 处理信息的方式。它包含以下信息:

    • Kafka 视为消息键的密钥。
    • 您希望 Kafka 使用 的主题
    • 分区
    • 时间戳,如果您不希望 Kafka 生成 的时间戳
    • 标头.
  • KafkaMetadataUtil 包含将 OutgoingKafkaRecordMetadata 写入 Message 的工具方法,并从 Message 读取 IncomingKafkaRecordMetadata
重要

如果您将 OutgoingKafkaRecordMetadata 写入发送到未映射到 Kafka 的频道的消息,则被动消息传递框架会忽略它。相反,如果您从未映射到 Kafka 的频道 读取 IncomingKafkaRecordMetadata,则该消息会返回 null

如何写入和读取消息 的示例
@Inject
@Channel("from-user")
Emitter<Integer> emitter;

@Incoming("from-user")
@Outgoing("to-kafka")
public Message<Integer> send(Message<Integer> msg) {
    // Set the key in the metadata
    OutgoingKafkaRecordMetadata<String> md =
            OutgoingKafkaRecordMetadata.<String>builder()
                .withKey("KEY-" + i)
                .build();
    // Note that Message is immutable so the copy returned by this method
    // call is not the same as the parameter to the method
    return KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, md);
}

@Incoming("from-kafka")
public CompletionStage<Void> receive(Message<Integer> msg) {
    IncomingKafkaRecordMetadata<String, Integer> metadata =
        KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get();

    // We can now read the Kafka record key
    String key = metadata.getKey();

    // When using the Message wrapper around the payload we need to explicitly ack
    // them
    return msg.ack();
}
microprofile-config.properties 文件中的 Kafka 映射示例
kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=some-topic
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer

mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=some-topic
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
注意

您必须为传出频道指定 key.serializer,并为传入频道指定 key.deserializer

这是 Kafka 连接器的简单 microprofile-config.properties 文件示例。其属性与示例中的属性对应 "MicroProfile 重新主动消息传递连接器,以与外部消息传递系统集成"。

kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to.connector=smallrye-kafka
mp.messaging.outgoing.to.topic=my-topic
mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

mp.messaging.incoming.from.connector=smallrye-kafka
mp.messaging.incoming.from.topic=my-topic
mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
Expand
表 11.6. 条目的讨论
entry描述

这些是"频道"。

发送,receive

这些是"方法"。

请注意, 频道位于 send () 方法,而 from 频道则位于 receive () 方法。

kafka.bootstrap.servers=kafka:9092

这将指定应用程序必须连接到的 Kafka 代理的 URL。您还可以在频道级别指定一个 URL,如下所示: mp.messaging.outgoing.to.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to.connector=smallrye-kafka

这表示您希望要频道从 Kafka 接收信息。

小型被动消息传递是用于构建应用程序的框架。请注意,smallrye-kafka 值是 SmallRye 特定于重新主动的消息传递。如果您使用 Galleon 置备自己的服务器,您可以通过包含 microprofile-reactive-messaging-kafka Galleon 层来启用 Kafka 集成。

mp.messaging.outgoing.to.topic=my-topic

这表示您要将数据发送到名为 my-topic 的 Kafka 主题。

Kafka "topic" 是一个类别或源名称,信息存储在并发布到其中。所有 Kafka 信息都组织为主题。生产者应用将数据写入主题和消费者应用, 主题中读取数据。

mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

这告知连接器使用 IntegerSerializer 来序列化 send () 方法输出的值。Kafka 为标准 Java 类型提供序列化器。您可以通过编写实现 org.apache.kafka.common.serialization.Serializer 的类来实现自己的序列化器,然后在部署中包含该类。

mp.messaging.incoming.from.connector=smallrye-kafka

这表示您要使用 from 频道来接收来自 Kafka 的信息。同样,smallrye-kafka 的值是 SmallRye reactive messaging-specific。

mp.messaging.incoming.from.topic=my-topic

这表示您的连接器应该从名为 my-topic 的 Kafka 主题中读取数据。

mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

这告知连接器在调用 receive () 方法前,使用 IntegerDeserializer 来反序列化主题中的值。您可以通过编写实现 org.apache.kafka.common.serialization.Deserializer 的类来实现自己的反序列化器,然后在部署中包含该类。

注意

这个属性列表并不全面。如需更多信息,请参阅 SmallRye Reactive Messaging Apache Kafka 文档。

强制 MicroProfile 主动消息传递前缀

MicroProfile Reactive 消息传递规范需要 Kafka 的以下方法属性密钥前缀:

  • mp.messaging.incoming.[channel-name].[attribute]=[value]`
  • mp.messaging.outgoing.[channel-name].[attribute]=[value]`
  • mp.messaging.connector.[connector-name].[attribute]=[value]`

请注意,channel-name@Incoming.value ()@Outgoing.value ()

现在考虑以下方法对示例:

@Outgoing("to")
public int send() {
    int i = // Randomly generated...
    return i;
}

@Incoming("from")
public void receive(int i) {
    // Process payload
}

在这个方法对示例中,请注意以下所需属性前缀:

  • mp.messaging.incoming.from.此前缀选择属性作为 receive () 方法的配置。
  • mp.messaging.outgoing.to.此前缀选择属性作为 send () 方法的配置。
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部