Chapter 3. Qpid JMS Client API


Warning
The Qpid JMS client API is a technical preview only, and is not supported in JBoss A-MQ 6.1.

3.1. Getting Started with AMQP

3.1.1. Introduction to AMQP

What is AMQP?

The Advanced Message Queuing Protocol (AMQP) is an open standard messaging system, which has been designed to facilitate interoperability between messaging systems. The key features of AMQP are:
  • Open standard (defined by the OASIS AMQP Technical Committee)
  • Defines a wire protocol
  • Defines APIs for multiple languages (C++, Java)
  • Interoperability between different AMQP implementations
Warning
The Qpid JMS client API is a technical preview only, and is not supported in JBoss A-MQ 6.1.

JMS is an API

It is interesting to contrast the Java Message Service (JMS) with AMQP. The JMS is first and foremost an API and is designed to enable Java code to be portable between different messaging products. JMS does not describe how to implement a messaging service (although it imposes significant constraints on the messaging behaviour), nor does JMS specify any details of the wire protocol for transmitting messages. Consequently, different JMS implementations are generally not interoperable.

AMQP is a wire protocol

AMQP, on the other hand, does specify complete details of a wire protocol for messaging (in an open standard). Moreover, AMQP also specifies APIs in several different programming languages (for example, Java and C++). An implementation of AMQP is therefore much more constrained than a comparable JMS implementation. One of the benefits of this is that different AMQP implementations ought to be interoperable with each other.

AMQP-to-JMS requires message conversion

If you want to bridge from an AMQP messaging system to a JMS messaging system, the messages must be converted from AMQP format to JMS format. Usually, this involves a fairly lightweight conversion, because the message body can usually be left intact while message headers are mapped to equivalent headers.

AMQP support in JBoss A-MQ

AMQP support in JBoss A-MQ is based on the following main components:
  • AMQP endpoint in the broker—an endpoint on the broker that supports the AMQP wire protocol and implicitly converts messages between AMQP format and JMS format (which is used inside the JBoss A-MQ broker).
  • AMQP JMS client—is based on the Apache Qpid JMS client, which is compatible with the broker's AMQP endpoint.

Getting started with AMQP

To run a simple demonstration of AMQP in JBoss A-MQ, you need to set up the following parts of the application:
  • Configure the broker to use AMQP—to enable AMQP in the broker, add an AMQP endpoint to the broker's configuration. This implicitly activates the broker's AMQP integration, ensuring that incoming messages are converted from AMQP message format to JMS message format, as required.
  • Implement the AMQP clients—the AMQP clients are based on the Apache Qpid JMS client libraries.

3.1.2. Configuring the Broker for AMQP

Overview

Configuring the broker to use AMQP is relatively straightforward in JBoss A-MQ, because the required AMQP packages are pre-installed in the container. There are essentially two main points you need to pay attention to:
  • Make sure that you have appropriate user entries in the etc/users.properties file, so that the AMQP clients will be able to log on to the broker.
  • Add an AMQP endpoint to the broker (by inserting a transportConnector element into the broker's XML configuration).

Steps to configure the broker

Perform the following steps to configure the broker with an AMQP endpoint:
  1. This example assumes that you are working with a fresh install of a standalone JBoss A-MQ broker, InstallDir.
  2. Define a JAAS user for the AMQP clients, so that the AMQP clients can authenticate themselves to the broker using JAAS security (security is enabled by default in the broker). Edit the InstallDir/etc/users.properties file and add a new user entry, as follows:
    #
    # This file contains the valid users who can log into JBoss A-MQ.
    # Each line has to be of the format:
    #
    # USER=PASSWORD,ROLE1,ROLE2,...
    #
    # All users and roles entered in this file are available after JBoss A-MQ startup
    # and modifiable via the JAAS command group. These users reside in a JAAS domain
    # with the name "karaf"..
    #
    # You must have at least one users to be able to access JBoss A-MQ resources
    
    #admin=admin,admin
    amqpuser=secret
    At this point, you can add entries for any other secure users you want. In particular, it is advisable to have at least one user with the admin role, so that you can log into the secure container remotely (remembering to choose a secure password for the admin user).
  3. Add an AMQP endpoint to the broker configuration. Edit the broker configuration file, InstallDir/etc/activemq.xml. As shown in the following XML fragment, add the highlighted transportConnector element as a child of the transportConnectors element in the broker configuration:
    <beans ...>
        ...
        <broker ...>
            ...
            <transportConnectors>
                <transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000"/>
     <transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/>
            </transportConnectors>
        </broker>
    
    </beans>
  4. To start the broker, open a new command prompt, change directory to InstallDir/bin, and enter the following command:
    ./amq

Message conversion

The AMQP endpoint in the broker implicitly converts incoming AMQP format messages into JMS format messages (which is the format in which messages are stored in the broker). The endpoint configuration shown here uses the default options for this conversion.

Reference

For full details of how to configure an AMQP endpoint in the broker, see the "Advanced Message Queueing Protocol (AMQP)" chapter from the Connection Reference. This also includes details of how to customize the message conversion from AMQP format to JMS format.

3.1.3. AMQP Example Clients

Overview

This section explains how to implement two basic AMQP clients: an AMQP producer client, which sends messages to a queue on the broker; and an AMQP consumer client, which pulls messages off the queue on the broker. The clients themselves use generic JMS code to access the messaging system. The key details of the AMQP configuration are retrieved using JNDI properties.

Prerequisites

Before building the example clients, you must install and configure the Apache Maven build tool, as described in Section 1.2, “Preparing to use Maven”.

AMQP connection URI

The critical piece of configuration for establishing a connection with the broker is the AMQP URI (defined as a JNDI property in the jndi.properties file, in this demonstration). This example uses the following AMQP URI for the clients:
amqp://amqpuser:secret@localhost/test/?brokerlist='tcp://localhost:5672'
The first part of the URI, amqpuser:secret@localhost, has the format Username:Password@ClientID. In order to authenticate the clients successfully with the broker, it is essential that there is a corresponding JAAS user entry on the broker side.
The brokerlist option defines the location of the AMQP port on the broker, which is tcp://localhost:5672 for this example.

Steps to implement and run the AMQP clients

Perform the following steps to implement and run an AMQP producer client and an AMQP consumer client:
  1. At any convenient location, create the directory, activemq-amqp-example, to hold the example code:
    mkdir activemq-amqp-example
  2. Create the directory hierarchy for the example code. Change directory to activemq-amqp-example and run the following script at a command prompt:
    mkdir src
    mkdir src/main
    mkdir src/main/java
    mkdir src/main/java/org
    mkdir src/main/java/org/fusebyexample
    mkdir src/main/java/org/fusebyexample/activemq
    mkdir src/main/resources
    After executing the preceding commands, you should have the following directory structure for the activemq-amqp-example project:
    activemq-amqp-example/
        src/
            main/
                java/
                    org/fusebyexample/activemq
                resources/
  3. Create a POM file for the Maven project. Using a text editor, create a new file, activemq-amqp-example/pom.xml, with the following contents:
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.fusebyexample.activemq</groupId>
        <artifactId>activemq-amqp-example</artifactId>
        <version>5.8.0</version>
    
        <name>ActiveMQ AMQP Example</name>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    
            <activemq.version>5.9.0.redhat-610379</activemq.version>
    
            <qpid.version>0.22</qpid.version>
    
            <slf4j-version>1.6.6</slf4j-version>
            <log4j-version>1.2.17</log4j-version>
        </properties>
    
        <repositories>
            <repository>
                <id>fusesource.releases</id>
                <name>FuseSource Release Repository</name>
                <url>http://repo.fusesource.com/nexus/content/repositories/releases</url>
                <releases>
                    <enabled>true</enabled>
                </releases>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
        </repositories>
        <pluginRepositories>
            <pluginRepository>
                <id>fusesource.releases</id>
                <name>FuseSource Release Repository</name>
                <url>http://repo.fusesource.com/nexus/content/repositories/releases</url>
                <releases>
                    <enabled>true</enabled>
                </releases>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </pluginRepository>
        </pluginRepositories>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.qpid</groupId>
                <artifactId>qpid-amqp-1-0-client-jms</artifactId>
                <version>${qpid.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.geronimo.specs</groupId>
                <artifactId>geronimo-jms_1.1_spec</artifactId>
                <version>1.1.1</version>
            </dependency>
    
            <!-- Logging -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j-version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j-version}</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${log4j-version}</version>
                <scope>runtime</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.6</source>
                        <target>1.6</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
        <profiles>
            <profile>
                <id>consumer</id>
                <build>
                    <defaultGoal>package</defaultGoal>
    
                    <plugins>
                        <plugin>
                            <groupId>org.codehaus.mojo</groupId>
                            <artifactId>exec-maven-plugin</artifactId>
                            <version>1.2.1</version>
                            <executions>
                                <execution>
                                    <phase>package</phase>
                                    <goals>
                                        <goal>java</goal>
                                    </goals>
                                    <configuration>
                                        <mainClass>org.fusebyexample.activemq.SimpleConsumer</mainClass>
                                    </configuration>
                                </execution>
                            </executions>
                        </plugin>
                    </plugins>
                </build>
            </profile>
    
            <profile>
                <id>producer</id>
                <build>
                    <defaultGoal>package</defaultGoal>
    
                    <plugins>
                        <plugin>
                            <groupId>org.codehaus.mojo</groupId>
                            <artifactId>exec-maven-plugin</artifactId>
                            <version>1.2.1</version>
                            <executions>
                                <execution>
                                    <phase>package</phase>
                                    <goals>
                                        <goal>java</goal>
                                    </goals>
                                    <configuration>
                                        <mainClass>org.fusebyexample.activemq.SimpleProducer</mainClass>
                                    </configuration>
                                </execution>
                            </executions>
                        </plugin>
                    </plugins>
                </build>
            </profile>
        </profiles>
    
    </project>
  4. Define the Java implementation of an AMQP consumer class, SimpleConsumer. Using a text editor, create the SimpleConsumer.java file under the activemq-amqp-example/src/main/java/org/fusebyexample/activemq/ directory, with the following contents:
    package org.fusebyexample.activemq;
    
    import javax.jms.*;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class SimpleConsumer {
        private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumer.class);
    
        private static final Boolean NON_TRANSACTED = false;
        private static final String CONNECTION_FACTORY_NAME = "myJmsFactory";
        private static final String DESTINATION_NAME = "queue/simple";
        private static final int MESSAGE_TIMEOUT_MILLISECONDS = 120000;
    
        public static void main(String args[]) {
            Connection connection = null;
    
            try {
                // JNDI lookup of JMS Connection Factory and JMS Destination
                Context context = new InitialContext();
                ConnectionFactory factory = (ConnectionFactory) context.lookup(CONNECTION_FACTORY_NAME);
                Destination destination = (Destination) context.lookup(DESTINATION_NAME);
    
                connection = factory.createConnection();
                connection.start();
    
                Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
                MessageConsumer consumer = session.createConsumer(destination);
    
                LOG.info("Start consuming messages from " + destination.toString() + " with " + MESSAGE_TIMEOUT_MILLISECONDS + "ms timeout");
    
                // Synchronous message consumer
                int i = 1;
                while (true) {
                    Message message = consumer.receive(MESSAGE_TIMEOUT_MILLISECONDS);
                    if (message != null) {
                        if (message instanceof TextMessage) {
                            String text = ((TextMessage) message).getText();
                            LOG.info("Got " + (i++) + ". message: " + text);
                        }
                    } else {
                        break;
                    }
                }
    
                consumer.close();
                session.close();
            } catch (Throwable t) {
                LOG.error("Error receiving message", t);
            } finally {
                // Cleanup code
                // In general, you should always close producers, consumers,
                // sessions, and connections in reverse order of creation.
                // For this simple example, a JMS connection.close will
                // clean up all other resources.
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        LOG.error("Error closing connection", e);
                    }
                }
            }
        }
    }
  5. Define the Java implementation of an AMQP producer class, SimpleProducer. Using a text editor, create the SimpleProducer.java file under the activemq-amqp-example/src/main/java/org/fusebyexample/activemq/ directory, with the following contents:
    package org.fusebyexample.activemq;
    
    import javax.jms.*;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class SimpleProducer {
        private static final Logger LOG = LoggerFactory.getLogger(SimpleProducer.class);
    
        private static final Boolean NON_TRANSACTED = false;
        private static final long MESSAGE_TIME_TO_LIVE_MILLISECONDS = 0;
        private static final int MESSAGE_DELAY_MILLISECONDS = 100;
        private static final int NUM_MESSAGES_TO_BE_SENT = 100;
        private static final String CONNECTION_FACTORY_NAME = "myJmsFactory";
        private static final String DESTINATION_NAME = "queue/simple";
    
        public static void main(String args[]) {
            Connection connection = null;
    
            try {
                // JNDI lookup of JMS Connection Factory and JMS Destination
                Context context = new InitialContext();
                ConnectionFactory factory = (ConnectionFactory) context.lookup(CONNECTION_FACTORY_NAME);
                Destination destination = (Destination) context.lookup(DESTINATION_NAME);
    
                connection = factory.createConnection();
                connection.start();
    
                Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
                MessageProducer producer = session.createProducer(destination);
    
                producer.setTimeToLive(MESSAGE_TIME_TO_LIVE_MILLISECONDS);
    
                for (int i = 1; i <= NUM_MESSAGES_TO_BE_SENT; i++) {
                    TextMessage message = session.createTextMessage(i + ". message sent");
                    LOG.info("Sending to destination: " + destination.toString() + " this text: '" + message.getText());
                    producer.send(message);
                    Thread.sleep(MESSAGE_DELAY_MILLISECONDS);
                }
    
                // Cleanup
                producer.close();
                session.close();
            } catch (Throwable t) {
                LOG.error("Error sending message", t);
            } finally {
                // Cleanup code
                // In general, you should always close producers, consumers,
                // sessions, and connections in reverse order of creation.
                // For this simple example, a JMS connection.close will
                // clean up all other resources.
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        LOG.error("Error closing connection", e);
                    }
                }
            }
        }
    }
  6. Configure the JNDI properties for the AMQP clients. Using a text editor, create the jndi.properties file under the activemq-amqp-example/src/main/resources/ directory, with the following contents:
    #
    # Copyright (C) Red Hat, Inc.
    # http://www.redhat.com
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #
    
    # JNDI properties file to setup the JNDI server within ActiveMQ
    
    #
    # Default JNDI properties settings
    #
    java.naming.factory.initial = org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory
    java.naming.provider.url = src/main/resources/jndi.properties
    
    #
    # Set the connection factory name(s) as well as the destination names. The connection factory name(s)
    # as well as the second part (after the dot) of the left hand side of the destination definition
    # must be used in the JNDI lookups.
    #
    connectionfactory.myJmsFactory = amqp://amqpuser:secret@localhost/test/?brokerlist='tcp://localhost:5672'
    
    queue.queue/simple = test.queue.simple
  7. Configure the client logging with log4j. Using a text editor, create the log4j.properties file under the activemq-amqp-example/src/main/resources/ directory, with the following contents:
    #
    # Copyright (C) Red Hat, Inc.
    # http://www.redhat.com
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #
    
    #
    # The logging properties used by the standalone ActiveMQ broker
    #
    log4j.rootLogger=INFO, stdout
    
    # CONSOLE appender
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %-5p %m%n
    
    # Log File appender
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
    log4j.appender.logfile.file=./target/log/exercises.log
    log4j.appender.logfile.append=true
    
    #
    #  You can change logger levels here.
    #
    log4j.logger.org.apache.activemq=INFO
    log4j.logger.org.apache.activemq.spring=WARN
  8. Make sure that the broker is already configured and running with an AMQP endpoint, as described in Section 3.1.2, “Configuring the Broker for AMQP”.
  9. Run the AMQP producer client as follows. Open a new command prompt, change directory to the project directory, activemq-amqp-example/, and enter the following Maven command:
    mvn -P producer
    After building the code (and downloading any packages required by Maven), this target proceeds to run the producer client, which sends 100 messages to the test.queue.simple queue in the broker. If the producer runs successfully, you should see output like the following in the console window:
    13:31:43 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '1. message sent
    13:31:43 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '2. message sent
    13:31:43 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '3. message sent
    ...
    13:31:53 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '99. message sent
    13:31:53 INFO  Sending to destination: org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b this text: '100. message sent
  10. Run the AMQP consumer client as follows. Open a new command prompt, change directory to the project directory, activemq-amqp-example/, and enter the following Maven command:
    mvn -P consumer
    After building the code, this target proceeds to run the consumer client, which reads messages from the test.queue.simple queue. You should see output like the following in the console window:
    13:32:12 INFO  Start consuming messages from org.apache.qpid.amqp_1_0.jms.impl.QueueImpl@fabdfd0b with 120000ms timeout
    13:32:12 INFO  Got 1. message: 1. message sent
    13:32:12 INFO  Got 2. message: 2. message sent
    13:32:12 INFO  Got 3. message: 3. message sent
    ...
    13:32:12 INFO  Got 99. message: 99. message sent
    13:32:12 INFO  Got 100. message: 100. message sent
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.