11.5. MicroProfile Reactive Messaging リファレンス


次に、MicroProfile Config 仕様で必要なリアクティブメッセージングプロパティーキー接頭辞のリストを示します。

  • mp.messaging.incoming.[channel-name].[attribute]=[value]
  • mp.messaging.outgoing.[channel-name].[attribute]=[value]
  • mp.messaging.connector.[connector-name].[attribute]=[value]

channel-name@Incoming.value() または @Outgoing.value() のいずれかであることに注意してください。明確にするために、コネクターメソッドのペアのこの例を見てみましょう。

@Outgoing("to")
public int send() {
   int i = // Randomly generated...
   return i;
}

@Incoming("from")
public void receive(int i) {
   // Process payload
}

この例では、必要なプロパティー接頭辞は次のとおりです。

  • mp.messaging.incoming.from.これは、receive() メソッドを定義します。
  • mp.messaging.outgoing.to.これは send() メソッドを定義します。

これは一例であることを忘れないでください。異なるコネクターは異なるプロパティーを認識するため、指定する接頭辞は、設定するコネクターによって異なります。

11.5.2. リアクティブメッセージングストリームとユーザー初期化コード間のデータ交換の例

以下は、リアクティブメッセージングストリームと、ユーザーが @Channel および Emitter コンストラクトを介してトリガーしたコードとの間のデータ交換の例です。

@Path("/")
@ApplicationScoped
class MyBean {
    @Inject @Channel("my-stream")
    Emitter<String> emitter; 
1


    Publisher<String> dest;

    public MyBean() { 
2

    }

    @Inject
    public MyBean(@Channel("my-stream") Publisher<String> dest) {
        this.dest = subscribeAndAllowMultipleSubscriptions(dest);
    }

    private Publisher subscribeAndAllowMultipleSubscriptions(Publisher delegate) {
    } 
3
 
4
 
5


    @POST
    public PublisherBuilder<String> publish(@FormParam("value") String value) {
        return emitter.send(value);
    }

    @GET
    public Publisher poll() {
        return dest;
    }

    @PreDestroy
    public void close() { 
6


    }
}

インラインの詳細:

1
コンストラクターによって挿入されたパブリッシャーをラップします。
2
Java 仕様のコンテキストと依存関係の挿入 (CDI) を満たすには、この空のコンストラクターが必要です。
3
デリゲートにサブスクライブします。
4
複数のサブスクリプションを処理できるパブリッシャーでデリゲートをラップします。
5
ラッピングパブリッシャーは、デリゲートからのデータを転送します。
6
リアクティブメッセージングが提供するパブリッシャーからアンサブスクライブします。

この例では、MicroProfile Reactive Messaging が my-stream メモリーストリームをリッスンしているため、Emitter を介して送信されたメッセージは、この挿入されたパブリッシャーで受信されます。ただし、このデータエクスチェンジを成功させるには、次の条件が満たされている必要があることに注意してください。

  1. Emitter.send() を呼び出す前に、チャネルにアクティブなサブスクリプションが存在する必要があります。この例では、コンストラクターによって呼び出される subscribeAndAllowMultipleSubscriptions() メソッドにより、Bean がユーザーコード呼び出しに使用できるようになるまでにアクティブなサブスクリプションが確実に存在することに注意してください。
  2. 挿入された Publisher には、Subscription を 1 つだけ持つことができます。受信側のパブリッシャーを REST 呼び出しで公開する場合、poll() メソッドを呼び出すたびに、dest パブリッシャーへの新しいサブスクリプションが生成されます。挿入されたデータを各クライアントにブロードキャストするには、独自のパブリッシャーを実装する必要があります。

11.5.3. Apache Kafka ユーザー API

Apache Kafka ユーザー API を使用して、Kafka が受信したメッセージに関する詳細情報を取得し、Kafka がメッセージを処理する方法に影響を与えることができます。この API は、io/smallrye/reactive/messaging/kafka/api パッケージに保存されており、次のクラスで設定されています。

  • IncomingKafkaRecordMetadata.このメタデータには、次の情報が含まれています。

    • Message で表される Kafka レコード key
    • Message に使用される Kafka topicpartition、およびそれら内の offset
    • MessagetimestamptimestampType
    • Message headers。これらは、アプリケーションが生成側で添付し、消費側で受信できる情報です。
  • OutgoingKafkaRecordMetadata.このメタデータを使用して、Kafka がメッセージを処理する方法を指定またはオーバーライドできます。次の情報が含まれています。

    • key。Kafka はこれをメッセージキーとして扱います。
    • Kafka に使用させたい topic
    • partition
    • Kafka が生成する timestamp が必要ない場合は、タイムスタンプ。
    • headers.
  • KafkaMetadataUtil には、OutgoingKafkaRecordMetadataMessage に書き込み、IncomingKafkaRecordMetadataMessage から読み取るためのユーティリティーメソッドが含まれています。
重要

Kafka にマップされていないチャネルに送信された MessageOutgoingKafkaRecordMetadata を書き込む場合、リアクティブメッセージングフレームワークはそれを無視します。逆に、Kafka にマップされていないチャネルからの Message から IncomingKafkaRecordMetadata を読み取ると、そのメッセージは null として返されます。

メッセージ key の書き込みと読み取りの方法の例
@Inject
@Channel("from-user")
Emitter<Integer> emitter;

@Incoming("from-user")
@Outgoing("to-kafka")
public Message<Integer> send(Message<Integer> msg) {
    // Set the key in the metadata
    OutgoingKafkaRecordMetadata<String> md =
            OutgoingKafkaRecordMetadata.<String>builder()
                .withKey("KEY-" + i)
                .build();
    // Note that Message is immutable so the copy returned by this method
    // call is not the same as the parameter to the method
    return KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, md);
}

@Incoming("from-kafka")
public CompletionStage<Void> receive(Message<Integer> msg) {
    IncomingKafkaRecordMetadata<String, Integer> metadata =
        KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get();

    // We can now read the Kafka record key
    String key = metadata.getKey();

    // When using the Message wrapper around the payload we need to explicitly ack
    // them
    return msg.ack();
}
microprofile-config.properties ファイルの Kafka マッピングの例
kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=some-topic
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer

mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=some-topic
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
注記

送信チャネルには key.serializer を指定し、受信チャネルには key.deserializer を指定する必要があります。

11.5.4. Kafka コネクターの MicroProfile Config プロパティーファイルの例

これは、Kafka コネクターのシンプルな microprofile-config.properties ファイルの例です。そのプロパティーは、外部メッセージングシステムと統合するための MicroProfile リアクティブメッセージングコネクターの例のプロパティーに対応しています。

kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to.connector=smallrye-kafka
mp.messaging.outgoing.to.topic=my-topic
mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

mp.messaging.incoming.from.connector=smallrye-kafka
mp.messaging.incoming.from.topic=my-topic
mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
Expand
表11.6 エントリーの議論
エントリー説明

tofrom

これらはチャネルです。

sendreceive

これらはメソッドです。

to チャネルは send() メソッドにあり、from チャネルは receive() メソッドにあることに注意してください。

kafka.bootstrap.servers=kafka:9092

これは、アプリケーションが接続する必要のある Kafka ブローカーの URL を指定します。次のように、チャネルレベルで URL を指定することもできます: mp.messaging.outgoing.to.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to.connector=smallrye-kafka

これは to チャネルが Kafka からのメッセージを受信することを示しています。

SmallRye リアクティブメッセージングは、アプリケーションをビルドするためのフレームワークです。smallrye-kafka 値は、SmallRye リアクティブメッセージング固有であることに注意してください。Galleon を使用して独自のサーバーをプロビジョニングしている場合は、microprofile-reactive-messaging-kafka Galleon レイヤーを含めることで、Kafka 統合を有効にできます。

mp.messaging.outgoing.to.topic=my-topic

これは、my-topic という Kafka トピックにデータを送信することを示しています。

Kafka のトピックは、メッセージが保存および公開されるカテゴリーまたはフィード名です。すべての Kafka メッセージはトピックに編成されています。プロデューサーアプリケーションはトピックにデータ to を書き込み、コンシューマーアプリケーションは from トピックからデータを読み取ります。

mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

これは、コネクターに IntegerSerializer を使用して、send() メソッドがトピックに書き込むときに出力する値をシリアル化するように指示します。Kafka は、標準の Java タイプ用のシリアライザーを提供します。org.apache.kafka.common.serialization.Serializer を実装するクラスを作成して独自のシリアライザーを実装し、そのクラスをデプロイメントに含めることができます。

mp.messaging.incoming.from.connector=smallrye-kafka

これは、from チャネルを使用して Kafka からのメッセージを受信することを示しています。繰り返しますが、smallrye-kafka 値は、SmallRye のリアクティブメッセージング固有です。

mp.messaging.incoming.from.topic=my-topic

これは、コネクターが my-topic と呼ばれる Kafka トピックからデータを読み取る必要があることを示しています。

mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

これは、receive() メソッドを呼び出す前に、IntegerDeserializer を使用してトピックから値をデシアライズするようにコネクターに指示します。org.apache.kafka.common.serialization.Deserializer を実装するクラスを作成して独自のデシリアライザーを実装し、そのクラスをデプロイメントに含めることができます。

注記

このプロパティーのリストはすべてを網羅したものではありません。詳細は、SmallRye Reactive Messaging Apache Kafka のドキュメントを参照してください。

必須の MicroProfile Reactive Messaging 接頭辞

MicroProfile Reactive Messaging 仕様では、Kafka に次のメソッドプロパティーキー接頭辞が必要です。

  • mp.messaging.incoming.[channel-name].[attribute]=[value]`
  • mp.messaging.outgoing.[channel-name].[attribute]=[value]`
  • mp.messaging.connector.[connector-name].[attribute]=[value]`

channel-name@Incoming.value() または @Outgoing.value() のいずれかであることに注意してください。

次に、次のメソッドペアの例を考えてみましょう。

@Outgoing("to")
public int send() {
    int i = // Randomly generated...
    return i;
}

@Incoming("from")
public void receive(int i) {
    // Process payload
}

このメソッドペアの例では、次の必須のプロパティー接頭辞に注意してください。

  • mp.messaging.incoming.from.この接頭辞は、receive() メソッドの設定としてプロパティーを選択します。
  • mp.messaging.outgoing.to.この接頭辞は、send() メソッドの設定としてプロパティーを選択します。
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

Red Hat ドキュメントについて

Legal Notice

Theme

© 2026 Red Hat
トップに戻る