Red Hat Camel K is deprecated
Red Hat Camel K is deprecated and the End of Life date for this product is June 30, 2025. For help migrating to the current go-to solution, Red Hat build of Apache Camel, see the Migration Guide.5.2. 运行 Kafka 集成
运行制作者集成
创建制作者集成示例。这会使用一条消息填充主题,每 10 秒填充一次。
Sample SaslSSLKafkaProducer.java
// kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003 import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaConstants; public class SaslSSLKafkaProducer extends RouteBuilder { @Override public void configure() throws Exception { log.info("About to start route: Timer -> Kafka "); from("timer:foo") .routeId("FromTimer2Kafka") .setBody() .simple("Message #${exchangeProperty.CamelTimerCounter}") .to("kafka:{{producer.topic}}") .log("Message correctly sent to the topic!"); } }
然后运行流程集成。
kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev
生产者将创建新消息并推送到主题并记录一些信息。
[2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:11,973 INFO [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:12,970 INFO [FromTimer2Kafka] (Camel (camel-1) thread #7 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:13,970 INFO [FromTimer2Kafka] (Camel (camel-1) thread #9 - KafkaProducer[test]) Message correctly sent to the topic!
运行消费者集成
创建消费者集成。
Sample SaslSSLKafkaProducer.java
// kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003 import org.apache.camel.builder.RouteBuilder; public class SaslSSLKafkaConsumer extends RouteBuilder { @Override public void configure() throws Exception { log.info("About to start route: Kafka -> Log "); from("kafka:{{consumer.topic}}") .routeId("FromKafka2Log") .log("${body}"); } }
打开另一个 shell 并运行以下命令运行使用者集成:
kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev
消费者将开始记录主题中找到的事件:
[1] 2021-05-06 08:51:08,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8 [1] 2021-05-06 08:51:10,065 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9 [1] 2021-05-06 08:51:10,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10 [1] 2021-05-06 08:51:11,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #11