5.2. 运行 Kafka 集成


运行制作者集成

  1. 创建制作者集成示例。这会使用一条消息填充主题,每 10 秒填充一次。

    Sample SaslSSLKafkaProducer.java

    // kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev
    // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003
    
    import org.apache.camel.builder.RouteBuilder;
    import org.apache.camel.component.kafka.KafkaConstants;
    
    public class SaslSSLKafkaProducer extends RouteBuilder {
      @Override
      public void configure() throws Exception {
      log.info("About to start route: Timer -> Kafka ");
      from("timer:foo")
        .routeId("FromTimer2Kafka")
        .setBody()
          .simple("Message #${exchangeProperty.CamelTimerCounter}")
        .to("kafka:{{producer.topic}}")
        .log("Message correctly sent to the topic!");
      }
    }

  2. 然后运行流程集成。

    kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev

    生产者将创建新消息并推送到主题并记录一些信息。

    [2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:11,973 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:12,970 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #7 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:13,970 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #9 - KafkaProducer[test]) Message correctly sent to the topic!

运行消费者集成

  1. 创建消费者集成。

    Sample SaslSSLKafkaProducer.java

    // kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev
    // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003
    
    import org.apache.camel.builder.RouteBuilder;
    
    public class SaslSSLKafkaConsumer extends RouteBuilder {
      @Override
      public void configure() throws Exception {
    	log.info("About to start route: Kafka -> Log ");
    	from("kafka:{{consumer.topic}}")
        .routeId("FromKafka2Log")
        .log("${body}");
      }
    }

  2. 打开另一个 shell 并运行以下命令运行使用者集成:

    kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev

    消费者将开始记录主题中找到的事件:

    [1] 2021-05-06 08:51:08,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8
    [1] 2021-05-06 08:51:10,065 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9
    [1] 2021-05-06 08:51:10,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10
    [1] 2021-05-06 08:51:11,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #11
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.