3.5. C++ AMQP 1.0 Client API
C++ AMQP 1.0 Client API is based on the Apache Qpid C++ AMQP 1.0 Client API.
Note
This is an initial version of documentation for the C++ client. Regular updates and enhancements of the documentation can be expected after the GA release of Fuse 6.2.0
3.5.1. Getting Started with C++ AMQP
Introduction to C++ AMQP 1.0 Client API
C++ Messaging Service (CMS) API is used for interfacing with Message Brokers such as A-MQ. A-MQ-C++ is a client library that uses A-MQ as a message broker for clients to communicate. The architecture of CMS supports pluggable transports and wire formats. At present, OpenWire and Stomp protocols are supported over TCP and SSL. Failover Transport is also supported for reliable client operation. In addition to CMS, A-MQ-C++ provides a set of classes that support platform independent constructs such as threading, I/O, sockets.
CMS and JMS are similar with some minor differences, mostly CMS adheres to the JMS specifications. To know more about CMS API, see CMS API Overview
Downloading A-MQ C++ Client
You can Download A-MQ C++ client from the Red Hat Customer Portal C++ Client
3.5.2. C++ Example Clients
A Simple Messaging Program in C++
The following program shows how to create a simple Asynchronous consumer that can receive TextMessage objects from an A-MQ broker.
In this example, we create ConnectionFactory object. This object is used to create a CMS Connection using the ConnectionFactory. A Connection is the Object that manages the client's connection to the Provider. After creating a connection, the client creates a CMS Session to create message producers and consumers.
Example 3.13. Simple Asynchronous Consumer
#include <decaf/lang/Thread.h> #include <decaf/lang/Runnable.h > #include <decaf/util/concurrent/CountDownLatch.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/core/ActiveMQConnection.h> #include <activemq/transport/DefaultTransportListener.h> #include <activemq/library/ActiveMQCPP.h> #include <decaf/lang/Integer.h> #include <activemq/util/Config.h> #include <decaf/util/Date.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> #include <stdio.h> #include <iostream> using namespace activemq; using namespace activemq::core; using namespace activemq::transport; using namespace decaf::lang; using namespace decaf::util; using namespace decaf::util::concurrent; using namespace cms; using namespace std; class SimpleAsyncConsumer : public ExceptionListener, public MessageListener, public DefaultTransportListener { private: Connection* connection; Session* session; Destination* destination; MessageConsumer* consumer; bool useTopic; std::string brokerURI; std::string destURI; bool clientAck; private: SimpleAsyncConsumer( const SimpleAsyncConsumer& ); SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& ); public: SimpleAsyncConsumer( const std::string& brokerURI, const std::string& destURI, bool useTopic = false, bool clientAck = false ) : connection(NULL), session(NULL), destination(NULL), consumer(NULL), useTopic(useTopic), brokerURI(brokerURI), destURI(destURI), clientAck(clientAck) 1 { } virtual ~SimpleAsyncConsumer() { this->cleanup(); } void close() { this->cleanup(); } void runConsumer() 2 { try { // Create a ConnectionFactory ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI ); // Create a Connection connection = connectionFactory->createConnection(); delete connectionFactory; ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection ); if( amqConnection != NULL ) { amqConnection->addTransportListener( this ); } connection->start(); connection->setExceptionListener(this); // Create a Session if( clientAck ) { session = connection->createSession( Session::CLIENT_ACKNOWLEDGE ); } else { session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // Create the destination (Topic or Queue) if( useTopic ) { destination = session->createTopic( destURI ); } else { destination = session->createQueue( destURI ); } // Create a MessageConsumer from the Session to the Topic or Queue consumer = session->createConsumer( destination ); consumer->setMessageListener( this ); } catch (CMSException& e) { e.printStackTrace(); } } // Called from the consumer since this class is a registered MessageListener. virtual void onMessage( const Message* message ) 3 { static int count = 0; try { count++; const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message ); string text = ""; if( textMessage != NULL ) { text = textMessage->getText(); } else { text = "NOT A TEXTMESSAGE!"; } if( clientAck ) { message->acknowledge(); } printf( "Message #%d Received: %s\n", count, text.c_str() ); } catch (CMSException& e) { e.printStackTrace(); } } // If something bad happens you see it here as this class is also been // registered as an ExceptionListener with the connection. virtual void onException( const CMSException& ex AMQCPP_UNUSED ) { printf("CMS Exception occurred. Shutting down client.\n"); exit(1); } virtual void transportInterrupted() { std::cout << "The Connection's Transport has been Interrupted." << std::endl; } virtual void transportResumed() { std::cout << "The Connection's Transport has been Restored." << std::endl; } private: void cleanup(){ try { if( connection != NULL ) { connection->close(); } } catch ( CMSException& e ) { e.printStackTrace(); } delete destination; delete consumer; delete session; delete connection; } }; //////////////////////////////////////////////////////////////////////////////// int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { activemq::library::ActiveMQCPP::initializeLibrary(); std::cout << "=====================================================\n"; std::cout << "Starting the example:" << std::endl; std::cout << "-----------------------------------------------------\n"; // Set the URI to point to the IPAddress of your broker. // add any optional params to the url to enable things like // tightMarshalling or tcp logging etc. See the CMS web site for // a full list of configuration options. // // http://activemq.apache.org/cms/ // std::string brokerURI = "failover:(tcp://127.0.0.1:61616)"; //============================================================ // This is the Destination Name and URI options. Use this to // customize where the consumer listens, to have the consumer // use a topic or queue set the 'useTopics' flag. //============================================================ std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1"; //============================================================ // set to true to use topics instead of queues // Note in the code above that this causes createTopic or // createQueue to be used in the consumer. //============================================================ bool useTopics = false; //============================================================ // set to true if you want the consumer to use client ack mode // instead of the default auto ack mode. //============================================================ bool clientAck = false; // Create the consumer SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck ); // Start it up and it will listen forever. consumer.runConsumer(); // Wait to exit. std::cout << "Press 'q' to quit" << std::endl; while( std::cin.get() != 'q') {} // All CMS resources should be closed before the library is shutdown. consumer.close(); std::cout << "-----------------------------------------------------\n"; std::cout << "Finished with the example." << std::endl; std::cout << "=====================================================\n"; activemq::library::ActiveMQCPP::shutdownLibrary(); }
- 1
- The constructor of the
SimpleAsyncConsumer
class. This constructor allows the user to create an instance of the class that connects to a particular broker and destination. It also identifies the destination as a Queue or a Topic - 2
- The
runConsumer
method creates a Connection to the broker and start a new Session configured with the configured Acknowledgment mode. Once a Session is created a new Consumer can then be created and attached to the configured Destination. To listen asynchronously for new messages theSimpleAsyncConsumer
inherits fromcms::MessageListener
so that it can register itself as a Message Listener with theMessageConsumer
created inrunConsumer
method. - 3
- All the messages received by the application are dispatched to the
onMessage
method and if the message is a TextMessage its contents are printed on the screen.
Example 3.14. A simple Asynchronous producer
#include <decaf/lang/Thread.h> #include <decaf/lang/Runnable.h> #include <decaf/util/concurrent/CountDownLatch.h> #include <decaf/lang/Long.h> #include <decaf/util/Date.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/util/Config.h> #include <activemq/library/ActiveMQCPP.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> #include <stdio.h> #include <iostream> #include <memory> using namespace activemq; using namespace activemq::core; using namespace decaf; using namespace decaf::lang; using namespace decaf::util; using namespace decaf::util::concurrent; using namespace cms; using namespace std; //////////////////////////////////////////////////////////////////////////////// class SimpleProducer : public Runnable { private: Connection* connection; Session* session; Destination* destination; MessageProducer* producer; bool useTopic; bool clientAck; unsigned int numMessages; std::string brokerURI; std::string destURI; private: SimpleProducer( const SimpleProducer& ); SimpleProducer& operator= ( const SimpleProducer& ); public: SimpleProducer( const std::string& brokerURI, unsigned int numMessages, const std::string& destURI, bool useTopic = false, bool clientAck = false ) : connection(NULL), session(NULL), destination(NULL), producer(NULL), useTopic(useTopic), clientAck(clientAck), numMessages(numMessages), brokerURI(brokerURI), destURI(destURI) 1 { } virtual ~SimpleProducer(){ cleanup(); } void close() { this->cleanup(); } virtual void run() 2 { try { // Create a ConnectionFactory auto_ptr<ActiveMQConnectionFactory> connectionFactory( new ActiveMQConnectionFactory( brokerURI ) ); // Create a Connection try{ connection = connectionFactory->createConnection(); connection->start(); } catch( CMSException& e ) { e.printStackTrace(); throw e; } // Create a Session if( clientAck ) { session = connection->createSession( Session::CLIENT_ACKNOWLEDGE ); } else { session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // Create the destination (Topic or Queue) if( useTopic ) { destination = session->createTopic( destURI ); } else { destination = session->createQueue( destURI ); } // Create a MessageProducer from the Session to the Topic or Queue producer = session->createProducer( destination ); producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); // Create the Thread Id String string threadIdStr = Long::toString( Thread::currentThread()->getId() ); // Create a messages string text = (string)"Hello world! from thread " + threadIdStr; for( unsigned int ix=0; ix <numMessages; ++ix ){ TextMessage* message = session->createTextMessage( text ); message->setIntProperty( "Integer", ix ); // Tell the producer to send the message printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() ); producer->send( message ); delete message; } }catch ( CMSException& e ) { e.printStackTrace(); } } private: void cleanup(){ try { if( connection != NULL ) { connection->close(); } } catch ( CMSException& e ) { e.printStackTrace(); } delete destination; delete producer; delete session; delete connection; } }; //////////////////////////////////////////////////////////////////////////////// int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { activemq::library::ActiveMQCPP::initializeLibrary(); std::cout << "=====================================================\n"; std::cout << "Starting the example:" << std::endl; std::cout << "-----------------------------------------------------\n"; // Set the URI to point to the IPAddress of your broker. // add any optional params to the url to enable things like // tightMarshalling or tcp logging etc. See the CMS web site for // a full list of configuration options. // // http://activemq.apache.org/cms/ // std::string brokerURI = "failover://(tcp://127.0.0.1:61616)"; //============================================================ // Total number of messages for this producer to send. //============================================================ unsigned int numMessages = 2000; //============================================================ // This is the Destination Name and URI options. Use this to // customize where the Producer produces, to have the producer // use a topic or queue set the 'useTopics' flag. //============================================================ std::string destURI = "TEST.FOO"; //============================================================ // set to true to use topics instead of queues // Note in the code above that this causes createTopic or // createQueue to be used in the producer. //============================================================ bool useTopics = false; // Create the producer and run it. SimpleProducer producer( brokerURI, numMessages, destURI, useTopics ); // Publish the given number of Messages producer.run(); // Before exiting we ensure that all CMS resources are closed. producer.close(); std::cout << "-----------------------------------------------------\n"; std::cout << "Finished with the example." << std::endl; std::cout << "=====================================================\n"; activemq::library::ActiveMQCPP::shutdownLibrary(); }
- 1
- The
SimpleProducer
class exposes a similar interface to the consumer example Example 3.13, “Simple Asynchronous Consumer”. The constructor creates an instance with the configuration options for the broker and destination and the number of messages to be send to the configured destination. - 2
- The
run
method publishes the specified number of messages. Once the run method completes, the client can close theSimpleProducer
application by calling theclose()
method, which cleans up the allocated CMS resource and exits the application.