このコンテンツは選択した言語では利用できません。

3.5. C++ AMQP 1.0 Client API


C++ AMQP 1.0 Client API is based on the Apache Qpid C++ AMQP 1.0 Client API.
Note
This is an initial version of documentation for the C++ client. Regular updates and enhancements of the documentation can be expected after the GA release of Fuse 6.2.0

3.5.1. Getting Started with C++ AMQP

Introduction to C++ AMQP 1.0 Client API

C++ Messaging Service (CMS) API is used for interfacing with Message Brokers such as A-MQ. A-MQ-C++ is a client library that uses A-MQ as a message broker for clients to communicate. The architecture of CMS supports pluggable transports and wire formats. At present, OpenWire and Stomp protocols are supported over TCP and SSL. Failover Transport is also supported for reliable client operation. In addition to CMS, A-MQ-C++ provides a set of classes that support platform independent constructs such as threading, I/O, sockets.
CMS and JMS are similar with some minor differences, mostly CMS adheres to the JMS specifications. To know more about CMS API, see CMS API Overview

Downloading A-MQ C++ Client

You can Download A-MQ C++ client from the Red Hat Customer Portal C++ Client

3.5.2. C++ Example Clients

A Simple Messaging Program in C++

The following program shows how to create a simple Asynchronous consumer that can receive TextMessage objects from an A-MQ broker.
In this example, we create ConnectionFactory object. This object is used to create a CMS Connection using the ConnectionFactory. A Connection is the Object that manages the client's connection to the Provider. After creating a connection, the client creates a CMS Session to create message producers and consumers.

Example 3.13. Simple Asynchronous Consumer

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h >
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
 
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
 
class SimpleAsyncConsumer : public ExceptionListener,
                            public MessageListener,
                            public DefaultTransportListener {
private:
 
    Connection* connection;
    Session* session;
    Destination* destination;
    MessageConsumer* consumer;
    bool useTopic;
    std::string brokerURI;
    std::string destURI;
    bool clientAck;
 
private:
 
    SimpleAsyncConsumer( const SimpleAsyncConsumer& );
    SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );
 
public:
 
    SimpleAsyncConsumer( const std::string& brokerURI, 
                         const std::string& destURI,
                         bool useTopic = false,
                         bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        consumer(NULL),
        useTopic(useTopic),
        brokerURI(brokerURI),
        destURI(destURI),
        clientAck(clientAck)  1
			      {
    }
 
    virtual ~SimpleAsyncConsumer() {
        this->cleanup();
    }
 
    void close() {
        this->cleanup();
    }
 
    void runConsumer() 2
		      { 
 
        try {
 
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory =
                new ActiveMQConnectionFactory( brokerURI );
 
            // Create a Connection
            connection = connectionFactory->createConnection();
            delete connectionFactory;
 
            ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
            if( amqConnection != NULL ) {
                amqConnection->addTransportListener( this );
            }
 
            connection->start();
 
            connection->setExceptionListener(this);
 
            // Create a Session
            if( clientAck ) {
                session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }
 
            // Create the destination (Topic or Queue)
            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );
            }
 
            // Create a MessageConsumer from the Session to the Topic or Queue
            consumer = session->createConsumer( destination );
            consumer->setMessageListener( this );
 
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
 
    // Called from the consumer since this class is a registered MessageListener.
    virtual void onMessage( const Message* message )  3
						      {
 
        static int count = 0;
 
        try
        {
            count++;
            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            string text = "";
 
            if( textMessage != NULL ) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";
            }
 
            if( clientAck ) {
                message->acknowledge();
            }
 
            printf( "Message #%d Received: %s\n", count, text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
 
    // If something bad happens you see it here as this class is also been
    // registered as an ExceptionListener with the connection.
    virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
        printf("CMS Exception occurred.  Shutting down client.\n");
        exit(1);
    }
 
    virtual void transportInterrupted() {
        std::cout << "The Connection's Transport has been Interrupted." << std::endl;
    }
 
    virtual void transportResumed() {
        std::cout << "The Connection's Transport has been Restored." << std::endl;
    }
 
private:
 
    void cleanup(){
 
        try {
            if( connection != NULL ) {
                connection->close();
            }
        } catch ( CMSException& e ) { 
            e.printStackTrace(); 
        }
 
        delete destination;
        delete consumer;
        delete session;
        delete connection;
    }
};
 
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
 
    activemq::library::ActiveMQCPP::initializeLibrary();
 
    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";
 
    // Set the URI to point to the IPAddress of your broker.
    // add any optional params to the url to enable things like
    // tightMarshalling or tcp logging etc.  See the CMS web site for
    // a full list of configuration options.
    //
    //  http://activemq.apache.org/cms/
    //
    std::string brokerURI =
        "failover:(tcp://127.0.0.1:61616)";
 
    //============================================================
    // This is the Destination Name and URI options.  Use this to
    // customize where the consumer listens, to have the consumer
    // use a topic or queue set the 'useTopics' flag.
    //============================================================
    std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";
 
    //============================================================
    // set to true to use topics instead of queues
    // Note in the code above that this causes createTopic or
    // createQueue to be used in the consumer.
    //============================================================
    bool useTopics = false;
 
    //============================================================
    // set to true if you want the consumer to use client ack mode
    // instead of the default auto ack mode.
    //============================================================
    bool clientAck = false;
 
    // Create the consumer
    SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
 
    // Start it up and it will listen forever.
    consumer.runConsumer();
 
    // Wait to exit.
    std::cout << "Press 'q' to quit" << std::endl;
    while( std::cin.get() != 'q') {}
 
    // All CMS resources should be closed before the library is shutdown.
    consumer.close();
 
    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";
 
    activemq::library::ActiveMQCPP::shutdownLibrary();
}      


