5.6. JMS


消息传递支持通过 JMS 组件提供,它们与 JBoss EAP Messaging(ActiveMQintroduction)子系统集成。

通过配置特定于供应商的资源适配器或不可用(如果使用 JBoss 通用 JMS 资源适配器),可以与其他 JMS 实现集成。

5.6.1. JBoss EAP JMS 配置

您可以通过标准的 JBoss EAP XML 配置文件配置 JBoss EAP 消息传递子系统。https://docs.jboss.org/author/display/WFLY8/Messaging+configuration例如: standalone.xml 或 domain.xml。

例如,之后您在内存实例中使用嵌入的 ActiveMQ Artemis。您首先,通过将以下 XML 配置添加到jms-destinations 部分,在 messaging 子系统上配置新的 JMS 队列。

<jms-queue name="WildFlyCamelQueue">
  <entry name="java:/jms/queue/WildFlyCamelQueue"/>
</jms-queue>

或者您可以使用 CLI 脚本添加队列。

$ jms-queue add --queue-address=WildFlyCamelQueue --entries=queue/WildFlyCamelQueue,java:/jms/queue/WildFlyCamelQueue

另外,您还可以在自定义 jms.xml 部署描述符中创建消息传递 部署配置。如需更多信息,请参阅 JBoss EAP messaging 子系统文档中的部署 -jms.xml 文件部分。

5.6.2. Camel 路由配置

以下 JMS producer 和使用者示例利用 JBoss EAP 嵌入式 ActiveMQ Artemis 服务器发布和使用消息到目的地。

示例还将 CDI 与 camel-cdi 组件结合使用。JMS ConnectionFactory 实例通过 JNDI 查找注入到 Camel RouteBuilder 中。

5.6.2.1. JMS Producer

DefaultJMSConnectionFactory 连接工厂从 JNDI 注入到 RouteBuilder 中。在 JBoss EAP XML 配置下,您可以在 messaging 子系统内找到连接工厂。

接下来,计时器端点每 10 秒运行一次,以将 XML 有效负载发送到之前配置的 WildFlyCamelQueue 目标。

@Startup
@ApplicationScoped
@ContextName("jms-camel-context")
public class JmsRouteBuilder extends RouteBuilder {

  @Resource(mappedName = "java:jboss/DefaultJMSConnectionFactory")
  private ConnectionFactory connectionFactory;

  @Override
  public void configure() throws Exception {
    JmsComponent component = new JmsComponent();
    component.setConnectionFactory(connectionFactory);

    getContext().addComponent("jms", component);

    from("timer://sendJMSMessage?fixedRate=true&period=10000")
    .transform(constant("<?xml version='1.0><message><greeting>hello world</greeting></message>"))
    .to("jms:queue:WildFlyCamelQueue")
    .log("JMS Message sent");
  }
}

每次将 JMS 消息添加到 WildFlyCamelQueue 目的地时,都将向控制台输出日志消息。要验证消息是否放置在队列上,您可以使用 JBoss EAP 管理控制台。

JMS 队列浏览

5.6.2.2. JMS Consumer

要使用 JMS 消息,Camel RouteBuilder 实施与制作者示例类似。

如前文所述,连接工厂从 JNDI 发现,并注入了 JMSComponent 实例。

当 JMS 端点消耗来自 WildFlyCamelQueue 目的地的消息时,内容将记录到控制台。

@Override
public void configure() throws Exception {
  JmsComponent component = new JmsComponent();
  component.setConnectionFactory(connectionFactory);

  getContext().addComponent("jms", component);

  from("jms:queue:WildFlyCamelQueue")
  .to("log:jms?showAll=true");
}

5.6.2.3. JMS 事务

为了使 Camel JMS 路由能够参与 JMS 事务,需要使用一些额外的配置。由于 camel-jms 围绕 spring-jms 构建,因此您需要配置一些 Spring 类,使其能与 JBoss EAP 的事务管理器和连接工厂一起工作。以下代码示例演示了如何使用 CDI 配置事务性 JMS Camel 路由。

camel-jms 组件需要类型为 org.springframework.transaction.PlatformTransactionManager 的事务管理器。因此,您首先创建一个扩展 JtaTransactionManager。请注意,an 标注了 @Named,以便 bean 在 Camel Bean 注册表中注册。另请注意,JBoss EAP 事务管理器和用户事务实例使用 CDI 注入。

@Named("transactionManager")
public class CdiTransactionManager extends JtaTransactionManager {

  @Resource(mappedName = "java:/TransactionManager")
  private TransactionManager transactionManager;

