11.5. MicroProfile Reactive Messaging 参考
11.5.1. MicroProfile 被动消息传递连接器,用于与外部消息传递系统集成 复制链接链接已复制到粘贴板!
以下是 MicroProfile 配置规范所需的被动消息传递属性密钥前缀的列表:
-
mp.messaging.incoming.[channel-name].[attribute]=[value] -
mp.messaging.outgoing.[channel-name].[attribute]=[value] -
mp.messaging.connector.[connector-name].[attribute]=[value]
请注意,channel-name 是 @Incoming.value () 或 @Outgoing.value ()。要清楚起见,请查看这个一组连接器方法示例:
@Outgoing("to")
public int send() {
int i = // Randomly generated...
return i;
}
@Incoming("from")
public void receive(int i) {
// Process payload
}
在本例中,所需的属性前缀如下:
-
mp.messaging.incoming.from.这定义了receive ()方法。 -
mp.messaging.outgoing.to.这将定义send ()方法。
请记住,这是一个示例。因为不同的连接器可识别不同的属性,所以您指示的前缀取决于您要配置的连接器。
11.5.2. 重新主动消息传递流和用户重新初始化代码之间的数据交换示例 复制链接链接已复制到粘贴板!
以下是重新主动消息传递流和通过 @Channel 和 Emitter 结构触发的代码之间的数据交换示例:
@Path("/")
@ApplicationScoped
class MyBean {
@Inject @Channel("my-stream")
Emitter<String> emitter;
Publisher<String> dest;
public MyBean() {
}
@Inject
public MyBean(@Channel("my-stream") Publisher<String> dest) {
this.dest = subscribeAndAllowMultipleSubscriptions(dest);
}
private Publisher subscribeAndAllowMultipleSubscriptions(Publisher delegate) {
}
@POST
public PublisherBuilder<String> publish(@FormParam("value") String value) {
return emitter.send(value);
}
@GET
public Publisher poll() {
return dest;
}
@PreDestroy
public void close() {
}
}
行中详情:
在本例中,MicroProfile Reactive Messaging 正在侦听 my-stream 内存流,因此通过 Emitter 发送的消息会收到注入的发布者。但请注意,需要满足以下条件才能使这个数据交换成功:
-
在调用
Emitter.send ()之前,该频道必须具有有效的订阅。在本例中,请注意构造器调用的subscribeAndAllowMultipleSubscriptions ()方法确保 bean 可用于用户代码调用时具有活跃的订阅。 -
在注入的 publisher 中只能有一个
。如果要使用 REST 调用公开接收发布程序,其中每个调用订阅poll ()方法会导致对destpublisher 的新订阅,您必须实施自己的发布程序,以便从注入的每个客户端广播数据。
11.5.3. Apache Kafka 用户 API 复制链接链接已复制到粘贴板!
您可以使用 Apache Kafka 用户 API 来获取有关收到消息 Kafka 的更多信息,并影响 Kafka 处理消息的方式。此 API 存储在 io/smallrye/reactive/messaging/kafka/api 软件包中,它由以下类组成:
IncomingKafkaRecordMetadata.这个元数据包含以下信息:-
Kafka 记录
密钥,由消息表示。 -
用于
消息的 Kafka 主题和分区,以及这些消息中的偏移量。 -
Messagetimestamp和timestampType。 -
Message标头.这些是应用程序可在生成端附加的信息,并在消费端接收信息。
-
Kafka 记录
OutgoingKafkaRecordMetadata.使用这个元数据,您可以指定或覆盖 Kafka 处理信息的方式。它包含以下信息:-
Kafka 视为消息键的密钥。
-
您希望 Kafka 使用
的主题。 -
分区。 -
时间戳,如果您不希望 Kafka 生成
的时间戳。 -
标头.
-
Kafka 视为消息键的密钥。
-
KafkaMetadataUtil包含将OutgoingKafkaRecordMetadata写入Message的工具方法,并从Message读取IncomingKafkaRecordMetadata。
如果您将 OutgoingKafkaRecordMetadata 写入发送到未映射到 Kafka 的频道的消息,则被动消息传递框架会忽略它。相反,如果您从未映射到 Kafka 的频道 读取 IncomingKafkaRecordMetadata,则该消息会返回 null。
如何写入和读取消息 键的示例
@Inject
@Channel("from-user")
Emitter<Integer> emitter;
@Incoming("from-user")
@Outgoing("to-kafka")
public Message<Integer> send(Message<Integer> msg) {
// Set the key in the metadata
OutgoingKafkaRecordMetadata<String> md =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey("KEY-" + i)
.build();
// Note that Message is immutable so the copy returned by this method
// call is not the same as the parameter to the method
return KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, md);
}
@Incoming("from-kafka")
public CompletionStage<Void> receive(Message<Integer> msg) {
IncomingKafkaRecordMetadata<String, Integer> metadata =
KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get();
// We can now read the Kafka record key
String key = metadata.getKey();
// When using the Message wrapper around the payload we need to explicitly ack
// them
return msg.ack();
}
microprofile-config.properties 文件中的 Kafka 映射示例
kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=some-topic
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=some-topic
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
您必须为传出频道指定 key.serializer,并为传入频道指定 key.deserializer。
11.5.4. Kafka 连接器的 MicroProfile Config 属性文件示例 复制链接链接已复制到粘贴板!
这是 Kafka 连接器的简单 microprofile-config.properties 文件示例。其属性与示例中的属性对应 "MicroProfile 重新主动消息传递连接器,以与外部消息传递系统集成"。
kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.to.connector=smallrye-kafka
mp.messaging.outgoing.to.topic=my-topic
mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.incoming.from.connector=smallrye-kafka
mp.messaging.incoming.from.topic=my-topic
mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
| entry | 描述 |
|---|---|
|
| 这些是"频道"。 |
|
| 这些是"方法"。
请注意, |
|
|
这将指定应用程序必须连接到的 Kafka 代理的 URL。您还可以在频道级别指定一个 URL,如下所示: |
|
|
这表示您希望要频道从 Kafka 接收信息。
小型被动消息传递是用于构建应用程序的框架。请注意, |
|
|
这表示您要将数据发送到名为 Kafka "topic" 是一个类别或源名称,信息存储在并发布到其中。所有 Kafka 信息都组织为主题。生产者应用将数据写入主题和消费者应用,从 主题中读取数据。 |
|
|
这告知连接器使用 |
|
|
这表示您要使用 |
|
|
这表示您的连接器应该从名为 |
|
|
这告知连接器在调用 |
这个属性列表并不全面。如需更多信息,请参阅 SmallRye Reactive Messaging Apache Kafka 文档。
强制 MicroProfile 主动消息传递前缀
MicroProfile Reactive 消息传递规范需要 Kafka 的以下方法属性密钥前缀:
-
mp.messaging.incoming.[channel-name].[attribute]=[value]` -
mp.messaging.outgoing.[channel-name].[attribute]=[value]` -
mp.messaging.connector.[connector-name].[attribute]=[value]`
请注意,channel-name 是 @Incoming.value () 或 @Outgoing.value ()。
现在考虑以下方法对示例:
@Outgoing("to")
public int send() {
int i = // Randomly generated...
return i;
}
@Incoming("from")
public void receive(int i) {
// Process payload
}
在这个方法对示例中,请注意以下所需属性前缀:
-
mp.messaging.incoming.from.此前缀选择属性作为receive ()方法的配置。 -
mp.messaging.outgoing.to.此前缀选择属性作为send ()方法的配置。