1
The constructor of the SimpleAsyncConsumer class. This constructor allows the user to create an instance of the class that connects to a particular broker and destination. It also identifies the destination as a Queue or a Topic
2
The runConsumer method creates a Connection to the broker and start a new Session configured with the configured Acknowledgment mode. Once a Session is created a new Consumer can then be created and attached to the configured Destination. To listen asynchronously for new messages the SimpleAsyncConsumer inherits from cms::MessageListener so that it can register itself as a Message Listener with the MessageConsumer created in runConsumer method.
3
All the messages received by the application are dispatched to the onMessage method and if the message is a TextMessage its contents are printed on the screen.

Example 3.14.  A simple Asynchronous producer

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
 
using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
 
////////////////////////////////////////////////////////////////////////////////
class SimpleProducer : public Runnable {
private:
 
    Connection* connection;
    Session* session;
    Destination* destination;
    MessageProducer* producer;
    bool useTopic;
    bool clientAck;
    unsigned int numMessages;
    std::string brokerURI;
    std::string destURI;
 
private:
 
    SimpleProducer( const SimpleProducer& );
    SimpleProducer& operator= ( const SimpleProducer& );
 
public:
 
    SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
                    const std::string& destURI, bool useTopic = false, bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        producer(NULL),
        useTopic(useTopic),
        clientAck(clientAck),
        numMessages(numMessages),
        brokerURI(brokerURI),
        destURI(destURI) 1
			  {
    }
 
    virtual ~SimpleProducer(){
        cleanup();
    }
 
    void close() {
        this->cleanup();
    }
 
    virtual void run() 2
			{
        try {
 
            // Create a ConnectionFactory
            auto_ptr<ActiveMQConnectionFactory> connectionFactory(
                new ActiveMQConnectionFactory( brokerURI ) );
 
            // Create a Connection
            try{
                connection = connectionFactory->createConnection();
                connection->start();
            } catch( CMSException& e ) {
                e.printStackTrace();
                throw e;
            }
 
            // Create a Session
            if( clientAck ) {
                session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }
 
            // Create the destination (Topic or Queue)
            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );
            }
 
            // Create a MessageProducer from the Session to the Topic or Queue
            producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
 
            // Create the Thread Id String
            string threadIdStr = Long::toString( Thread::currentThread()->getId() );
 
            // Create a messages
            string text = (string)"Hello world! from thread " + threadIdStr;
 
            for( unsigned int ix=0; ix <numMessages; ++ix ){
                TextMessage* message = session->createTextMessage( text );
 
                message->setIntProperty( "Integer", ix );
 
                // Tell the producer to send the message
                printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );
                producer->send( message );
 
                delete message;
            }
 
        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }
 
private:
 
    void cleanup(){
 
        try {
            if( connection != NULL ) {
                connection->close();
            }
        } catch ( CMSException& e ) { 
            e.printStackTrace(); 
        }
 
        delete destination;
        delete producer;
        delete session;
        delete connection;
    }
};
 
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
 
    activemq::library::ActiveMQCPP::initializeLibrary();
 
    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";
 
    // Set the URI to point to the IPAddress of your broker.
    // add any optional params to the url to enable things like
    // tightMarshalling or tcp logging etc.  See the CMS web site for
    // a full list of configuration options.
    //
    //  http://activemq.apache.org/cms/
    //
    std::string brokerURI =
        "failover://(tcp://127.0.0.1:61616)";
 
    //============================================================
    // Total number of messages for this producer to send.
    //============================================================
    unsigned int numMessages = 2000;
 
    //============================================================
    // This is the Destination Name and URI options.  Use this to
    // customize where the Producer produces, to have the producer
    // use a topic or queue set the 'useTopics' flag.
    //============================================================
    std::string destURI = "TEST.FOO";
 
    //============================================================
    // set to true to use topics instead of queues
    // Note in the code above that this causes createTopic or
    // createQueue to be used in the producer.
    //============================================================
    bool useTopics = false;
 
    // Create the producer and run it.
    SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
 
    // Publish the given number of Messages
    producer.run();
 
    // Before exiting we ensure that all CMS resources are closed.
    producer.close();
 
    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";
 
    activemq::library::ActiveMQCPP::shutdownLibrary();
}

1
The SimpleProducer class exposes a similar interface to the consumer example Example 3.13, “Simple Asynchronous Consumer”. The constructor creates an instance with the configuration options for the broker and destination and the number of messages to be send to the configured destination.
2
The run method publishes the specified number of messages. Once the run method completes, the client can close the SimpleProducer application by calling the close() method, which cleans up the allocated CMS resource and exits the application.
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.