5.6. JMS


消息传递支持通过 JMS 组件提供,该组件与 JBoss EAP Messaging (ActiveMQ Artemis)子系统集成。

通过配置供应商特定资源适配器或不可用,可以使用 JBoss Generic JMS 资源适配器与其他 JMS 实现集成。

5.6.1. JBoss EAP JMS 配置

您可以通过标准 JBoss EAP XML 配置文件配置 JBoss EAP 消息传递子系统。https://docs.jboss.org/author/display/WFLY8/Messaging+configuration例如: standalone.xml 或 domain.xml。

例如,如下是您在内存实例中使用嵌入的 ActiveMQ Artemis。您首先在 messaging 子系统中配置一个新的 JMS 队列,方法是将以下 XML 配置添加到 jms-destinations 部分:

<jms-queue name="WildFlyCamelQueue">
  <entry name="java:/jms/queue/WildFlyCamelQueue"/>
</jms-queue>

或者,也可以使用 CLI 脚本添加队列。

$ jms-queue add --queue-address=WildFlyCamelQueue --entries=queue/WildFlyCamelQueue,java:/jms/queue/WildFlyCamelQueue

另外,您可以在自定义 jms.xml 部署描述符中创建 messaging-deployment 配置。如需更多信息,请参阅 JBoss EAP messaging 子系统文档中的"部署 -jms.xml 文件"部分。

5.6.2. Camel 路由配置

以下 JMS 生成者和消费者示例使用 JBoss EAP 的嵌入式 ActiveMQ Artemis 服务器发布和使用消息到目的地或从目的地使用。

这个示例还将 CDI 与 camel-cdi 组件一起使用。JMS ConnectionFactory 实例通过 JNDI 查找注入到 Camel RouteBuilder 中。

5.6.2.1. JMS Producer

DefaultJMSConnectionFactory 连接工厂从 JNDI 注入到 RouteBuilder 中。在 JBoss EAP XML 配置下,您可以在 messaging 子系统中找到连接工厂。

接下来,计时器端点每 10 秒运行一次,将 XML 有效负载发送到之前配置的 WildFlyCamelQueue 目的地。

@Startup
@ApplicationScoped
@ContextName("jms-camel-context")
public class JmsRouteBuilder extends RouteBuilder {

  @Resource(mappedName = "java:jboss/DefaultJMSConnectionFactory")
  private ConnectionFactory connectionFactory;

  @Override
  public void configure() throws Exception {
    JmsComponent component = new JmsComponent();
    component.setConnectionFactory(connectionFactory);

    getContext().addComponent("jms", component);

    from("timer://sendJMSMessage?fixedRate=true&period=10000")
    .transform(constant("<?xml version='1.0><message><greeting>hello world</greeting></message>"))
    .to("jms:queue:WildFlyCamelQueue")
    .log("JMS Message sent");
  }
}

每次将一个 JMS 消息添加到 WildFlyCamelQueue 目的地时,都会向控制台输出一条日志消息。要验证消息是否真正放置在队列中,您可以使用 JBoss EAP 管理控制台。

JMS 队列浏览

5.6.2.2. JMS Consumer

要使用 JMS 消息,Camel RouteBuilder 实施与制作者示例类似。

如前文所述,连接工厂从 JNDI 发现,注入并在 JMSComponent 实例上设置。

当 JMS 端点消耗来自 WildFlyCamelQueue 目的地的消息时,内容会记录到控制台。

@Override
public void configure() throws Exception {
  JmsComponent component = new JmsComponent();
  component.setConnectionFactory(connectionFactory);

  getContext().addComponent("jms", component);

  from("jms:queue:WildFlyCamelQueue")
  .to("log:jms?showAll=true");
}

5.6.2.3. JMS Transactions

要让 Camel JMS 路由能够参与 JMS 事务,需要进行一些额外的配置。由于 camel-jms 围绕 spring-jms 构建,您需要配置一些 Spring 类,以便它们能够处理 JBoss EAP 的事务管理器和连接工厂。以下代码示例演示了如何使用 CDI 配置事务 JMS Camel 路由。

camel-jms 组件需要一个类型为 org.springframework.transaction.PlatformTransactionManager 的事务管理器。因此,您首先创建一个 bean 扩展 JtaTransactionManager。请注意,bean 被标注为 @Named,以允许在 Camel bean registry 中注册 bean。另请注意,JBoss EAP 事务管理器和用户事务实例使用 CDI 注入。

@Named("transactionManager")
public class CdiTransactionManager extends JtaTransactionManager {

  @Resource(mappedName = "java:/TransactionManager")
  private TransactionManager transactionManager;

