第 2 章 ActiveMQ


ActiveMQ 组件

ActiveMQ 组件允许消息发送到 JMS Queue 或 Topic;或使用 Apache ActiveMQ 从 JMS Queue 或 Topic 使用消息。

此组件基于 第 178 章 JMS 组件,并使用 Spring 的 JMS 支持声明事务,使用 Spring 的 JmsTemplate 来发送和接收 MessageListenerContainer 进行消耗。第 178 章 JMS 组件 组件中的所有选项也适用于此组件。

要使用此组件,请确保您的 classpath 上的 activemq.jaractivemq-core.jar 与 Camel 依赖关系(如 camel-core.jarcamel-spring.jarcamel-jms.jar.

转换和缓存

如果您在使用 JMS 的事务时,请参阅 JMS 页面中的事务和缓存级别 部分,因为它可能会影响性能。

URI 格式

activemq:[queue:|topic:]destinationName

其中 destinationName 是 ActiveMQ 队列或主题名称。默认情况下,targetName 解释为队列名称。例如,要连接到队列 FOO.BAR,请使用:

activemq:FOO.BAR

如果需要,可以包括可选 queue: 前缀:

activemq:queue:FOO.BAR

要连接到一个主题,您必须包括 主题: 前缀。例如,要连接到该主题的 Stocks.Prices,请使用:

activemq:topic:Stocks.Prices

选项

请参阅 第 178 章 JMS 组件 组件中的 Options,因为所有这些选项也适用于这个组件。

Camel on EAP 部署

该组件受到 EAP (Wildfly Camel)框架的 Camel 支持,该框架在红帽 JBoss 企业应用平台(JBoss EAP)容器上提供简化的部署模型。

您可以配置 ActiveMQ Camel 组件,以便使用嵌入式代理或外部代理。要在 JBoss EAP 容器中嵌入代理,请在 EAP 容器配置文件 进行 ActiveMQ 资源适配器中配置 ActiveMQ 资源适配器来详情,请参阅 ActiveMQ 资源适配器配置

配置连接工厂

以下 测试案例 说明了如何在使用 activeMQComponent () 方法 添加到 CamelContext 中,同时指定用于连接 ActiveMQ 的 brokerURL

camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));

使用 Spring XML 配置 ConnectionFactory

您可以在 ActiveMQComponent 上配置 ActiveMQ 代理 URL,如下所示

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
  </camelContext>

  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://somehost:61616"/>
  </bean>

</beans>

使用连接池

使用 Camel 发送到 ActiveMQ 代理时,建议使用池连接工厂来处理 JMS 连接、会话和制作者的有效池。这记录在 ActiveMQ Spring Support 的页面。

您可以使用 Maven 获取 Jencks AMQ 池:

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.3.2</version>
    </dependency>

然后,按照如下所示设置 activemq 组件:

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory"    class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>
注意

请注意池连接工厂上的 initdestroy 方法。务必要确保连接池已正确启动和关闭。

然后,PooledConnectionFactory 将创建一个同时使用最多 8 个连接的连接池。每个连接可由许多会话共享。有一个名为 maxActive 的选项,可用于配置每个连接的最大会话数;默认值为 500。在 ActiveMQ 5.7 上,选项已被重命名为,以更好地反映其用途,被命名为 maxActiveSessionPerConnection。注意 并发Consumers 设置为高于 maxConnections 的值。这为好,因为每个消费者都使用会话,而作为会话可以共享同一连接,我们就安全。在这个示例中,我们可以同时有 8 * * 500 = 4000 个活跃会话。

在路由中调用 MessageListener POJO

ActiveMQ 组件还向 Processor 提供帮助 Type Converter from a JMS MessageListener。这意味着,第 43 章 Bean 组件 组件能够直接调用任何 JMS MessageListener bean。

例如,您可以在 JMS 中创建 MessageListener,如下所示:

public class MyListener implements MessageListener {
   public void onMessage(Message jmsMessage) {
       // ...
   }
}

然后,在您的路由中使用它,如下所示

from("file://foo/bar").
  bean(MyListener.class);

也就是说,您可以重复使用任何 Apache Camel 组件并将其集成到您的 JMS MessageListener POJO\!

使用 ActiveMQ 目的地选项

ActiveMQ 5.6 提供

您可以使用 "destination." 前缀在 endpoint uri 中配置 Destination Options。例如,要将消费者标记为独占,并将其预先大小设定为 50,您可以执行以下操作:

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="file://src/test/data?noop=true"/>
    <to uri="activemq:queue:foo"/>
  </route>
  <route>
    <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. -->
    <from uri="activemq:foo?destination.consumer.exclusive=true&amp;destination.consumer.prefetchSize=50"/>
    <to uri="mock:results"/>
  </route>
</camelContext>

使用公告消息

ActiveMQ 可以生成公告消息,它们放入您可以使用的主题。这些消息可以帮助您发送警报,以检测缓慢的消费者或者构建统计信息(每天的信息/生成的数量等) 以下 Spring DSL 示例显示如何从主题读取消息。

<route>
	<from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" />
	<convertBodyTo type="java.lang.String"/>
	<transform>
	     <simple>${in.body}&#13;</simple>
	</transform>
	<to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" />
</route>

如果您在队列上使用消息,您应该会看到 data/activemq 文件夹下的以下文件:

advisoryConnection-20100312.txt advisoryProducer-20100312.txt

并包含字符串:

      ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140
      -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles-
      3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null,
      expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468,
      correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null,
      groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null,
      marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo
      {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50,
      clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****,
      brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true},
      redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles-
      3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true,
      droppable = false}

获取组件 JAR

您需要此依赖项:

  • activemq-camel

ActiveMQ 是与 ActiveMQ 项目 发布的 第 178 章 JMS 组件 组件的扩展。

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-camel</artifactId>
  <version>5.6.0</version>
</dependency>
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.