14.5. MicroProfile Reactive Messaging リファレンス


次に、MicroProfile Config 仕様で必要なリアクティブメッセージングプロパティーキー接頭辞のリストを示します。

  • mp.messaging.incoming.[channel-name].[attribute]=[value]
  • mp.messaging.outgoing.[channel-name].[attribute]=[value]
  • mp.messaging.connector.[connector-name].[attribute]=[value]

channel-name@Incoming.value() または @Outgoing.value() のいずれかであることに注意してください。明確にするために、コネクターメソッドのペアのこの例を見てみましょう。

@Outgoing("to")
public int send() {
   int i = // Randomly generated...
   return i;
}

@Incoming("from")
public void receive(int i) {
   // Process payload
}

この例では、必要なプロパティー接頭辞は次のとおりです。

  • mp.messaging.incoming.from.これは、receive() メソッドを定義します。
  • mp.messaging.outgoing.to.これは send() メソッドを定義します。

これは一例であることを忘れないでください。異なるコネクターは異なるプロパティーを認識するため、指定する接頭辞は、設定するコネクターによって異なります。

14.5.2. リアクティブメッセージングストリームとユーザー初期化コード間のデータ交換の例

以下は、リアクティブメッセージングストリームと、ユーザーが @Channel および Emitter コンストラクトを介してトリガーしたコードとの間のデータ交換の例です。

@Path("/")
@ApplicationScoped
class MyBean {
    @Inject @Channel("my-stream")
    Emitter<String> emitter; 
1


    Publisher<String> dest;

    public MyBean() { 
2

    }

    @Inject
    public MyBean(@Channel("my-stream") Publisher<String> dest) {
        this.dest = subscribeAndAllowMultipleSubscriptions(dest);
    }

    private Publisher subscribeAndAllowMultipleSubscriptions(Publisher delegate) {
    } 
3
 
4
 
5


    @POST
    public PublisherBuilder<String> publish(@FormParam("value") String value) {
        return emitter.send(value);
    }

    @GET
    public Publisher poll() {
        return dest;
    }

    @PreDestroy
    public void close() { 
6


    }
}

インラインの詳細:

1
コンストラクターによって挿入されたパブリッシャーをラップします。
2
Java 仕様のコンテキストと依存関係の挿入 (CDI) を満たすには、この空のコンストラクターが必要です。
3
デリゲートにサブスクライブします。
4
複数のサブスクリプションを処理できるパブリッシャーでデリゲートをラップします。
5
ラッピングパブリッシャーは、デリゲートからのデータを転送します。
6
リアクティブメッセージングが提供するパブリッシャーからアンサブスクライブします。

この例では、MicroProfile Reactive Messaging が my-stream メモリーストリームをリッスンしているため、Emitter を介して送信されたメッセージは、この挿入されたパブリッシャーで受信されます。ただし、このデータエクスチェンジを成功させるには、次の条件が満たされている必要があることに注意してください。

  1. Emitter.send() を呼び出す前に、チャネルにアクティブなサブスクリプションが存在する必要があります。この例では、コンストラクターによって呼び出される subscribeAndAllowMultipleSubscriptions() メソッドにより、Bean がユーザーコード呼び出しに使用できるようになるまでにアクティブなサブスクリプションが確実に存在することに注意してください。
  2. 挿入された Publisher には、Subscription を 1 つだけ持つことができます。受信側のパブリッシャーを REST 呼び出しで公開する場合、poll() メソッドを呼び出すたびに、dest パブリッシャーへの新しいサブスクリプションが生成されます。挿入されたデータを各クライアントにブロードキャストするには、独自のパブリッシャーを実装する必要があります。

14.5.3. Apache Kafka ユーザー API

