5.13. リアクティブアプリケーションでの AMQP の使用


Spring Boot HTTP コントローラーで AMQP Client Starter を使用して、簡易メッセージングリアクティブアプリケーションを開発します。このサンプルアプリケーションは、2 つのメッセージングキューとブローカーを使用する Publisher-Subscriber メッセージング統合パターンで 2 つのサービスを統合します。

この例は、AMQP メッセージングを使用して統合された 2 つのサービスで設定される Reactor Netty 上の Spring Boot および Eclipse Vert.x で基本的なアプリケーションを作成する方法を示しています。アプリケーションは以下のコンポーネントで設定されます。

  • テキスト文字列をアプリケーションに送信するために使用できるフロントエンドサービス
  • 文字列を大文字に変換するバックエンドサービス
  • サービス間でマッサージをルーティングし、要求キューと応答キューを管理する Artemis AMQP ブローカー。
  • Spring Boot HTTP Starter によって提供される HTTP コントローラー

前提条件

  • Spring Boot を使用するよう設定された Maven ベースの Java アプリケーションプロジェクト
  • JDK 8 または JDK 11 がインストールされている。
  • Maven がインストールされている。

手順

  1. 以下の依存関係をアプリケーションプロジェクトの pom.xml ファイルに追加します。

    pom.xml

    ...
    <dependencies>
      ...
       <dependency>
          <groupId>dev.snowdrop</groupId>
          <artifactId>vertx-spring-boot-starter-http</artifactId>
       </dependency>
       <dependency>
           <groupId>dev.snowdrop</groupId>
           <artifactId>vertx-spring-boot-starter-amqp</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-artemis</artifactId>
       </dependency>
       <dependency>
           <groupId>org.apache.activemq</groupId>
           <artifactId>artemis-jms-server</artifactId>
       </dependency>
       <dependency>
           <groupId>org.apache.activemq</groupId>
           <artifactId>artemis-amqp-protocol</artifactId>
         <exclusions>
           <exclusion>
             <groupId>org.apache.qpid</groupId>
             <artifactId>proton-j</artifactId>
           </exclusion>
         </exclusions>
       </dependency>
      ...
    </dependencies>
    ...
    Copy to Clipboard Toggle word wrap

  2. サンプルアプリケーションのメインクラスファイルを作成します。このクラスには、要求と結果に対応する処理キューを定義するメソッドが含まれています。

    /src/main/java/AmqpExampleApplication.java

    package dev.snowdrop.AmqpExampleApplication.java;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import dev.snowdrop.vertx.amqp.AmqpProperties;
    import org.apache.activemq.artemis.api.core.TransportConfiguration;
    import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.autoconfigure.jms.artemis.ArtemisConfigurationCustomizer;
    import org.springframework.context.annotation.Bean;
    
    @SpringBootApplication
    public class AmqpExampleApplication {
    
        final static String PROCESSING_REQUESTS_QUEUE = "processing-requests";
    
        final static String PROCESSING_RESULTS_QUEUE = "processing-results";
    
        public static void main(String[] args) {
            SpringApplication.run(AmqpExampleApplication.class, args);
        }
    
        /**
         * Add Netty acceptor to the embedded Artemis server.
         */
        @Bean
        public ArtemisConfigurationCustomizer artemisConfigurationCustomizer(AmqpProperties properties) {
            Map<String, Object> params = new HashMap<>();
            params.put("host", properties.getHost());
            params.put("port", properties.getPort());
    
            return configuration -> configuration
                .addAcceptorConfiguration(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));
        }
    }
    Copy to Clipboard Toggle word wrap

  3. GET および POST リクエストを処理する REST エンドポイントを公開して、要求キューと応答キューを管理する HTTP REST コントローラーのコードを含むクラスファイルを作成します。

    /src/main/java/Controller.java

    package dev.snowdrop.vertx.sample.amqp;
    
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE;
    
    /**
     * Rest controller exposing GET and POST resources to receive processed messages and submit messages for processing.
     */
    @RestController
    public class Controller {
    
        private final MessagesManager messagesManager;
    
        public Controller(MessagesManager messagesManager) {
            this.messagesManager = messagesManager;
        }
    
        /**
         * Get a flux of messages processed up to this point.
         */
        @GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
        public Flux<String> getProcessedMessages() {
            return Flux.fromIterable(messagesManager.getProcessedMessages());
        }
    
        /**
         * Submit a message for processing by publishing it to a processing requests queue.
         */
        @PostMapping
        public Mono<Void> submitMessageForProcessing(@RequestBody String body) {
            return messagesManager.processMessage(body.trim());
        }
    }
    Copy to Clipboard Toggle word wrap

  4. メッセージングマネージャーを含むクラスファイルを作成します。Manager は、アプリケーションコンポーネントがリクエストキューにリクエストを公開する方法を制御し、その後に応答キューにサブスクライブして処理された結果を取得します。

    /src/main/java/MessagesManager.java:

    package dev.snowdrop.vertx.sample.amqp;
    
    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    import dev.snowdrop.vertx.amqp.AmqpClient;
    import dev.snowdrop.vertx.amqp.AmqpMessage;
    import dev.snowdrop.vertx.amqp.AmqpSender;
    import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.stereotype.Component;
    import reactor.core.Disposable;
    import reactor.core.publisher.Mono;
    
    import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_REQUESTS_QUEUE;
    import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_RESULTS_QUEUE;
    
    /**
     * Processor client submits messages to the requests queue and subscribes to the results queue for processed messages.
     */
    @Component
    public class MessagesManager implements InitializingBean, DisposableBean {
    
        private final Logger logger = LoggerFactory.getLogger(MessagesManager.class);
    
        private final List<String> processedMessages = new CopyOnWriteArrayList<>();
    
        private final AmqpClient client;
    
        private Disposable receiverDisposer;
    
        // Injecting EmbeddedActiveMQ to make sure it has started before creating this component.
        public MessagesManager(AmqpClient client, EmbeddedActiveMQ server) {
            this.client = client;
        }
    
        /**
         * Create a processed messages receiver and subscribe to its messages publisher.
         */
        @Override
        public void afterPropertiesSet() {
            receiverDisposer = client.createReceiver(PROCESSING_RESULTS_QUEUE)
                .flatMapMany(receiver -> receiver.flux()
                    .doOnCancel(() -> receiver.close().block())) // Close the receiver once subscription is disposed
                .subscribe(this::handleMessage);
        }
    
        /**
         * Cancel processed messages publisher subscription.
         */
        @Override
        public void destroy() {
            if (receiverDisposer != null) {
                receiverDisposer.dispose();
            }
        }
    
        /**
         * Get messages which were processed up to this moment.
         *
         * @return List of processed messages.
         */
        public List<String> getProcessedMessages() {
            return processedMessages;
        }
    
        /**
         * Submit a message for processing by publishing it to a processing requests queue.
         *
         * @param body Message body to be processed.
         * @return Mono which is completed once the message is sent.
         */
        public Mono<Void> processMessage(String body) {
            logger.info("Sending message '{}' for processing", body);
    
            AmqpMessage message = AmqpMessage.create()
                .withBody(body)
                .build();
    
            return client.createSender(PROCESSING_REQUESTS_QUEUE)
                .map(sender -> sender.send(message))
                .flatMap(AmqpSender::close);
        }
    
        private void handleMessage(AmqpMessage message) {
            String body = message.bodyAsString();
    
            logger.info("Received processed message '{}'", body);
            processedMessages.add(body);
        }
    }
    Copy to Clipboard Toggle word wrap

  5. 要求キューからテキスト文字列を受け取る大文字プロセッサーを含むクラスファイルを作成し、それを大文字に変換します。その後、プロセッサーは結果を応答キューに公開します。

    /src/main/java/UppercaseProcessor.java

    package dev.snowdrop.vertx.sample.amqp;
    
    import dev.snowdrop.vertx.amqp.AmqpClient;
    import dev.snowdrop.vertx.amqp.AmqpMessage;
    import dev.snowdrop.vertx.amqp.AmqpSender;
    import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.stereotype.Component;
    import reactor.core.Disposable;
    import reactor.core.publisher.Mono;
    
    import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_REQUESTS_QUEUE;
    import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_RESULTS_QUEUE;
    
    /**
     * Uppercase processor subscribes to the requests queue, converts each received message to uppercase and send it to the
     * results queue.
     */
    @Component
    public class UppercaseProcessor implements InitializingBean, DisposableBean {
    
        private final Logger logger = LoggerFactory.getLogger(UppercaseProcessor.class);
    
        private final AmqpClient client;
    
        private Disposable receiverDisposer;
    
        // Injecting EmbeddedActiveMQ to make sure it has started before creating this component.
        public UppercaseProcessor(AmqpClient client, EmbeddedActiveMQ server) {
            this.client = client;
        }
    
        /**
         * Create a processing requests receiver and subscribe to its messages publisher.
         */
        @Override
        public void afterPropertiesSet() {
            receiverDisposer = client.createReceiver(PROCESSING_REQUESTS_QUEUE)
                .flatMapMany(receiver -> receiver.flux()
                    .doOnCancel(() -> receiver.close().block())) // Close the receiver once subscription is disposed
                .flatMap(this::handleMessage)
                .subscribe();
        }
    
        /**
         * Cancel processing requests publisher subscription.
         */
        @Override
        public void destroy() {
            if (receiverDisposer != null) {
                receiverDisposer.dispose();
            }
        }
    
        /**
         * Convert the message body to uppercase and send it to the results queue.
         */
        private Mono<Void> handleMessage(AmqpMessage originalMessage) {
            logger.info("Processing '{}'", originalMessage.bodyAsString());
    
            AmqpMessage processedMessage = AmqpMessage.create()
                .withBody(originalMessage.bodyAsString().toUpperCase())
                .build();
    
            return client.createSender(PROCESSING_RESULTS_QUEUE)
                .map(sender -> sender.send(processedMessage))
                .flatMap(AmqpSender::close);
        }
    }
    Copy to Clipboard Toggle word wrap

  6. オプション: アプリケーションをローカルで実行し、テストします。

    1. Maven プロジェクトのルートディレクトリーへ移動します。

      $ cd myApp
      Copy to Clipboard Toggle word wrap
    2. アプリケーションをパッケージ化します。

      $ mvn clean package
      Copy to Clipboard Toggle word wrap
    3. コマンドラインからアプリケーションを起動します。

      $ java -jar target/vertx-spring-boot-sample-amqp.jar
      Copy to Clipboard Toggle word wrap
    4. 新しいターミナルウィンドウで、localhost へ処理されるテキスト文字列が含まれる HTTP POST リクエストを多数送信します。

      $ curl -H "Content-Type: text/plain" -d 'Hello, World' -X POST http://localhost:8080
      $ curl -H "Content-Type: text/plain" -d 'Hello again' -X POST http://localhost:8080
      Copy to Clipboard Toggle word wrap
    5. HTTP GET 要求を localhost に送信します。文字列を大文字で HTTP 応答を受け取ります。

      $ curl http://localhost:8080
      HTTP/1.1 200 OK
      Content-Type: text/event-stream;charset=UTF-8
      transfer-encoding: chunked
      
      data:HELLO, WORLD
      
      data:HELLO AGAIN
      Copy to Clipboard Toggle word wrap

その他のリソース

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat