5.16. リアクティブアプリケーションでの Kafka の使用


この例は、Reactor Netty 上の Spring Boot および Eclipse Vert.x で Apache Kafka を使用するメッセージングアプリケーションのサンプルを作成する方法を示しています。アプリケーションはメッセージを Kafka トピックに公開してから、リクエストの送信時にメッセージを取得して表示します。

Kafka クラスターによって使用されるメッセージトピック、URL、およびメタデータの Kafka 設定プロパティーは src/main/resources/application.yml に保存されます。

前提条件

  • Spring Boot を使用するよう設定された Maven ベースの Java アプリケーションプロジェクト
  • JDK 8 または JDK 11 がインストールされている。
  • Maven がインストールされている。

手順

  1. WebFlux HTTP Starter および Apache Kafka Starter をアプリケーションプロジェクトの pom.xml ファイルに依存関係として追加します。

    pom.xml

    ...
    <dependencies>
      ...
      <!-- Vert.x WebFlux starter used to handle HTTP requests -->
      <dependency>
        <groupId>dev.snowdrop</groupId>
        <artifactId>vertx-spring-boot-starter-http</artifactId>
      </dependency>
      <!-- Vert.x Kafka starter used to send and receive messages to/from Kafka cluster -->
      <dependency>
        <groupId>dev.snowdrop</groupId>
        <artifactId>vertx-spring-boot-starter-kafka</artifactId>
      </dependency>
      ...
    </dependencies>
    ...
    Copy to Clipboard Toggle word wrap

  1. KafkaLogger クラスを作成します。このクラスはプロデューサーと sendas メッセージを機能させます。KafkaLogger クラスは、Producer がメッセージ (別名レコード) をトピックに公開する方法を定義します。

    /src/main/java/KafkaLogger.java

    ...
    final class KafkaLogger {
    
        private final KafkaProducer<String, String> producer;
    
        KafkaLogger(KafkaProducer<String, String> producer) {
            this.producer = producer;
        }
    
        public Mono<Void> logMessage(String body) {
            // Generic key and value types can be inferred if both key and value are used to create a builder
            ProducerRecord<String, String> record = ProducerRecord.<String, String>builder(LOG_TOPIC, body).build();
    
            return producer.send(record)
                .log("Kafka logger producer")
                .then();
        }
    }
    ...
    Copy to Clipboard Toggle word wrap

  2. KafkaLog クラスを作成します。このクラスは、kafka メッセージのコンシューマーとして機能します。KafkaLog は、トピックからメッセージを取得してターミナルに表示されるメッセージを取得します。

    /src/main/java/KafkaLog.java

    ...
    final class KafkaLog implements InitializingBean, DisposableBean {
    
        private final List<String> messages = new CopyOnWriteArrayList<>();
    
        private final KafkaConsumer<String, String> consumer;
    
        private Disposable consumerDisposer;
    
        KafkaLog(KafkaConsumer<String, String> consumer) {
            this.consumer = consumer;
        }
    
        @Override
        public void afterPropertiesSet() {
            consumerDisposer = consumer.subscribe(LOG_TOPIC)
                .thenMany(consumer.flux())
                .log("Kafka log consumer")
                .map(ConsumerRecord::value)
                .subscribe(messages::add);
        }
    
        @Override
        public void destroy() {
            if (consumerDisposer != null) {
                consumerDisposer.dispose();
            }
            consumer.unsubscribe()
                .block(Duration.ofSeconds(2));
        }
    
        public List<String> getMessages() {
            return messages;
        }
    }
    ...
    Copy to Clipboard Toggle word wrap

  3. HTTP REST コントローラーが含まれるクラスファイルを作成します。アプリケーションがメッセージのロギングや読み取りを処理するために使用する REST リソースを公開するコントローラー。

    /src/main/java/Controller.java

    package dev.snowdrop.vertx.sample.kafka;
    
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE;
    
    /**
     * HTTP controller exposes GET and POST resources to log messages and to receive the previously logged ones.
     */
    @RestController
    public class Controller {
    
        private final KafkaLogger logger;
    
        private final KafkaLog log;
    
        public Controller(KafkaLogger logger, KafkaLog log) {
            this.logger = logger;
            this.log = log;
        }
    
        /**
         * Get a Flux of previously logged messages.
         */
        @GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
        public Flux<String> getMessages() {
            return Flux.fromIterable(log.getMessages());
        }
    
        /**
         * Log a message.
         */
        @PostMapping
        public Mono<Void> logMessage(@RequestBody String body) {
            return logger.logMessage(body.trim());
        }
    }
    Copy to Clipboard Toggle word wrap

  4. Apache Kafka Cluster のプロデューサーおよびコンシューマーが使用する URL が含まれる YAML テンプレートを作成して、メッセージのログおよび読み取りを行います。この例では、Apache Kafka Cluster のコンシューマーおよびプロデューサーは、デフォルトで localhost でポート 9092 を使用して通信します。以下の例に示すように、プロデューサーとコンシューマーを個別に設定する必要があります。

    /src/main/resources/application.yml

    vertx:
      kafka:
        producer:
          bootstrap:
             # The producer in your cluster uses this URL to publish messages to the log.
            servers: localhost:9092
          key:
              # This class assigns the mandatory key attribute that is assigned to each message.
            serializer: org.apache.kafka.common.serialization.StringSerializer
          value:
              # This class assigns the mandatory value attribute that is assigned to each message.
            serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          bootstrap:
            servers: localhost:9092 # The consumer in your cluster uses this URL to read messages from the log.
          group:
            id: log # The consumer group IDs used to define a group of consumers that subscribe to the same topic. In this example, all consumers belong in the same consumer group.
          key:
            deserializer: org.apache.kafka.common.serialization.StringDeserializer # This class generates the mandatory key attribute that is assigned to each message.
          value:
            deserializer: org.apache.kafka.common.serialization.StringDeserializer # This class generates the mandatory value attribute that is assigned to each message.
    Copy to Clipboard Toggle word wrap

  5. オプション: アプリケーションをローカルで実行し、テストします。

    1. Maven プロジェクトのルートディレクトリーへ移動します。

      $ cd vertx-spring-boot-sample-kafka
      Copy to Clipboard Toggle word wrap
    2. アプリケーションをパッケージ化します。

      $ mvn clean package
      Copy to Clipboard Toggle word wrap
    3. コマンドラインからアプリケーションを起動します。

      $ java -jar target/vertx-spring-boot-sample-kafka.jar
      Copy to Clipboard Toggle word wrap
    4. 新しいターミナルウィンドウで、テキスト文字列としてフォーマットされたメッセージが含まれる HTTP POST リクエストを localhost に送信します。メッセージはすべて log トピックに公開されます。

      $ curl -H "Content-Type: text/plain" -d 'Hello, World' -X POST http://localhost:8080
      $ curl -H "Content-Type: text/plain" -d 'Hello again' -X POST http://localhost:8080
      ...
      Copy to Clipboard Toggle word wrap
    5. HTTP GET 要求を localhost に送信します。コンシューマーがサブスクライブするトピック内の全メッセージが含まれる HTTP 応答を受け取ります。

      $ curl http://localhost:8080
      HTTP/1.1 200 OK
      Content-Type: text/event-stream;charset=UTF-8
      transfer-encoding: chunked
      
      data:Hello, World
      
      data:Hello, again
      ...
      Copy to Clipboard Toggle word wrap

その他のリソース

サンプルを使用 する他に、Eclipse Vert.x と Spring Boot を使用して、ゼロから新しい Spring Boot アプリケーションを作成し、それらを OpenShift にデプロイすることもできます。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat