Search

3.5. C++ AMQP 1.0 Client API

download PDF
C++ AMQP 1.0 Client API is based on the Apache Qpid C++ AMQP 1.0 Client API.
Note
This is an initial version of documentation for the C++ client. Regular updates and enhancements of the documentation can be expected after the GA release of Fuse 6.2.0

3.5.1. Getting Started with C++ AMQP

Introduction to C++ AMQP 1.0 Client API

C++ Messaging Service (CMS) API is used for interfacing with Message Brokers such as A-MQ. A-MQ-C++ is a client library that uses A-MQ as a message broker for clients to communicate. The architecture of CMS supports pluggable transports and wire formats. At present, OpenWire and Stomp protocols are supported over TCP and SSL. Failover Transport is also supported for reliable client operation. In addition to CMS, A-MQ-C++ provides a set of classes that support platform independent constructs such as threading, I/O, sockets.
CMS and JMS are similar with some minor differences, mostly CMS adheres to the JMS specifications. To know more about CMS API, see CMS API Overview

Downloading A-MQ C++ Client

You can Download A-MQ C++ client from the Red Hat Customer Portal C++ Client

3.5.2. C++ Example Clients

A Simple Messaging Program in C++

The following program shows how to create a simple Asynchronous consumer that can receive TextMessage objects from an A-MQ broker.
In this example, we create ConnectionFactory object. This object is used to create a CMS Connection using the ConnectionFactory. A Connection is the Object that manages the client's connection to the Provider. After creating a connection, the client creates a CMS Session to create message producers and consumers.

Example 3.13. Simple Asynchronous Consumer

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h >
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
 
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
 
class SimpleAsyncConsumer : public ExceptionListener,
                            public MessageListener,
                            public DefaultTransportListener {
private:
 
    Connection* connection;
    Session* session;
    Destination* destination;
    MessageConsumer* consumer;
    bool useTopic;
    std::string brokerURI;
    std::string destURI;
    bool clientAck;
 
private:
 
    SimpleAsyncConsumer( const SimpleAsyncConsumer& );
    SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );
 
public:
 
    SimpleAsyncConsumer( const std::string& brokerURI, 
                         const std::string& destURI,
                         bool useTopic = false,
                         bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        consumer(NULL),
        useTopic(useTopic),
        brokerURI(brokerURI),
        destURI(destURI),
        clientAck(clientAck)  1
			      {
    }
 
    virtual ~SimpleAsyncConsumer() {
        this->cleanup();
    }
 
    void close() {
        this->cleanup();
    }
 
    void runConsumer() 2
		      { 
 
        try {
 
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory =
                new ActiveMQConnectionFactory( brokerURI );
 
            // Create a Connection
            connection = connectionFactory->createConnection();
            delete connectionFactory;
 
            ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
            if( amqConnection != NULL ) {
                amqConnection->addTransportListener( this );
            }
 
            connection->start();
 
            connection->setExceptionListener(this);
 
            // Create a Session
            if( clientAck ) {
                session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }
 
            // Create the destination (Topic or Queue)
            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );
            }
 
            // Create a MessageConsumer from the Session to the Topic or Queue
            consumer = session->createConsumer( destination );
            consumer->setMessageListener( this );
 
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
 
    // Called from the consumer since this class is a registered MessageListener.
    virtual void onMessage( const Message* message )  3
						      {
 
        static int count = 0;
 
        try
        {
            count++;
            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            string text = "";
 
            if( textMessage != NULL ) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";
            }
 
            if( clientAck ) {
                message->acknowledge();
            }
 
            printf( "Message #%d Received: %s\n", count, text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
 
    // If something bad happens you see it here as this class is also been
    // registered as an ExceptionListener with the connection.
    virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
        printf("CMS Exception occurred.  Shutting down client.\n");
        exit(1);
    }
 
    virtual void transportInterrupted() {
        std::cout << "The Connection's Transport has been Interrupted." << std::endl;
    }
 
    virtual void transportResumed() {
        std::cout << "The Connection's Transport has been Restored." << std::endl;
    }
 
private:
 
    void cleanup(){
 
        try {
            if( connection != NULL ) {
                connection->close();
            }
        } catch ( CMSException& e ) { 
            e.printStackTrace(); 
        }
 
        delete destination;
        delete consumer;
        delete session;
        delete connection;
    }
};
 
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
 
    activemq::library::ActiveMQCPP::initializeLibrary();
 
    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";
 
    // Set the URI to point to the IPAddress of your broker.
    // add any optional params to the url to enable things like
    // tightMarshalling or tcp logging etc.  See the CMS web site for
    // a full list of configuration options.
    //
    //  http://activemq.apache.org/cms/
    //
    std::string brokerURI =
        "failover:(tcp://127.0.0.1:61616)";
 
    //============================================================
    // This is the Destination Name and URI options.  Use this to
    // customize where the consumer listens, to have the consumer
    // use a topic or queue set the 'useTopics' flag.
    //============================================================
    std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";
 
    //============================================================
    // set to true to use topics instead of queues
    // Note in the code above that this causes createTopic or
    // createQueue to be used in the consumer.
    //============================================================
    bool useTopics = false;
 
    //============================================================
    // set to true if you want the consumer to use client ack mode
    // instead of the default auto ack mode.
    //============================================================
    bool clientAck = false;
 
    // Create the consumer
    SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
 
    // Start it up and it will listen forever.
    consumer.runConsumer();
 
    // Wait to exit.
    std::cout << "Press 'q' to quit" << std::endl;
    while( std::cin.get() != 'q') {}
 
    // All CMS resources should be closed before the library is shutdown.
    consumer.close();
 
    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";
 
    activemq::library::ActiveMQCPP::shutdownLibrary();
}      


1
The constructor of the SimpleAsyncConsumer class. This constructor allows the user to create an instance of the class that connects to a particular broker and destination. It also identifies the destination as a Queue or a Topic
2
The runConsumer method creates a Connection to the broker and start a new Session configured with the configured Acknowledgment mode. Once a Session is created a new Consumer can then be created and attached to the configured Destination. To listen asynchronously for new messages the SimpleAsyncConsumer inherits from cms::MessageListener so that it can register itself as a Message Listener with the MessageConsumer created in runConsumer method.
3
All the messages received by the application are dispatched to the onMessage method and if the message is a TextMessage its contents are printed on the screen.

Example 3.14.  A simple Asynchronous producer

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
 
using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
 
////////////////////////////////////////////////////////////////////////////////
class SimpleProducer : public Runnable {
private:
 
    Connection* connection;
    Session* session;
    Destination* destination;
    MessageProducer* producer;
    bool useTopic;
    bool clientAck;
    unsigned int numMessages;
    std::string brokerURI;
    std::string destURI;
 
private:
 
    SimpleProducer( const SimpleProducer& );
    SimpleProducer& operator= ( const SimpleProducer& );
 
public:
 
    SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
                    const std::string& destURI, bool useTopic = false, bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        producer(NULL),
        useTopic(useTopic),
        clientAck(clientAck),
        numMessages(numMessages),
        brokerURI(brokerURI),
        destURI(destURI) 1
			  {
    }
 
    virtual ~SimpleProducer(){
        cleanup();
    }
 
    void close() {
        this->cleanup();
    }
 
    virtual void run() 2
			{
        try {
 
            // Create a ConnectionFactory
            auto_ptr<ActiveMQConnectionFactory> connectionFactory(
                new ActiveMQConnectionFactory( brokerURI ) );
 
            // Create a Connection
            try{
                connection = connectionFactory->createConnection();
                connection->start();
            } catch( CMSException& e ) {
                e.printStackTrace();
                throw e;
            }
 
            // Create a Session
            if( clientAck ) {
                session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }
 
            // Create the destination (Topic or Queue)
            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );
            }
 
            // Create a MessageProducer from the Session to the Topic or Queue
            producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
 
            // Create the Thread Id String
            string threadIdStr = Long::toString( Thread::currentThread()->getId() );
 
            // Create a messages
            string text = (string)"Hello world! from thread " + threadIdStr;
 
            for( unsigned int ix=0; ix <numMessages; ++ix ){
                TextMessage* message = session->createTextMessage( text );
 
                message->setIntProperty( "Integer", ix );
 
                // Tell the producer to send the message
                printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );
                producer->send( message );
 
                delete message;
            }
 
        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }
 
private:
 
    void cleanup(){
 
        try {
            if( connection != NULL ) {
                connection->close();
            }
        } catch ( CMSException& e ) { 
            e.printStackTrace(); 
        }
 
        delete destination;
        delete producer;
        delete session;
        delete connection;
    }
};
 
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
 
    activemq::library::ActiveMQCPP::initializeLibrary();
 
    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";
 
    // Set the URI to point to the IPAddress of your broker.
    // add any optional params to the url to enable things like
    // tightMarshalling or tcp logging etc.  See the CMS web site for
    // a full list of configuration options.
    //
    //  http://activemq.apache.org/cms/
    //
    std::string brokerURI =
        "failover://(tcp://127.0.0.1:61616)";
 
    //============================================================
    // Total number of messages for this producer to send.
    //============================================================
    unsigned int numMessages = 2000;
 
    //============================================================
    // This is the Destination Name and URI options.  Use this to
    // customize where the Producer produces, to have the producer
    // use a topic or queue set the 'useTopics' flag.
    //============================================================
    std::string destURI = "TEST.FOO";
 
    //============================================================
    // set to true to use topics instead of queues
    // Note in the code above that this causes createTopic or
    // createQueue to be used in the producer.
    //============================================================
    bool useTopics = false;
 
    // Create the producer and run it.
    SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
 
    // Publish the given number of Messages
    producer.run();
 
    // Before exiting we ensure that all CMS resources are closed.
    producer.close();
 
    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";
 
    activemq::library::ActiveMQCPP::shutdownLibrary();
}

1
The SimpleProducer class exposes a similar interface to the consumer example Example 3.13, “Simple Asynchronous Consumer”. The constructor creates an instance with the configuration options for the broker and destination and the number of messages to be send to the configured destination.
2
The run method publishes the specified number of messages. Once the run method completes, the client can close the SimpleProducer application by calling the close() method, which cleans up the allocated CMS resource and exits the application.

3.5.3. C++ Client on RHEL for SSL based communication with A-MQ Broker

Overview

This section describes how to enable SSL/TLS security for the AMQP protocol, where the connection is made between:
  • A-MQ broker, deployed on a RHEL host, and
  • Qpid C++ client, deployed on a RHEL host.

Configuring SSL/TLS for the broker on RHEL

Follow these steps to enable SSL/TLS security for the AMQP endpoint of a broker running on RHEL:
  1. Create a certificate, test.jks, for testing purposes:
    keytool -genkey -alias jboss -keyalg RSA -keystore test.jks -storepass password -dname "CN=test,O=test"
    Store the new certificate file, test.jks, in the broker's ${A-MQ_HOME}/etc/ directory.
  2. Configure the broker to use the test.jks certificate and enable SSL/TLS on the broker's AMQP connector by editing the ${A-MQ_HOME}/etc/activemq.xml file as follows:
    <sslContext>
        <sslContext
           keyStore="${karaf.base}/etc/test.jks"
           keyStorePassword="password"
        />
    </sslContext>
    
    <transportConnectors>
        <transportConnector name="amqpssl" uri="amqp+ssl://0.0.0.0:61617?transport.enabledProtocols=TLSv1,TLSv1.1,TLSv1.2" />
    Note
    For more details about the broker configuration, see Securing a Broker using SSL/TLS.
  3. Export the certificate in a format that can be used by the Qpid C++ client running on RHEL (in the next step):
    keytool -exportcert -rfc -keystore test.jks -storepass password -alias jboss -file ./sample_cert.cer
    You need to copy the resulting sample_cert.cer file to the client RHEL machine. One way of doing this is to use the secure copy command, scp, to copy the sample_cert.cer file securely across the network:
    scp sample_cert.cer ${USER_NAME}@0.0.0.0:${CLIENT_TARGET_PATH}
    Where ${USER_NAME} is the relevant user name on the client RHEL machine and ${CLIENT_TARGET_PATH} is the location on the remote file system where you want to copy the certificate.

Configuring SSL/TLS on the client side

