5.6. Kafka インテグレーションの実行


プロデューサーインテグレーションの実行

  1. サンプルプロデューサーインテグレーションを作成します。これにより、トピックには 10 秒ごとにメッセージが表示されます。

    Sample SaslSSLKafkaProducer.java

    // kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev
    // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003
    
    import org.apache.camel.builder.RouteBuilder;
    import org.apache.camel.component.kafka.KafkaConstants;
    
    public class SaslSSLKafkaProducer extends RouteBuilder {
      @Override
      public void configure() throws Exception {
      log.info("About to start route: Timer -> Kafka ");
      from("timer:foo")
        .routeId("FromTimer2Kafka")
        .setBody()
          .simple("Message #${exchangeProperty.CamelTimerCounter}")
        .to("kafka:{{producer.topic}}")
        .log("Message correctly sent to the topic!");
      }
    }

  2. その後、プロデューサーインテグレーションを実行します。

    kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev

    プロデューサーは新しいメッセージを作成し、トピックにプッシュし、一部の情報をログに記録します。

    [2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:11,973 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:12,970 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #7 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:13,970 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #9 - KafkaProducer[test]) Message correctly sent to the topic!

コンシューマーインテグレーションの実行

  1. コンシューマーインテグレーションを作成します。

    Sample SaslSSLKafkaProducer.java

    // kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev
    // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003
    
    import org.apache.camel.builder.RouteBuilder;
    
    public class SaslSSLKafkaConsumer extends RouteBuilder {
      @Override
      public void configure() throws Exception {
    	log.info("About to start route: Kafka -> Log ");
    	from("kafka:{{consumer.topic}}")
        .routeId("FromKafka2Log")
        .log("${body}");
      }
    }

  2. 別のシェルを開き、コマンドを使用してコンシューマーインテグレーションを実行します。

    kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev

    コンシューマーは、トピックにあるイベントのロギングを開始します。

    [1] 2021-05-06 08:51:08,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8
    [1] 2021-05-06 08:51:10,065 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9
    [1] 2021-05-06 08:51:10,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10
    [1] 2021-05-06 08:51:11,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #11
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.