Apache Kafka ユーザー API を使用して、Kafka が受信したメッセージに関する詳細情報を取得し、Kafka がメッセージを処理する方法に影響を与えることができます。この API は、io/smallrye/reactive/messaging/kafka/api パッケージに保存されており、次のクラスで構成されています。

  • IncomingKafkaRecordMetadata.このメタデータには、次の情報が含まれています。

    • Message で表される Kafka レコード key
    • Message に使用される Kafka topicpartition、およびそれら内の offset
    • MessagetimestamptimestampType
    • Message headers。これらは、アプリケーションが生成側で添付し、消費側で受信できる情報です。
  • OutgoingKafkaRecordMetadata.このメタデータを使用して、Kafka がメッセージを処理する方法を指定またはオーバーライドできます。次の情報が含まれています。

    • key。Kafka はこれをメッセージキーとして扱います。
    • Kafka に使用させたい topic
    • partition
    • Kafka が生成する timestamp が必要ない場合は、タイムスタンプ。
    • headers.
  • KafkaMetadataUtil には、OutgoingKafkaRecordMetadataMessage に書き込み、IncomingKafkaRecordMetadataMessage から読み取るためのユーティリティーメソッドが含まれています。
重要

Kafka にマップされていないチャネルに送信された MessageOutgoingKafkaRecordMetadata を書き込む場合、リアクティブメッセージングフレームワークはそれを無視します。逆に、Kafka にマップされていないチャネルからの Message から IncomingKafkaRecordMetadata を読み取ると、そのメッセージは null として返されます。

メッセージ key の書き込みと読み取りの方法の例
@Inject
@Channel("from-user")
Emitter<Integer> emitter;

@Incoming("from-user")
@Outgoing("to-kafka")
public Message<Integer> send(Message<Integer> msg) {
    // Set the key in the metadata
    OutgoingKafkaRecordMetadata<String> md =
            OutgoingKafkaRecordMetadata.<String>builder()
                .withKey("KEY-" + i)
                .build();
    // Note that Message is immutable so the copy returned by this method
    // call is not the same as the parameter to the method
    return KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, md);
}

@Incoming("from-kafka")
public CompletionStage<Void> receive(Message<Integer> msg) {
    IncomingKafkaRecordMetadata<String, Integer> metadata =
        KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get();

    // We can now read the Kafka record key
    String key = metadata.getKey();

    // When using the Message wrapper around the payload we need to explicitly ack
    // them
    return msg.ack();
}
microprofile-config.properties ファイルの Kafka マッピングの例
kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=some-topic
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer

mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=some-topic
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
注記

送信チャネルには key.serializer を指定し、受信チャネルには key.deserializer を指定する必要があります。

14.5.4. Kafka コネクターの MicroProfile Config プロパティーファイルの例

これは、Kafka コネクターのシンプルな microprofile-config.properties ファイルの例です。そのプロパティーは、「外部メッセージングシステムと統合するための MicroProfile リアクティブメッセージングコネクター」に示されている例のプロパティーに対応しています。

kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to.connector=smallrye-kafka
mp.messaging.outgoing.to.topic=my-topic
mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

mp.messaging.incoming.from.connector=smallrye-kafka
mp.messaging.incoming.from.topic=my-topic
mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
Expand
表14.6 エントリーの議論
エントリー説明

tofrom

これらは "チャネル" です。

sendreceive

これらは "メソッド" です。

to チャネルは send() メソッドにあり、from チャネルは receive() メソッドにあることに注意してください。

kafka.bootstrap.servers=kafka:9092

これは、アプリケーションが接続する必要のある Kafka ブローカーの URL を指定します。次のように、チャネルレベルで URL を指定することもできます: mp.messaging.outgoing.to.bootstrap.servers=kafka:9092

mp.messaging.outgoing.to.connector=smallrye-kafka

これは to チャネルが Kafka からのメッセージを受信することを示しています。

SmallRye リアクティブメッセージングは、アプリケーションをビルドするためのフレームワークです。smallrye-kafka 値は、SmallRye リアクティブメッセージング固有であることに注意してください。Galleon を使用して独自のサーバーをプロビジョニングしている場合は、microprofile-reactive-messaging-kafka Galleon レイヤーを含めることで、Kafka 統合を有効にできます。

mp.messaging.outgoing.to.topic=my-topic

これは、my-topic という Kafka トピックにデータを送信することを示しています。

Kafka の "トピック" は、メッセージが保存および公開されるカテゴリーまたはフィード名です。すべての Kafka メッセージはトピックに編成されています。プロデューサーアプリケーションはトピックにデータ to を書き込み、コンシューマーアプリケーションは from トピックからデータを読み取ります。

mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