Follow these steps to enable SSL/TLS security on a Qpid C++ client deployed on a remote RHEL machine:
  1. Install the Qpid C++ client packages, along with the qpid-send and qpid-receive packages for testing:
    yum install qpid-cpp-client
    yum install log4cpp
    yum install qpid-cpp-client-devel
    yum isntall log4cpp-devel
  2. The keytool command is needed for generating self-signed certificates. If it is not already available, install OpenJDK as follows:
    yum install java-1.8.0-openjdk-headless-1.8.0.51-0.b16.el6_6.x86_64
  3. Set up the client environment, using the NSS (Network Security Services) database to install the sample_cert.cer certificate:
    mkdir -p  ~/nssdb
    certutil -A -n selfsigned -d ~/nssdb -t "CT,," -i ./sample_cert.cer
  4. Set the following environment variables:
    export QPID_SSL_CERT_DB=${YOUR_WORK_PATH}/nssdb
  5. Test the new configuration using the qpid-send command and the qpid-receive command:
    qpid-send -b amqp:ssl:0.0.0.0:61617 -a TestQueue --connection-options "{protocol:amqp1.0, ssl_ignore_hostname_verification_failure:true, username:admin, password:admin}" --content-string "hello world"
    
    qpid-receive -b amqp:ssl:0.0.0.0:61617 -a TestQueue --connection-options "{protocol:amqp1.0, ssl_ignore_hostname_verification_failure:true, username:admin, password:admin}"

3.5.4. C++ Client on Windows for SSL-Based Communication with A-MQ Broker

Overview

This section describes how to enable SSL/TLS security for the AMQP protocol, where the connection is made between:
  • A-MQ broker, deployed on a Linux OS, and
  • Qpid C++ client, deployed on a Windows OS.

Configuring SSL/TLS for the broker on Linux

Follow these steps to enable SSL/TLS security for the AMQP endpoint of a broker running on a Linux platform:
  1. Create a new certificate, test.jks, for testing purposes:
    keytool -genkey -alias jboss -keyalg RSA -keystore test.jks -storepass password -dname "CN=test,O=test"
    Store the new certificate file, test.jks, in the broker's ${A-MQ_HOME}/etc/ directory.
  2. Configure the broker to use the test.jks certificate and enable SSL/TLS on the broker's AMQP connector by editing the ${A-MQ_HOME}/etc/activemq.xml file as follows:
    <sslContext>
        <sslContext
           keyStore="${karaf.base}/etc/test.jks"
           keyStorePassword="password"
        />
    </sslContext>
    
    <transportConnectors>
        <transportConnector name="amqpssl" uri="amqp+ssl://0.0.0.0:61617?transport.enabledProtocols=TLSv1,TLSv1.1,TLSv1.2" />
    Note
    For more details about the broker configuration, see Securing a Broker using SSL/TLS.
  3. Export the certificate in a format that can be used by the Qpid C++ client running on Windows (in the next step):
    keytool -exportcert -rfc -keystore test.jks -storepass password -alias jboss -file ./sample_cert.cer
    You need to copy the resulting sample_cert.cer file to the Windows machine. One way of doing this is to use the secure copy command, scp, to copy the sample_cert.cer file securely across the network:
    scp sample_cert.cer ${USER_NAME}@0.0.0.0:${CLIENT_TARGET_PATH}
    Where ${USER_NAME} is the relevant user name on the Windows machine and ${CLIENT_TARGET_PATH} is the location on the Windows file system where you want to copy the certificate.

Configuring SSL/TLS on the client side

Follow these steps to enable SSL/TLS security on a Qpid C++ client deployed on a Windows machine:
  1. Set up the client environment, using the MMC.exe utility to install the sample_cert.cer certificate into "Trusted Root Certification Authorities", as follows:
    "Console Root" -> "Trusted Root Certification Authorities" -> "Certificates"
    Right Click -> "All Tasks" -> "Import"
  2. Test the new configuration using the qpid-send command and the qpid-receive command:
    qpid-send -b amqp:ssl:0.0.0.0:61617 -a TestQueue --connection-options "{protocol:amqp1.0, ssl_ignore_hostname_verification_failure:true, username:admin, password:admin}" --content-string "hello world"
    
    qpid-receive -b amqp:ssl:0.0.0.0:61617 -a TestQueue --connection-options "{protocol:amqp1.0, ssl_ignore_hostname_verification_failure:true, username:admin, password:admin}"
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.