  @Resource
  private UserTransaction userTransaction;

  @PostConstruct
  public void initTransactionManager() {
    setTransactionManager(transactionManager);
    setUserTransaction(userTransaction);
  }
}

接下来,您需要声明要使用的事务策略。再次使用 @Named 注释,使 bean 可供 Camel 使用。事务管理器也注入,以便可使用所需的事务策略创建 TransactionTemplate。本例中为 PROPAGATION_REQUIRED。

@Named("PROPAGATION_REQUIRED")
public class CdiRequiredPolicy extends SpringTransactionPolicy {
  @Inject
  public CdiRequiredPolicy(CdiTransactionManager cdiTransactionManager) {
    super(new TransactionTemplate(cdiTransactionManager,
      new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)));
  }
}

现在,您可以配置 Camel RouteBuilder 类,并注入 Camel JMS 组件所需的依赖项。JBoss EAP XA 连接工厂与之前配置的事务管理器一起注入。

在本示例 RouteBuilder 中,每当来自 queue1 的消息被使用时,它们都会路由到名为 queue2 的其他 JMS 队列。来自 queue2 的消息会使用 rollback()DSL 方法回滚 JMS 事务。这将导致原始消息放置在死信队列(DLQ)上。

@Startup
@ApplicationScoped
@ContextName("jms-camel-context")
public class JMSRouteBuilder extends RouteBuilder {

  @Resource(mappedName = "java:/JmsXA")
  private ConnectionFactory connectionFactory;

  @Inject
  CdiTransactionManager transactionManager;

  @Override
  public void configure() throws Exception {
    // Creates a JMS component which supports transactions
    JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager);
    getContext().addComponent("jms", jmsComponent);

    from("jms:queue:queue1")
      .transacted("PROPAGATION_REQUIRED")
      .to("jms:queue:queue2");

    // Force the transaction to roll back. The message will end up on the Wildfly 'DLQ' message queue
    from("jms:queue:queue2")
      .to("log:end")
      .rollback();
  }

5.6.2.4. 远程 JMS 目的地

个 JBoss EAP 实例可以通过 远程 JNDI 将消息发送到另一 JBoss EAP 实例上配置的消息。

需要额外的 JBoss EAP 配置才能实现这一点。首先配置导出的 JMS 队列。

只有 java:jboss/exported 命名空间中的 JNDI 名称才会被视为远程客户端的候选者,因此队列会被适当地命名。

注意

您必须在 JBoss EAP 客户端 应用服务器和JBoss EAP 远程服务器上配置队列。

<jms-queue name="RemoteQueue">
  <entry name="java:jboss/exported/jms/queues/RemoteQueue"/>
</jms-queue>

在客户端可以连接到远程服务器前,需要配置用户访问凭证。在远程服务器中运行 add user 实用程序在 "guest"组中创建新应用用户。这个示例的用户名为 'admin' 且密码为 'secret'。

RouteBuilder 实施与上例不同。您需要配置 InitialContext 并从 JNDI 中检索它,而不是注入连接工厂。

configureInitialContext 方法创建了这个 InitialContext。请注意,您需要设置提供程序 URL,该 URL 应引用您的远程 JBoss EAP 实例主机名和端口号。本示例使用 JBoss EAP JMS http-connector,但 此处列出了 alternatives。

最后,路由配置为每 10 秒发送一次 XML 有效负载到之前配置的远程目的地 - 'RemoteQueue'。

@Override
public void configure() throws Exception {
  Context initialContext = configureInitialContext();
  ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("java:jms/RemoteConnectionFactory");

  JmsComponent component = new JmsComponent();
  component.setConnectionFactory(connectionFactory);

  getContext().addComponent("jms", component);

  from("timer://foo?fixedRate=true&period=10000")
  .transform(constant("<?xml version='1.0><message><greeting>hello world</greeting></message>"))
  .to("jms:queue:RemoteQueue?username=admin&password=secret")
  .to("log:jms?showAll=true");
}

private Context configureInitialContext() throws NamingException {
  final Properties env = new Properties();
  env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
  env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, "http-remoting://my-remote-host:8080"));
  env.put(Context.SECURITY_PRINCIPAL, System.getProperty("username", "admin"));
  env.put(Context.SECURITY_CREDENTIALS, System.getProperty("password", "secret"));
  return new InitialContext(env);
}

5.6.3. 安全性

请参阅 JMS 安全部分

5.6.4. EAP 上的 Fuse 中的 Quickstart 示例

快速入门/camel/camel-jms 目录中,您的 Fuse 中提供了 EAP 安装的快速入门示例。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.