Chapter 3. Qpid JMS Client API
Warning
The Qpid JMS client API is a technical preview only, and is not supported in JBoss A-MQ 6.1.
3.1. Getting Started with AMQP
3.1.1. Introduction to AMQP
What is AMQP?
The Advanced Message Queuing Protocol (AMQP) is an open standard messaging system, which has been designed to facilitate interoperability between messaging systems. The key features of AMQP are:
- Open standard (defined by the OASIS AMQP Technical Committee)
- Defines a wire protocol
- Defines APIs for multiple languages (C++, Java)
- Interoperability between different AMQP implementations
Warning
The Qpid JMS client API is a technical preview only, and is not supported in JBoss A-MQ 6.1.
JMS is an API
It is interesting to contrast the Java Message Service (JMS) with AMQP. The JMS is first and foremost an API and is designed to enable Java code to be portable between different messaging products. JMS does not describe how to implement a messaging service (although it imposes significant constraints on the messaging behaviour), nor does JMS specify any details of the wire protocol for transmitting messages. Consequently, different JMS implementations are generally not interoperable.
AMQP is a wire protocol
AMQP, on the other hand, does specify complete details of a wire protocol for messaging (in an open standard). Moreover, AMQP also specifies APIs in several different programming languages (for example, Java and C++). An implementation of AMQP is therefore much more constrained than a comparable JMS implementation. One of the benefits of this is that different AMQP implementations ought to be interoperable with each other.
AMQP-to-JMS requires message conversion
If you want to bridge from an AMQP messaging system to a JMS messaging system, the messages must be converted from AMQP format to JMS format. Usually, this involves a fairly lightweight conversion, because the message body can usually be left intact while message headers are mapped to equivalent headers.
AMQP support in JBoss A-MQ
AMQP support in JBoss A-MQ is based on the following main components:
- AMQP endpoint in the broker—an endpoint on the broker that supports the AMQP wire protocol and implicitly converts messages between AMQP format and JMS format (which is used inside the JBoss A-MQ broker).
- AMQP JMS client—is based on the Apache Qpid JMS client, which is compatible with the broker's AMQP endpoint.
Getting started with AMQP
To run a simple demonstration of AMQP in JBoss A-MQ, you need to set up the following parts of the application:
- Configure the broker to use AMQP—to enable AMQP in the broker, add an AMQP endpoint to the broker's configuration. This implicitly activates the broker's AMQP integration, ensuring that incoming messages are converted from AMQP message format to JMS message format, as required.
- Implement the AMQP clients—the AMQP clients are based on the Apache Qpid JMS client libraries.
3.1.2. Configuring the Broker for AMQP
Overview
Configuring the broker to use AMQP is relatively straightforward in JBoss A-MQ, because the required AMQP packages are pre-installed in the container. There are essentially two main points you need to pay attention to:
- Make sure that you have appropriate user entries in the
etc/users.properties
file, so that the AMQP clients will be able to log on to the broker. - Add an AMQP endpoint to the broker (by inserting a
transportConnector
element into the broker's XML configuration).
Steps to configure the broker
Perform the following steps to configure the broker with an AMQP endpoint:
- This example assumes that you are working with a fresh install of a standalone JBoss A-MQ broker, InstallDir.
- Define a JAAS user for the AMQP clients, so that the AMQP clients can authenticate themselves to the broker using JAAS security (security is enabled by default in the broker). Edit the
InstallDir/etc/users.properties
file and add a new user entry, as follows:# # This file contains the valid users who can log into JBoss A-MQ. # Each line has to be of the format: # # USER=PASSWORD,ROLE1,ROLE2,... # # All users and roles entered in this file are available after JBoss A-MQ startup # and modifiable via the JAAS command group. These users reside in a JAAS domain # with the name "karaf".. # # You must have at least one users to be able to access JBoss A-MQ resources #admin=admin,admin amqpuser=secret
At this point, you can add entries for any other secure users you want. In particular, it is advisable to have at least one user with theadmin
role, so that you can log into the secure container remotely (remembering to choose a secure password for the admin user). - Add an AMQP endpoint to the broker configuration. Edit the broker configuration file,
InstallDir/etc/activemq.xml
. As shown in the following XML fragment, add the highlightedtransportConnector
element as a child of thetransportConnectors
element in the broker configuration:<beans ...> ... <broker ...> ... <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/> </transportConnectors> </broker> </beans>
- To start the broker, open a new command prompt, change directory to
InstallDir/bin
, and enter the following command:./amq
Message conversion
The AMQP endpoint in the broker implicitly converts incoming AMQP format messages into JMS format messages (which is the format in which messages are stored in the broker). The endpoint configuration shown here uses the default options for this conversion.
Reference
For full details of how to configure an AMQP endpoint in the broker, see the "Advanced Message Queueing Protocol (AMQP)" chapter from the Connection Reference. This also includes details of how to customize the message conversion from AMQP format to JMS format.
3.1.3. AMQP Example Clients
Overview
This section explains how to implement two basic AMQP clients: an AMQP producer client, which sends messages to a queue on the broker; and an AMQP consumer client, which pulls messages off the queue on the broker. The clients themselves use generic JMS code to access the messaging system. The key details of the AMQP configuration are retrieved using JNDI properties.
Prerequisites
Before building the example clients, you must install and configure the Apache Maven build tool, as described in Section 1.2, “Preparing to use Maven”.
AMQP connection URI
The critical piece of configuration for establishing a connection with the broker is the AMQP URI (defined as a JNDI property in the
jndi.properties
file, in this demonstration). This example uses the following AMQP URI for the clients:
amqp://amqpuser:secret@localhost/test/?brokerlist='tcp://localhost:5672'
The first part of the URI,
amqpuser:secret@localhost
, has the format Username:Password@ClientID
. In order to authenticate the clients successfully with the broker, it is essential that there is a corresponding JAAS user entry on the broker side.
The
brokerlist
option defines the location of the AMQP port on the broker, which is tcp://localhost:5672
for this example.
Steps to implement and run the AMQP clients
Perform the following steps to implement and run an AMQP producer client and an AMQP consumer client:
- At any convenient location, create the directory,
activemq-amqp-example
, to hold the example code:mkdir activemq-amqp-example
- Create the directory hierarchy for the example code. Change directory to
activemq-amqp-example
and run the following script at a command prompt:mkdir src mkdir src/main mkdir src/main/java mkdir src/main/java/org mkdir src/main/java/org/fusebyexample mkdir src/main/java/org/fusebyexample/activemq mkdir src/main/resources
After executing the preceding commands, you should have the following directory structure for theactivemq-amqp-example
project:activemq-amqp-example/ src/ main/ java/ org/fusebyexample/activemq resources/
- Create a POM file for the Maven project. Using a text editor, create a new file,
activemq-amqp-example/pom.xml
, with the following contents:<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.fusebyexample.activemq</groupId> <artifactId>activemq-amqp-example</artifactId> <version>5.8.0</version> <name>ActiveMQ AMQP Example</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <activemq.version>5.9.0.redhat-610379</activemq.version> <qpid.version>0.22</qpid.version> <slf4j-version>1.6.6</slf4j-version> <log4j-version>1.2.17</log4j-version> </properties> <repositories> <repository> <id>fusesource.releases</id> <name>FuseSource Release Repository</name> <url>http://repo.fusesource.com/nexus/content/repositories/releases</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>fusesource.releases</id> <name>FuseSource Release Repository</name> <url>http://repo.fusesource.com/nexus/content/repositories/releases</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-amqp-1-0-client-jms</artifactId> <version>${qpid.version}</version> </dependency> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1.1</version> </dependency> <!-- Logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j-version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j-version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j-version}</version> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> <profiles> <profile> <id>consumer</id> <build> <defaultGoal>package</defaultGoal> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>java</goal> </goals> <configuration> <mainClass>org.fusebyexample.activemq.SimpleConsumer</mainClass> </configuration> </execution> </executions> </plugin> </plugins> </build> </profile> <profile> <id>producer</id> <build> <defaultGoal>package</defaultGoal> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>java</goal> </goals> <configuration> <mainClass>org.fusebyexample.activemq.SimpleProducer</mainClass> </configuration> </execution> </executions> </plugin> </plugins> </build> </profile> </profiles> </project>
- Define the Java implementation of an AMQP consumer class,
SimpleConsumer
. Using a text editor, create theSimpleConsumer.java
file under theactivemq-amqp-example/src/main/java/org/fusebyexample/activemq/
directory, with the following contents:package org.fusebyexample.activemq; import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleConsumer { private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumer.class); private static final Boolean NON_TRANSACTED = false; private static final String CONNECTION_FACTORY_NAME = "myJmsFactory"; private static final String DESTINATION_NAME = "queue/simple"; private static final int MESSAGE_TIMEOUT_MILLISECONDS = 120000; public static void main(String args[]) { Connection connection = null; try { // JNDI lookup of JMS Connection Factory and JMS Destination Context context = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup(CONNECTION_FACTORY_NAME); Destination destination = (Destination) context.lookup(DESTINATION_NAME); connection = factory.createConnection(); connection.start(); Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(destination); LOG.info("Start consuming messages from " + destination.toString() + " with " + MESSAGE_TIMEOUT_MILLISECONDS + "ms timeout"); // Synchronous message consumer int i = 1; while (true) { Message message = consumer.receive(MESSAGE_TIMEOUT_MILLISECONDS); if (message != null) { if (message instanceof TextMessage) { String text = ((TextMessage) message).getText(); LOG.info("Got " + (i++) + ". message: " + text); } } else { break; } } consumer.close(); session.close(); } catch (Throwable t) { LOG.error("Error receiving message", t); } finally { // Cleanup code // In general, you should always close producers, consumers, // sessions, and connections in reverse order of creation. // For this simple example, a JMS connection.close will // clean up all other resources. if (connection != null) { try { connection.close(); } catch (JMSException e) { LOG.error("Error closing connection", e); } } } } }
- Define the Java implementation of an AMQP producer class,
SimpleProducer
. Using a text editor, create theSimpleProducer.java
file under theactivemq-amqp-example/src/main/java/org/fusebyexample/activemq/
directory, with the following contents:package org.fusebyexample.activemq; import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleProducer { private static final Logger LOG = LoggerFactory.getLogger(SimpleProducer.class); private static final Boolean NON_TRANSACTED = false; private static final long MESSAGE_TIME_TO_LIVE_MILLISECONDS = 0; private static final int MESSAGE_DELAY_MILLISECONDS = 100; private static final int NUM_MESSAGES_TO_BE_SENT = 100; private static final String CONNECTION_FACTORY_NAME = "myJmsFactory"; private static final String DESTINATION_NAME = "queue/simple"; public static void main(String args[]) { Connection connection = null; try { // JNDI lookup of JMS Connection Factory and JMS Destination Context context = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup(CONNECTION_FACTORY_NAME); Destination destination = (Destination) context.lookup(DESTINATION_NAME); connection = factory.createConnection(); connection.start(); Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(destination); producer.setTimeToLive(MESSAGE_TIME_TO_LIVE_MILLISECONDS); for (int i = 1; i <= NUM_MESSAGES_TO_BE_SENT; i++) { TextMessage message = session.createTextMessage(i + ". message sent"); LOG.info("Sending to destination: " + destination.toString() + " this text: '" + message.getText()); producer.send(message); Thread.sleep(MESSAGE_DELAY_MILLISECONDS); } // Cleanup producer.close(); session.close(); } catch (Throwable t) { LOG.error("Error sending message", t); } finally { // Cleanup code // In general, you should always close producers, consumers, // sessions, and connections in reverse order of creation. // For this simple example, a JMS connection.close will // clean up all other resources. if (connection != null) { try { connection.close(); } catch (JMSException e) { LOG.error("Error closing connection", e); } } } } }
- Configure the JNDI properties for the AMQP clients. Using a text editor, create the
jndi.properties
file under theactivemq-amqp-example/src/main/resources/
directory, with the following contents:# # Copyright (C) Red Hat, Inc. # http://www.redhat.com # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # JNDI properties file to setup the JNDI server within ActiveMQ # # Default JNDI properties settings # java.naming.factory.initial = org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory java.naming.provider.url = src/main/resources/jndi.properties # # Set the connection factory name(s) as well as the destination names. The connection factory name(s) # as well as the second part (after the dot) of the left hand side of the destination definition # must be used in the JNDI lookups. # connectionfactory.myJmsFactory = amqp://amqpuser:secret@localhost/test/?brokerlist='tcp://localhost:5672' queue.queue/simple = test.queue.simple
- Configure the client logging with log4j. Using a text editor, create the
log4j.properties
file under theactivemq-amqp-example/src/main/resources/
directory, with the following contents:# # Copyright (C) Red Hat, Inc. # http://www.redhat.com # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # # The logging properties used by the standalone ActiveMQ broker # log4j.rootLogger=INFO, stdout # CONSOLE appender log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %m%n # Log File appender log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n log4j.appender.logfile.file=./target/log/exercises.log log4j.appender.logfile.append=true # # You can change logger levels here. # log4j.logger.org.apache.activemq=INFO log4j.logger.org.apache.activemq.spring=WARN
- Make sure that the broker is already configured and running with an AMQP endpoint, as described in Section 3.1.2, “Configuring the Broker for AMQP”.
- Run the AMQP producer client as follows. Open a new command prompt, change directory to the project directory,
activemq-amqp-example/
, and enter the following Maven command:mvn -P producer
After building the code (and downloading any packages required by Maven), this target proceeds to run the producer client, which sends 100 messages to thetest.queue.simple
queue in the broker. If the producer runs successfully, you should see output like the following in the console window:13:31:43 INFO Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '1. message sent 13:31:43 INFO Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '2. message sent 13:31:43 INFO Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '3. message sent ... 13:31:53 INFO Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '99. message sent 13:31:53 INFO Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '100. message sent
- Run the AMQP consumer client as follows. Open a new command prompt, change directory to the project directory,
activemq-amqp-example/
, and enter the following Maven command:mvn -P consumer
After building the code, this target proceeds to run the consumer client, which reads messages from thetest.queue.simple
queue. You should see output like the following in the console window:13:32:12 INFO Start consuming messages from org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b with 120000ms timeout 13:32:12 INFO Got 1. message: 1. message sent 13:32:12 INFO Got 2. message: 2. message sent 13:32:12 INFO Got 3. message: 3. message sent ... 13:32:12 INFO Got 99. message: 99. message sent 13:32:12 INFO Got 100. message: 100. message sent