第 2 章 ActiveMQ


ActiveMQ 组件

利用 ActiveMQ 组件,您可以发送消息到 JMS Queue 或 Topic,或者使用 Apache ActiveMQ 来使用来自 JMS Queue 或 Topic 的消息。

此组件基于 JMS 组件,使用 Spring 的 JMS 支持进行声明性事务,使用 Spring 的 JmsTemplate 发送和 MessageListenerContainer 以供使用。所有 JMS 组件选项 也适用于 ActiveMQ 组件。

要使用此组件,请确保在 classpath 上具有 activemq.jaractivemq-core.jar,以及任何 Apache Camel 依赖项,如 camel-core.jarcamel-spring.jarcamel-jms.jar

转换和缓存

如果使用 JMS 的事务,请参阅 JMS 页面上的 Transactions 和 Cache Levels 部分,因为它可能会影响性能。

URI 格式

activemq:[queue:|topic:]destinationName

其中 destinationName 是 ActiveMQ 队列或主题名称。默认情况下,destinationName 解释为队列名称。例如,要连接到队列 FOO.BAR,请使用:

activemq:FOO.BAR

如果需要,您可以包含可选的 queue: 前缀:

activemq:queue:FOO.BAR

要连接到主题,您必须包含 topic: 前缀。例如,要连接到主题 Stocks.Prices,请使用:

activemq:topic:Stocks.Prices

选项

所有 JMS 组件选项 也适用于 ActiveMQ 组件。

Camel on EAP 部署

此组件由 EAP 上的 Camel (Wildfly Camel)框架支持,该框架在 Red Hat JBoss Enterprise Application Platform (JBoss EAP)容器上提供了简化的部署模型。

您可以配置 ActiveMQ Camel 组件,以用于嵌入式代理或外部代理。要在 JBoss EAP 容器中嵌入代理,请在 EAP 容器配置文件 configuring ActiveMQ 资源适配器中配置 ActiveMQ 资源适配器,详细信息请参阅 ActiveMQ 资源适配器配置

配置连接工厂

以下 测试案例 演示了如何在指定用于连接 ActiveMQ 的 brokerURL 时,使用 activeMQComponent () 方法将 ActiveMQComponent 添加到 CamelContext 中。

camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));

使用 Spring XML 配置连接工厂

您可以在 ActiveMQComponent 上配置 ActiveMQ 代理 URL,如下所示

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
  </camelContext>

  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://somehost:61616"/>
  </bean>

</beans>

使用连接池

使用 Camel 发送到 ActiveMQ 代理时,最好使用池化连接工厂来处理 JMS 连接、会话和生产者的有效池。如需更多信息,请参阅 ActiveMQ Spring 支持

  1. 使用 Maven 添加 AMQ 池:
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.11.0.redhat-630516</version>
    </dependency>
  1. 设置 activemq 组件,如下所示:
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory"    class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>
注意

注意池连接工厂的 initdestroy 方法。这些方法是确保正确启动和关闭连接池。

然后,pooled ConnectionFactory 将创建一个同时使用最多 8 个连接的连接池。每个连接可由多个会话共享。

有一个名为 maxActive 的选项,可用于配置每个连接的最大会话数;默认值为 500

ActiveMQ 5.7 起,选项已重命名为 maxActiveSessionPerConnection,以更好地反映其目的。

请注意,concurrentConsumers 被设置为高于 maxConnections 的值。允许这样做,因为每个消费者都使用会话,因为会话可以共享相同的连接,所以这可以正常工作。在这个示例中,我们可以同时有 8 * 500 = 4000 活跃的会话。

在路由中调用 MessageListener POJO

ActiveMQ 组件还提供了从 JMS MessageListener 到处理器的帮助 类型转换器https://camel.apache.org/manual/processor.html这意味着 Bean 组件 可以直接在任何路由内调用任何 JMS MessageListener bean。

您可以在 JMS 中创建 MessageListener,如下所示:

Example

public class MyListener implements MessageListener {
   public void onMessage(Message jmsMessage) {
       // ...
   }
}

然后,在您的路由中使用它,如下所示

Example

from("file://foo/bar").
  bean(MyListener.class);

也就是说,您可以重复使用任何 Apache Camel 组件,并将它们轻松集成到 JMS MessageListener POJO\!

使用 ActiveMQ Destination Options

从 ActiveMQ 5.6 开始提供

您可以使用 "destination." 前缀在端点 uri 中配置 Destination Options。例如,要将消费者标记为 exclusive,并将其预抓取大小设置为 50,您可以执行以下操作: .Example

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="file://src/test/data?noop=true"/>
    <to uri="activemq:queue:foo"/>
  </route>
  <route>
    <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. -->
    <from uri="activemq:foo?destination.consumer.exclusive=true&amp;destination.consumer.prefetchSize=50"/>
    <to uri="mock:results"/>
  </route>
</camelContext>

消耗公告消息

ActiveMQ 可以生成 公告消息,被放入您可以使用的主题中。当您检测到较慢的用户或构建统计信息时,这些消息可帮助您发送警报(每天生成的消息/数量等)以下 Spring DSL 示例演示了如何从主题读取消息。

Example

<route>
	<from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" />
	<convertBodyTo type="java.lang.String"/>
	<transform>
	     <simple>${in.body}&#13;</simple>
	</transform>
	<to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" />
</route>

如果您在队列中使用消息,则应在 data/activemq 文件夹下看到以下文件:

advisoryConnection-20100312.txt

advisoryProducer-20100312.txt

包含字符串:

Example

      ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140
      -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles-
      3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null,
      expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468,
      correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null,
      groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null,
      marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo
      {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50,
      clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****,
      brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true},
      redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles-
      3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true,
      droppable = false}

获取组件 JAR

您需要这个依赖项:

  • camel-activemq

ActiveMQ 是与 ActiveMQ 项目 一起发布的 JMS 组件的 扩展。

<dependency>
    <groupId>org.fusesource</groupId>
    <artifactId>camel-activemq</artifactId>
    <version>7.11.0.fuse-sb2-7_11_0-00035-redhat-00001</version>
</dependency>
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.