3.5. C++ AMQP 1.0 Client API
C++ AMQP 1.0 Client API is based on the Apache Qpid C++ AMQP 1.0 Client API.
Note
This is an initial version of documentation for the C++ client. Regular updates and enhancements of the documentation can be expected after the GA release of Fuse 6.2.0
3.5.1. Getting Started with C++ AMQP
Introduction to C++ AMQP 1.0 Client API
C++ Messaging Service (CMS) API is used for interfacing with Message Brokers such as A-MQ. A-MQ-C++ is a client library that uses A-MQ as a message broker for clients to communicate. The architecture of CMS supports pluggable transports and wire formats. At present, OpenWire and Stomp protocols are supported over TCP and SSL. Failover Transport is also supported for reliable client operation. In addition to CMS, A-MQ-C++ provides a set of classes that support platform independent constructs such as threading, I/O, sockets.
CMS and JMS are similar with some minor differences, mostly CMS adheres to the JMS specifications. To know more about CMS API, see CMS API Overview
Downloading A-MQ C++ Client
You can Download A-MQ C++ client from the Red Hat Customer Portal C++ Client
3.5.2. C++ Example Clients
A Simple Messaging Program in C++
The following program shows how to create a simple Asynchronous consumer that can receive TextMessage objects from an A-MQ broker.
In this example, we create ConnectionFactory object. This object is used to create a CMS Connection using the ConnectionFactory. A Connection is the Object that manages the client's connection to the Provider. After creating a connection, the client creates a CMS Session to create message producers and consumers.
Example 3.13. Simple Asynchronous Consumer
#include <decaf/lang/Thread.h> #include <decaf/lang/Runnable.h > #include <decaf/util/concurrent/CountDownLatch.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/core/ActiveMQConnection.h> #include <activemq/transport/DefaultTransportListener.h> #include <activemq/library/ActiveMQCPP.h> #include <decaf/lang/Integer.h> #include <activemq/util/Config.h> #include <decaf/util/Date.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> #include <stdio.h> #include <iostream> using namespace activemq; using namespace activemq::core; using namespace activemq::transport; using namespace decaf::lang; using namespace decaf::util; using namespace decaf::util::concurrent; using namespace cms; using namespace std; class SimpleAsyncConsumer : public ExceptionListener, public MessageListener, public DefaultTransportListener { private: Connection* connection; Session* session; Destination* destination; MessageConsumer* consumer; bool useTopic; std::string brokerURI; std::string destURI; bool clientAck; private: SimpleAsyncConsumer( const SimpleAsyncConsumer& ); SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& ); public: SimpleAsyncConsumer( const std::string& brokerURI, const std::string& destURI, bool useTopic = false, bool clientAck = false ) : connection(NULL), session(NULL), destination(NULL), consumer(NULL), useTopic(useTopic), brokerURI(brokerURI), destURI(destURI), clientAck(clientAck) 1 { } virtual ~SimpleAsyncConsumer() { this->cleanup(); } void close() { this->cleanup(); } void runConsumer() 2 { try { // Create a ConnectionFactory ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI ); // Create a Connection connection = connectionFactory->createConnection(); delete connectionFactory; ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection ); if( amqConnection != NULL ) { amqConnection->addTransportListener( this ); } connection->start(); connection->setExceptionListener(this); // Create a Session if( clientAck ) { session = connection->createSession( Session::CLIENT_ACKNOWLEDGE ); } else { session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // Create the destination (Topic or Queue) if( useTopic ) { destination = session->createTopic( destURI ); } else { destination = session->createQueue( destURI ); } // Create a MessageConsumer from the Session to the Topic or Queue consumer = session->createConsumer( destination ); consumer->setMessageListener( this ); } catch (CMSException& e) { e.printStackTrace(); } } // Called from the consumer since this class is a registered MessageListener. virtual void onMessage( const Message* message ) 3 { static int count = 0; try { count++; const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message ); string text = ""; if( textMessage != NULL ) { text = textMessage->getText(); } else { text = "NOT A TEXTMESSAGE!"; } if( clientAck ) { message->acknowledge(); } printf( "Message #%d Received: %s\n", count, text.c_str() ); } catch (CMSException& e) { e.printStackTrace(); } } // If something bad happens you see it here as this class is also been // registered as an ExceptionListener with the connection. virtual void onException( const CMSException& ex AMQCPP_UNUSED ) { printf("CMS Exception occurred. Shutting down client.\n"); exit(1); } virtual void transportInterrupted() { std::cout << "The Connection's Transport has been Interrupted." << std::endl; } virtual void transportResumed() { std::cout << "The Connection's Transport has been Restored." << std::endl; } private: void cleanup(){ try { if( connection != NULL ) { connection->close(); } } catch ( CMSException& e ) { e.printStackTrace(); } delete destination; delete consumer; delete session; delete connection; } }; //////////////////////////////////////////////////////////////////////////////// int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { activemq::library::ActiveMQCPP::initializeLibrary(); std::cout << "=====================================================\n"; std::cout << "Starting the example:" << std::endl; std::cout << "-----------------------------------------------------\n"; // Set the URI to point to the IPAddress of your broker. // add any optional params to the url to enable things like // tightMarshalling or tcp logging etc. See the CMS web site for // a full list of configuration options. // // http://activemq.apache.org/cms/ // std::string brokerURI = "failover:(tcp://127.0.0.1:61616)"; //============================================================ // This is the Destination Name and URI options. Use this to // customize where the consumer listens, to have the consumer // use a topic or queue set the 'useTopics' flag. //============================================================ std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1"; //============================================================ // set to true to use topics instead of queues // Note in the code above that this causes createTopic or // createQueue to be used in the consumer. //============================================================ bool useTopics = false; //============================================================ // set to true if you want the consumer to use client ack mode // instead of the default auto ack mode. //============================================================ bool clientAck = false; // Create the consumer SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck ); // Start it up and it will listen forever. consumer.runConsumer(); // Wait to exit. std::cout << "Press 'q' to quit" << std::endl; while( std::cin.get() != 'q') {} // All CMS resources should be closed before the library is shutdown. consumer.close(); std::cout << "-----------------------------------------------------\n"; std::cout << "Finished with the example." << std::endl; std::cout << "=====================================================\n"; activemq::library::ActiveMQCPP::shutdownLibrary(); }
- 1
- The constructor of the
SimpleAsyncConsumer
class. This constructor allows the user to create an instance of the class that connects to a particular broker and destination. It also identifies the destination as a Queue or a Topic - 2
- The
runConsumer
method creates a Connection to the broker and start a new Session configured with the configured Acknowledgment mode. Once a Session is created a new Consumer can then be created and attached to the configured Destination. To listen asynchronously for new messages theSimpleAsyncConsumer
inherits fromcms::MessageListener
so that it can register itself as a Message Listener with theMessageConsumer
created inrunConsumer
method. - 3
- All the messages received by the application are dispatched to the
onMessage
method and if the message is a TextMessage its contents are printed on the screen.
Example 3.14. A simple Asynchronous producer
#include <decaf/lang/Thread.h> #include <decaf/lang/Runnable.h> #include <decaf/util/concurrent/CountDownLatch.h> #include <decaf/lang/Long.h> #include <decaf/util/Date.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/util/Config.h> #include <activemq/library/ActiveMQCPP.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> #include <stdio.h> #include <iostream> #include <memory> using namespace activemq; using namespace activemq::core; using namespace decaf; using namespace decaf::lang; using namespace decaf::util; using namespace decaf::util::concurrent; using namespace cms; using namespace std; //////////////////////////////////////////////////////////////////////////////// class SimpleProducer : public Runnable { private: Connection* connection; Session* session; Destination* destination; MessageProducer* producer; bool useTopic; bool clientAck; unsigned int numMessages; std::string brokerURI; std::string destURI; private: SimpleProducer( const SimpleProducer& ); SimpleProducer& operator= ( const SimpleProducer& ); public: SimpleProducer( const std::string& brokerURI, unsigned int numMessages, const std::string& destURI, bool useTopic = false, bool clientAck = false ) : connection(NULL), session(NULL), destination(NULL), producer(NULL), useTopic(useTopic), clientAck(clientAck), numMessages(numMessages), brokerURI(brokerURI), destURI(destURI) 1 { } virtual ~SimpleProducer(){ cleanup(); } void close() { this->cleanup(); } virtual void run() 2 { try { // Create a ConnectionFactory auto_ptr<ActiveMQConnectionFactory> connectionFactory( new ActiveMQConnectionFactory( brokerURI ) ); // Create a Connection try{ connection = connectionFactory->createConnection(); connection->start(); } catch( CMSException& e ) { e.printStackTrace(); throw e; } // Create a Session if( clientAck ) { session = connection->createSession( Session::CLIENT_ACKNOWLEDGE ); } else { session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // Create the destination (Topic or Queue) if( useTopic ) { destination = session->createTopic( destURI ); } else { destination = session->createQueue( destURI ); } // Create a MessageProducer from the Session to the Topic or Queue producer = session->createProducer( destination ); producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); // Create the Thread Id String string threadIdStr = Long::toString( Thread::currentThread()->getId() ); // Create a messages string text = (string)"Hello world! from thread " + threadIdStr; for( unsigned int ix=0; ix <numMessages; ++ix ){ TextMessage* message = session->createTextMessage( text ); message->setIntProperty( "Integer", ix ); // Tell the producer to send the message printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() ); producer->send( message ); delete message; } }catch ( CMSException& e ) { e.printStackTrace(); } } private: void cleanup(){ try { if( connection != NULL ) { connection->close(); } } catch ( CMSException& e ) { e.printStackTrace(); } delete destination; delete producer; delete session; delete connection; } }; //////////////////////////////////////////////////////////////////////////////// int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { activemq::library::ActiveMQCPP::initializeLibrary(); std::cout << "=====================================================\n"; std::cout << "Starting the example:" << std::endl; std::cout << "-----------------------------------------------------\n"; // Set the URI to point to the IPAddress of your broker. // add any optional params to the url to enable things like // tightMarshalling or tcp logging etc. See the CMS web site for // a full list of configuration options. // // http://activemq.apache.org/cms/ // std::string brokerURI = "failover://(tcp://127.0.0.1:61616)"; //============================================================ // Total number of messages for this producer to send. //============================================================ unsigned int numMessages = 2000; //============================================================ // This is the Destination Name and URI options. Use this to // customize where the Producer produces, to have the producer // use a topic or queue set the 'useTopics' flag. //============================================================ std::string destURI = "TEST.FOO"; //============================================================ // set to true to use topics instead of queues // Note in the code above that this causes createTopic or // createQueue to be used in the producer. //============================================================ bool useTopics = false; // Create the producer and run it. SimpleProducer producer( brokerURI, numMessages, destURI, useTopics ); // Publish the given number of Messages producer.run(); // Before exiting we ensure that all CMS resources are closed. producer.close(); std::cout << "-----------------------------------------------------\n"; std::cout << "Finished with the example." << std::endl; std::cout << "=====================================================\n"; activemq::library::ActiveMQCPP::shutdownLibrary(); }
- 1
- The
SimpleProducer
class exposes a similar interface to the consumer example Example 3.13, “Simple Asynchronous Consumer”. The constructor creates an instance with the configuration options for the broker and destination and the number of messages to be send to the configured destination. - 2
- The
run
method publishes the specified number of messages. Once the run method completes, the client can close theSimpleProducer
application by calling theclose()
method, which cleans up the allocated CMS resource and exits the application.
3.5.3. C++ Client on RHEL for SSL based communication with A-MQ Broker
Overview
This section describes how to enable SSL/TLS security for the AMQP protocol, where the connection is made between:
- A-MQ broker, deployed on a RHEL host, and
- Qpid C++ client, deployed on a RHEL host.
Configuring SSL/TLS for the broker on RHEL
Follow these steps to enable SSL/TLS security for the AMQP endpoint of a broker running on RHEL:
- Create a certificate,
test.jks
, for testing purposes:keytool -genkey -alias jboss -keyalg RSA -keystore test.jks -storepass password -dname "CN=test,O=test"
Store the new certificate file,test.jks
, in the broker's${A-MQ_HOME}/etc/
directory. - Configure the broker to use the
test.jks
certificate and enable SSL/TLS on the broker's AMQP connector by editing the${A-MQ_HOME}/etc/activemq.xml
file as follows:<sslContext> <sslContext keyStore="${karaf.base}/etc/test.jks" keyStorePassword="password" /> </sslContext> <transportConnectors> <transportConnector name="amqpssl" uri="amqp+ssl://0.0.0.0:61617?transport.enabledProtocols=TLSv1,TLSv1.1,TLSv1.2" />
NoteFor more details about the broker configuration, see Securing a Broker using SSL/TLS. - Export the certificate in a format that can be used by the Qpid C++ client running on RHEL (in the next step):
keytool -exportcert -rfc -keystore test.jks -storepass password -alias jboss -file ./sample_cert.cer
You need to copy the resultingsample_cert.cer
file to the client RHEL machine. One way of doing this is to use the secure copy command,scp
, to copy thesample_cert.cer
file securely across the network:scp sample_cert.cer ${USER_NAME}@0.0.0.0:${CLIENT_TARGET_PATH}
Where${USER_NAME}
is the relevant user name on the client RHEL machine and${CLIENT_TARGET_PATH}
is the location on the remote file system where you want to copy the certificate.
Configuring SSL/TLS on the client side
Follow these steps to enable SSL/TLS security on a Qpid C++ client deployed on a remote RHEL machine:
- Install the Qpid C++ client packages, along with the
qpid-send
andqpid-receive
packages for testing:yum install qpid-cpp-client yum install log4cpp yum install qpid-cpp-client-devel yum isntall log4cpp-devel
- The
keytool
command is needed for generating self-signed certificates. If it is not already available, install OpenJDK as follows:yum install java-1.8.0-openjdk-headless-1.8.0.51-0.b16.el6_6.x86_64
- Set up the client environment, using the NSS (Network Security Services) database to install the
sample_cert.cer
certificate:mkdir -p ~/nssdb certutil -A -n selfsigned -d ~/nssdb -t "CT,," -i ./sample_cert.cer
- Set the following environment variables:
export QPID_SSL_CERT_DB=${YOUR_WORK_PATH}/nssdb
- Test the new configuration using the
qpid-send
command and theqpid-receive
command:qpid-send -b amqp:ssl:0.0.0.0:61617 -a TestQueue --connection-options "{protocol:amqp1.0, ssl_ignore_hostname_verification_failure:true, username:admin, password:admin}" --content-string "hello world" qpid-receive -b amqp:ssl:0.0.0.0:61617 -a TestQueue --connection-options "{protocol:amqp1.0, ssl_ignore_hostname_verification_failure:true, username:admin, password:admin}"
3.5.4. C++ Client on Windows for SSL-Based Communication with A-MQ Broker
Overview
This section describes how to enable SSL/TLS security for the AMQP protocol, where the connection is made between:
- A-MQ broker, deployed on a Linux OS, and
- Qpid C++ client, deployed on a Windows OS.
Configuring SSL/TLS for the broker on Linux
Follow these steps to enable SSL/TLS security for the AMQP endpoint of a broker running on a Linux platform:
- Create a new certificate,
test.jks
, for testing purposes:keytool -genkey -alias jboss -keyalg RSA -keystore test.jks -storepass password -dname "CN=test,O=test"
Store the new certificate file,test.jks
, in the broker's${A-MQ_HOME}/etc/
directory. - Configure the broker to use the
test.jks
certificate and enable SSL/TLS on the broker's AMQP connector by editing the${A-MQ_HOME}/etc/activemq.xml
file as follows:<sslContext> <sslContext keyStore="${karaf.base}/etc/test.jks" keyStorePassword="password" /> </sslContext> <transportConnectors> <transportConnector name="amqpssl" uri="amqp+ssl://0.0.0.0:61617?transport.enabledProtocols=TLSv1,TLSv1.1,TLSv1.2" />
NoteFor more details about the broker configuration, see Securing a Broker using SSL/TLS. - Export the certificate in a format that can be used by the Qpid C++ client running on Windows (in the next step):
keytool -exportcert -rfc -keystore test.jks -storepass password -alias jboss -file ./sample_cert.cer
You need to copy the resultingsample_cert.cer
file to the Windows machine. One way of doing this is to use the secure copy command,scp
, to copy thesample_cert.cer
file securely across the network:scp sample_cert.cer ${USER_NAME}@0.0.0.0:${CLIENT_TARGET_PATH}
Where${USER_NAME}
is the relevant user name on the Windows machine and${CLIENT_TARGET_PATH}
is the location on the Windows file system where you want to copy the certificate.
Configuring SSL/TLS on the client side
Follow these steps to enable SSL/TLS security on a Qpid C++ client deployed on a Windows machine:
- Set up the client environment, using the
MMC.exe
utility to install thesample_cert.cer
certificate into "Trusted Root Certification Authorities", as follows:"Console Root" -> "Trusted Root Certification Authorities" -> "Certificates" Right Click -> "All Tasks" -> "Import"
- Test the new configuration using the
qpid-send
command and theqpid-receive
command:qpid-send -b amqp:ssl:0.0.0.0:61617 -a TestQueue --connection-options "{protocol:amqp1.0, ssl_ignore_hostname_verification_failure:true, username:admin, password:admin}" --content-string "hello world" qpid-receive -b amqp:ssl:0.0.0.0:61617 -a TestQueue --connection-options "{protocol:amqp1.0, ssl_ignore_hostname_verification_failure:true, username:admin, password:admin}"