第 6 章 Camel 组件


本章详细介绍了支持的 camel 组件信息

6.1. camel-activemq

Camel ActiveMQ 集成由 activemq 组件提供。

组件可以配置为处理嵌入式或外部代理。对于 Wildfly / EAP 容器管理的连接池和 XA-Transaction 支持,可以将 ActiveMQ 资源适配器 配置为容器配置文件。

6.1.1. JBoss EAP ActiveMQ 资源适配器配置

下载 ActiveMQ 资源适配器 rar 文件。以下步骤概述了如何配置 ActiveMQ 资源适配器。

  1. 停止您的 JBoss EAP 实例。
  2. 下载资源适配器并将 复制到相关的 JBoss EAP 部署目录。对于独立模式:

    cp activemq-rar-5.11.1.rar ${JBOSS_HOME}/standalone/deployments/activemq-rar.rar
    Copy to Clipboard Toggle word wrap
  3. 为 ActiveMQ 适配器配置 JBoss EAP 资源适配器子系统。
<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
     <resource-adapters>
         <resource-adapter id="activemq-rar.rar">
             <archive>
                 activemq-rar.rar
             </archive>
             <transaction-support>XATransaction</transaction-support>
             <config-property name="UseInboundSession">
                 false
             </config-property>
             <config-property name="Password">
                 defaultPassword
             </config-property>
             <config-property name="UserName">
                 defaultUser
             </config-property>
             <config-property name="ServerUrl">
                 tcp://localhost:61616?jms.rmIdFromConnectionId=true
             </config-property>
             <connection-definitions>
                 <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" jndi-name="java:/ActiveMQConnectionFactory" enabled="true" pool-name="ConnectionFactory">
                     <xa-pool>
                         <min-pool-size>1</min-pool-size>
                         <max-pool-size>20</max-pool-size>
                         <prefill>false</prefill>
                         <is-same-rm-override>false</is-same-rm-override>
                     </xa-pool>
                 </connection-definition>
             </connection-definitions>
             <admin-objects>
                 <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/HELLOWORLDMDBQueue" use-java-context="true" pool-name="HELLOWORLDMDBQueue">
                     <config-property name="PhysicalName">
                         HELLOWORLDMDBQueue
                     </config-property>
                 </admin-object>
                 <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:/topic/HELLOWORLDMDBTopic" use-java-context="true" pool-name="HELLOWORLDMDBTopic">
                     <config-property name="PhysicalName">
                         HELLOWORLDMDBTopic
                     </config-property>
                 </admin-object>
             </admin-objects>
         </resource-adapter>
     </resource-adapters>
 </subsystem>
Copy to Clipboard Toggle word wrap

如果您的资源适配器存档文件名与 located-rar.rar 不同,您必须更改上述配置中 archive 元素的内容以匹配存档文件的名称。

必须选择 UserName 和 Password 配置属性的值以匹配外部代理中有效用户的凭证。

您可能需要更改 ServerUrl 配置属性的值,以匹配外部代理公开的实际主机名和端口。

4) 启动 JBoss EAP。如果一切配置正确,您应该可以看到 JBoss EAP server.log 中的消息,如下所示:

