14.5. MicroProfile Reactive Messaging リファレンス
14.5.1. 外部メッセージングシステムと統合するための MicroProfile リアクティブメッセージングコネクター リンクのコピーリンクがクリップボードにコピーされました!
次に、MicroProfile Config 仕様で必要なリアクティブメッセージングプロパティーキー接頭辞のリストを示します。
-
mp.messaging.incoming.[channel-name].[attribute]=[value] -
mp.messaging.outgoing.[channel-name].[attribute]=[value] -
mp.messaging.connector.[connector-name].[attribute]=[value]
channel-name は @Incoming.value() または @Outgoing.value() のいずれかであることに注意してください。明確にするために、コネクターメソッドのペアのこの例を見てみましょう。
@Outgoing("to")
public int send() {
int i = // Randomly generated...
return i;
}
@Incoming("from")
public void receive(int i) {
// Process payload
}
この例では、必要なプロパティー接頭辞は次のとおりです。
-
mp.messaging.incoming.from.これは、receive()メソッドを定義します。 -
mp.messaging.outgoing.to.これはsend()メソッドを定義します。
これは一例であることを忘れないでください。異なるコネクターは異なるプロパティーを認識するため、指定する接頭辞は、設定するコネクターによって異なります。
14.5.2. リアクティブメッセージングストリームとユーザー初期化コード間のデータ交換の例 リンクのコピーリンクがクリップボードにコピーされました!
以下は、リアクティブメッセージングストリームと、ユーザーが @Channel および Emitter コンストラクトを介してトリガーしたコードとの間のデータ交換の例です。
@Path("/")
@ApplicationScoped
class MyBean {
@Inject @Channel("my-stream")
Emitter<String> emitter;
Publisher<String> dest;
public MyBean() {
}
@Inject
public MyBean(@Channel("my-stream") Publisher<String> dest) {
this.dest = subscribeAndAllowMultipleSubscriptions(dest);
}
private Publisher subscribeAndAllowMultipleSubscriptions(Publisher delegate) {
}
@POST
public PublisherBuilder<String> publish(@FormParam("value") String value) {
return emitter.send(value);
}
@GET
public Publisher poll() {
return dest;
}
@PreDestroy
public void close() {
}
}
インラインの詳細:
この例では、MicroProfile Reactive Messaging が my-stream メモリーストリームをリッスンしているため、Emitter を介して送信されたメッセージは、この挿入されたパブリッシャーで受信されます。ただし、このデータエクスチェンジを成功させるには、次の条件が満たされている必要があることに注意してください。
-
Emitter.send()を呼び出す前に、チャネルにアクティブなサブスクリプションが存在する必要があります。この例では、コンストラクターによって呼び出されるsubscribeAndAllowMultipleSubscriptions()メソッドにより、Bean がユーザーコード呼び出しに使用できるようになるまでにアクティブなサブスクリプションが確実に存在することに注意してください。 -
挿入された
Publisherには、Subscriptionを 1 つだけ持つことができます。受信側のパブリッシャーを REST 呼び出しで公開する場合、poll()メソッドを呼び出すたびに、destパブリッシャーへの新しいサブスクリプションが生成されます。挿入されたデータを各クライアントにブロードキャストするには、独自のパブリッシャーを実装する必要があります。
14.5.3. Apache Kafka ユーザー API リンクのコピーリンクがクリップボードにコピーされました!
Apache Kafka ユーザー API を使用して、Kafka が受信したメッセージに関する詳細情報を取得し、Kafka がメッセージを処理する方法に影響を与えることができます。この API は、io/smallrye/reactive/messaging/kafka/api パッケージに保存されており、次のクラスで構成されています。
IncomingKafkaRecordMetadata.このメタデータには、次の情報が含まれています。-
Messageで表される Kafka レコードkey。 -
Messageに使用される Kafkatopicとpartition、およびそれら内のoffset。 -
MessageのtimestampとtimestampType。 -
Messageheaders。これらは、アプリケーションが生成側で添付し、消費側で受信できる情報です。
-
OutgoingKafkaRecordMetadata.このメタデータを使用して、Kafka がメッセージを処理する方法を指定またはオーバーライドできます。次の情報が含まれています。-
key。Kafka はこれをメッセージキーとして扱います。 -
Kafka に使用させたい
topic。 -
partition。 -
Kafka が生成する
timestampが必要ない場合は、タイムスタンプ。 -
headers.
-
-
KafkaMetadataUtilには、OutgoingKafkaRecordMetadataをMessageに書き込み、IncomingKafkaRecordMetadataをMessageから読み取るためのユーティリティーメソッドが含まれています。
Kafka にマップされていないチャネルに送信された Message に OutgoingKafkaRecordMetadata を書き込む場合、リアクティブメッセージングフレームワークはそれを無視します。逆に、Kafka にマップされていないチャネルからの Message から IncomingKafkaRecordMetadata を読み取ると、そのメッセージは null として返されます。
メッセージ key の書き込みと読み取りの方法の例
@Inject
@Channel("from-user")
Emitter<Integer> emitter;
@Incoming("from-user")
@Outgoing("to-kafka")
public Message<Integer> send(Message<Integer> msg) {
// Set the key in the metadata
OutgoingKafkaRecordMetadata<String> md =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey("KEY-" + i)
.build();
// Note that Message is immutable so the copy returned by this method
// call is not the same as the parameter to the method
return KafkaMetadataUtil.writeOutgoingKafkaMetadata(msg, md);
}
@Incoming("from-kafka")
public CompletionStage<Void> receive(Message<Integer> msg) {
IncomingKafkaRecordMetadata<String, Integer> metadata =
KafkaMetadataUtil.readIncomingKafkaMetadata(msg).get();
// We can now read the Kafka record key
String key = metadata.getKey();
// When using the Message wrapper around the payload we need to explicitly ack
// them
return msg.ack();
}
microprofile-config.properties ファイルの Kafka マッピングの例
kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=some-topic
mp.messaging.outgoing.to-kafka.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=some-topic
mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
送信チャネルには key.serializer を指定し、受信チャネルには key.deserializer を指定する必要があります。
14.5.4. Kafka コネクターの MicroProfile Config プロパティーファイルの例 リンクのコピーリンクがクリップボードにコピーされました!
これは、Kafka コネクターのシンプルな microprofile-config.properties ファイルの例です。そのプロパティーは、「外部メッセージングシステムと統合するための MicroProfile リアクティブメッセージングコネクター」に示されている例のプロパティーに対応しています。
kafka.bootstrap.servers=kafka:9092
mp.messaging.outgoing.to.connector=smallrye-kafka
mp.messaging.outgoing.to.topic=my-topic
mp.messaging.outgoing.to.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.incoming.from.connector=smallrye-kafka
mp.messaging.incoming.from.topic=my-topic
mp.messaging.incoming.from.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
| エントリー | 説明 |
|---|---|
|
| これらは "チャネル" です。 |
|
| これらは "メソッド" です。
|
|
|
これは、アプリケーションが接続する必要のある Kafka ブローカーの URL を指定します。次のように、チャネルレベルで URL を指定することもできます: |
|
|
これは
SmallRye リアクティブメッセージングは、アプリケーションをビルドするためのフレームワークです。 |
|
|
これは、 Kafka の "トピック" は、メッセージが保存および公開されるカテゴリーまたはフィード名です。すべての Kafka メッセージはトピックに編成されています。プロデューサーアプリケーションはトピックにデータ to を書き込み、コンシューマーアプリケーションは from トピックからデータを読み取ります。 |
|
|
これは、コネクターに |
|
|
これは、 |
|
|
これは、コネクターが |
|
|
これは、 |
このプロパティーのリストはすべてを網羅したものではありません。詳細は、SmallRye Reactive Messaging Apache Kafka のドキュメントを参照してください。
必須の MicroProfile Reactive Messaging 接頭辞
MicroProfile Reactive Messaging 仕様では、Kafka に次のメソッドプロパティーキー接頭辞が必要です。
-
mp.messaging.incoming.[channel-name].[attribute]=[value]` -
mp.messaging.outgoing.[channel-name].[attribute]=[value]` -
mp.messaging.connector.[connector-name].[attribute]=[value]`
channel-name は @Incoming.value() または @Outgoing.value() のいずれかであることに注意してください。
次に、次のメソッドペアの例を考えてみましょう。
@Outgoing("to")
public int send() {
int i = // Randomly generated...
return i;
}
@Incoming("from")
public void receive(int i) {
// Process payload
}
このメソッドペアの例では、次の必須のプロパティー接頭辞に注意してください。
-
mp.messaging.incoming.from.この接頭辞は、receive()メソッドの設定としてプロパティーを選択します。 -
mp.messaging.outgoing.to.この接頭辞は、send()メソッドの設定としてプロパティーを選択します。
14.5.4.1. 安全な MicroProfile Reactive Messaging Apache Kafka コネクターの設定 リンクのコピーリンクがクリップボードにコピーされました!
自己署名証明書を使用して Apache Kafka コネクタークライアントを設定するには、microprofile-config.properties ファイルで client-ssl-context を定義します。これはコネクターレベルとチャネルレベルで実行できます。
次の例は、SSL/TLS で保護された Apache Kafka コネクターを設定する方法を示しています。
コネクターレベルの client-ssl-context 定義の例
mp.messaging.incoming.from.security.protocol=SSL
mp.messaging.outgoing.to.security.protocol=SSL
mp.messaging.connector.smallrye-kafka.wildfly.elytron.ssl.context=exampleSSLContext
mp.messaging.connector.smallrye-kafka.wildfly.elytron.ssl.context 属性は、自己署名証明書を使用する場合にのみ必要です。
実稼働環境では自己署名証明書を使用しないでください。認証局 (CA) が署名した証明書のみ使用してください。
チャネルの client-ssl-context は次のように指定できます。
チャネルレベルの client-ssl-context 定義の例
mp.messaging.incoming.from.wildfly.elytron.ssl.context=exampleSSLContext
この例では、exampleSSLContext は、受信チャネル from にのみ関連付けられています。
| エントリー | 説明 |
|---|---|
|
| これは、ブローカーに接続するときにセキュアな受信チャネル接続を使用することを指定します。 |
|
| これは、ブローカーに接続するときにセキュアな送信チャネル接続を使用することを指定します。 |
|
| Kafka ブローカーが認証局 (CA) 署名付き証明書でセキュリティーが保護されている場合は、この属性を指定する必要はありません。 |
自己署名証明書を使用する場合は、管理モデルの /subsystem=elytron/client-ssl-context=* の下の Elytron サブシステムで定義されている SSLContext を指定します。
実稼働環境では自己署名証明書を使用しないでください。認証局 (CA) が署名した証明書のみ使用してください。
次の管理 CLI コマンドを使用して client-ssl-context を定義できます。
例
/subsystem=elytron/client-ssl-context=exampleSSLContext:add(key-manager=exampleServerKeyManager,trust-manager=exampleTLSTrustManager)
SCRAM-SHA-512 認証で SSL/TLS 接続を使用すると、SSL/TLS プロトコルは暗号化を提供しますが、認証には使用されません。SCRAM-SHA-512 プロトコルで接続を確立するには、Configure MicroProfile Reactive Messagaging Kafka connector to use SASL_PLAINTEXT and SASL_SSL authentication protocol を参照してください。
14.5.4.2. SASL_PLAINTEXT および SASL_SSL 認証プロトコルを使用するための MicroProfile Reactive Messagaging Kafka コネクターの設定 リンクのコピーリンクがクリップボードにコピーされました!
Apache Kafka は Simple Authentication and Security Layer (SASL)プロトコルを使用して、Apache Kafka リスナー に接続されているクライアントを認証します。暗号化されていない プレーン タイプと暗号化された tls タイプ通信を使用するように リスナー を設定できます。Streams for Apache Kafka は、Salted Challenged Response Authentication Mechanism (SCRAM)プロトコル(SASL SCRAM-SHA-512)と組み合わせて SASL を使用して認証を提供します。Kafka カスタムリソースと KafkaUser カスタムリソース YAML ファイルを定義して、両方のタイプの リスナー の認証を設定する必要があります。
SASL がサーバー上で設定されている場合、Kafka リスナー が SCRAM-SHA-512 認証を使用するように設定されている場合、クライアントはセキュリティープロトコルを指定する必要があります。TLS で暗号化された リスナー に接続する場合、このプロトコルは SASL_SSL である必要があります。リスナー が暗号化されていない場合、プロトコルは SASL_PLAINTEXT である必要があります。クライアント設定では、SCRAM-SHA-512 を使用するように SASL メカニズムを指定する必要があります。
SCRAM-SHA-512 認証が SSL/TLS 接続で使用されると、SSL/TLS プロトコルは暗号化を提供しますが、認証には使用されません。SSL/TLS を使用して接続をセキュリティー保護するには、セキュアな MicroProfile Reactive Messaging Apache Kafka コネクターの設定 を参照してください。
プレーン タイプの暗号化されていない通信および SASL SCRAM-SHA-512 認証を使用する リスナー にクライアント認証を設定します。
前提条件
Streams for Apache Kafka クラスター Operator がインストールされている。
注記クラスター Operator はデフォルトで
Secretリソースを生成します。クライアント設定のusernameおよびpasswordは、Secretリソースで定義されるユーザー名とパスワードと一致する必要があります。カスタム
シークレットリソースの作成の詳細については、カスタム パスワード設定 を参照 してください。Operator によって生成された
Secretリソースの詳細は、Operator によって 生成された Secret を参照して ください。
手順
Kafka カスタムリソースを YAML ファイルとして定義します。
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: myproject spec: kafka: # ... listeners: - name: plain port: 9092 type: internal tls: true authentication: type: scram-sha-512KafkaUser カスタムリソースを YAML ファイルとして定義します。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication: type: scram-sha-512KafkaUser
定義は.authenticationリスナー.authentication 定義と一致する必要があります。microprofile-config.propertiesファイルでクライアントを設定します。# General config to set up SASL over PLAINTEXT mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092 mp.messaging.connector.smallrye-kafka.sasl.mechanism=SCRAM-SHA-512 mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="my-user" \ password="my-password";注記ユーザー名とパスワードは、クラスターオペレーターによって生成されたSecretリソースのユーザー名とパスワードと一致する必要があります。
14.5.5. AMQP コネクターの MicroProfile Config プロパティーファイルの例 リンクのコピーリンクがクリップボードにコピーされました!
これは、Advanced Message Queuing Protocol (AMQP) コネクター用の単純な microprofile-config.properties ファイルの例です。そのプロパティーは、外部メッセージングシステムと統合するための MicroProfile リアクティブメッセージングコネクター に示された例のプロパティーに対応しています。
amqp-host=localhost
amqp-port=5672
amqp-username=artemis
amqp-password=artemis
mp.messaging.outgoing.to.connector=smallrye-amqp
mp.messaging.outgoing.to.address=my-topic
mp.messaging.incoming.from.connector=smallrye-amqp
mp.messaging.incoming.from.address=my-topic
| エントリー | 説明 |
|---|---|
|
| これらは "チャネル" です。 |
|
| これらは "メソッド" です。
|
|
|
これは、アプリケーションが接続する必要のある AMQP ブローカーの URL を指定します。 |
|
| これは、AMQP ブローカーのポートを指定します。 |
|
| これは、チャネルが AMQP にメッセージを送信することを示します。
SmallRye リアクティブメッセージングは、アプリケーションをビルドするためのフレームワークです。 |
|
|
これは、アドレス |
|
|
これは、AMQP ブローカーからメッセージを受信するために |
|
|
これは、 |
SmallRye Reactive Messaging の AMQP コネクターでサポートされているプロパティーの完全なリストは、SmallRye Reactive Messaging AMQP コネクター設定リファレンス を参照してください。
安全な AMQP ブローカーへの接続
SSL/TLS および Simple Authentication and Security Layer (SASL) で保護された AMQ ブローカーに接続するには、microprofile-config.properties ファイルで、接続に使用する client-ssl-context を定義します。これはコネクターレベルでもチャネルレベルでも実行できます。
コネクターレベルの client-ssl-context 定義の例
amqp-use-ssl=true
mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context=exampleSSLContext
mp.messaging.connector.smallrye-amqp.wildfly.elytron.ssl.context 属性は、自己署名証明書を使用する場合にのみ必要です。
実稼働環境では自己署名証明書を使用しないでください。認証局 (CA) によって署名された証明書のみ使用してください。
次のようにして、チャネルの client-ssl-context を指定することもできます。
チャネルレベルの client-ssl-context 定義の例
mp.messaging.incoming.from.wildfly.elytron.ssl.context=exampleSSLContext
この例では、exampleSSLContext は、受信チャネル from にのみ関連付けられています。
| エントリー | 説明 |
|---|---|
|
| これは、ブローカーに接続するときに安全な接続を使用することを指定します。 |
|
| AMQ ブローカーが証明機関 (CA) の署名付き証明書で保護されている場合は、この属性を指定する必要はありません。
自己署名証明書を使用する場合は、管理モデルの 重要 実稼働環境では自己署名証明書を使用しないでください。認証局 (CA) によって署名された証明書のみ使用してください。
次の管理 CLI コマンドを使用して
詳細は、JBoss EAP での SSL/TLS の設定 ガイドの クライアント証明書の信頼ストアと信頼マネージャーの設定 および 双方向 SSL/TLS 用のサーバー証明書の設定 を参照してください。 |