14.5. MicroProfile Reactive Messaging 参考
14.5.1. MicroProfile 被动消息传递连接器,用于与外部消息传递系统集成 复制链接链接已复制到粘贴板!
以下是 MicroProfile 配置规范所需的被动消息传递属性密钥前缀的列表:
-
mp.messaging.incoming.[channel-name].[attribute]=[value] -
mp.messaging.outgoing.[channel-name].[attribute]=[value] -
mp.messaging.connector.[connector-name].[attribute]=[value]
请注意,channel-name 是 @Incoming.value () 或 @Outgoing.value ()。要清楚起见,请查看这个一组连接器方法示例:
@Outgoing("to")
public int send() {
int i = // Randomly generated...
return i;
}
@Incoming("from")
public void receive(int i) {
// Process payload
}
在本例中,所需的属性前缀如下:
-
mp.messaging.incoming.from.这定义了receive ()方法。 -
mp.messaging.outgoing.to.这将定义send ()方法。
请记住,这是一个示例。因为不同的连接器可识别不同的属性,所以您指示的前缀取决于您要配置的连接器。
14.5.2. 重新主动消息传递流和用户重新初始化代码之间的数据交换示例 复制链接链接已复制到粘贴板!
以下是重新主动消息传递流和通过 @Channel 和 Emitter 结构触发的代码之间的数据交换示例:
@Path("/")
@ApplicationScoped
class MyBean {
@Inject @Channel("my-stream")
Emitter<String> emitter;
Publisher<String> dest;
public MyBean() {
}
@Inject
public MyBean(@Channel("my-stream") Publisher<String> dest) {
this.dest = subscribeAndAllowMultipleSubscriptions(dest);
}
private Publisher subscribeAndAllowMultipleSubscriptions(Publisher delegate) {
}
@POST
public PublisherBuilder<String> publish(@FormParam("value") String value) {
return emitter.send(value);
}
@GET
public Publisher poll() {
return dest;
}
@PreDestroy
public void close() {
}
}
行中详情:
在本例中,MicroProfile Reactive Messaging 正在侦听 my-stream 内存流,因此通过 Emitter 发送的消息会收到注入的发布者。但请注意,需要满足以下条件才能使这个数据交换成功:
-
在调用
Emitter.send ()之前,该频道必须具有有效的订阅。在本例中,请注意构造器调用的subscribeAndAllowMultipleSubscriptions ()方法确保 bean 可用于用户代码调用时具有活跃的订阅。 -
在注入的 publisher 中只能有一个
。如果要使用 REST 调用公开接收发布程序,其中每个调用订阅poll ()方法会导致对destpublisher 的新订阅,您必须实施自己的发布程序,以便从注入的每个客户端广播数据。
14.5.3. Apache Kafka 用户 API 复制链接链接已复制到粘贴板!
您可以使用 Apache Kafka 用户 API 来获取有关收到消息 Kafka 的更多信息,并影响 Kafka 处理消息的方式。此 API 存储在 io/smallrye/reactive/messaging/kafka/api 软件包中,它由以下类组成:
IncomingKafkaRecordMetadata.这个元数据包含以下信息:-
Kafka 记录
密钥,由消息表示。 -
用于
消息的 Kafka 主题和分区,以及这些消息中的偏移量。 -
Messagetimestamp和timestampType。 -
Message标头.这些是应用程序可在生成端附加的信息,并在消费端接收信息。
-
Kafka 记录
OutgoingKafkaRecordMetadata.使用这个元数据,您可以指定或覆盖 Kafka 处理信息的方式。它包含以下信息:-
Kafka 视为消息键的密钥。
-
您希望 Kafka 使用
的主题。 -
分区。 -
时间戳,如果您不希望 Kafka 生成
的时间戳。 -
标头.
-
Kafka 视为消息键的密钥。
-
KafkaMetadataUtil包含将OutgoingKafkaRecordMetadata写入Message的工具方法,并从Message读取IncomingKafkaRecordMetadata。
如果您将 OutgoingKafkaRecordMetadata 写入发送到未映射到 Kafka 的频道的消息,则被动消息传递框架会忽略它。相反,如果您从未映射到 Kafka 的频道 读取 IncomingKafkaRecordMetadata,则该消息会返回 null。
如何写入和读取消息 键的示例
@Inject
@Channel("from-user")
Emitter<Integer> emitter;
@Incoming("from-user")
@Outgoing("to-kafka")
public Message<Integer> send(Message<Integer> msg) {
// Set the key in the metadata
OutgoingKafkaRecordMetadata<String> md =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey("KEY-" + i)
.build();
// Note that Message is immutable so the copy returned by this method
// call is not the same as the parameter to the method
return KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, md);
}
@Incoming("from-kafka")
public CompletionStage<Void> receive(Message<Integer> msg) {
IncomingKafkaRecordMetadata<String, Integer> metadata =
KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get();
// We can now read the Kafka record key
String key = metadata.getKey();
// When using the Message wrapper around the payload we need to explicitly ack
// them
return msg.ack();
}
microprofile-config.properties 文件中的 Kafka 映射示例
kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=some-topic
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=some-topic
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
您必须为传出频道指定 key.serializer,并为传入频道指定 key.deserializer。
14.5.4. Kafka 连接器的 MicroProfile Config 属性文件示例 复制链接链接已复制到粘贴板!
这是 Kafka 连接器的简单 microprofile-config.properties 文件示例。其属性与示例中的属性对应 "MicroProfile 重新主动消息传递连接器,以与外部消息传递系统集成"。
kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.to.connector=smallrye-kafka
mp.messaging.outgoing.to.topic=my-topic
mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.incoming.from.connector=smallrye-kafka
mp.messaging.incoming.from.topic=my-topic
mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
| entry | 描述 |
|---|---|
|
| 这些是"频道"。 |
|
| 这些是"方法"。
请注意, |
|
|
这将指定应用程序必须连接到的 Kafka 代理的 URL。您还可以在频道级别指定一个 URL,如下所示: |
|
|
这表示您希望要频道从 Kafka 接收信息。
小型被动消息传递是用于构建应用程序的框架。请注意, |
|
|
这表示您要将数据发送到名为 Kafka "topic" 是一个类别或源名称,信息存储在并发布到其中。所有 Kafka 信息都组织为主题。生产者应用将数据写入主题和消费者应用,从 主题中读取数据。 |
|
|
这告知连接器使用 |
|
|
这表示您要使用 |
|
|
这表示您的连接器应该从名为 |
|
|
这告知连接器在调用 |
这个属性列表并不全面。如需更多信息,请参阅 SmallRye Reactive Messaging Apache Kafka 文档。
强制 MicroProfile 主动消息传递前缀
MicroProfile Reactive 消息传递规范需要 Kafka 的以下方法属性密钥前缀:
-
mp.messaging.incoming.[channel-name].[attribute]=[value]` -
mp.messaging.outgoing.[channel-name].[attribute]=[value]` -
mp.messaging.connector.[connector-name].[attribute]=[value]`
请注意,channel-name 是 @Incoming.value () 或 @Outgoing.value ()。
现在考虑以下方法对示例:
@Outgoing("to")
public int send() {
int i = // Randomly generated...
return i;
}
@Incoming("from")
public void receive(int i) {
// Process payload
}
在这个方法对示例中,请注意以下所需属性前缀:
-
mp.messaging.incoming.from.此前缀选择属性作为receive ()方法的配置。 -
mp.messaging.outgoing.to.此前缀选择属性作为send ()方法的配置。
14.5.4.1. 配置安全 MicroProfile 被动消息 Apache Kafka 连接器 复制链接链接已复制到粘贴板!
要使用自签名证书配置 Apache Kafka 连接器客户端,请在 microprofile-config.properties 文件中定义 client-ssl-context。您可以在连接器和频道级别进行此操作。
以下示例演示了如何配置使用 SSL/TLS 保护的 Apache Kafka 连接器。
连接器级别 client-ssl-context 定义示例
mp.messaging.incoming.from.security.protocol=SSL
mp.messaging.outgoing.to.security.protocol=SSL
mp.messaging.connector.smallrye-kafka.wildfly.elytron.ssl.context=exampleSSLContext
只有在使用自签名证书时,才需要属性 mp.messaging.connector.smallrye-kafka.wildfly.elytron.ssl.context。
不要在生产环境中使用自签名证书。只使用证书颁发机构(CA)签名的证书。
您可以为频道指定 client-ssl-context,如下所示:
频道级 client-ssl-context 定义示例
mp.messaging.incoming.from.wildfly.elytron.ssl.context=exampleSSLContext
在示例中,exampleSSLContext 仅与来自 的传入频道关联。
| entry | 描述 |
|---|---|
|
| 这指定了在连接到代理时要使用安全的传入频道连接。 |
|
| 这指定在连接到代理时要使用安全的传出频道连接。 |
|
| 如果 Kafka 代理使用证书颁发机构(CA)签名证书保护,则不需要指定此属性。 |
如果您使用自签名证书,请在管理模型的 /subsystem=elytron/client-ssl-context 444 下指定 Elytron 子系统中定义的 SSLContext。
不要在生产环境中使用自签名证书。只使用证书颁发机构(CA)签名的证书。
您可以使用以下管理 CLI 命令定义 client-ssl-context :
Example
/subsystem=elytron/client-ssl-context=exampleSSLContext:add(key-manager=exampleServerKeyManager,trust-manager=exampleTLSTrustManager)
当 SSL/TLS 连接与 SCRAM-SHA-512 身份验证一起使用时,SSL/TLS 协议提供加密,但不用于身份验证。要使用 SCRAM-SHA-512 协议验证连接,请参阅 配置 MicroProfile Reactive Messagaging Kafka 连接器以使用 SASL_PLAINTEXT 和 SASL_SSL 身份验证协议。
Apache Kafka 使用 简单身份验证和安全层 (SASL)协议来验证连接到 Apache Kafka 侦听器 的客户端。您可以将 监听程序 配置为使用未加密的 普通 类型和加密的 tls 类型通信。Apache Kafka 的流将 SASL 与 Salted Challenged Response Authentication Mechanism (SCRAM)协议(SASL SCRAM-SHA-512)相结合来提供身份验证。您必须定义一个 Kafka 自定义资源和 KafkaUser 自定义资源 YAML 文件,以便为两种类型的监听程序配置身份验证。
如果在服务器上配置了 SASL,即 Kafka 侦听程序 配置为使用 SCRAM-SHA-512 身份验证,客户端必须指定安全协议。如果连接到使用 TLS 加密 的监听程序,此协议应为 SASL_SSL。如果没有加密 侦听器,则协议应为 SASL_PLAINTEXT。客户端配置必须通过将其设置为使用 SCRAM-SHA-512 来指定 SASL 机制。
当 SCRAM-SHA-512 身份验证与 SSL/TLS 连接一起使用时,SSL/TLS 协议提供加密,但不用于身份验证。要保护与 SSL/TLS 的连接,请参阅配置安全 MicroProfile 被动消息 Apache Kafka 连接器。
为 使用普通 类型未加密的通信和 SASL SCRAM-SHA-512 身份验证 的监听程序 设置客户端身份验证。
先决条件
已安装 Apache Kafka 集群 Operator 的流。
注意集群操作器默认生成
Secret资源。客户端配置中必须与的用户名和密码Secret资源定义的用户名和密码匹配有关创建自定义
Secret资源的更多信息,请参阅自定义密码配置。如需有关操作器生成的
Secret资源的更多信息,请参阅 Operator 生成的 Secret。
流程
将 Kafka 自定义资源定义为 YAML 文件:
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... listeners: - name: plain port: 9092 type: internal tls: true authentication: type: scram-sha-512将您的 KafkaUser 自定义资源定义为 YAML 文件:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: scram-sha-512KafkaUser
.authentication定义应与监听器.authentication定义匹配。在
microprofile-config.properties文件中配置您的客户端:# General config to set up SASL over PLAINTEXT mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092 mp.messaging.connector.smallrye-kafka.sasl.mechanism=SCRAM-SHA-512 mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="my-user" \ password="my-password";注意必须与集群操作器生成的用户名和密码Secret资源中的用户名和密码匹配。
14.5.5. AMQP 连接器的 MicroProfile Config 属性文件示例 复制链接链接已复制到粘贴板!
这是高级消息队列协议(AMQP)连接器的简单 microprofile-config.properties 文件示例。其属性与 MicroProfile 被动消息传递连接器中的示例中的属性对应,用于与外部消息传递系统集成。
amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis
mp.messaging.outgoing.to.connector=smallrye-amqp
mp.messaging.outgoing.to.address=my-topic
mp.messaging.incoming.from.connector=smallrye-amqp
mp.messaging.incoming.from.address=my-topic
| entry | 描述 |
|---|---|
|
| 这些是"频道"。 |
|
| 这些是"方法"。
请注意, |
|
|
这将指定应用程序必须连接到的 AMQP 代理的 URL。您还可以在频道级别指定一个 URL,如下所示: |
|
| 这将指定 AMQP 代理的端口。 |
|
| 这表示您希望频道将消息发送到 AMQP。
小型被动消息传递是用于构建应用程序的框架。请注意, |
|
|
这表示您要将数据发送到地址 |
|
|
这表示您要使用 |
|
|
这表示 |
有关 SmallRye Reactive Messaging 的 AMQP 连接器支持的属性的完整列表,请参阅 SmallRye Reactive Messaging AMQP Connector Configuration Reference。
连接到安全 AMQP 代理
要与通过 SSL/TLS 和简单身份验证和安全层(SASL)保护的 AMQ 代理连接,请在 microprofile-config.properties 文件中定义用于连接的 client-ssl-context。您可以在连接器级别以及频道级别进行此操作。
连接器级别 client-ssl-context 定义示例
amqp-use-ssl=true
mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context=exampleSSLContext
只有在使用自签名证书时,才需要属性 mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context。
不要在生产环境中使用自签名证书。仅使用证书颁发机构(CA)签名的证书。
您还可以为频道指定 client-ssl-context,如下所示:
频道级 client-ssl-context 定义示例
mp.messaging.incoming.from.wildfly.elytron.ssl.context=exampleSSLContext
在示例中,exampleSSLContext 仅与来自 的传入频道关联。
| entry | 描述 |
|---|---|
|
| 这指定了我们在连接到代理时使用安全连接。 |
|
| 如果 AMQ 代理使用证书颁发机构(CA)签名证书进行保护,则不需要指定此属性。
如果您使用自签名证书,请在管理模型的 重要 不要在生产环境中使用自签名证书。仅使用证书颁发机构(CA)签名的证书。
您可以使用以下管理 CLI 命令定义
如需更多信息,请参阅为 客户端证书配置信任存储和信任管理器,在 JBoss EAP 中配置 SSL/TLS 指南中的为双向 SSL/TLS 配置 服务器证书。 |