11.5. MicroProfile Reactive Messaging リファレンス
11.5.1. 外部メッセージングシステムと統合するための MicroProfile リアクティブメッセージングコネクター リンクのコピーリンクがクリップボードにコピーされました!
次に、MicroProfile Config 仕様で必要なリアクティブメッセージングプロパティーキー接頭辞のリストを示します。
-
mp.messaging.incoming.[channel-name].[attribute]=[value] -
mp.messaging.outgoing.[channel-name].[attribute]=[value] -
mp.messaging.connector.[connector-name].[attribute]=[value]
channel-name は @Incoming.value() または @Outgoing.value() のいずれかであることに注意してください。明確にするために、コネクターメソッドのペアのこの例を見てみましょう。
@Outgoing("to")
public int send() {
int i = // Randomly generated...
return i;
}
@Incoming("from")
public void receive(int i) {
// Process payload
}
この例では、必要なプロパティー接頭辞は次のとおりです。
-
mp.messaging.incoming.from.これは、receive()メソッドを定義します。 -
mp.messaging.outgoing.to.これはsend()メソッドを定義します。
これは一例であることを忘れないでください。異なるコネクターは異なるプロパティーを認識するため、指定する接頭辞は、設定するコネクターによって異なります。
11.5.2. リアクティブメッセージングストリームとユーザー初期化コード間のデータ交換の例 リンクのコピーリンクがクリップボードにコピーされました!
以下は、リアクティブメッセージングストリームと、ユーザーが @Channel および Emitter コンストラクトを介してトリガーしたコードとの間のデータ交換の例です。
@Path("/")
@ApplicationScoped
class MyBean {
@Inject @Channel("my-stream")
Emitter<String> emitter;
Publisher<String> dest;
public MyBean() {
}
@Inject
public MyBean(@Channel("my-stream") Publisher<String> dest) {
this.dest = subscribeAndAllowMultipleSubscriptions(dest);
}
private Publisher subscribeAndAllowMultipleSubscriptions(Publisher delegate) {
}
@POST
public PublisherBuilder<String> publish(@FormParam("value") String value) {
return emitter.send(value);
}
@GET
public Publisher poll() {
return dest;
}
@PreDestroy
public void close() {
}
}
インラインの詳細:
この例では、MicroProfile Reactive Messaging が my-stream メモリーストリームをリッスンしているため、Emitter を介して送信されたメッセージは、この挿入されたパブリッシャーで受信されます。ただし、このデータエクスチェンジを成功させるには、次の条件が満たされている必要があることに注意してください。
-
Emitter.send()を呼び出す前に、チャネルにアクティブなサブスクリプションが存在する必要があります。この例では、コンストラクターによって呼び出されるsubscribeAndAllowMultipleSubscriptions()メソッドにより、Bean がユーザーコード呼び出しに使用できるようになるまでにアクティブなサブスクリプションが確実に存在することに注意してください。 -
挿入された
Publisherには、Subscriptionを 1 つだけ持つことができます。受信側のパブリッシャーを REST 呼び出しで公開する場合、poll()メソッドを呼び出すたびに、destパブリッシャーへの新しいサブスクリプションが生成されます。挿入されたデータを各クライアントにブロードキャストするには、独自のパブリッシャーを実装する必要があります。
11.5.3. Apache Kafka ユーザー API リンクのコピーリンクがクリップボードにコピーされました!
Apache Kafka ユーザー API を使用して、Kafka が受信したメッセージに関する詳細情報を取得し、Kafka がメッセージを処理する方法に影響を与えることができます。この API は、io/smallrye/reactive/messaging/kafka/api パッケージに保存されており、次のクラスで設定されています。
IncomingKafkaRecordMetadata.このメタデータには、次の情報が含まれています。-
Messageで表される Kafka レコードkey。 -
Messageに使用される Kafkatopicとpartition、およびそれら内のoffset。 -
MessageのtimestampとtimestampType。 -
Messageheaders。これらは、アプリケーションが生成側で添付し、消費側で受信できる情報です。
-
OutgoingKafkaRecordMetadata.このメタデータを使用して、Kafka がメッセージを処理する方法を指定またはオーバーライドできます。次の情報が含まれています。-
key。Kafka はこれをメッセージキーとして扱います。 -
Kafka に使用させたい
topic。 -
partition。 -
Kafka が生成する
timestampが必要ない場合は、タイムスタンプ。 -
headers.
-
-
KafkaMetadataUtilには、OutgoingKafkaRecordMetadataをMessageに書き込み、IncomingKafkaRecordMetadataをMessageから読み取るためのユーティリティーメソッドが含まれています。
Kafka にマップされていないチャネルに送信された Message に OutgoingKafkaRecordMetadata を書き込む場合、リアクティブメッセージングフレームワークはそれを無視します。逆に、Kafka にマップされていないチャネルからの Message から IncomingKafkaRecordMetadata を読み取ると、そのメッセージは null として返されます。
メッセージ key の書き込みと読み取りの方法の例
@Inject
@Channel("from-user")
Emitter<Integer> emitter;
@Incoming("from-user")
@Outgoing("to-kafka")
public Message<Integer> send(Message<Integer> msg) {
// Set the key in the metadata
OutgoingKafkaRecordMetadata<String> md =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey("KEY-" + i)
.build();
// Note that Message is immutable so the copy returned by this method
// call is not the same as the parameter to the method
return KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, md);
}
@Incoming("from-kafka")
public CompletionStage<Void> receive(Message<Integer> msg) {
IncomingKafkaRecordMetadata<String, Integer> metadata =
KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get();
// We can now read the Kafka record key
String key = metadata.getKey();
// When using the Message wrapper around the payload we need to explicitly ack
// them
return msg.ack();
}
microprofile-config.properties ファイルの Kafka マッピングの例
kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=some-topic
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=some-topic
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
送信チャネルには key.serializer を指定し、受信チャネルには key.deserializer を指定する必要があります。
11.5.4. Kafka コネクターの MicroProfile Config プロパティーファイルの例 リンクのコピーリンクがクリップボードにコピーされました!
これは、Kafka コネクターのシンプルな microprofile-config.properties ファイルの例です。そのプロパティーは、外部メッセージングシステムと統合するための MicroProfile リアクティブメッセージングコネクターの例のプロパティーに対応しています。
kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.to.connector=smallrye-kafka
mp.messaging.outgoing.to.topic=my-topic
mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.incoming.from.connector=smallrye-kafka
mp.messaging.incoming.from.topic=my-topic
mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
| エントリー | 説明 |
|---|---|
|
| これらはチャネルです。 |
|
| これらはメソッドです。
|
|
|
これは、アプリケーションが接続する必要のある Kafka ブローカーの URL を指定します。次のように、チャネルレベルで URL を指定することもできます: |
|
|
これは
SmallRye リアクティブメッセージングは、アプリケーションをビルドするためのフレームワークです。 |
|
|
これは、 Kafka のトピックは、メッセージが保存および公開されるカテゴリーまたはフィード名です。すべての Kafka メッセージはトピックに編成されています。プロデューサーアプリケーションはトピックにデータ to を書き込み、コンシューマーアプリケーションは from トピックからデータを読み取ります。 |
|
|
これは、コネクターに |
|
|
これは、 |
|
|
これは、コネクターが |
|
|
これは、 |
このプロパティーのリストはすべてを網羅したものではありません。詳細は、SmallRye Reactive Messaging Apache Kafka のドキュメントを参照してください。
必須の MicroProfile Reactive Messaging 接頭辞
MicroProfile Reactive Messaging 仕様では、Kafka に次のメソッドプロパティーキー接頭辞が必要です。
-
mp.messaging.incoming.[channel-name].[attribute]=[value]` -
mp.messaging.outgoing.[channel-name].[attribute]=[value]` -
mp.messaging.connector.[connector-name].[attribute]=[value]`
channel-name は @Incoming.value() または @Outgoing.value() のいずれかであることに注意してください。
次に、次のメソッドペアの例を考えてみましょう。
@Outgoing("to")
public int send() {
int i = // Randomly generated...
return i;
}
@Incoming("from")
public void receive(int i) {
// Process payload
}
このメソッドペアの例では、次の必須のプロパティー接頭辞に注意してください。
-
mp.messaging.incoming.from.この接頭辞は、receive()メソッドの設定としてプロパティーを選択します。 -
mp.messaging.outgoing.to.この接頭辞は、send()メソッドの設定としてプロパティーを選択します。