13:16:08,412 INFO  [org.jboss.as.connector.deployment] (MSC service thread 1-5) JBAS010406: Registered connection factory java:/AMQConnectionFactory`
Copy to Clipboard Toggle word wrap

6.1.2. Camel 路由配置

以下 ActiveMQ producer 和使用者示例利用 ActiveMQ 嵌入式代理和"vm' 传输(避免对外部 ActiveMQ 代理的需求)。

示例将 CDI 与 camel-cdi 组件结合使用。JMS ConnectionFactory 实例通过 JNDI 查找注入到 Camel RouteBuilder 中。

6.1.2.1. ActiveMQ Producer

@Startup
@ApplicationScoped
@ContextName("activemq-camel-context")
public class ActiveMQRouteBuilder extends RouteBuilder {

  @Override
  public void configure() throws Exception {
    from("timer://sendJMSMessage?fixedRate=true&period=10000")
    .transform(constant("<?xml version='1.0><message><greeting>hello world</greeting></message>"))
    .to("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost")
    .log("JMS Message sent");
  }
}
Copy to Clipboard Toggle word wrap

每次将消息添加到 WildFlyCamelQueue 目的地时,都将向控制台输出日志消息。要验证消息实际上是否已放置到队列中,您可以使用由 Camel on EAP 子系统提供的 ../features/hawtio.md[Hawtio 控制台、window=_THREE]。

6.1.2.2. ActiveMQ Consumer

若要利用 ActiveMQ 消息,Camel RouteBuilder 实施与制作者示例类似。

当 ActiveMQ 端点消耗来自 WildFlyCamelQueue 目的地的消息时,内容将记录到控制台。

@Override
public void configure() throws Exception {
  from("activemq:queue:WildFlyCamelQueue?brokerURL=vm://localhost")
  .to("log:jms?showAll=true");
}
Copy to Clipboard Toggle word wrap

6.1.2.3. ActiveMQ 事务

6.1.2.3.1. ActiveMQ 资源适配器配置

需要 ActiveMQ 资源适配器以利用 XA 事务支持、连接池等。

以下 XML 片段演示了如何在 JBoss EAP 服务器 XML 配置中配置资源适配器。请注意 ,serverURL 设置为使用嵌入式代理。连接工厂与 JNDI 名称 java:/ActiveMQConnectionFactory 绑定。这将在 RouteBuilder 示例中查找。

最后,两个队列配置为名为 'queue1' 和 'queue2'。

<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
  <resource-adapters>
    <resource-adapter id="activemq-rar.rar">
      ...
      <admin-objects>
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue1" use-java-context="true" pool-name="queue1pool">
          <config-property name="PhysicalName">queue1</config-property>
        </admin-object>
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:/queue/queue2" use-java-context="true" pool-name="queue2pool">
          <config-property name="PhysicalName">queue2</config-property>
        </admin-object>
      </admin-objects>
    </resource-adapter>
  </resource-adapters>
</subsystem>
Copy to Clipboard Toggle word wrap

6.1.2.4. 事务管理器

camel-activemq 组件需要类型为 org.springframework.transaction.PlatformTransactionManager 的事务管理器。因此,您可以首先创建一个满足此要求的 bean 扩展 JtaTransactionManager。请注意,an 标注了 @Named,以便 bean 在 Camel Bean 注册表中注册。另请注意,JBoss EAP 事务管理器和用户事务实例使用 CDI 注入。

@Named("transactionManager")
public class CdiTransactionManager extends JtaTransactionManager {

  @Resource(mappedName = "java:/TransactionManager")
  private TransactionManager transactionManager;

  @Resource
  private UserTransaction userTransaction;

  @PostConstruct
  public void initTransactionManager() {
    setTransactionManager(transactionManager);
    setUserTransaction(userTransaction);
  }
}
Copy to Clipboard Toggle word wrap

6.1.2.5. 事务策略

接下来,您需要声明要使用的事务策略。再次使用 @Named 注释,使 bean 可供 Camel 使用。事务管理器也注入,以便可使用所需的事务策略创建 TransactionTemplate。本例中为 PROPAGATION_REQUIRED

@Named("PROPAGATION_REQUIRED")
public class CdiRequiredPolicy extends SpringTransactionPolicy {
  @Inject
  public CdiRequiredPolicy(CdiTransactionManager cdiTransactionManager) {
    super(new TransactionTemplate(cdiTransactionManager,
      new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)));
  }
}
Copy to Clipboard Toggle word wrap

6.1.2.6. 路由构建器

现在,您可以配置 Camel RouteBuilder 类,并注入 Camel ActiveMQ 组件所需的依赖项。您在资源适配器配置上配置的 ActiveMQ 连接工厂会与您之前配置的事务管理器一起注入。

在本示例 RouteBuilder 中,每当来自 queue1 的消息被使用时,它们都会路由到名为 queue2 的其他 JMS 队列。来自 queue2 的消息会使用 rollback()DSL 方法回滚 JMS 事务。这将导致原始消息放置在死信队列(DLQ)上。

@Startup
@ApplicationScoped
@ContextName("activemq-camel-context")
public class ActiveMQRouteBuilder extends RouteBuilder {

  @Resource(mappedName = "java:/ActiveMQConnectionFactory")
  private ConnectionFactory connectionFactory;

  @Inject
  private CdiTransactionManager transactionManager;

  @Override
  public void configure() throws Exception {
    ActiveMQComponent activeMQComponent = ActiveMQComponent.activeMQComponent();
    activeMQComponent.setTransacted(false);
    activeMQComponent.setConnectionFactory(connectionFactory);
    activeMQComponent.setTransactionManager(transactionManager);

    getContext().addComponent("activemq", activeMQComponent);

      errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ")
      .useOriginalMessage()
      .maximumRedeliveries(0)
      .redeliveryDelay(1000));

    from("activemq:queue:queue1F")
      .transacted("PROPAGATION_REQUIRED")
      .to("activemq:queue:queue2");

    from("activemq:queue:queue2")
      .to("log:end")
      .rollback();
  }
}
Copy to Clipboard Toggle word wrap

6.1.3. 安全性

请参阅 JMS 安全部分

6.1.4. GitHub 上的代码示例

GitHub 上提供了 camel-activemq 应用程序 的示例。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat