2.3. OpenWire C++ Client API
Overview
The CMS API is a C++ corollary to the JMS API. The CMS makes every attempt to maintain parity with the JMS API as possible. It only diverges when a JMS feature depended on features in the Java programming language. Even though there are some differences most are minor and for the most part CMS adheres to the JMS spec. Having a firm grasp on how JMS works should make using the C++ API easier.
Note
In order to use the CMS API, you will need to download the source and build it for your environment.
The connection factory
The first interface you will use in the CMS API is the
ConnectionFactory
. A ConnectionFactory
allows you to create connections which maintain a connection to a message broker.
The simplest way to obtain an instance of a
ConnectionFactory
is to use the static createCMSConnectionFactory()
method that all CMS provider libraries are required to implement. Example 2.3, “Creating a Connection Factory” demonstrates how to obtain a new ConnectionFactory
.
Example 2.3. Creating a Connection Factory
std::auto_ptr<cms::ConnectionFactory> connectionFactory( cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
The
createCMSConnectionFactory()
takes a single string parameter which a URI that defines the connection that will be created by the factory. Additionally configuration information can be encoded in the URI. For details on how to construct a broker URI see the Connection Reference.
The connection
Once you've created a connection factory, you need to create a connection using the factory. A
Connection
is a object that manages the client's connection to the broker. Example 2.4, “Creating a Connection” shows the code to create a connection.
Example 2.4. Creating a Connection
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
Upon creation the connection object attempts to connect to the broker, if the connection fails then an
CMSException
is thrown with a description of the error that occurred stored in its message property.
The connection interface defines an object that is the client's active connection to the CMS provider. In most cases the client will only create one connection object since it is considered a heavyweight object.
A connection serves several purposes:
- It encapsulates an open connection with a JMS provider. It typically represents an open TCP/IP socket between a client and a provider service daemon.
- Its creation is where client authentication takes place.
- It can specify a unique client identifier.
- It provides a
ConnectionMetaData
object. - It supports an optional
ExceptionListener
object.
The session
After creating the connection the client must create a Session in order to create message producers and consumers. Example 2.5, “Creating a Session” shows how to create a session object from the connection.
Example 2.5. Creating a Session
std::auto_ptr<cms::Session> session( connection->createSession(cms::Session::CLIENT_ACKNOWLEDGE) );
When a client creates a session it must specify the mode in which the session will acknowledge the messages that it receives and dispatches. The modes supported are summarized in Table 2.1, “Support Acknowledgement Modes”.
Acknowledge Mode | Description |
---|---|
AUTO_ACKNOWLEDGE | The session automatically acknowledges a client's receipt of a message when the session returns successfully from a recieve call or when the message listener of the session returns successfully. |
CLIENT_ACKNOWLEDGE | The client acknowledges a consumed message by calling the message's acknowledge method. Acknowledging a consumed message acknowledges all messages that the session has consumed. |
DUPS_OK_ACKNOWLEDGE | The session to lazily acknowledges the delivery of messages. This is likely to result in the delivery of some duplicate messages if the broker fails, so it should only be used by consumers that can tolerate duplicate messages. Use of this mode can reduce session overhead by minimizing the work the session does to prevent duplicates. |
SESSION_TRANSACTED | The session is transacted and the acknowledge of messages is handled internally. |
INDIVIDUAL_ACKNOWLEDGE | Acknowledges are applied to a single message only. |
Note
If you do not specify an acknowledgement mode, the default is
AUTO_ACKNOWLEDGE
.
A session serves several purposes:
- It is a factory for producers and consumers.
- It supplies provider-optimized message factories.
- It is a factory for temporary topics and temporary queues.
- It provides a way to create a queue or a topic for those clients that need to dynamically manipulate provider-specific destination names.
- It supports a single series of transactions that combine work spanning its producers and consumers into atomic units.
- It defines a serial order for the messages it consumes and the messages it produces.
- It retains messages it consumes until they have been acknowledged.
- It serializes execution of message listeners registered with its message consumers.
Note
A session can create and service multiple producers and consumers.
Resources
The API reference documentation for the A-MQ C++ API can be found at http://activemq.apache.org/cms/api.html.
Example
Example 2.6, “CMS Producer Connection” shows code for creating a message producer that sends messages to the queue
EXAMPLE.FOO
.
Example 2.6. CMS Producer Connection
#include <decaf/lang/Thread.h> #include <decaf/lang/Runnable.h> #include <decaf/util/concurrent/CountDownLatch.h> #include <decaf/lang/Integer.h> #include <decaf/util/Date.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/util/Config.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> ... using namespace activemq::core; using namespace decaf::util::concurrent; using namespace decaf::util; using namespace decaf::lang; using namespace cms; using namespace std; ... // Create a ConnectionFactory auto_ptr<ConnectionFactory> connectionFactory( ConnectionFactory::createCMSConnectionFactory( "tcp://127.1.0.1:61616?wireFormat=openwire" ) ); // Create a Connection connection = connectionFactory->createConnection(); connection->start(); // Create a Session session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); destination = session->createQueue( "EXAMPLE.FOO" ); // Create a MessageProducer from the Session to the Queue producer = session->createProducer( destination ); ...