使用 AMQ JMS 客户端
使开源包含更多
红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。有关更多详情,请参阅我们的首席技术官 Chris Wright 提供的消息。
第 1 章 概述
AMQ JMS 是一个 Java 消息服务(JMS) 2.0 客户端,用于发送和接收 AMQP 消息的消息。
AMQ JMS 是 AMQ 客户端的一部分,这是支持多种语言和平台的一系列消息传递库。有关客户端的概述,请参阅 AMQ 客户端概述。有关此发行版本的详情,请参考 AMQ Clients 2.11 发行注记。
AMQ JMS 基于 Apache Qpid 的 JMS 实施。有关 JMS API 的更多信息,请参阅 JMS API 引用 和 JMS 教程。
1.1. 主要特性
- JMS 1.1 和 2.0 兼容
- 用于安全通信的 SSL/TLS
- 灵活的 SASL 身份验证
- 自动重新连接和故障转移
- 准备好与 OSGi 容器一起使用
- 纯 Java 实现
基于 OpenTracing 标准的分布式追踪
重要AMQ 客户端中的分布式追踪只是一个技术预览功能。技术预览功能不受红帽产品服务等级协议(SLA)支持,且功能可能并不完整。红帽不推荐在生产环境中使用它们。这些技术预览功能可以使用户提早试用新的功能,并有机会在开发阶段提供反馈意见。有关红帽技术预览功能支持范围的详情,请参考 https://access.redhat.com/support/offerings/techpreview/。
AMQ JMS 目前不支持分布式事务(XA)。如果您的应用程序需要分布式事务,建议您使用 AMQ Core Protocol JMS 客户端。
1.2. 支持的标准和协议
AMQ JMS 支持以下行业认可的标准和网络协议:
- Java Message Service API 的版本 2.0
- 高级消息队列协议 (AMQP)的版本 1.0
- AMQP JMS 映射的版本 1.0
- 传输层安全 (TLS)协议的 1.0、1.1、1.2 和 1.3,是 SSL 的后续版本
- 简单的身份验证和安全层 (SASL)机制,包括 ANONYMOUS, PLAIN, SCRAM, EXTERNAL, 和 GSSAPI (Kerberos)
- 使用 IPv6的现代 TCP
1.3. 支持的配置
有关 AMQ JMS 支持的配置 的当前信息,请参阅红帽客户门户网站上的 Red Hat AMQ 支持的配置。
1.4. 术语和概念
本节介绍核心 API 实体,并描述它们如何一起工作。
实体 | 描述 |
---|---|
| 创建连接的入口点。 |
| 网络上两个对等点间的通信的频道。它包含会话。 |
| 生成和使用消息的上下文。它包含消息制作者和消费者。 |
| 发送消息到目的地的频道。它有一个目标目的地。 |
| 从目的地接收消息的频道。它有一个源目的地。 |
| 消息的命名位置,可以是队列或主题。 |
| 存储的消息序列。 |
| 存储的用于多播分发的消息序列。 |
| 特定于应用程序的信息片段。 |
AMQ JMS 发送并 接收消息。使用消息 生产者和消费者 在连接的对等点之间传输消息。生产者和消费者通过会话建立 。会话通过 连接建立。连接由 连接工厂 创建。
发送对等点会创建一个制作者来发送消息。制作者有一个 目的地,用于标识远程对等点上的目标队列或主题。接收对等点会创建一个消费者来接收消息。与生产者一样,使用者也有一个目的地,用于标识远程对等点上的源队列或主题。
目标是 队列或主题 。在 JMS 中,队列和主题是包含消息的命名代理实体的客户端表示。
队列实施点对点语义。每条消息仅由一个消费者查看,并且消息在读取后从队列中删除。主题实施发布订阅语义。每个消息都会看到多个消费者,在读取后,消息仍可供其他消费者使用。
如需更多信息,请参阅 JMS 指南。
1.5. 文档惯例
sudo 命令
在本文档中,sudo
用于任何需要 root 特权的命令。使用 sudo
时请谨慎操作,因为任何更改都可能会影响整个系统。有关 sudo
的更多信息,请参阅使用 sudo 命令。
文件路径
在本文档中,所有文件路径都对 Linux、UNIX 和类似操作系统(例如 /home/andrea
)有效。在 Microsoft Windows 上,您必须使用对应的 Windows 路径(例如 C:\Users\andrea
)。
变量文本
本文档包含代码块,其中的变量必须替换为特定于您的环境的值。变量文本以箭头括号括起,样式为方便的 monospace。例如,使用以下命令将 < project-dir>
; 替换为环境的值:
$ cd <project-dir>
第 2 章 安装
本章介绍了在您的环境中安装 AMQ JMS 的步骤。
2.1. 先决条件
- 您必须具有访问 AMQ 发行文件和软件仓库 的订阅。
- 要使用 AMQ JMS 构建程序,您必须安装 Apache Maven。
- 要使用 AMQ JMS,您必须安装 Java。
2.2. 使用 Red Hat Maven 存储库
配置 Maven 环境,以从 Red Hat Maven 存储库下载客户端库。
流程
将红帽存储库添加到您的 Maven 设置或 POM 文件中。有关配置文件的示例,请参阅 第 B.1 节 “使用在线软件仓库”。
<repository> <id>red-hat-ga</id> <url>https://maven.repository.redhat.com/ga</url> </repository>
将库依赖项添加到 POM 文件中。
<dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>1.5.0.redhat-00001</version> </dependency>
客户端现在在 Maven 项目中可用。
2.3. 安装本地 Maven 存储库
作为在线存储库的替代选择,AMQ JMS 可以作为基于文件的 Maven 存储库安装到本地文件系统。
流程
- 使用您的订阅 下载 AMQ Clients 2.11.0 JMS Maven 存储库 .zip 文件。
将文件内容提取到您选择的目录中。
在 Linux 或 UNIX 上,使用
unzip
命令提取文件内容。$ unzip amq-clients-2.11.0-jms-maven-repository.zip
在 Windows 上,右键单击 .zip 文件并选择 Extract All。
-
配置 Maven,以使用提取的安装目录中的
maven-repository
目录中的存储库。如需更多信息,请参阅 第 B.2 节 “使用本地存储库”。
2.4. 安装示例
流程
使用
git clone
命令将源存储库克隆到名为qpid-jms
的本地目录:$ git clone https://github.com/apache/qpid-jms.git qpid-jms
进入
qpid-jms
目录,并使用git checkout
命令切换到1.5.0
分支:$ cd qpid-jms $ git checkout 1.5.0
在此文档中,生成的本地目录被称为 < ;source-dir
>。
第 3 章 开始使用
本章介绍了设置环境并运行简单的消息传递程序的步骤。
3.1. 先决条件
- 要构建示例,必须将 Maven 配置为使用 红帽存储库或 本地存储库 。
- 您必须安装示例。
-
您必须在
localhost
上侦听连接的消息代理。它必须启用匿名访问权限。如需更多信息,请参阅 启动代理。 -
您必须具有名为
queue
的队列。如需更多信息,请参阅创建队列。
3.2. 运行 Hello World
Hello World 示例创建了一个与代理的连接,发送一条消息,其中包含到队列 队列的
问候消息,然后接收它。成功时,它会将收到的消息输出到控制台。
流程
在 <
source-dir> /qpid-jms-examples
目录中运行以下命令来构建示例:$ mvn clean package dependency:copy-dependencies -DincludeScope=runtime -DskipTests
添加 dependencies
:copy-dependencies
会导致依赖项被复制到target/dependency
目录中。使用
java
命令运行示例。对于 Linux 或 UNIX:
$ java -cp "target/classes:target/dependency/*" org.apache.qpid.jms.example.HelloWorld
在 Windows 上:
> java -cp "target\classes;target\dependency\*" org.apache.qpid.jms.example.HelloWorld
例如,在 Linux 上运行它会产生以下输出:
$ java -cp "target/classes/:target/dependency/*" org.apache.qpid.jms.example.HelloWorld Hello world!
示例的源代码位于 < source-dir> /qpid-jms-examples/src/main/java
目录中。JNDI 和日志记录配置位于 < source-dir>/qpid-jms-examples/src/main/resources
目录中。
第 4 章 Configuration
本章论述了将 AMQ JMS 实现绑定到 JMS 应用并设置配置选项的过程。
JMS 使用 Java 命名目录接口(JNDI)注册和查找 API 实施和其他资源。这可让您将代码写入 JMS API,而无需将其写入特定的实现。
配置选项作为连接 URI 上的查询参数公开。
4.1. 配置 JNDI 初始上下文
JMS 应用使用
获取的 JNDI InitialContext 对象来查找 JMS 对象,如连接工厂。AMQ JMS 在 InitialContext
Factoryorg.apache.qpid.jms.jndi.JmsInitialContextFactory
类中提供了 InitialContextFactory
的实施。
当 InitialContext
对象实例化时,会发现 InitialContextFactory
实现:
javax.naming.Context context = new javax.naming.InitialContext();
若要查找实施,环境中必须配置 JNDI。实现这一点的方法有三种:使用 jndi.properties
文件、使用系统属性或使用初始上下文 API。
使用 jndi.properties 文件
创建名为 jndi.properties
的文件,并将其放在 Java classpath 中。使用键 java.naming.factory.initial
添加属性。
示例:使用 jndi.properties 文件设置 JNDI 初始上下文工厂
java.naming.factory.initial = org.apache.qpid.jms.jndi.JmsInitialContextFactory
在基于 Maven 的项目中,jndi.properties
文件放置在 < project-dir> /src/main/resources
目录中。
使用系统属性
设置 java.naming.factory.initial
系统属性。
示例:使用系统属性设置 JNDI 初始上下文工厂
$ java -Djava.naming.factory.initial=org.apache.qpid.jms.jndi.JmsInitialContextFactory ...
使用初始上下文 API
使用 JNDI 初始上下文 API 以编程方式设置属性。
示例:以编程方式设置 JNDI 属性
Hashtable<Object, Object> env = new Hashtable<>(); env.put("java.naming.factory.initial", "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); InitialContext context = new InitialContext(env);
请注意,您可以使用相同的 API 为连接工厂、队列和主题设置 JNDI 属性。
4.2. 配置连接工厂
JMS 连接工厂是创建连接的入口点。它使用一个连接 URI,对特定于应用程序的配置设置进行编码。
要设置工厂名称和连接 URI,请使用以下格式创建一个属性。您可以将此配置存储在 jndi.properties
文件中,或者设置对应的系统属性。
连接工厂的 JNDI 属性格式
connectionFactory.<lookup-name> = <connection-uri>
例如,这是如何配置名为 app1
的工厂:
示例:在 jndi.properties 文件中设置连接工厂
connectionFactory.app1 = amqp://example.net:5672?jms.clientID=backend
然后,您可以使用 JNDI 上下文使用名称 app1
来查找您配置的连接工厂:
ConnectionFactory factory = (ConnectionFactory) context.lookup("app1");
4.3. 连接 URI
连接使用连接 URI 进行配置。连接 URI 指定远程主机、端口和一组配置选项,它们被设置为查询参数。有关可用选项的详情请参考 第 5 章 配置选项。
连接 URI 格式
<scheme>://<host>:<port>[?<option>=<value>[&<option>=<value>...]]
对于未加密的连接,方案是
,用于 SSL/TLS 连接。
amqp
例如,以下是一个连接 URI,它连接到位于端口 5672
的主机 example.net
,并将客户端 ID 设置为 后端
:
示例:连接 URI
amqp://example.net:5672?jms.clientID=backend
故障转移 URI
配置故障转移时,如果与当前服务器的连接丢失,客户端可以自动重新连接到其他服务器。故障转移 URI 具有前缀 故障切换:
在括号内包含以逗号分隔的连接 URI 列表。其他选项在末尾指定。
故障转移 URI 格式
failover:(<connection-uri>[,<connection-uri>...])[?<option>=<value>[&<option>=<value>...]]
例如,以下是故障转移 URI,它可以连接到两个主机 host1
或 host2
之一:
示例:故障转移 URI
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=backend
与连接 URI 示例一样,客户端可以使用故障转移配置中的 URI 配置多个不同的设置。这些设置在 第 5 章 配置选项 中详细介绍,其中 第 5.5 节 “故障转移选项” 部分是特别关注的部分。
SSL/TLS 服务器名称代表
当使用 amqps
方案指定 SSL/TLS 连接时,JVM 的 TLS 服务器名称调用(SNI)扩展中的主机片段可用于 TLS 握手期间通信所需的服务器主机名。如果指定了完全限定域名(如 "myhost.mydomain"),但没有使用非限定名称(如 "myhost")或裸机 IP 地址,则会自动包含 SNI 扩展。
4.4. 配置队列和主题名称
JMS 提供了使用 JNDI 查找特定于部署的队列和主题资源的 选项。
若要在 JNDI 中设置队列和主题名称,请以以下格式创建属性:将此配置放在 jndi.properties
文件中,或者设置对应的系统属性。
队列和主题的 JNDI 属性格式
queue.<lookup-name> = <queue-name> topic.<lookup-name> = <topic-name>
例如,以下属性定义了两个部署特定资源的名称 作业和
通知
:
示例:在 jndi.properties 文件中设置队列和主题名称
queue.jobs = app1/work-items topic.notifications = app1/updates
然后,您可以通过其 JNDI 名称查找资源:
Queue queue = (Queue) context.lookup("jobs"); Topic topic = (Topic) context.lookup("notifications");
4.5. JNDI 属性中的变量扩展
JNDI 属性值可以包含格式为 ${ <variable-name> }
的变量。该程序库通过搜索以下位置的顺序来解析变量值:
- Java 系统属性
- OS 环境变量
- JNDI 属性文件或环境散列
例如,在 Linux ${HOME}
上,解析为 HOME
环境变量,当前用户的主目录。
可以使用语法 ${ <variable-name> :- < default-value> } 提供默认值
。如果没有找到 < ;variable-name
> 的值,则使用默认值。
第 5 章 配置选项
本章列出了 AMQ JMS 的可用配置选项。
JMS 配置选项被设置为连接 URI 上的查询参数。如需更多信息,请参阅 第 4.3 节 “连接 URI”。
5.1. JMS 选项
这些选项控制 JMS 对象的行为,如 Connection
,Session
,MessageConsumer
, 和 MessageProducer
。
- jms.username
- 客户端用来验证连接的用户名。
- jms.password
- 客户端用来验证连接的密码。
- jms.clientID
- 客户端应用到连接的客户端 ID。
- jms.forceAsyncSend
-
如果启用,则来自
MessageProducer
的所有消息都会异步发送。否则,只有某些类型(如非持久性消息或事务内的消息)异步发送。它默认是禁用的。 - jms.forceSyncSend
-
如果启用,则来自
MessageProducer
的所有消息都会同步发送。它默认是禁用的。 - jms.forceAsyncAcks
- 如果启用,则所有消息确认都会异步发送。它默认是禁用的。
- jms.localMessageExpiry
-
如果启用,则由
MessageConsumer
接收的任何过期信息都会被过滤掉且不发送。它会被默认启用。 - jms.localMessagePriority
- 如果启用,则预先获取的消息会根据消息优先级值在本地重新排序。它默认是禁用的。
- jms.validatePropertyNames
- 如果启用,则消息属性名称必须是有效的 Java 标识符。它会被默认启用。
- jms.receiveLocalOnly
-
如果启用,则使用 timeout 参数
接收的
调用只检查消费者的本地消息缓冲区。否则,如果超时过期,则会检查远程 peer 以确保没有信息。它默认是禁用的。 - jms.receiveNoWaitLocalOnly
-
如果启用,则调用
receiveNoWait
仅检查消费者的本地消息缓冲区。否则,将检查远程 peer 以确保实际没有可用的消息。它默认是禁用的。 - jms.queuePrefix
-
一个可选的前缀值添加到从
Session
创建的任何Queue
的名称中。 - jms.topicPrefix
-
一个可选的前缀值添加到从
Session
创建的任何
主题的名称中。 - jms.closeTimeout
- 客户端在返回前等待正常资源冲突的时间(以毫秒为单位)。默认值为 60000 (60 秒)。
- jms.connectTimeout
- 客户端在返回错误前等待连接建立的时间(以毫秒为单位)。默认值为 15000 (15 秒)。
- jms.sendTimeout
- 客户端在返回错误前等待完成 同步消息发送的时间 (以毫秒为单位)。默认情况下,客户端会无限期等待发送完成。
- jms.requestTimeout
- 客户端在返回错误前等待完成 各种同步交互 的时间(以毫秒为单位),如打开生成者或消费者(不包括发送)。默认情况下,客户端会无限期等待请求完成。
- jms.clientIDPrefix
-
可选前缀值,用于在由
ConnectionFactory
创建新连接时
生成客户端 ID 值。默认值为ID:
- jms.connectionIDPrefix
-
可选前缀值,用于在由
ConnectionFactory
创建新连接时生成连接
ID 值。在从Connection
对象中记录一些信息时使用此连接 ID,因此可配置的前缀可以更轻松地对日志进行导航。默认值为ID:
- jms.populateJMSXUserID
-
如果启用,请使用来自连接的经过身份验证的用户名称,为每个发送的消息填充
JMSXUserID
属性。它默认是禁用的。 - jms.awaitClientID
- 如果启用,则 URI 中没有配置的客户端 ID 的连接会以编程方式设置客户端 ID,或者在发送 AMQP 连接 "open" 前确认是否设置 none。它会被默认启用。
- jms.useDaemonThread
- 如果启用,连接将守护进程线程用于其 executor,而不是非守护进程线程。它默认是禁用的。
- jms.tracing
-
追踪提供程序的名称。支持的值有
opentracing
和noop
。默认值为noop
。
预抓取策略选项
prefetch 策略决定每个 MessageConsumer
从远程对等点获取的消息数量,并保存在本地"prefetch"缓冲区中。
- jms.prefetchPolicy.queuePrefetch
- 默认值为 1000。
- jms.prefetchPolicy.topicPrefetch
- 默认值为 1000。
- jms.prefetchPolicy.queueBrowserPrefetch
- 默认值为 1000。
- jms.prefetchPolicy.durableTopicPrefetch
- 默认值为 1000。
- jms.prefetchPolicy.all
- 这可用于一次性设置所有 prefetch 值。
prefetch 的值可能会影响到队列或共享订阅上的多个消费者的信息分布。较高的值可导致一次向每个消费者发送更大的批处理。要实现更多循环分发,请使用较低的值。
重新发送策略选项
重新发送策略控制如何在客户端上处理重新设计的消息。
- jms.redeliveryPolicy.maxRedeliveries
- 根据已重新设计的次数控制传入的消息何时被拒绝。值 0 表示不接受任何消息 redeliveries。值 5 允许将消息恢复为五次,以此类推。默认值为 -1,代表没有限制。
- jms.redeliveryPolicy.outcome
-
控制在超过配置的 maxRedeliveries 值后,应用到消息的结果。支持的值有:
ACCEPTED
,REJECTED
,RELEASED
,MODIFIED_FAILED
和MODIFIED_FAILED_UNDELIVERABLE
.默认值为MODIFIED_FAILED_UNDELIVERABLE
。
消息 ID 策略选项
消息 ID 策略控制分配给从客户端发送的消息 ID 的数据类型。
- jms.messageIDPolicy.messageIDType
-
默认情况下,生成的
String
值用于传出消息上的消息 ID。其他可用的类型包括UUID
、UUID_STRING
和PREFIXED_UUID_STRING
。
Presettle 策略选项
Presettle 策略控制生产者或消费者实例何时配置为使用 AMQP 预先设置的消息传递语义。
- jms.presettlePolicy.presettleAll
- 如果启用,则创建的生成者和非转换的用户都以预先设置的模式操作。它默认是禁用的。
- jms.presettlePolicy.presettleProducers
- 如果启用,则所有制作者都以预先设置的模式运行。它默认是禁用的。
- jms.presettlePolicy.presettleTopicProducers
-
如果启用,则发送到主题或
Temporary
目的地的任何生成者在 presettled 模式中运行。它默认是禁用的。Topic
- jms.presettlePolicy.presettleQueueProducers
-
如果启用,则发送到
Queue
或TemporaryQueue
目的地的任何制作者都在预先设置的模式下运行。它默认是禁用的。 - jms.presettlePolicy.presettleTransactedProducers
-
如果启用,在 transacted
Session
中创建的任何制作者都以预先设置的模式运行。它默认是禁用的。 - jms.presettlePolicy.presettleConsumers
- 如果启用,则所有消费者都以预先设置的模式运行。它默认是禁用的。
- jms.presettlePolicy.presettleTopicConsumers
-
如果启用,则从主题或
Temporary
目的地接收的消费者以 presettled 模式运行。它默认是禁用的。Topic
- jms.presettlePolicy.presettleQueueConsumers
-
如果启用,则从
Queue
或TemporaryQueue
目的地接收的任何消费者都在预先设置的模式下运行。它默认是禁用的。
解序列化策略选项
解序列化策略提供了一种方式,控制哪些 Java 类型可信任从对象流中反序列化,同时从由序列化 Java 对象内容组成的传入
检索正文。默认情况下,所有类型都会在尝试反序列化正文期间被信任。默认 deserialization 策略提供 URI 选项,允许指定白名单和 Java 类或软件包名称黑名单。
Object
Message
- jms.deserializationPolicy.whiteList
-
以逗号分隔的类和软件包名称列表,在停用
ObjectMessage
的内容时应允许这些名称,除非被blackList
覆盖。此列表中的名称不是模式值。必须配置准确的类或软件包名称,如java.util.Map
或java.util
中所示。软件包匹配包括子软件包。默认为允许所有。 - jms.deserializationPolicy.blackList
-
以逗号分隔的类和软件包名称列表,在停用
ObjectMessage
的内容时应被拒绝。此列表中的名称不是模式值。必须配置准确的类或软件包名称,如java.util.Map
或java.util
中所示。软件包匹配包括子软件包。默认为防止 none。
5.2. TCP 选项
当使用普通 TCP 连接到远程服务器时,以下选项指定了底层套接字的行为。这些选项会与其他配置选项一起附加到连接 URI 中。
示例:使用传输选项的连接 URI
amqp://localhost:5672?jms.clientID=foo&transport.connectTimeout=30000
下面列出了完整的 TCP 传输选项。
- transport.sendBufferSize
- 发送缓冲区大小(以字节为单位)。默认值为 65536 (64 KiB)。
- transport.receiveBufferSize
- 接收缓冲区大小(以字节为单位)。默认值为 65536 (64 KiB)。
- transport.trafficClass
- 默认值为 0。
- transport.connectTimeout
- 默认为 60 秒。
- transport.soTimeout
- 默认值为 -1。
- transport.soLinger
- 默认值为 -1。
- transport.tcpKeepAlive
- 默认值为 false。
- transport.tcpNoDelay
- 如果启用,请不要延迟和缓冲 TCP 发送。它会被默认启用。
- transport.useEpoll
- 当可用时,使用原生 epoll IO 层而不是 NIO 层。这可以提高性能。它会被默认启用。
5.3. SSL/TLS 选项
SSL/TLS 传输通过使用 amqps
URI 方案启用。因为 SSL/TLS 传输扩展了基于 TCP 的传输的功能,所以所有 TCP 传输选项都在 SSL/TLS 传输 URI 中有效。
示例:一个简单的 SSL/TLS 连接 URI
amqps://myhost.mydomain:5671
下面列出了完整的 SSL/TLS 传输选项。
- transport.keyStoreLocation
-
SSL/TLS 密钥存储的路径。如果未设置,则使用
javax.net.ssl.keyStore
系统属性的值。 - transport.keyStorePassword
-
SSL/TLS 密钥存储的密码。如果未设置,则使用
javax.net.ssl.keyStorePassword
系统属性的值。 - transport.trustStoreLocation
-
SSL/TLS 信任存储的路径。如果未设置,则使用
javax.net.ssl.trustStore
系统属性的值。 - transport.trustStorePassword
-
SSL/TLS 信任存储的密码。如果未设置,则使用
javax.net.ssl.trustStorePassword
系统属性的值。 - transport.keyStoreType
-
如果未设置,则使用
javax.net.ssl.keyStoreType
系统属性的值。如果未设置系统属性,则默认为JKS
。 - transport.trustStoreType
-
如果未设置,则使用
javax.net.ssl.trustStoreType
系统属性的值。如果未设置系统属性,则默认为JKS
。 - transport.storeType
-
将
keyStoreType
和trustStoreType
设置为相同的值。如果未设置,keyStoreType
和trustStoreType
默认为以上指定的值。 - transport.contextProtocol
-
获取 SSLContext 时使用的 protocol 参数。如果使用 OpenSSL,则默认为
TLS
或TLSv1.2
。 - transport.enabledCipherSuites
- 要启用的、以逗号分隔的密码套件列表。如果未设置,则使用 context-default 密码。任何禁用的密码都会从这个列表中删除。
- transport.disabledCipherSuites
- 要禁用的、以逗号分隔的密码套件列表。此处列出的密码已从启用的密码中删除。
- transport.enabledProtocols
- 要启用的以逗号分隔的协议列表。如果未设置,则使用上下文默认协议。所有禁用的协议都会从这个列表中删除。
- transport.disabledProtocols
-
要禁用的以逗号分隔的协议列表。此处列出的协议已从启用的协议列表中删除。默认为
SSLv2Hello,SSLv3
。 - transport.trustAll
- 如果启用,请隐式信任提供的服务器证书,无论任何配置的信任存储是什么。它默认是禁用的。
- transport.verifyHost
- 如果启用,请验证连接主机名是否与提供的服务器证书匹配。它会被默认启用。
- transport.keyAlias
- 需要向服务器发送客户端证书时从密钥存储中选择密钥对时使用的别名。
- transport.useOpenSSL
如果启用,在 SSL/TLS 连接中使用原生 OpenSSL 库(如果可用)。它默认是禁用的。
如需更多信息,请参阅 第 7.1 节 “启用 OpenSSL 支持”。
5.4. AMQP 选项
以下选项适用于与 AMQP 线协议相关的行为的各个方面。
- amqp.idleTimeout
- 如果 peer 没有 AMQP 帧,则连接失败的时间(以毫秒为单位)。默认值为 60000 (1 分钟)。
- amqp.vhost
- 要连接的虚拟主机。这用于填充 SASL 和 AMQP 主机名字段。默认为来自连接 URI 的主要主机名。
- amqp.saslLayer
- 如果启用,则在建立连接时使用 SASL。它会被默认启用。
- amqp.saslMechanisms
- 如果服务器提供,则以逗号分隔的 SASL 机制列表,允许选择服务器并使用配置的凭证。支持的机制有 EXTERNAL, SCRAM-SHA-256, SCRAM-SHA-1, CRAM-MD5, PLAIN, ANONYMOUS, 和 GSSAPI 用于 Kerberos。默认为允许来自 GSSAPI 以外的所有机制的选择,该机制必须明确包含才能启用。
- amqp.maxFrameSize
- 客户端允许的最大 AMQP 帧大小(以字节为单位)。这个值会公告给远程对等点。默认值为 1048576 (1 MiB)。
- amqp.drainTimeout
- 客户端在提交消费者排空请求时等待来自远程对等点的响应的时间(以毫秒为单位)。如果在分配的超时时间中没有看到响应,则链接被视为失败,相关的消费者将被关闭。默认值为 60000 (1 分钟)。
- amqp.allowNonSecureRedirects
- 如果启用,允许 AMQP 在现有连接安全且替代连接时重定向到备用主机。例如,如果启用,这将允许将 SSL/TLS 连接重定向到原始 TCP 连接。它默认是禁用的。
5.5. 故障转移选项
故障转移 URI 从前缀 failover:
开始,并在括号内包含以逗号分隔的连接 URI 列表。其他选项在末尾指定。前缀为 jms.
的选项应用于括号之外的总体故障转移 URI,并影响 其生命周期的连接
对象。
示例:带有故障切换选项的故障转移 URI
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.maxReconnectAttempts=20
括号中的单个代理详情可以使用之前定义的 transport.
或 amqp.
选项。它们在每次主机都连接到时应用。
示例:带有每个连接传输和 AMQP 选项的故障转移 URI
failover:(amqp://host1:5672?amqp.option=value,amqp://host2:5672?transport.option=value)?jms.clientID=foo
下面列出了用于故障转移的所有配置选项。
- failover.initialReconnectDelay
- 客户端在第一次尝试重新连接到远程对等点前等待的时间(毫秒)。默认值为 0,表示第一个尝试会立即发生。
- failover.reconnectDelay
- 重新连接尝试之间的时间(毫秒)。如果没有启用 backoff 选项,这个值会保持恒定状态。默认值为 10。
- failover.maxReconnectDelay
- 客户端在尝试重新连接前等待的时间。只有在启用了 backoff 功能来确保延迟不会增长太大时才使用这个值。默认值为 30 秒。
- failover.useReconnectBackOff
- 如果启用,则重新连接尝试之间的时间会根据配置的倍数增加。它会被默认启用。
- failover.reconnectBackOffMultiplier
- 用于增大 reconnection delay 值的倍数。默认值为 2.0。
- failover.maxReconnectAttempts
- 在向客户端报告连接失败前允许的重新连接尝试次数。默认值为 -1,代表没有限制。
- failover.startupMaxReconnectAttempts
-
对于之前从未连接到远程对等点的客户端,这个选项控制在报告连接失败前要连接的尝试次数。如果未设置,则使用
maxReconnectAttempts
的值。 - failover.warnAfterReconnectAttempts
- 重新连接失败次数,直到记录警告为止。默认值为 10。
- failover.randomize
- 如果启用,在尝试连接到其中一个故障转移 URI 之前,会随机执行一组故障转移 URI。这有助于在多个远程对等点之间更均匀地分发客户端连接。它默认是禁用的。
- failover.amqpOpenServerListAction
-
控制当服务器中连接"open"帧提供故障转移主机到客户端的列表时,故障转移传输的行为方式。有效值为
REPLACE
、ADD
或IGNORE
。如果配置了REPLACE
,则当前服务器以外的所有故障转移 URI 都替换为服务器提供的。如果配置了ADD
,则服务器提供的 URI 将添加到现有故障转移 URI 集合中,并去除重复数据。如果配置了IGNORE
,则忽略来自服务器的任何更新,且不会对正在使用的故障转移 URI 进行任何更改。默认值为REPLACE
。
故障转移 URI 还支持定义嵌套选项,作为指定适用于所有嵌套代理 URI 的 AMQP 和传输选项值的方法。这使用相同的 传输实现。
和 amqp。
之前为非故障切换代理 URI 概述的 URI 选项,但带有 failover.nested
的前缀。例如,要将 amqp.vhost
选项的值应用到连接到的每个代理,您可能有一个类似如下的 URI:
示例:使用共享传输和 AMQP 选项的故障转移 URI
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.nested.amqp.vhost=myhost
5.6. 发现选项
客户端具有一个可选的发现模块,它提供自定义故障转移层,代理 URI 没有在初始 URI 中提供,而是通过与发现代理交互来发现。目前有两个发现代理实现:一个从文件中加载 URI 的文件监视器,以及一个多播监听程序,用于为侦听客户端广播其代理地址的 ActiveMQ 5.x 代理。
使用发现时,常规的与故障转移相关的选项集与之前详述的相同,主前缀从 故障转移
改为 发现。
以及用于提供对所有发现的代理 URI 通用的 嵌套
前缀。例如,如果没有代理 URI 详情,一般的发现 URI 可能类似如下:
示例:发现 URI
discovery:(<agent-uri>)?discovery.maxReconnectAttempts=20&discovery.discovered.jms.clientID=foo
要使用文件观察器发现代理,请创建类似如下的代理 URI:
示例:使用文件监视器代理的发现 URI
discovery:(file:///path/to/monitored-file?updateInterval=60000)
下面列出了文件观察器发现代理的 URI 选项。
- updateInterval
- 检查文件更改之间的时间(毫秒)。默认值为 30000 (30 秒)。
要将多播发现代理与 ActiveMQ 5.x 代理搭配使用,请创建类似如下的代理 URI:
示例:使用多播监听程序代理的发现 URI
discovery:(multicast://default?group=default)
请注意,在上述多播代理 URI 中将 default
用作主机是一个特殊值,代理将替换为默认的 239.255.2.3:6155
。您可以更改它,以指定用于多播配置的实际 IP 地址和端口。
下方列出了多播发现代理的 URI 选项。
- group
-
用于侦听更新的多播组。默认为
default
。
第 6 章 例子
本章演示了通过示例程序使用 AMQ JMS。
如需了解更多示例,请参阅 AMQ JMS 示例套件和 Qpid JMS 示例。
6.1. 配置 JNDI 上下文
使用 JMS 的应用通常使用 JNDI 获取应用所用的 ConnectionFactory
和 Destination
对象。这会使配置与程序分开,并使其与特定的客户端实施隔离。
为了使用这些示例,应将名为 jndi.properties
的文件放在类路径上,以配置 JNDI 上下文,如前面所述。
jndi.properties
文件的内容应与下面显示的内容匹配,它确定应使用客户端的 InitialContextFactory
实施,配置 ConnectionFactory
以连接到本地服务器,并定义名为 queue
的目标队列。
# Configure the InitialContextFactory class to use java.naming.factory.initial = org.apache.qpid.jms.jndi.JmsInitialContextFactory # Configure the ConnectionFactory connectionfactory.myFactoryLookup = amqp://localhost:5672 # Configure the destination queue.myDestinationLookup = queue
6.2. 发送消息
本例首先会创建一个 JNDI 上下文
,使用它来查找 ConnectionFactory
和 Destination
,使用工厂创建并启动 连接
,然后创建一个 Session
。然后,将创建一个 MessageProducer
到 Destination
,并使用它发送一条消息。然后,连接
关闭,程序会退出。
此 Sender
示例的一个可运行变体位于 < ;source-dir>/qpid-jms-examples
目录中,以及之前在 第 3 章 开始使用 中涵盖的 Hello World 示例。
示例:发送消息
package org.jboss.amq.example; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; public class Sender { public static void main(String[] args) throws Exception { try { Context context = new InitialContext(); 1 ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup"); Destination destination = (Destination) context.lookup("myDestinationLookup"); 2 Connection connection = factory.createConnection("<username>", "<password>"); connection.setExceptionListener(new MyExceptionListener()); connection.start(); 3 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 4 MessageProducer messageProducer = session.createProducer(destination); 5 TextMessage message = session.createTextMessage("Message Text!"); 6 messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); 7 connection.close(); 8 } catch (Exception exp) { System.out.println("Caught exception, exiting."); exp.printStackTrace(System.out); System.exit(1); } } private static class MyExceptionListener implements ExceptionListener { @Override public void onException(JMSException exception) { System.out.println("Connection ExceptionListener fired, exiting."); exception.printStackTrace(System.out); System.exit(1); } } }
- 1
- 2
- ConnectionFactory 和 Destination 对象利用其查找名称从 JNDI 上下文检索。
- 3
- 工厂用于创建
Connection
,然后注册了ExceptionListener
,然后启动。创建连接时给出的凭证通常来自适当的外部配置源,确保它们与应用程序本身保持独立,并可独立更新。 - 4
- 在连接上创建了非转换的
、
自动确认会话
。 - 5
- 创建
MessageProducer
以将消息发送到Destination
。 - 6
- 使用给定内容创建一个
TextMessage
。 - 7
TextMessage
被发送。它发送了非持久性,具有默认优先级且无过期。- 8
连接
关闭。Session
和MessageProducer
被隐式关闭。
请注意,这只是一个示例。现实应用通常使用长期消息Producer,并随着时间的推移发送许多消息。打开然后关闭每个 消息
和消息生产通常效率低下。
的连接
、会话
6.3. 接收信息
本例首先创建一个 JNDI 上下文,使用它来查找 ConnectionFactory
和 Destination
,使用工厂创建和启动 连接
,然后创建一个 Session
。然后,为 Destination
创建一个 MessageConsumer
,使用它收到一条消息,其内容会输出到控制台。然后,连接关闭,程序会退出。相同的 JNDI 配置与 发送示例 中相同。
此 Receiver
示例的一个可执行变体包含在客户端分发的示例目录中,以及之前在 第 3 章 开始使用 中涵盖的 Hello World 示例。
示例:接收信息
package org.jboss.amq.example; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; public class Receiver { public static void main(String[] args) throws Exception { try { Context context = new InitialContext(); 1 ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup"); Destination destination = (Destination) context.lookup("myDestinationLookup"); 2 Connection connection = factory.createConnection("<username>", "<password>"); connection.setExceptionListener(new MyExceptionListener()); connection.start(); 3 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 4 MessageConsumer messageConsumer = session.createConsumer(destination); 5 Message message = messageConsumer.receive(5000); 6 if (message == null) { 7 System.out.println("A message was not received within given time."); } else { System.out.println("Received message: " + ((TextMessage) message).getText()); } connection.close(); 8 } catch (Exception exp) { System.out.println("Caught exception, exiting."); exp.printStackTrace(System.out); System.exit(1); } } private static class MyExceptionListener implements ExceptionListener { @Override public void onException(JMSException exception) { System.out.println("Connection ExceptionListener fired, exiting."); exception.printStackTrace(System.out); System.exit(1); } } }
- 1
- 2
ConnectionFactory
和Destination
对象利用其查找名称从 JNDI上下文
检索。- 3
- 工厂用于创建
Connection
,然后注册了ExceptionListener
,然后启动。创建连接时给出的凭证通常来自适当的外部配置源,确保它们与应用程序本身保持独立,并可独立更新。 - 4
- 在连接上创建了非转换的
、
自动确认会话
。 - 5
- 创建
MessageConsumer
以接收来自Destination
的信息。 - 6
- 发出对接收消息的调用,显示 5 秒超时。
- 7
- 检查结果,如果收到了消息,则其内容会被打印或注意没有收到消息。结果明确发送到
TextMessage
,因为这是我们知道发送者
发送的内容。 - 8
连接
关闭。Session
和MessageConsumer
被隐式关闭。
请注意,这只是一个示例。现实应用通常使用长期的 MessageConsumer
,并随着时间的推移接收许多消息。打开然后关闭每个消息
和 的连接
、会话MessageConsumer
通常效率较低。
第 7 章 安全性
AMQ JMS 具有一系列与安全相关的配置选项,可根据您的应用需求加以利用。
在应用中创建连接时,用户名和密码等基本用户凭据应直接传递给 ConnectionFactory
。但是,如果您使用 no-argument factory 方法,也可以在连接 URI 中提供用户凭证。如需更多信息,请参阅 第 5.1 节 “JMS 选项” 部分。
另一个常见的安全考虑因素是使用 SSL/TLS。当连接 URI 方案在连接 URI 中指定时,客户端通过
SSL/TLS 传输 连接到服务器,并提供各种选项来配置行为。如需更多信息,请参阅 第 5.3 节 “SSL/TLS 选项” 部分。
与之前的项目一致,可能需要将客户端限制为只允许使用服务器提供的特定 SASL 机制,而不是从所有它支持的选择。如需更多信息,请参阅 第 5.4 节 “AMQP 选项” 部分。
在接收的 ObjectMessage
上调用 getObject ()
的应用程序可能需要限制在序列化过程中创建的类型。请注意,使用 AMQP 类型系统组成的消息正文不使用 ObjectInputStream
机制,因此不需要此预防。如需更多信息,请参阅 “解序列化策略选项”一节 部分。
7.1. 启用 OpenSSL 支持
SSL/TLS 连接可以配置为使用原生 OpenSSL 实现来提高性能。要使用 OpenSSL,必须启用 transport.useOpenSSL
选项,并且必须在 classpath 上提供 OpenSSL 支持库。
要在 Red Hat Enterprise Linux 上使用系统安装的 OpenSSL 库,请安装 openssl
和 apr
RPM 软件包,并在 POM 文件中添加以下依赖项:
示例:添加原生 OpenSSL 支持
<dependency> <groupId>io.netty</groupId> <artifactId>netty-tcnative</artifactId> <version>2.0.46.Final-redhat-00001</version> <classifier>linux-x86_64-fedora</classifier> </dependency>
Netty 项目中提供了 OpenSSL 库实现列表。
7.2. 使用 Kerberos 进行身份验证
当与适当配置的服务器一起使用时,客户端可以配置为使用 Kerberos 进行身份验证。要启用 Kerberos,请使用以下步骤。
使用
amqp.saslMechanisms
URI 选项将客户端配置为使用 SASL 身份验证的GSSAPI
机制。amqp://myhost:5672?amqp.saslMechanisms=GSSAPI failover:(amqp://myhost:5672?amqp.saslMechanisms=GSSAPI)
将
java.security.auth.login.config
系统属性设置为包含 KerberosLoginModule
的适当配置的 JAAS 登录配置文件的路径。-Djava.security.auth.login.config=<login-config-file>
登录配置文件可能类似以下示例:
amqp-jms-client { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true; };
使用的确切配置将取决于您如何为连接建立凭证,以及使用的特定 LoginModule
。有关 Oracle Krb5LoginModule
的详情,请查看 Oracle Krb5LoginModule
类参考。有关 IBM Java 8 Krb5LoginModule
的详情,请查看 IBM Krb5LoginModule
类参考。
可以将 LoginModule
配置为建立用于 Kerberos 进程的凭证,如指定主体以及是否使用现有的票据缓存或 keytab。但是,如果 LoginModule
配置不提供建立所有必要的凭据的方法,则请求并从客户端连接对象传递用户名和密码值(如果使用 ConnectionFactory
选项创建连接或之前通过 URI 选项配置)。
请注意,Kerberos 仅支持验证目的。使用 SSL/TLS 连接进行加密。
以下连接 URI 选项可用于影响 Kerberos 身份验证过程。
- sasl.options.configScope
-
用于验证的登录配置条目的名称。默认值为
amqp-jms-client
。 - sasl.options.protocol
-
GSSAPI SASL 进程中使用的协议值。默认值为
amqp
。 - sasl.options.serverName
-
GSSAPI SASL 进程中使用的
serverName
值。默认为来自连接 URI 的服务器主机名。
与前面详述的 amqp.
和 transport.
选项类似,这些选项必须基于每个主机指定,或者作为故障切换 URI 中的 all-host 嵌套选项。
第 8 章 消息交付
8.1. 处理未确认的发送
消息传递系统使用消息确认来跟踪消息的目标是否真正完成。
发送消息后,在邮件被发送并被确认前有一个时间段(消息为"in flight")。如果此时丢失网络连接,消息交付的状态未知,发送可能需要在应用程序代码中特殊处理,以确保其完成。
以下小节描述了连接失败时消息交付的条件。
非转换生成者带有未确认的发送
如果消息在 flight 中,它会在重新连接后再次发送,只要未设置发送超时,且尚未设置。
不需要用户操作。
带有未提交事务的 transacted producer
如果消息在 flight 中,它会在重新连接后再次发送。如果发送是新事务中的第一个,则在重新连接后发送将继续正常。如果以前的事务中发送,则事务被视为失败,后续提交操作都会引发 TransactionRolledBackException
。
为确保发送,用户必须重新发送属于失败事务的任何消息。
带有待处理提交的 transacted producer
如果提交处于 flight,则事务被视为失败,后续提交操作都会引发 TransactionRolledBackException
。
为确保发送,用户必须重新发送属于失败事务的任何消息。
非转换的消费者,带有未确认的发送
如果收到消息但尚未确认,则确认消息不会产生任何错误,但不会给客户端产生任何操作。
由于收到的消息没有被确认,因此生成者可能会重新发送它。为避免重复,用户必须根据消息 ID 过滤出重复的消息。
带有未提交事务的 transacted consumer
如果还没有提交活动事务,它将被视为失败,并丢弃任何待处理的确认。任何后续提交操作都会抛出 TransactionRolledBackException
。
生产者可能会重新发送属于事务的消息。为避免重复,用户必须根据消息 ID 过滤出重复的消息。
使用待处理的提交转换消费者
如果提交处于 flight,则事务将被视为失败。任何后续提交操作都会抛出 TransactionRolledBackException
。
生产者可能会重新发送属于事务的消息。为避免重复,用户必须根据消息 ID 过滤出重复的消息。
8.2. 扩展会话确认模式
除了 JMS 规范中定义的外,客户端支持另外两种会话确认模式。
个人确认
在这个模式中,应用程序必须使用会话处于 CLIENT_ACKNOWLEDGE
模式时使用的 Message.acknowledge ()
方法单独确认消息。与 CLIENT_ACKNOWLEDGE
模式不同,只有目标消息被确认。所有其他发送的消息仍未确认。用于激活此模式的整数值是 101。
connection.createSession(false, 101);
没有确认
在此模式中,在发送到客户端之前,服务器接受消息,并且客户端不会执行确认。客户端支持两个整数值来激活此模式,100 和 257。
connection.createSession(false, 100);
第 9 章 日志记录
9.1. 配置日志记录
客户端使用 SLF4J API,允许用户根据自己的需要选择特定的日志记录实施。例如,用户可以提供 slf4j-log4j 绑定来选择 Log4J 实现。有关 SLF4J 的详情,请访问 其网站。
客户端使用位于 org.apache.qpid.jms
层次结构中的 Logger
名称,您可以使用它来根据您的需要配置日志实施。
9.2. 启用协议日志记录
调试时,从 Qpid Proton AMQP 1.0 库启用其他协议追踪日志有时非常有用。实现此目标的方法有两种。
-
将环境变量(不是 Java 系统属性)
PN_TRACE_FRM
设置为1
。当变量设置为1
时,Proton 会将帧日志记录发送到控制台。 -
将
amqp.traceFrames=true
选项添加到 连接 URI,并将org.apache.qpid.jms.provider.amqp.FRAMES
日志记录器配置为日志级别TRACE
。这会向 Proton 添加了一个协议跟踪器,并在您的日志中包括输出。
您还可以将客户端配置为发出输入和输出字节的低级别追踪。若要启用此功能,请将选项 transport.traceBytes=true
添加到 连接 URI,并将 org.apache.qpid.jms.transports.netty.NettyTcpTransport
日志记录器配置为日志级别 DEBUG
。
第 10 章 分布式追踪
客户端根据 OpenTracing 标准的 Jaeger 实现提供分布式追踪。
10.1. 启用分布式追踪
使用以下步骤在应用程序中启用追踪:
流程
将 Jaeger 客户端依赖项添加到 POM 文件中。
<dependency> <groupId>io.jaegertracing</groupId> <artifactId>jaeger-client</artifactId> <version>${jaeger-version}</version> </dependency>
${jaeger-version}
必须是 1.0.0 或更高版本。将
jms.tracing
选项添加到您的连接 URI。将值设为opentracing
。示例:启用了追踪的连接 URI
amqps://example.net?jms.tracing=opentracing
注册全局 tracer。
示例: Global tracer 注册
import io.jaegertracing.Configuration; import io.opentracing.Tracer; import io.opentracing.util.GlobalTracer; public class Example { public static void main(String[] args) { Tracer tracer = Configuration.fromEnv("<service-name>").getTracer(); GlobalTracer.registerIfAbsent(tracer); // ... } }
配置您的环境以进行追踪。
示例:跟踪配置
$ export JAEGER_SAMPLER_TYPE=const $ export JAEGER_SAMPLER_PARAM=1 $ java -jar example.jar net.example.Example
此处显示的配置用于演示目的。如需有关 Jaeger 配置的更多信息,请参阅通过 环境和 Jaeger Sampling 配置。
要查看应用程序捕获的 trace,请使用 Jaeger Getting Started 来运行 Jaeger 基础架构和控制台。
第 11 章 互操作性
本章讨论了如何将 AMQ JMS 与其他 AMQ 组件结合使用。有关 AMQ 组件的兼容性概述,请参阅 产品简介。
11.1. 与其他 AMQP 客户端交互
AMQP 消息 使用 AMQP 类型系统 组成。具有这种通用格式是以不同语言的 AMQP 客户端能够相互互操作的原因之一。本节旨在记录客户端与所使用的各种 JMS 消息类型相关的 AMQP 有效负载和接收的 AMQP 有效负载,以帮助将客户端与其他 AMQP 客户端一起使用。
11.1.1. 发送消息
本节用于记录使用各种 JMS 消息类型时客户端发送的不同有效负载,以帮助其他客户端使用其他客户端接收它们。
11.1.1.1. 消息类型
JMS 消息类型 | 传输的 AMQP 消息的描述 |
---|---|
将使用 amqp-value body 部分发送 TextMessage,其中包含正文文本的 utf8 编码字符串,如果没有设置正文文本,则为 null。带有 符号 键为 "x-opt-jms-msg-type" 的消息注释将设置为 字节 值 5。 | |
将使用包含 BytesMessage 正文中的原始字节 的数据 正文部分发送 BytesMessage,并将 properties 部分 content-type 字段设置为符号值 "application/octet-stream"。http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-symbol带有 符号 键为 "x-opt-jms-msg-type" 的消息注释将设置为 字节 值 3。 | |
MapMessage 正文将使用包含单个 映射 值的 amqp-value body 部分发送。MapMessage 正文中的任何 byte[] 值将编码为映射中 的二进制 条目。带有 符号 键为 "x-opt-jms-msg-type" 的消息注释将设置为 字节 值 2。 | |
StreamMessage 将使用包含 StreamMessage 正文中条目的 amqp-sequence body 部分发送。StreamMessage 正文中的任何 byte[] 条目都会被编码为序列中 的二进制 条目。带有 符号 键为 "x-opt-jms-msg-type" 的消息注释将设置为 字节 值 4。 | |
ObjectMessage 将使用 数据 正文部分发送,其中包含使用 ObjectOutputStream 序列化 ObjectMessage 正文的字节,并将 properties 部分 content-type 字段设置为 符号值 "application/x-java-serialized-object "。http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-symbol带有 符号 键为 "x-opt-jms-msg-type" 的消息注释将设置为 字节 值 1。 | |
普通的 JMS 消息没有正文,并将作为包含 null 的 amqp-value body 部分发送。带有 符号 键为 "x-opt-jms-msg-type" 的消息注解将设置为 字节 值 0。 |
11.1.1.2. 消息属性
JMS 消息支持设置各种 Java 类型的应用程序属性。本节用于显示这些属性类型的映射到发送消息的 application-properties 部分中的 AMQP 类型值。JMS 和 AMQP 将字符串键用于属性名称。
JMS 属性类型 | AMQP 应用属性类型 |
---|---|
布尔值 | |
byte | |
short | |
int | |
long | |
浮点值 | |
double | |
字符串 |
11.1.2. 接收信息
本节旨在记录客户端接收的不同有效负载将映射到不同的 JMS 消息类型,因此要协助使用其他客户端向 JMS 客户端接收消息。
11.1.2.1. 消息类型
如果收到的 AMQP 消息中存在 "x-opt-jms-msg-type" message-annotation,其值用于根据下表中详述的映射来确定用于代表它的 JMS 消息类型。这反映了为 JMS 客户端 发送的消息 讨论的映射的反向流程。
AMQP "x-opt-jms-msg-type" message-annotation value (type) | JMS 消息类型 |
---|---|
0 (字节) | |
1 (字节) | |
2 (字节) | |
3 (byte) | |
4 (字节) | |
5 (字节) |
如果没有 "x-opt-jms-msg-type" message-annotation,下表详细介绍了消息如何映射到 JMS Message 类型。请注意,StreamMessage 和 MapMessage 类型仅被分配给注解的消息。
没有 "x-opt-jms-msg-type" 注解的 Received AMQP 消息描述 | JMS 消息类型 |
---|---|
| |
| |
|
11.1.2.2. 消息属性
本节用于显示收到的 AMQP 消息到 JMS 消息中使用的 Java 类型的 application-properties 部分中的值映射。
AMQP 应用程序属性类型 | JMS 属性类型 |
---|---|
布尔值 | |
byte | |
short | |
int | |
long | |
浮点值 | |
double | |
字符串 | |
字符串 |
11.2. 连接到 AMQ Broker
AMQ Broker 旨在与 AMQP 1.0 客户端互操作。检查以下内容以确保为 AMQP 消息传递配置了代理:
- 网络防火墙中的端口 5672 将打开。
- 启用 AMQ Broker AMQP acceptor。请参阅 默认接受者设置。
- 在代理上配置必要的地址。请参阅 地址、队列和主题。
- 代理配置为允许来自您的客户端的访问,客户端被配置为发送所需的凭证。请参阅 Broker 安全。
附录 A. 使用您的订阅
AMQ 通过软件订阅提供。要管理您的订阅,请访问红帽客户门户中的帐户。
A.1. 访问您的帐户
流程
- 转至 access.redhat.com。
- 如果您还没有帐户,请创建一个帐户。
- 登录到您的帐户。
A.2. 激活订阅
流程
- 转至 access.redhat.com。
- 导航到 My Subscriptions。
- 导航到 激活订阅 并输入您的 16 位激活号。
A.3. 下载发行文件
要访问 .zip、.tar.gz 和其他发布文件,请使用客户门户查找要下载的相关文件。如果您使用 RPM 软件包或 Red Hat Maven 存储库,则不需要这一步。
流程
- 打开浏览器并登录红帽客户门户网站 产品下载页面,网址为 access.redhat.com/downloads。
- 查找 INTEGRATION 目录中的红帽 AMQ 条目。
- 选择所需的 AMQ 产品。此时会打开 Software Downloads 页面。
- 单击组件的 Download 链接。
A.4. 为系统注册软件包
要在 Red Hat Enterprise Linux 上安装此产品的 RPM 软件包,必须注册您的系统。如果您使用下载的发行文件,则不需要这一步。
流程
- 转至 access.redhat.com。
- 进入 Registration Assistant。
- 选择您的操作系统版本,再继续到下一页。
- 使用您的系统终端中列出的命令完成注册。
有关注册您的系统的更多信息,请参阅以下资源之一:
附录 B. 使用 Red Hat Maven 软件仓库
这部分论述了如何在软件中使用红帽提供的 Maven 存储库。
B.1. 使用在线软件仓库
红帽维护一个中央 Maven 存储库,用于基于 Maven 的项目。如需更多信息,请参阅 存储库欢迎页面。
将 Maven 配置为使用红帽存储库的方法有两种:
将存储库添加到您的 Maven 设置中
此配置方法适用于用户拥有的所有 Maven 项目,只要您的 POM 文件不会覆盖存储库配置,并且启用了包含的配置集。
流程
找到 Maven
settings.xml
文件。它通常位于用户主目录中的.m2
目录中。如果文件不存在,则使用文本编辑器创建该文件。对于 Linux 或 UNIX:
/home/<username>/.m2/settings.xml
在 Windows 上:
C:\Users\<username>\.m2\settings.xml
在
settings.xml
文件的profile
元素中添加包含红帽存储库的新配置集,如下例所示:示例:包含红帽存储库的 Maven
settings.xml
文件<settings> <profiles> <profile> <id>red-hat</id> <repositories> <repository> <id>red-hat-ga</id> <url>https://maven.repository.redhat.com/ga</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>red-hat-ga</id> <url>https://maven.repository.redhat.com/ga</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> </profile> </profiles> <activeProfiles> <activeProfile>red-hat</activeProfile> </activeProfiles> </settings>
有关 Maven 配置的更多信息,请参阅 Maven 设置参考。
在 POM 文件中添加存储库
要直接在项目中配置存储库,请在 POM 文件的 repositories
元素中添加新条目,如下例所示:
示例:包含红帽存储库的 Maven pom.xml
文件
<project> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>example-app</artifactId> <version>1.0.0</version> <repositories> <repository> <id>red-hat-ga</id> <url>https://maven.repository.redhat.com/ga</url> </repository> </repositories> </project>
有关 POM 文件配置的更多信息,请参阅 Maven POM 参考。
B.2. 使用本地存储库
红帽为其某些组件提供了基于文件的 Maven 存储库。这些是作为可下载存档提供的,您可以提取到本地文件系统。
要将 Maven 配置为使用本地提取的存储库,请在 Maven 设置或 POM 文件中应用以下 XML:
<repository>
<id>red-hat-local</id>
<url>${repository-url}</url>
</repository>
${repository-url}
必须是包含提取存储库的本地文件系统路径的文件 URL。
操作系统 | 文件系统路径 | URL |
---|---|---|
Linux 或 UNIX |
|
|
Windows |
|
|
附录 C. 使用带有示例的 AMQ Broker
AMQ JMS 示例需要一个正在运行的消息代理,其队列名为 queue
。使用以下步骤安装和启动代理并定义队列。
C.1. 安装代理
按照 AMQ Broker 入门 中的说明 来安装代理 并创建代理实例。启用匿名访问。
以下流程将代理实例的位置称为 < broker-instance-dir>
。
C.2. 启动代理
流程
使用
artemis run
命令启动代理。$ <broker-instance-dir>/bin/artemis run
检查控制台输出,以查看启动期间记录的所有关键错误。代理日志服务器
现在在服务器就绪时处于实时
状态。$ example-broker/bin/artemis run __ __ ____ ____ _ /\ | \/ |/ __ \ | _ \ | | / \ | \ / | | | | | |_) |_ __ ___ | | _____ _ __ / /\ \ | |\/| | | | | | _ <| '__/ _ \| |/ / _ \ '__| / ____ \| | | | |__| | | |_) | | | (_) | < __/ | /_/ \_\_| |_|\___\_\ |____/|_| \___/|_|\_\___|_| Red Hat AMQ <version> 2020-06-03 12:12:11,807 INFO [org.apache.activemq.artemis.integration.bootstrap] AMQ101000: Starting ActiveMQ Artemis Server ... 2020-06-03 12:12:12,336 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live ...
C.3. 创建队列
在新终端中,使用 artemis queue
命令创建名为 queue
的队列。
$ <broker-instance-dir>/bin/artemis queue create --name queue --address queue --auto-create-address --anycast
系统将提示您回答一系列 yes 或没有问题。全部答案为 N
。
创建队列后,代理就可以与示例程序一起使用。
C.4. 停止代理
运行完示例后,请使用 artemis stop
命令来停止代理。
$ <broker-instance-dir>/bin/artemis stop
更新于 2023-09-26