これは、コネクターに IntegerSerializer を使用して、send() メソッドがトピックに書き込むときに出力する値をシリアル化するように指示します。Kafka は、標準の Java タイプ用のシリアライザーを提供します。org.apache.kafka.common.serialization.Serializer を実装するクラスを作成して独自のシリアライザーを実装し、そのクラスをデプロイメントに含めることができます。

mp.messaging.incoming.from.connector=smallrye-kafka

これは、from チャネルを使用して Kafka からのメッセージを受信することを示しています。繰り返しますが、smallrye-kafka 値は、SmallRye のリアクティブメッセージング固有です。

mp.messaging.incoming.from.topic=my-topic

これは、コネクターが my-topic と呼ばれる Kafka トピックからデータを読み取る必要があることを示しています。

mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

これは、receive() メソッドを呼び出す前に、IntegerDeserializer を使用してトピックから値をデシアライズするようにコネクターに指示します。org.apache.kafka.common.serialization.Deserializer を実装するクラスを作成して独自のデシリアライザーを実装し、そのクラスをデプロイメントに含めることができます。

注記

このプロパティーのリストはすべてを網羅したものではありません。詳細は、SmallRye Reactive Messaging Apache Kafka のドキュメントを参照してください。

必須の MicroProfile Reactive Messaging 接頭辞

