第 2 章 ActiveMQ


ActiveMQ Component

使用 ActiveMQ 组件,您可以将消息发送到 JMS Queue 或 Topic,或者使用 Apache ActiveMQ 从 JMS Queue 或 Topic 中使用消息。

此组件基于 JMS 组件,使用 Spring 的 JMS 对声明性事务的支持,使用 Spring 的 JmsTemplate 发送以及一个 MessageListenerContainer 以供消费。所有 JMS 组件选项 也适用于 ActiveMQ 组件。

要使用此组件,请确保您的类路径上具有 activemq.jaractivemq-core.jar 以及任何 Apache Camel 依赖项,如 camel-core.jarcamel-spring.jarcamel-jms.jar

转换和缓存

如果您使用带有 JMS 的事务,请参阅 JMS 页面上的下面的部分事务和缓存级别,因为它可能会影响性能。

URI 格式

activemq:[queue:|topic:]destinationName

其中 destinationName 是 ActiveMQ 队列或主题名称。默认情况下,targetName 被解释为队列名称。例如,要连接到队列 FOO.BAR,请使用:

activemq:FOO.BAR

如果需要,您可以包含可选的 queue: 前缀:

activemq:queue:FOO.BAR

要连接到一个主题,您必须包含 topic: 前缀。例如,要连接到主题 Stocks.Prices,请使用:

activemq:topic:Stocks.Prices

选项

所有 JMS 组件选项 也适用于 ActiveMQ 组件。

Camel on EAP 部署

此组件由 EAP (Wildfly Camel)框架上的 Camel 支持,该框架在 Red Hat JBoss Enterprise Application Platform (JBoss EAP)容器上提供了简化的部署模型。

您可以配置 ActiveMQ Camel 组件,以用于嵌入式代理或外部代理。要在 JBoss EAP 容器中嵌入代理,请在 EAP 容器配置文件中配置 ActiveMQ 资源适配器 以了解详细信息,请参阅 ActiveMQ 资源适配器配置

配置连接工厂

以下 测试案例 演示了如何使用 activeMQComponent () 方法将 ActiveMQComponent 添加到 CamelContext,同时指定用于连接 ActiveMQ 的 brokerURL

camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));

使用 Spring XML 配置连接工厂

您可以在 ActiveMQComponent 上配置 ActiveMQ 代理 URL,如下所示

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
  </camelContext>

  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://somehost:61616"/>
  </bean>

</beans>

使用连接池

使用 Camel 发送到 ActiveMQ 代理时,最佳实践是使用池连接工厂来处理 JMS 连接、会话和生产者的有效池。如需更多信息,请参阅 ActiveMQ Spring 支持

  1. 使用 Maven 添加 AMQ 池:
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.11.0.redhat-630516</version>
    </dependency>
  1. 设置 activemq 组件,如下所示:
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory"    class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>
注意

注意池连接工厂的 initdestroy 方法。这些方法对于确保连接池已正确启动和关闭非常重要。

然后,PooledConnectionFactory 会创建最多 8 个连接同时使用的连接池。每个连接都可以由多个会话共享。

有一个名为 maxActive 的选项,可用于配置每个连接的最大会话数;默认值为 500

ActiveMQ 5.7 以后,选项已重命名为 maxActiveSessionPerConnection,以更好地反映其目的。

请注意,concurrentConsumers 被设置为 maxConnections 的值。这是允许的,因为每个使用者都使用会话,并且会话可以共享相同的连接,因此可以正常工作。在本例中,我们可以同时有 8 个 * 500 = 4000 活跃会话。

在路由中调用 MessageListener POJO

ActiveMQ 组件还提供从 JMS MessageListener 到处理器的帮助程序 类型 Converterhttps://camel.apache.org/manual/processor.html这意味着 Bean 组件 可以直接在任何路由内调用任何 JMS MessageListener bean。

您可以在 JMS 中创建 MessageListener,如下所示:

示例

public class MyListener implements MessageListener {
   public void onMessage(Message jmsMessage) {
       // ...
   }
}

然后,在路由中使用它,如下所示

示例

from("file://foo/bar").
  bean(MyListener.class);

也就是说,您可以重复使用任何 Apache Camel 组件,并将其轻松集成到 JMS MessageListener POJO\!

使用 ActiveMQ Destination 选项

从 ActiveMQ 5.6 开始提供

您可以使用 "destination." 前缀在 endpoint uri 中配置 Destination Options。例如,要将消费者标记为 exclusive,并将其预抓取大小设置为 50,您可以执行以下操作:.Example

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="file://src/test/data?noop=true"/>
    <to uri="activemq:queue:foo"/>
  </route>
  <route>
    <!-- use consumer.exclusive ActiveMQ destination option, notice we have to prefix with destination. -->
    <from uri="activemq:foo?destination.consumer.exclusive=true&amp;destination.consumer.prefetchSize=50"/>
    <to uri="mock:results"/>
  </route>
</camelContext>

消耗公告消息

ActiveMQ 可以生成 公告消息,这些消息 位于您可以使用的主题中。如果您检测到缓慢消费者或构建统计信息(每天生成的消息/数量等),这些消息可以帮助您发送警报。) 以下 Spring DSL 示例演示了如何从主题读取消息。

示例

<route>
	<from uri="activemq:topic:ActiveMQ.Advisory.Connection?mapJmsMessage=false" />
	<convertBodyTo type="java.lang.String"/>
	<transform>
	     <simple>${in.body}&#13;</simple>
	</transform>
	<to uri="file://data/activemq/?fileExist=Append&ileName=advisoryConnection-${date:now:yyyyMMdd}.txt" />
</route>

如果您在队列中使用消息,您应该在 data/activemq 文件夹下看到以下文件:

advisoryConnection-20100312.txt

advisoryProducer-20100312.txt

包含字符串:

示例

      ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:dell-charles-3258-1268399815140
      -1:0:0:0:221, originalDestination = null, originalTransactionId = null, producerId = ID:dell-charles-
      3258-1268399815140-1:0:0:0, destination = topic://ActiveMQ.Advisory.Connection, transactionId = null,
      expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1268403383468, brokerOutTime = 1268403383468,
      correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null,
      groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null,
      marshalledProperties = org.apache.activemq.util.ByteSequence@17e2705, dataStructure = ConnectionInfo
      {commandId = 1, responseRequired = true, connectionId = ID:dell-charles-3258-1268399815140-2:50,
      clientId = ID:dell-charles-3258-1268399815140-14:0, userName = , password = *****,
      brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true},
      redeliveryCounter = 0, size = 0, properties = {originBrokerName=master, originBrokerId=ID:dell-charles-
      3258-1268399815140-0:0, originBrokerURL=vm://master}, readOnlyProperties = true, readOnlyBody = true,
      droppable = false}

获取组件 JAR

您需要这个依赖项:

  • camel-activemq

ActiveMQ 是随 ActiveMQ 项目 一起发布的 JMS 组件的 扩展。

<dependency>
    <groupId>org.fusesource</groupId>
    <artifactId>camel-activemq</artifactId>
    <version>7.11.0.fuse-sb2-7_11_0-00035-redhat-00001</version>
</dependency>
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.