Red Hat Camel K is no longer supported.
As of June 30, 2025, Red Hat build of Camel K has reached End of Life. The suggested replacements is Red Hat build of Apache Camel. For details about moving, see the Camel K to Camel Quarkus migration guide.5.2. Kafka 통합 실행
생산자 통합 실행
샘플 생산자 통합을 생성합니다. 이렇게 하면 주제가 10초마다 메시지로 채워집니다.
Sample SaslSSLKafkaProducer.java
// kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003 import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaConstants; public class SaslSSLKafkaProducer extends RouteBuilder { @Override public void configure() throws Exception { log.info("About to start route: Timer -> Kafka "); from("timer:foo") .routeId("FromTimer2Kafka") .setBody() .simple("Message #${exchangeProperty.CamelTimerCounter}") .to("kafka:{{producer.topic}}") .log("Message correctly sent to the topic!"); } }
// kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003 import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaConstants; public class SaslSSLKafkaProducer extends RouteBuilder { @Override public void configure() throws Exception { log.info("About to start route: Timer -> Kafka "); from("timer:foo") .routeId("FromTimer2Kafka") .setBody() .simple("Message #${exchangeProperty.CamelTimerCounter}") .to("kafka:{{producer.topic}}") .log("Message correctly sent to the topic!"); } }
Copy to Clipboard Copied! 그런 다음 절차 통합을 실행합니다.
kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev
kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev
Copy to Clipboard Copied! 생산자는 새 메시지를 생성하고 항목에 푸시하고 일부 정보를 기록합니다.
[2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:11,973 INFO [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:12,970 INFO [FromTimer2Kafka] (Camel (camel-1) thread #7 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:13,970 INFO [FromTimer2Kafka] (Camel (camel-1) thread #9 - KafkaProducer[test]) Message correctly sent to the topic!
[2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:11,854 INFO [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:11,973 INFO [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:12,970 INFO [FromTimer2Kafka] (Camel (camel-1) thread #7 - KafkaProducer[test]) Message correctly sent to the topic! [2] 2021-05-06 08:48:13,970 INFO [FromTimer2Kafka] (Camel (camel-1) thread #9 - KafkaProducer[test]) Message correctly sent to the topic!
Copy to Clipboard Copied!
소비자 통합 실행
소비자 통합을 생성합니다.
Sample SaslSSLKafkaProducer.java
// kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003 import org.apache.camel.builder.RouteBuilder; public class SaslSSLKafkaConsumer extends RouteBuilder { @Override public void configure() throws Exception { log.info("About to start route: Kafka -> Log "); from("kafka:{{consumer.topic}}") .routeId("FromKafka2Log") .log("${body}"); } }
// kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003 import org.apache.camel.builder.RouteBuilder; public class SaslSSLKafkaConsumer extends RouteBuilder { @Override public void configure() throws Exception { log.info("About to start route: Kafka -> Log "); from("kafka:{{consumer.topic}}") .routeId("FromKafka2Log") .log("${body}"); } }
Copy to Clipboard Copied! 다른 쉘을 열고 명령을 사용하여 소비자 통합을 실행합니다.
kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev
kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev
Copy to Clipboard Copied! 소비자가 Topic에 있는 이벤트 로깅을 시작합니다.
[1] 2021-05-06 08:51:08,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8 [1] 2021-05-06 08:51:10,065 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9 [1] 2021-05-06 08:51:10,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10 [1] 2021-05-06 08:51:11,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #11
[1] 2021-05-06 08:51:08,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8 [1] 2021-05-06 08:51:10,065 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9 [1] 2021-05-06 08:51:10,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10 [1] 2021-05-06 08:51:11,991 INFO [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #11
Copy to Clipboard Copied!