  @Resource
  private UserTransaction userTransaction;

  @PostConstruct
  public void initTransactionManager() {
    setTransactionManager(transactionManager);
    setUserTransaction(userTransaction);
  }
}

接下来,您需要声明要使用的事务策略。同样,使用 @Named 注释使 bean 可用于 Camel。事务管理器也会注入,以便使用所需的事务策略创建 TransactionTemplate。此实例中 PROPAGATION_REQUIRED。

@Named("PROPAGATION_REQUIRED")
public class CdiRequiredPolicy extends SpringTransactionPolicy {
  @Inject
  public CdiRequiredPolicy(CdiTransactionManager cdiTransactionManager) {
    super(new TransactionTemplate(cdiTransactionManager,
      new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)));
  }
}

现在,您可以配置 Camel RouteBuilder 类,并注入 Camel JMS 组件所需的依赖项。JBoss EAP XA 连接工厂与之前配置的事务管理器一起注入。

在本例中,当任何消息都从 queue1 使用时,它们都会路由到另一个名为 queue2 的 JMS 队列。从 queue2 使用的消息会导致 JMS 事务通过 rollback ()DSL 方法回滚。这将使原始消息放置在死信队列(DLQ)上。

@Startup
@ApplicationScoped
@ContextName("jms-camel-context")
public class JMSRouteBuilder extends RouteBuilder {

  @Resource(mappedName = "java:/JmsXA")
  private ConnectionFactory connectionFactory;

  @Inject
  CdiTransactionManager transactionManager;

  @Override
  public void configure() throws Exception {
    // Creates a JMS component which supports transactions
    JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
    getContext().addComponent("jms", jmsComponent);

    from("jms:queue:queue1")
      .transacted("PROPAGATION_REQUIRED")
      .to("jms:queue:queue2");

    // Force the transaction to roll back. The message will end up on the Wildfly 'DLQ' message queue
    from("jms:queue:queue2")
      .to("log:end")
      .rollback();
  }

5.6.2.4. 远程 JMS 目的地

一个 JBoss EAP 实例可以通过 远程 JNDI 将消息发送到另一个 JBoss EAP 实例上配置的 ActiveMQ Artemis 目的地。

需要一些额外的 JBoss EAP 配置才能实现此目的。首先配置一个导出的 JMS 队列。

只有 java:jboss/exported 命名空间中绑定的 JNDI 名称才被视为远程客户端的候选者,因此队列被正确命名。

注意

您必须在 JBoss EAP 客户端 应用服务器和JBoss EAP 远程服务器上配置队列。

<jms-queue name="RemoteQueue">
  <entry name="java:jboss/exported/jms/queues/RemoteQueue"/>
</jms-queue>

在客户端可以连接到远程服务器之前,需要配置用户访问凭据。在远程服务器上,运行 add user 实用程序,以在 'guest' 组中创建新应用程序用户。本例具有名为 'admin' 的用户,密码是 'secret'。

RouteBuilder 实施与前面的示例不同。您需要配置 InitialContext 并从 JNDI weselves 检索连接工厂,而不是注入连接工厂。

configureInitialContext 方法会创建这个 InitialContext。请注意,您需要设置一个提供程序 URL,它应引用您的远程 JBoss EAP 实例主机名和端口号。本例使用 JBoss EAP JMS http-connector,但 此处 提供了备选方案。

最后,路由配置为每 10 秒发送 XML 有效负载到之前配置的远程目的地 - 'RemoteQueue'。

@Override
public void configure() throws Exception {
  Context initialContext = configureInitialContext();
  ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("java:jms/RemoteConnectionFactory");

  JmsComponent component = new JmsComponent();
  component.setConnectionFactory(connectionFactory);

  getContext().addComponent("jms", component);

  from("timer://foo?fixedRate=true&period=10000")
  .transform(constant("<?xml version='1.0><message><greeting>hello world</greeting></message>"))
  .to("jms:queue:RemoteQueue?username=admin&password=secret")
  .to("log:jms?showAll=true");
}

private Context configureInitialContext() throws NamingException {
  final Properties env = new Properties();
  env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
  env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, "http-remoting://my-remote-host:8080"));
  env.put(Context.SECURITY_PRINCIPAL, System.getProperty("username", "admin"));
  env.put(Context.SECURITY_CREDENTIALS, System.getProperty("password", "secret"));
  return new InitialContext(env);
}

5.6.3. 安全性

请参阅 JMS 安全性部分

5.6.4. EAP 上的 Fuse 中的快速入门示例

快速入门示例包括在 EAP 安装的 quickstarts/camel/camel-jms 目录的 Fuse 中。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.