MicroProfile Reactive Messaging 仕様では、Kafka に次のメソッドプロパティーキー接頭辞が必要です。

  • mp.messaging.incoming.[channel-name].[attribute]=[value]`
  • mp.messaging.outgoing.[channel-name].[attribute]=[value]`
  • mp.messaging.connector.[connector-name].[attribute]=[value]`

channel-name@Incoming.value() または @Outgoing.value() のいずれかであることに注意してください。

次に、次のメソッドペアの例を考えてみましょう。

@Outgoing("to")
public int send() {
    int i = // Randomly generated...
    return i;
}

@Incoming("from")
public void receive(int i) {
    // Process payload
}

このメソッドペアの例では、次の必須のプロパティー接頭辞に注意してください。

  • mp.messaging.incoming.from.この接頭辞は、receive() メソッドの設定としてプロパティーを選択します。
  • mp.messaging.outgoing.to.この接頭辞は、send() メソッドの設定としてプロパティーを選択します。

14.5.4.1. 安全な MicroProfile Reactive Messaging Apache Kafka コネクターの設定

自己署名証明書を使用して Apache Kafka コネクタークライアントを設定するには、microprofile-config.properties ファイルで client-ssl-context を定義します。これはコネクターレベルとチャネルレベルで実行できます。

次の例は、SSL/TLS で保護された Apache Kafka コネクターを設定する方法を示しています。

コネクターレベルの client-ssl-context 定義の例

mp.messaging.incoming.from.security.protocol=SSL
mp.messaging.outgoing.to.security.protocol=SSL
mp.messaging.connector.smallrye-kafka.wildfly.elytron.ssl.context=exampleSSLContext

mp.messaging.connector.smallrye-kafka.wildfly.elytron.ssl.context 属性は、自己署名証明書を使用する場合にのみ必要です。

重要

実稼働環境では自己署名証明書を使用しないでください。認証局 (CA) が署名した証明書のみ使用してください。

チャネルの client-ssl-context は次のように指定できます。

チャネルレベルの client-ssl-context 定義の例

mp.messaging.incoming.from.wildfly.elytron.ssl.context=exampleSSLContext

この例では、exampleSSLContext は、受信チャネル from にのみ関連付けられています。

Expand
表14.7 エントリーの議論
エントリー説明

mp.messaging.incoming.from.security.protocol=SSL

これは、ブローカーに接続するときにセキュアな受信チャネル接続を使用することを指定します。

mp.messaging.outgoing.to.security.protocol=SSL

これは、ブローカーに接続するときにセキュアな送信チャネル接続を使用することを指定します。

mp.messaging.connector.smallrye-kafka.wildfly.elytron.ssl.context

Kafka ブローカーが認証局 (CA) 署名付き証明書でセキュリティーが保護されている場合は、この属性を指定する必要はありません。

自己署名証明書を使用する場合は、管理モデルの /subsystem=elytron/client-ssl-context=* の下の Elytron サブシステムで定義されている SSLContext を指定します。

重要

実稼働環境では自己署名証明書を使用しないでください。認証局 (CA) が署名した証明書のみ使用してください。

次の管理 CLI コマンドを使用して client-ssl-context を定義できます。

/subsystem=elytron/client-ssl-context=exampleSSLContext:add(key-manager=exampleServerKeyManager,trust-manager=exampleTLSTrustManager)

注記

SCRAM-SHA-512 認証で SSL/TLS 接続を使用すると、SSL/TLS プロトコルは暗号化を提供しますが、認証には使用されません。SCRAM-SHA-512 プロトコルで接続を確立するには、Configure MicroProfile Reactive Messagaging Kafka connector to use SASL_PLAINTEXT and SASL_SSL authentication protocol を参照してください。

Apache Kafka は Simple Authentication and Security Layer (SASL)プロトコルを使用して、Apache Kafka リスナー に接続されているクライアントを認証します。暗号化されていない プレーン タイプと暗号化された tls タイプ通信を使用するように リスナー を設定できます。Streams for Apache Kafka は、Salted Challenged Response Authentication Mechanism (SCRAM)プロトコル(SASL SCRAM-SHA-512)と組み合わせて SASL を使用して認証を提供します。Kafka カスタムリソースと KafkaUser カスタムリソース YAML ファイルを定義して、両方のタイプの リスナー の認証を設定する必要があります。

SASL がサーバー上で設定されている場合、Kafka リスナー が SCRAM-SHA-512 認証を使用するように設定されている場合、クライアントはセキュリティープロトコルを指定する必要があります。TLS で暗号化された リスナー に接続する場合、このプロトコルは SASL_SSL である必要があります。リスナー が暗号化されていない場合、プロトコルは SASL_PLAINTEXT である必要があります。クライアント設定では、SCRAM-SHA-512 を使用するように SASL メカニズムを指定する必要があります。

注記

SCRAM-SHA-512 認証が SSL/TLS 接続で使用されると、SSL/TLS プロトコルは暗号化を提供しますが、認証には使用されません。SSL/TLS を使用して接続をセキュリティー保護するには、セキュアな MicroProfile Reactive Messaging Apache Kafka コネクターの設定 を参照してください。

プレーン タイプの暗号化されていない通信および SASL SCRAM-SHA-512 認証を使用する リスナー にクライアント認証を設定します。

前提条件

  • Streams for Apache Kafka クラスター Operator がインストールされている。

    注記

    クラスター Operator はデフォルトで Secret リソースを生成します。クライアント設定の username および password は、Secret リソースで定義されるユーザー名とパスワードと一致する必要があります。

    カスタム シークレット リソースの作成の詳細については、カスタム パスワード設定 を参照 してください。

    Operator によって生成された Secret リソースの詳細は、Operator によって 生成された Secret を参照して ください。

手順

  1. Kafka カスタムリソースを YAML ファイルとして定義します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
      namespace: myproject
    spec:
      kafka:
        # ...
        listeners:
          - name: plain
            port: 9092
            type: internal
            tls: true
            authentication:
              type: scram-sha-512
  2. KafkaUser カスタムリソースを YAML ファイルとして定義します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      authentication:
        type: scram-sha-512

    KafkaUser .authentication 定義は リスナー.authentication 定義と一致する必要があります。

  3. microprofile-config.properties ファイルでクライアントを設定します。

    # General config to set up SASL over PLAINTEXT
    mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092
    mp.messaging.connector.smallrye-kafka.sasl.mechanism=SCRAM-SHA-512
    mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
    mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="my-user" \
      password="my-password";
    注記

    ユーザー名パスワード は、クラスターオペレーターによって生成された Secret リソースのユーザー名とパスワードと一致する必要があります。

14.5.5. AMQP コネクターの MicroProfile Config プロパティーファイルの例

これは、Advanced Message Queuing Protocol (AMQP) コネクター用の単純な microprofile-config.properties ファイルの例です。そのプロパティーは、外部メッセージングシステムと統合するための MicroProfile リアクティブメッセージングコネクター に示された例のプロパティーに対応しています。

amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis

mp.messaging.outgoing.to.connector=smallrye-amqp
mp.messaging.outgoing.to.address=my-topic

mp.messaging.incoming.from.connector=smallrye-amqp
mp.messaging.incoming.from.address=my-topic
Expand
表14.8 エントリーの議論
エントリー説明

tofrom

これらは "チャネル" です。

sendreceive

これらは "メソッド" です。

to チャネルは send() メソッドにあり、from チャネルは receive() メソッドにあることに注意してください。

amqp-host=localhost

これは、アプリケーションが接続する必要のある AMQP ブローカーの URL を指定します。mp.messaging.outgoing.to.host=localhost のように、チャネルレベルで URL を指定することもできます。URL が指定されていない場合、値はデフォルトで localhost になります。

amqp-port=5672

これは、AMQP ブローカーのポートを指定します。

mp.messaging.outgoing.to.connector=smallrye-amqp

これは、チャネルが AMQP にメッセージを送信することを示します。

SmallRye リアクティブメッセージングは、アプリケーションをビルドするためのフレームワークです。smallrye-amqp 値は、SmallRye リアクティブメッセージング固有であることに注意してください。Galleon を使用して独自のサーバーをプロビジョニングしている場合は、microprofile-reactive-messaging-amqp Galleon レイヤーを含めることで、AMQP 統合を有効にできます。

mp.messaging.outgoing.to.address=my-topic

これは、アドレス my-topic の AMQP キューにデータを送信することを示します。mp.messaging.outgoing.to.address の値を指定しない場合は、デフォルトでチャネル (この例では "to") に設定されます。

mp.messaging.incoming.from.connector=smallrye-amqp

これは、AMQP ブローカーからメッセージを受信するために from チャネルを使用することを示します。繰り返しますが、smallrye-amqp 値は、SmallRye のリアクティブメッセージング固有です。

mp.messaging.incoming.from.address=my-topic

これは、from チャネルの AMQP キュー my-topic からデータを読み取ることを示します。

SmallRye Reactive Messaging の AMQP コネクターでサポートされているプロパティーの完全なリストは、SmallRye Reactive Messaging AMQP コネクター設定リファレンス を参照してください。

安全な AMQP ブローカーへの接続

SSL/TLS および Simple Authentication and Security Layer (SASL) で保護された AMQ ブローカーに接続するには、microprofile-config.properties ファイルで、接続に使用する client-ssl-context を定義します。これはコネクターレベルでもチャネルレベルでも実行できます。

コネクターレベルの client-ssl-context 定義の例

amqp-use-ssl=true
mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context=exampleSSLContext

mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context 属性は、自己署名証明書を使用する場合にのみ必要です。

重要

実稼働環境では自己署名証明書を使用しないでください。認証局 (CA) によって署名された証明書のみ使用してください。

次のようにして、チャネルの client-ssl-context を指定することもできます。

チャネルレベルの client-ssl-context 定義の例

mp.messaging.incoming.from.wildfly.elytron.ssl.context=exampleSSLContext

この例では、exampleSSLContext は、受信チャネル from にのみ関連付けられています。

Expand
表14.9 エントリーの議論
エントリー説明

amqp-use-ssl

これは、ブローカーに接続するときに安全な接続を使用することを指定します。

mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context

AMQ ブローカーが証明機関 (CA) の署名付き証明書で保護されている場合は、この属性を指定する必要はありません。

自己署名証明書を使用する場合は、管理モデルの /subsystem=elytron/client-ssl-context=* の下の Elytron サブシステムで定義されている SSLContext を指定します。

重要

実稼働環境では自己署名証明書を使用しないでください。認証局 (CA) によって署名された証明書のみ使用してください。

次の管理 CLI コマンドを使用して client-ssl-context を定義できます。

/subsystem=elytron/client-ssl-context=exampleSSLContext:add(key-manager=exampleServerKeyManager,trust-manager=exampleTLSTrustManager)

詳細は、JBoss EAP での SSL/TLS の設定 ガイドの クライアント証明書の信頼ストアと信頼マネージャーの設定 および 双方向 SSL/TLS 用のサーバー証明書の設定 を参照してください。

Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

Red Hat ドキュメントについて

Legal Notice

Theme

© 2026 Red Hat
トップに戻る