このコンテンツは選択した言語では利用できません。
3.2. JMS AMQP 1.0 Client API
JMS AMQP 1.0 Client API is based on the Apache Qpid JMS AMQP 1.0 Client API.
Note
This is an initial version of documentation for the JMS client. Regular updates and enhancements of the documentation can be expected after the GA release of Fuse 6.2.0
3.2.1. Getting Started with AMQP
Getting started with AMQP
To run a simple demonstration of AMQP in JBoss A-MQ, you need to set up the following parts of the application:
- Configure the broker to use AMQP—to enable AMQP in the broker, add an AMQP endpoint to the broker's configuration. This implicitly activates the broker's AMQP integration, ensuring that incoming messages are converted from AMQP message format to JMS message format, as required.
- Implement the AMQP clients—the AMQP clients are based on the Apache Qpid JMS client libraries.
3.2.2. Configuring the Broker for AMQP
Overview
Configuring the broker to use AMQP is relatively straightforward in JBoss A-MQ, because the required AMQP packages are pre-installed in the container. There are essentially two main points you need to pay attention to:
- Make sure that you have appropriate user entries in the
etc/users.properties
file, so that the AMQP clients will be able to log on to the broker. - Add an AMQP endpoint to the broker (by inserting a
transportConnector
element into the broker's XML configuration).
Steps to configure the broker
Perform the following steps to configure the broker with an AMQP endpoint:
- This example assumes that you are working with a fresh install of a standalone JBoss A-MQ broker, InstallDir.
- Define a JAAS user for the AMQP clients, so that the AMQP clients can authenticate themselves to the broker using JAAS security (security is enabled by default in the broker). Edit the
InstallDir/etc/users.properties
file and add a new user entry, as follows:# # This file contains the valid users who can log into JBoss A-MQ. # Each line has to be of the format: # # USER=PASSWORD,ROLE1,ROLE2,... # # All users and roles entered in this file are available after JBoss A-MQ startup # and modifiable via the JAAS command group. These users reside in a JAAS domain # with the name "karaf".. # # You must have at least one users to be able to access JBoss A-MQ resources #admin=admin,admin guest=guest
At this point, you can add entries for any other secure users you want. In particular, it is advisable to have at least one user with theadmin
role, so that you can log into the secure container remotely (remembering to choose a secure password for the admin user).NoteTo avoid authentication issue, include the user guest in the list ofauthorizationEntries
forjaasAuthenticationPlugin
inactivemq.xml
- Add an AMQP endpoint to the broker configuration. Edit the broker configuration file,
InstallDir/etc/activemq.xml
. As shown in the following XML fragment, add the highlightedtransportConnector
element as a child of thetransportConnectors
element in the broker configuration:<beans ...> ... <broker ...> ... <transportConnectors> <transportConnector name="amqp" uri="amqp://127.0.0.1:5672"/> <transportConnector name="openwire" uri="tcp://${bindAddress}:${bindPort}"/> </transportConnectors> </broker> </beans>
- To start the broker, open a new command prompt, change directory to
InstallDir/bin
, and enter the following command:./amq
Message conversion
The AMQP endpoint in the broker implicitly converts incoming AMQP format messages into JMS format messages (which is the format in which messages are stored in the broker). The endpoint configuration shown here uses the default options for this conversion.
Reference
For full details of how to configure an AMQP endpoint in the broker, see the "Advanced Message Queueing Protocol (AMQP)" chapter from the Connection Reference. This also includes details of how to customize the message conversion from AMQP format to JMS format.
3.2.3. AMQP Example Clients
Overview
This section explains how to implement two basic AMQP clients: an AMQP sender client, which sends messages to a queue on the broker; and an AMQP reciever client, which pulls messages off the queue on the broker. The clients themselves use generic JMS code to access the messaging system. The key details of the AMQP configuration are retrieved using JNDI properties.
Prerequisites
Before building the example clients, you must install and configure the Apache Maven build tool, as described in Section 1.2, “Preparing to use Maven”.
Ensure A-MQ broker is running.
The Qpid client and the example packages are downloaded from the repository Qpid-JMS and build.
Steps to implement and run the AMQP clients
Perform the following steps to implement and run an AMQP producer client and an AMQP consumer client:
- At any convenient location, download and extract the qpid-jms code for example
examples
, to hold the example code:mkdir example
- The extracted files should have the following directory structure for the
examples
project:├── apache-qpid-jms │ ├── pom.xml │ └── src ├── LICENSE ├── NOTICE ├── pom.xml ├── qpid-jms-client │ ├── pom.xml │ └── src ├── qpid-jms-discovery │ ├── pom.xml │ └── src ├── qpid-jms-docs │ ├── Configuration.md │ ├── pom.xml │ └── README.txt ├── qpid-jms-examples │ ├── pom.xml │ ├── README.txt │ └── src ├── qpid-jms-interop-tests │ ├── pom.xml │ ├── qpid-jms-activemq-tests │ └── README.md ├── README.md └── target └── maven-shared-archive-resources
- On the console, run the command mvn clean package dependency:copy-dependencies -DincludeScope=runtime -DskipTestsAfter building the code (and downloading any packages required by Maven), if the build is successful, you should see output like the following in the console window:
[INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] QpidJMS ............................................ SUCCESS [05:36 min] [INFO] QpidJMS Client ..................................... SUCCESS [01:04 min] [INFO] QpidJMS Discovery Library .......................... SUCCESS [ 33.068 s] [INFO] QpidJMS Broker Interop Tests ....................... SUCCESS [ 0.024 s] [INFO] QpidJMS ActiveMQ Interop Tests ..................... SUCCESS [ 18.120 s] [INFO] QpidJMS Examples ................................... SUCCESS [ 0.144 s] [INFO] QpidJMS Docs ....................................... SUCCESS [ 0.017 s] [INFO] Apache Qpid JMS .................................... SUCCESS [ 22.253 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 08:33 min [INFO] Finished at: 2015-06-15T21:24:01+05:30 [INFO] Final Memory: 35M/200M [INFO] ------------------------------------------------------------------------
- Run the following java command:
java -cp "target/classes/:target/dependency/*" org.apache.qpid.jms.example.HelloWorld
After building the code, this target proceeds to run the consumer client, which reads messages from thequeue
queue. You should see output like the following in the console window:Hello world!
A Simple Messaging Program in Java JMS
The following program shows how to send and receive a message using the Qpid JMS client. JMS programs typically use JNDI to obtain connection factory and destination objects which the application needs. In this way the configuration is kept separate from the application code itself.
In this example, we create a JNDI context using a properties file, use the context to lookup a connection factory, create and start a connection, create a session, and lookup a destination from the JNDI context. Then we create a producer and a consumer, send a message with the producer and receive it with the consumer. This code should be straightforward for anyone familiar with Java JMS.
Note
The example uses a Queue named "queue". You need to create this before running the example, depending on the broker/peer you are using.
Example 3.1. "Hello world!" in Java
package org.apache.qpid.jms.example; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; public class HelloWorld { private static final String USER = "guest"; private static final String PASSWORD = "guest";\ public static void main(String[] args) throws Exception { try { // The configuration for the Qpid InitialContextFactory has been supplied in // a jndi.properties file in the classpath, which results in it being picked // up automatically by the InitialContext constructor. Context context = new InitialContext(); 1 ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup"); 2 Destination queue = (Destination) context.lookup("myQueueLookup"); Connection connection = factory.createConnection(USER, PASSWORD); 3 connection.setExceptionListener(new MyExceptionListener()); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 4 MessageProducer messageProducer = session.createProducer(queue); 5 MessageConsumer messageConsumer = session.createConsumer(queue); 6 TextMessage message = session.createTextMessage("Hello world!"); messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L); 7 if (receivedMessage != null) { System.out.println(receivedMessage.getText()); } else { System.out.println("No message received within the given timeout!"); } connection.close(); 8 } catch (Exception exp) { System.out.println("Caught exception, exiting."); exp.printStackTrace(System.out); System.exit(1); } } private static class MyExceptionListener implements ExceptionListener { @Override public void onException(JMSException exception) { System.out.println("Connection ExceptionListener fired, exiting."); exception.printStackTrace(System.out); System.exit(1); } } }
- 1
- Creates the JNDI initial context.
- 2
- Creates a JMS connection factory for Qpid.
- 3
- Creates a JMS connection.
- 4
- Creates a session. This session is not transactional (transactions='false'), and messages are automatically acknowledged.
- 5
- Creates a producer that sends messages to the topic exchange.
- 6
- Creates a consumer that reads messages from the topic exchange.
- 7
- Reads the next available message.
- 8
- Closes the connection, all sessions managed by the connection, and all senders and receivers managed by each session.
The contents of the jndi.properties file are shown below.
Example 3.2. JNDI Properties File for "Hello world!" example
- 1
- Defines a connection factory from which connections can be created. The syntax of a ConnectionURL is given in the section called “Apache Qpid JMS Client Configuration”.
- 2
- Defines a destination for which MessageProducers and/or MessageConsumers can be created to send and receive messages. The value for the destination in the properties file is an address string. In the JMS implementation MessageProducers are analogous to senders in the Qpid Message API, and MessageConsumers are analogous to receivers.
Apache Qpid JMS Client Configuration
Apache Qpid JMS 0.5.0 provides various configuration options for the client such as, configuring and creating a JNDI InitialContext, configuration syntax, and URI options that can be set when defining a ConnectionFactory.
Configuring a JNDI InitialContext
JNDI InitialContext is used to to look up JMS objects such as ConnectionFactory and is obtained from an InitialContextFactory. The Qpid JMS client provides an implementation of the InitialContextFactory in class
org.apache.qpid.jms.jndi.JmsInitialContextFactory
. You can configure JNDI InitialContext in three ways.
- Using
jndi.properties
file on the Java Classpath.To configure JNDI InitialContext using the properties file, Include the filejndi.properties
on the Classpath and set thejava.naming.factory.initial
property to valueorg.apache.qpid.jms.jndi.JmsInitialContextFactory
. The Qpid InitialContextFactory implementation is discovered while instantiating InitialContext object.javax.naming.Context ctx = new javax.naming.InitialContext();
The ConnectionFactory, Queue and Topic objects contained in the context are configured using properties, either directly within thejndi.properties
file, or in a separate file which is referenced injndi.properties
using thejava.naming.provider.url
property. - Using system properties.To configure JNDI InitialContext using the system properties, set the
java.naming.factory.initial
to valueorg.apache.qpid.jms.jndi.JmsInitialContextFactory
. The Qpid InitialContextFactory implementation is discovered while instantiating InitialContext object.javax.naming.Context ctx = new javax.naming.InitialContext();
The ConnectionFactory, Queue and Topic objects contained in the context are configured using properties, which is passed using thejava.naming.provider.url
system property. - Programmatically using an environment Hashtable.The InitialContext can be configured by passing an environment during creation:
Hashtable<Object, Object> env = new Hashtable<Object, Object>(); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); javax.naming.Context context = new javax.naming.InitialContext(env);
The ConnectionFactory, Queue and Topic objects contained in the context are configured using properties, either directly within the environment Hashtable or a seperate file which is referenced using thejava.naming.provider.url
property within the environment Hashtable.
Syntax of the Properties file
The property syntax used in the properties file or environment Hashtable is as follows:
- For ConnectionFactory, use
connectionfactory.lookupName = URI
, for example,connectionfactory.myFactoryLookup = amqp://localhost:5672
- For a Queue, use
queue.lookupName = queueName
, for example,queue.myQueueLookup = queueA
- For a Topic, use
topic.lookupName = topicName
,for example,topic.myTopicLookup = topicA
These objects could then be looked up from a Context as follows:
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup"); Queue queue = (Queue) context.lookup("myQueueLookup"); Topic topic = (Topic) context.lookup("myTopicLookup");
Connection URI
The basic format of the clients Connection URI is as follows:
amqp://hostname:port[?option=value[&option2=value...]]
The client can be configured in different settings using the URI while defining the ConnectionFactory, these settings are detailed in the following sections.
JMS Configuration options
The options are applicable to the JMS objects such as Connection, Session, MessageConsumer, and MessageProducer.
Option Name
|
Description
|
---|---|
jms.username
|
User name value used to authenticate the connection
|
jms.password
|
Password value used to authenticate the connection.
|
jms.clientID
|
The ClientID value that is applied to the connection.
|
jms.forceAsyncSend
|
Configures whether all Messages sent from a MessageProducer are sent asynchronously or only those Message that qualify such as Messages inside a transaction or non-persistent messages.
|
jms.alwaysSyncSend
|
Override all asynchronous send conditions and always sends every Message from a MessageProducer synchronously.
|
jms.sendAcksAsync
|
Causes all Message acknowledgments to be sent asynchronously.
|
jms.localMessagePriority
|
If enabled prefetched messages are reordered locally based on their given Message priority value. Default value is false
|
jms.localMessageExpiry
|
Controls whether MessageConsumer instances locally filter expired Messages or deliver them. By default this value is set to true and expired messages are filtered
|
jms.validatePropertyNames
|
If message property names should be validated as valid Java identifiers. Default is true.
|
jms.queuePrefix
|
Optional prefix value added to the name of any Queue created from a JMS Session.
|
jms.topicPrefix
|
Optional prefix value added to the name of any Topic created from a JMS Session.
|
jms.closeTimeout
|
Timeout value that controls how long the client waits on Connection close before returning. (By default the client waits 15 seconds for a normal close completion event).
|
jms.connectTimeout
|
Timeout value that controls how long the client waits on Connection establishment before returning with an error. (By default the client waits 15 seconds for a connection to be established before failing).
|
jms.clientIDPrefix
|
Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory. The default prefix is 'ID:'.
|
jms.connectionIDPrefix
|
Optional prefix value that is used for generated Connection ID values when a new Connection is created for the JMS ConnectionFactory. This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing the logs easier. The default prefix is 'ID:'.
|
These values control how many messages the remote peer can send to the client and be held in a prefetch buffer for each consumer instance.
jms.prefetchPolicy.queuePrefetch
defaults to 1000jms.prefetchPolicy.topicPrefetch
defaults to 1000jms.prefetchPolicy.queueBrowserPrefetch
defaults to 1000jms.prefetchPolicy.durableTopicPrefetch
defaults to 1000jms.prefetchPolicy.all
used to set all prefetch values at once.
The RedeliveryPolicy controls how redelivered messages are handled to the client.
jms.redeliveryPolicy.maxRedeliveries
controls when an incoming message is rejected based on the number of times it has been redelivered, the default value is (-1) disabled. A value of zero would indicate no message redeliveries are accepted, a value of five would allow a message to be redelivered five times, and so on.
TCP Transport Configuration options
To use a remote connection using plain TCP these options configure the behavior of the underlying socket. These options are appended to the connection URI along with the other configuration options, for example:
amqp://localhost:5672?jms.clientID=foo&transport.connectTimeout=30000
The TCP Transport options are listed below:
Option Name
|
Default Value
|
---|---|
transport.sendBufferSize
|
64k
|
transport.receiveBufferSize
|
64k
|
transport.trafficClass
|
10
|
transport.connectTimeout
|
60 secs
|
transport.soTimeout
|
-1
|
transport.soLinger
|
-1
|
transport.tcpKeepAlive
|
false
|
transport.tcpNoDelay
|
true
|
SSL Transport Configuration options
The SSL Transport extends the TCP Transport and is enabled using the amqps URI scheme hence all the TCP Transport options are valid on an SSL Transport URI.
A simple SSL based client URI is shown below:
amqps://localhost:5673
SSL Transport options is listed below:
transport.keyStoreLocation
|
default is to read from the system property
javax.net.ssl.keyStore .
|
transport.keyStorePassword
|
default is to read from the system property
javax.net.ssl.keyStorePassword .
|
transport.trustStoreLocation
|
default is to read from the system property
javax.net.ssl.trustStore .
|
transport.trustStorePassword
|
default is to read from the system property
javax.net.ssl.trustStorePassword .
|
transport.storeType
|
The type of trust store being used. Default is "JKS".
|
transport.contextProtocol
|
The protocol argument used when getting an SSLContext. Default is "TLS".
|
transport.enabledCipherSuites
|
The cipher suites to enable, comma separated. The context default ciphers are used. Any disabled ciphers are removed.
|
transport.disabledCipherSuites
|
The cipher suites to disable, comma separated. Ciphers listed here are removed from the enabled ciphers. No default.
|
transport.enabledProtocols
|
The protocols to enable, comma separated, the context default protocols are used. Any disabled protocols are removed.
|
transport.disabledProtocols
|
The protocols to disable, comma separated. Protocols listed here are removed from the enabled protocols. Default is "SSLv2Hello,SSLv3".
|
transport.trustAll
|
Whether to trust the provided server certificate implicitly, regardless of any configured trust store. Defaults to false.
|
transport.verifyHost
|
Whether to verify that the hostname being connected to matches with the provided server certificate. Defaults to true.
|
transport.keyAlias
|
The alias to use when selecting a keypair from the keystore to send a client certificate to the server. No default.
|
Failover Configuration options
If failover is enabled the client can reconnect to a different broker automatically when the connection to the current connection is lost. The failover URI is always initiated with the failover prefix and a list of URIs for the brokers. The
jms.*
options are applied to the overall failover URI, outside the parentheses, and affect the JMS Connection object for its lifetime.
The URI for failover is shown as follows:
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.maxReconnectAttempts=20
The individual broker details within the parentheses can use the
transport.
or amqp.
options defined earlier, with these being applied as each host is connected to:
failover:(amqp://host1:5672?amqp.option=value,amqp://host2:5672?transport.option=value)?jms.clientID=foo
Failover configuration options are listed below:
failover.initialReconnectDelay
|
The amount of time the client will wait before the first attempt to reconnect to a remote peer. The default value is zero.
|
failover.reconnectDelay
|
Controls the delay between successive reconnection attempts, defaults to 10 milliseconds. If the backoff option is not enabled this value remains constant.
|
failover.maxReconnectDelay
|
The maximum time that the client will wait before attempting a reconnect. This value is used when the backoff feature is enabled to ensure that the delay is not too long. Defaults to 30 seconds.
|
failover.useReconnectBackOff
|
Controls whether the time between reconnection attempts should grow based on a configured multiplier. Defaults value is true.
|
failover.reconnectBackOffMultiplier
|
The multiplier used to grow the reconnection delay value, defaults to 2.0d.
|
failover.maxReconnectAttempts
|
The number of reconnection attempts allowed before reporting the connection as failed to the client. The default is no limit or (-1).
|
failover.startupMaxReconnectAttempts
|
For a client that has never connected to a remote peer. This option controls the number of attempts made to connect before reporting the connection as failed. The default value is maxReconnectAttempts.
|
failover.warnAfterReconnectAttempts
|
Controls how often the client logs a message indicating that failover reconnection is being attempted. The default is to log every 10 connection attempts.
|
transport.enabledProtocols
|
The protocols to enable, the values are comma separated and the context default protocols are used. Any disabled protocols are removed.
|
The failover URI also supports defining nested options as a means of specifying AMQP and transport option values applicable to all the individual nested broker URI's, which can be useful to avoid repetition. This is accomplished using the same
transport.
and amqp.
URI options outlined earlier for a non-failover broker URI but prefixed with failover.nested.
For example, to apply the same value for the
amqp.vhost
option to every broker connected to you might have a URI like:
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.nested.amqp.vhost=myhost
AMQP Configuration options
The AMQP configuration options apply to certain functionality.
amqp.idleTimeout
: The idle timeout in milliseconds, the connection fails if the peer sends no AMQP frames. Default value 60000.amqp.vhost
: The vhost to connect to. Used to populate the Sasl and Open hostname fields. Default is the main hostname from the Connection URI.amqp.saslLayer
: Controls whether connections should use a SASL layer or not. Default is true.amqp.saslMechanisms
: Which SASL mechanism(s) the client should allow selection of, if offered by the server and usable with the configured credentials. Comma separated if specifying more than 1 mechanism. Default is to allow selection from all the clients supported mechanisms, which are currently EXTERNAL, CRAM-MD5, PLAIN, and ANONYMOUS.amqp.maxFrameSize
: The max-frame-size value in bytes that is advertised to the peer. Default is 1048576.
Discovery Configuration options
The client has an optional Discovery module, which provides a customized failover layer where the broker URIs to connect to are not given in the initial URI, but discovered as the client operates via associated discovery agents. There are currently two discovery agent implementations, a file watcher that loads URIs from a file, and a multicast listener that works with ActiveMQ 5 brokers which have been configured to broadcast their broker addresses for listening clients.
The general set of failover related options when using discovery are the same as those detailed earlier, with the main prefix updated from failover. to discovery., and with the 'nested' options prefix used to supply URI options common to all the discovered broker URIs bring updated from failover.nested. to discovery.discovered. For example, without the agent URI details, a general discovery URI might look like:
discovery:(<agent-uri>)?discovery.maxReconnectAttempts=20&discovery.discovered.jms.clientID=foo
To use the file watcher discovery agent, utilise an agent URI of the form:
discovery:(file:///path/to/monitored-file?updateInterval=60000)
The URI options for the file watcher discovery agent are listed below:
updateInterval
: Controls the frequency in milliseconds which the file is inspected for change. The default value is 30000.
To use the multicast discovery agent with an ActiveMQ 5 broker, utilise an agent URI of the form:
discovery:(multicast://default?group=default)
Note
The use of default as the host in the multicast agent URI above is a special value (that is substituted by the agent with the default "239.255.2.3:6155"). You may change this to specify the actual IP and port in use with your multicast configuration.
The URI options for the multicast discovery agent are listed below:
group
: Controls which multicast group messages are listened for on. The default value is "default".
JMS Client Logging
The JMS Client logging is handled using the Simple Logging Facade for Java (SLF4J). As the name implies, slf4j is a facade that delegates to other logging systems like log4j or JDK 1.4 logging. For more information on how to configure slf4j for specific logging systems, please consult the slf4j documentation.
you can configure a logging implementation by using the
org.apache.qpid.jms
.
For debugging you can enable additional protocol trace logging from the Qpid Proton AMQP 1.0 library. There are two options to enable the logging:
- By setting the environment variable
PN_TRACE_FRM
to true, which enables Proton to emit frame logging to stdout. - Add the option
amqp.traceFrames=true
to the connection URI. This enables the client to add a protocol tracer to Proton, and configure theorg.apache.qpid.jms.provider.amqp.FRAMES
Logger to TRACE level to include the output in the logs.