5.6. Kafka インテグレーションの実行
プロデューサーインテグレーションの実行
サンプルプロデューサーインテグレーションを作成します。これにより、トピックには 10 秒ごとにメッセージが表示されます。
Sample SaslSSLKafkaProducer.java
// kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003 import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaConstants; public class SaslSSLKafkaProducer extends RouteBuilder { @Override public void configure() throws Exception { log.info("About to start route: Timer -> Kafka "); from("timer:foo") .routeId("FromTimer2Kafka") .setBody() .simple("Message #${exchangeProperty.CamelTimerCounter}") .to("kafka:{{producer.topic}}") .log("Message correctly sent to the topic!"); } }
その後、プロデューサーインテグレーションを実行します。
kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev
プロデューサーは新しいメッセージを作成し、トピックにプッシュし、一部の情報をログに記録します。
[2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:11,973 INFO [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:12,970 INFO [FromTimer2Kafka] (Camel (camel-1) thread #7 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:13,970 INFO [FromTimer2Kafka] (Camel (camel-1) thread #9 - KafkaProducer[test]) Message correctly sent to the topic!
コンシューマーインテグレーションの実行
コンシューマーインテグレーションを作成します。
Sample SaslSSLKafkaProducer.java
// kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003 import org.apache.camel.builder.RouteBuilder; public class SaslSSLKafkaConsumer extends RouteBuilder { @Override public void configure() throws Exception { log.info("About to start route: Kafka -> Log "); from("kafka:{{consumer.topic}}") .routeId("FromKafka2Log") .log("${body}"); } }
別のシェルを開き、コマンドを使用してコンシューマーインテグレーションを実行します。
kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev
コンシューマーは、トピックにあるイベントのロギングを開始します。
[1] 2021-05-06 08:51:08,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8 [1] 2021-05-06 08:51:10,065 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9 [1] 2021-05-06 08:51:10,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10 [1] 2021-05-06 08:51:11,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #11