搜索

第 6 章 Camel 组件

download PDF

本章有关支持的 camel 组件的详情

6.1. camel-activemq

Camel ActiveMQ 集成由 activemq 组件提供。

组件可以配置为使用嵌入式或外部代理。对于 Wildfly / EAP 容器管理的连接池和 XA-Transaction 支持,可以将 ActiveMQ 资源适配器 配置到容器配置文件中。

6.1.1. JBoss EAP ActiveMQ 资源适配器配置

下载 ActiveMQ 资源适配器 rar 文件。下列步骤概述了配置 ActiveMQ 资源适配器。

  1. 停止您的 JBoss EAP 实例。
  2. 下载资源适配器并复制到相关的 JBoss EAP 部署目录。对于独立模式:

    cp activemq-rar-5.11.1.rar ${JBOSS_HOME}/standalone/deployments/activemq-rar.rar
  3. 为 ActiveMQ 适配器配置 JBoss EAP 资源适配器子系统。
<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
     <resource-adapters>
         <resource-adapter id="activemq-rar.rar">
             <archive>
                 activemq-rar.rar
             </archive>
             <transaction-support>XATransaction</transaction-support>
             <config-property name="UseInboundSession">
                 false
             </config-property>
             <config-property name="Password">
                 defaultPassword
             </config-property>
             <config-property name="UserName">
                 defaultUser
             </config-property>
             <config-property name="ServerUrl">
                 tcp://localhost:61616?jms.rmIdFromConnectionId=true
             </config-property>
             <connection-definitions>
                 <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" jndi-name="java:/ActiveMQConnectionFactory" enabled="true" pool-name="ConnectionFactory">
                     <xa-pool>
                         <min-pool-size>1</min-pool-size>
                         <max-pool-size>20</max-pool-size>
                         <prefill>false</prefill>
                         <is-same-rm-override>false</is-same-rm-override>
                     </xa-pool>
                 </connection-definition>
             </connection-definitions>
             <admin-objects>
                 <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/HELLOWORLDMDBQueue" use-java-context="true" pool-name="HELLOWORLDMDBQueue">
                     <config-property name="PhysicalName">
                         HELLOWORLDMDBQueue
                     </config-property>
                 </admin-object>
                 <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:/topic/HELLOWORLDMDBTopic" use-java-context="true" pool-name="HELLOWORLDMDBTopic">
                     <config-property name="PhysicalName">
                         HELLOWORLDMDBTopic
                     </config-property>
                 </admin-object>
             </admin-objects>
         </resource-adapter>
     </resource-adapters>
 </subsystem>

如果您的资源适配器存档文件名与 activemq-rar.rar.rar 不同,您必须更改上述配置中的 archive 元素的内容,以匹配您的存档文件名称。

必须选择 UserName 和 Password 配置属性的值,以匹配外部代理中有效用户的凭证。

您可能需要更改 ServerUrl 配置 属性的值,以匹配外部代理公开的实际主机名和端口。

4) 启动 JBoss EAP。如果一切配置正确,您应该在 JBoss EAP server.log 中查看消息,如下所示:

13:16:08,412 INFO  [org.jboss.as.connector.deployment] (MSC service thread 1-5) JBAS010406: Registered connection factory java:/AMQConnectionFactory`

6.1.2. Camel 路由配置

以下 ActiveMQ producer 和消费者示例使用 ActiveMQ 嵌入式代理和"vm"传输(避免对外部 ActiveMQ 代理的需求)。

示例将 CDI 与 camel-cdi 组件一起使用。JMS ConnectionFactory 实例通过 JNDI 查找注入到 Camel RouteBuilder 中。

6.1.2.1. ActiveMQ Producer

@Startup
@ApplicationScoped
@ContextName("activemq-camel-context")
public class ActiveMQRouteBuilder extends RouteBuilder {

  @Override
  public void configure() throws Exception {
    from("timer://sendJMSMessage?fixedRate=true&period=10000")
    .transform(constant("<?xml version='1.0><message><greeting>hello world</greeting></message>"))
    .to("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost")
    .log("JMS Message sent");
  }
}

每次将消息添加到 WildFlyCamelQueue 目的地时,都会向控制台输出一条日志消息。要验证消息实际上被放入队列中,您可以使用 Camel on EAP 子系统提供的 ../features/hawtio.md[Hawtio 控制台,window=_blank]。

ActiveMQ 队列浏览

6.1.2.2. ActiveMQ Consumer

要使用 ActiveMQ 消息,Camel RouteBuilder 实施与制作者示例类似。

当 ActiveMQ 端点消耗来自 WildFlyCamelQueue 目的地的消息时,内容会记录到控制台。

@Override
public void configure() throws Exception {
  from("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost")
  .to("log:jms?showAll=true");
}

6.1.2.3. ActiveMQ Transactions

6.1.2.3.1. ActiveMQ 资源适配器配置

需要 ActiveMQ 资源适配器来利用 XA 事务支持、连接池等。

以下 XML 片段显示了如何在 JBoss EAP 服务器 XML 配置中配置资源适配器。请注意,ServerURL 设置为使用嵌入式代理。连接工厂绑定到 JNDI 名称 java:/ActiveMQConnectionFactory。这将在下面的 RouteBuilder 示例中查找。

最后,两个队列配置为"queue1"和"queue2"。

<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
  <resource-adapters>
    <resource-adapter id="activemq-rar.rar">
      ...
      <admin-objects>
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue1" use-java-context="true" pool-name="queue1pool">
          <config-property name="PhysicalName">queue1</config-property>
        </admin-object>
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue2" use-java-context="true" pool-name="queue2pool">
          <config-property name="PhysicalName">queue2</config-property>
        </admin-object>
      </admin-objects>
    </resource-adapter>
  </resource-adapters>
</subsystem>

6.1.2.4. 事务管理器

camel-activemq 组件需要类型为 org.springframework.transaction.PlatformTransactionManager 的事务管理器。因此,您可以先创建一个可满足此要求的 bean 扩展 JtaTransactionManager。请注意,bean 被标注为 @Named,以允许在 Camel bean registry 中注册 bean。另请注意,JBoss EAP 事务管理器和用户事务实例使用 CDI 注入。

@Named("transactionManager")
public class CdiTransactionManager extends JtaTransactionManager {

  @Resource(mappedName = "java:/TransactionManager")
  private TransactionManager transactionManager;

  @Resource
  private UserTransaction userTransaction;

  @PostConstruct
  public void initTransactionManager() {
    setTransactionManager(transactionManager);
    setUserTransaction(userTransaction);
  }
}

6.1.2.5. 事务策略

接下来,您需要声明要使用的事务策略。同样,使用 @Named 注释使 bean 可用于 Camel。事务管理器也会注入,以便使用所需的事务策略创建 TransactionTemplate。此实例中 PROPAGATION_REQUIRED

@Named("PROPAGATION_REQUIRED")
public class CdiRequiredPolicy extends SpringTransactionPolicy {
  @Inject
  public CdiRequiredPolicy(CdiTransactionManager cdiTransactionManager) {
    super(new TransactionTemplate(cdiTransactionManager,
      new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)));
  }
}

6.1.2.6. 路由构建器

现在,您可以配置 Camel RouteBuilder 类,并注入 Camel ActiveMQ 组件所需的依赖项。在资源适配器配置上配置的 ActiveMQ 连接工厂会与您之前配置的事务管理器一起注入。

在本例中,当任何消息都从 queue1 使用时,它们都会路由到另一个名为 queue2 的 JMS 队列。从 queue2 使用的消息会导致 JMS 事务通过 rollback ()DSL 方法回滚。这将使原始消息放置在死信队列(DLQ)上。

@Startup
@ApplicationScoped
@ContextName("activemq-camel-context")
public class ActiveMQRouteBuilder extends RouteBuilder {

  @Resource(mappedName = "java:/ActiveMQConnectionFactory")
  private ConnectionFactory connectionFactory;

  @Inject
  private CdiTransactionManager transactionManager;

  @Override
  public void configure() throws Exception {
    ActiveMQComponent activeMQComponent = ActiveMQComponent.activeMQComponent();
    activeMQComponent.setTransacted(false);
    activeMQComponent.setConnectionFactory(connectionFactory);
    activeMQComponent.setTransactionManager(transactionManager);

    getContext().addComponent("activemq", activeMQComponent);

      errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ")
      .useOriginalMessage()
      .maximumRedeliveries(0)
      .redeliveryDelay(1000));

    from("activemq:queue:queue1F")
      .transacted("PROPAGATION_REQUIRED")
      .to("activemq:queue:queue2");

    from("activemq:queue:queue2")
      .to("log:end")
      .rollback();
  }
}

6.1.3. 安全性

请参阅 JMS 安全性部分

6.1.4. GitHub 上的代码示例

GitHub 上提供了一个 camel-activemq 应用 示例。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.