5.13. リアクティブアプリケーションでの AMQP の使用
Spring Boot HTTP コントローラーで AMQP Client Starter を使用して、簡易メッセージングリアクティブアプリケーションを開発します。このサンプルアプリケーションは、2 つのメッセージングキューとブローカーを使用する Publisher-Subscriber メッセージング統合パターンで 2 つのサービスを統合します。
この例は、AMQP メッセージングを使用して統合された 2 つのサービスで設定される Reactor Netty 上の Spring Boot および Eclipse Vert.x で基本的なアプリケーションを作成する方法を示しています。アプリケーションは以下のコンポーネントで設定されます。
- テキスト文字列をアプリケーションに送信するために使用できるフロントエンドサービス
- 文字列を大文字に変換するバックエンドサービス
- サービス間でマッサージをルーティングし、要求キューと応答キューを管理する Artemis AMQP ブローカー。
- Spring Boot HTTP Starter によって提供される HTTP コントローラー
前提条件
- Spring Boot を使用するよう設定された Maven ベースの Java アプリケーションプロジェクト
- JDK 8 または JDK 11 がインストールされている。
- Maven がインストールされている。
手順
以下の依存関係をアプリケーションプロジェクトの
pom.xmlファイルに追加します。pom.xml
... <dependencies> ... <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-http</artifactId> </dependency> <dependency> <groupId>dev.snowdrop</groupId> <artifactId>vertx-spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-artemis</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-jms-server</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-amqp-protocol</artifactId> <exclusions> <exclusion> <groupId>org.apache.qpid</groupId> <artifactId>proton-j</artifactId> </exclusion> </exclusions> </dependency> ... </dependencies> ...サンプルアプリケーションのメインクラスファイルを作成します。このクラスには、要求と結果に対応する処理キューを定義するメソッドが含まれています。
/src/main/java/AmqpExampleApplication.java
package dev.snowdrop.AmqpExampleApplication.java; import java.util.HashMap; import java.util.Map; import dev.snowdrop.vertx.amqp.AmqpProperties; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jms.artemis.ArtemisConfigurationCustomizer; import org.springframework.context.annotation.Bean; @SpringBootApplication public class AmqpExampleApplication { final static String PROCESSING_REQUESTS_QUEUE = "processing-requests"; final static String PROCESSING_RESULTS_QUEUE = "processing-results"; public static void main(String[] args) { SpringApplication.run(AmqpExampleApplication.class, args); } /** * Add Netty acceptor to the embedded Artemis server. */ @Bean public ArtemisConfigurationCustomizer artemisConfigurationCustomizer(AmqpProperties properties) { Map<String, Object> params = new HashMap<>(); params.put("host", properties.getHost()); params.put("port", properties.getPort()); return configuration -> configuration .addAcceptorConfiguration(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params)); } }GET および POST リクエストを処理する REST エンドポイントを公開して、要求キューと応答キューを管理する HTTP REST コントローラーのコードを含むクラスファイルを作成します。
/src/main/java/Controller.java
package dev.snowdrop.vertx.sample.amqp; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE; /** * Rest controller exposing GET and POST resources to receive processed messages and submit messages for processing. */ @RestController public class Controller { private final MessagesManager messagesManager; public Controller(MessagesManager messagesManager) { this.messagesManager = messagesManager; } /** * Get a flux of messages processed up to this point. */ @GetMapping(produces = TEXT_EVENT_STREAM_VALUE) public Flux<String> getProcessedMessages() { return Flux.fromIterable(messagesManager.getProcessedMessages()); } /** * Submit a message for processing by publishing it to a processing requests queue. */ @PostMapping public Mono<Void> submitMessageForProcessing(@RequestBody String body) { return messagesManager.processMessage(body.trim()); } }メッセージングマネージャーを含むクラスファイルを作成します。Manager は、アプリケーションコンポーネントがリクエストキューにリクエストを公開する方法を制御し、その後に応答キューにサブスクライブして処理された結果を取得します。
/src/main/java/MessagesManager.java:
package dev.snowdrop.vertx.sample.amqp; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import dev.snowdrop.vertx.amqp.AmqpClient; import dev.snowdrop.vertx.amqp.AmqpMessage; import dev.snowdrop.vertx.amqp.AmqpSender; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import reactor.core.Disposable; import reactor.core.publisher.Mono; import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_REQUESTS_QUEUE; import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_RESULTS_QUEUE; /** * Processor client submits messages to the requests queue and subscribes to the results queue for processed messages. */ @Component public class MessagesManager implements InitializingBean, DisposableBean { private final Logger logger = LoggerFactory.getLogger(MessagesManager.class); private final List<String> processedMessages = new CopyOnWriteArrayList<>(); private final AmqpClient client; private Disposable receiverDisposer; // Injecting EmbeddedActiveMQ to make sure it has started before creating this component. public MessagesManager(AmqpClient client, EmbeddedActiveMQ server) { this.client = client; } /** * Create a processed messages receiver and subscribe to its messages publisher. */ @Override public void afterPropertiesSet() { receiverDisposer = client.createReceiver(PROCESSING_RESULTS_QUEUE) .flatMapMany(receiver -> receiver.flux() .doOnCancel(() -> receiver.close().block())) // Close the receiver once subscription is disposed .subscribe(this::handleMessage); } /** * Cancel processed messages publisher subscription. */ @Override public void destroy() { if (receiverDisposer != null) { receiverDisposer.dispose(); } } /** * Get messages which were processed up to this moment. * * @return List of processed messages. */ public List<String> getProcessedMessages() { return processedMessages; } /** * Submit a message for processing by publishing it to a processing requests queue. * * @param body Message body to be processed. * @return Mono which is completed once the message is sent. */ public Mono<Void> processMessage(String body) { logger.info("Sending message '{}' for processing", body); AmqpMessage message = AmqpMessage.create() .withBody(body) .build(); return client.createSender(PROCESSING_REQUESTS_QUEUE) .map(sender -> sender.send(message)) .flatMap(AmqpSender::close); } private void handleMessage(AmqpMessage message) { String body = message.bodyAsString(); logger.info("Received processed message '{}'", body); processedMessages.add(body); } }要求キューからテキスト文字列を受け取る大文字プロセッサーを含むクラスファイルを作成し、それを大文字に変換します。その後、プロセッサーは結果を応答キューに公開します。
/src/main/java/UppercaseProcessor.java
package dev.snowdrop.vertx.sample.amqp; import dev.snowdrop.vertx.amqp.AmqpClient; import dev.snowdrop.vertx.amqp.AmqpMessage; import dev.snowdrop.vertx.amqp.AmqpSender; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import reactor.core.Disposable; import reactor.core.publisher.Mono; import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_REQUESTS_QUEUE; import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_RESULTS_QUEUE; /** * Uppercase processor subscribes to the requests queue, converts each received message to uppercase and send it to the * results queue. */ @Component public class UppercaseProcessor implements InitializingBean, DisposableBean { private final Logger logger = LoggerFactory.getLogger(UppercaseProcessor.class); private final AmqpClient client; private Disposable receiverDisposer; // Injecting EmbeddedActiveMQ to make sure it has started before creating this component. public UppercaseProcessor(AmqpClient client, EmbeddedActiveMQ server) { this.client = client; } /** * Create a processing requests receiver and subscribe to its messages publisher. */ @Override public void afterPropertiesSet() { receiverDisposer = client.createReceiver(PROCESSING_REQUESTS_QUEUE) .flatMapMany(receiver -> receiver.flux() .doOnCancel(() -> receiver.close().block())) // Close the receiver once subscription is disposed .flatMap(this::handleMessage) .subscribe(); } /** * Cancel processing requests publisher subscription. */ @Override public void destroy() { if (receiverDisposer != null) { receiverDisposer.dispose(); } } /** * Convert the message body to uppercase and send it to the results queue. */ private Mono<Void> handleMessage(AmqpMessage originalMessage) { logger.info("Processing '{}'", originalMessage.bodyAsString()); AmqpMessage processedMessage = AmqpMessage.create() .withBody(originalMessage.bodyAsString().toUpperCase()) .build(); return client.createSender(PROCESSING_RESULTS_QUEUE) .map(sender -> sender.send(processedMessage)) .flatMap(AmqpSender::close); } }オプション: アプリケーションをローカルで実行し、テストします。
Maven プロジェクトのルートディレクトリーへ移動します。
$ cd myAppアプリケーションをパッケージ化します。
$ mvn clean packageコマンドラインからアプリケーションを起動します。
$ java -jar target/vertx-spring-boot-sample-amqp.jar新しいターミナルウィンドウで、
localhostへ処理されるテキスト文字列が含まれる HTTPPOSTリクエストを多数送信します。$ curl -H "Content-Type: text/plain" -d 'Hello, World' -X POST http://localhost:8080 $ curl -H "Content-Type: text/plain" -d 'Hello again' -X POST http://localhost:8080HTTP
GET要求をlocalhostに送信します。文字列を大文字で HTTP 応答を受け取ります。$ curl http://localhost:8080 HTTP/1.1 200 OK Content-Type: text/event-stream;charset=UTF-8 transfer-encoding: chunked data:HELLO, WORLD data:HELLO AGAIN
その他のリソース
- Fabric8 Maven プラグインを使用して、アプリケーションを OpenShift クラスターにデプロイ できます。
- また、スタンドアロンの Red Hat Enterprise Linux でのデプロイメント 用にアプリケーションを設定することもできます。