Red Hat AMQ 6
As of February 2025, Red Hat is no longer supporting Red Hat AMQ 6. If you are using AMQ 6, please upgrade: Migrating to AMQ 7.Client Connectivity Guide
Creating and tuning clients connections to message brokers
Copyright © 2011-2015 Red Hat, Inc. and/or its affiliates.
Abstract
Chapter 1. Introduction Copy linkLink copied to clipboard!
Abstract
1.1. JBoss A-MQ Client APIs Copy linkLink copied to clipboard!
Transports and protocols Copy linkLink copied to clipboard!
- Simple Text Orientated Messaging Protocol(STOMP)—allows developers to use a wide variety of client APIs to connect to a broker.
- Discovery—allows clients to connect to one or more brokers without knowing the connection details for a specific broker. See Using Networks of Brokers.
- VM—allows clients to directly communicate with other clients in the same virtual machine. See Section 2.8, “Intra-JVM Connections”.
- Peer—allows clients to communicate with each other without using an external message broker. See Section 2.9, “Peer Protocol”.
Supported Client APIs Copy linkLink copied to clipboard!
- C clients
- C++ clients
- C# and .NET clients
- Delphi clients
- Flash clients
- Perl clients
- PHP clients
- Pike clients
- Python clients
- C++ Clients
- Python clients
- .NET clients
Configuration Copy linkLink copied to clipboard!
- transport options—configured on the connection. These options are configured using the connection URI and may be set by the broker. They apply to all clients using the connection.
- destination options—configured on a per destination basis. These options are configured when the destination is created and impact all of the clients that send or receive messages using the destination. They are always set by clients.
1.2. Preparing to use Maven Copy linkLink copied to clipboard!
Overview Copy linkLink copied to clipboard!
Prerequisites Copy linkLink copied to clipboard!
- Maven installation—Maven is a free, open source build tool from Apache. You can download the latest version from the Maven download page.
- Network connection—whilst performing a build, Maven dynamically searches external repositories and downloads the required artifacts on the fly. By default, Maven looks for repositories that are accessed over the Internet. You can change this behavior so that Maven will prefer searching repositories that are on a local network.NoteMaven can run in an offline mode. In offline mode Maven will only look for artifacts in its local repository.
Adding the Red Hat JBoss A-MQ repository Copy linkLink copied to clipboard!
settings.xml file. Maven looks for your settings.xml file in the .m2 directory of the user's home directory. If there is not a user specified settings.xml file, Maven will use the system-level settings.xml file at M2_HOME/conf/settings.xml.
.m2/settings.xml file or modify the system-level settings. In the settings.xml file, add the repository element for the JBoss A-MQ repository as shown in bold text in Example 1.1, “Adding the Red Hat JBoss A-MQ Repositories to Maven”.
Example 1.1. Adding the Red Hat JBoss A-MQ Repositories to Maven
fusesource-snapshotrepository—if you want to experiment with building your application using an Red Hat JBoss A-MQ snapshot kit, you can include this repository.apache-publicrepository—you might not always need this repository, but it is often useful to include it, because JBoss A-MQ depends on many of the artifacts from Apache.
Artifacts Copy linkLink copied to clipboard!
Maven coordinates Copy linkLink copied to clipboard!
{groupId, artifactId, version}. Sometimes Maven augments the basic set of coordinates with the additional coordinates, packaging and classifier. A tuple can be written with the basic coordinates, or with the additional packaging coordinate, or with the addition of both the packaging and classifier coordinates, as follows:
groupdId:artifactId:version groupdId:artifactId:packaging:version groupdId:artifactId:packaging:classifier:version
groupdId:artifactId:version
groupdId:artifactId:packaging:version
groupdId:artifactId:packaging:classifier:version
- groupdId
- Defines a scope for the name of the artifact. You would typically use all or part of a package name as a group ID—for example,
org.fusesource.example. - artifactId
- Defines the artifact name (relative to the group ID).
- version
- Specifies the artifact's version. A version number can have up to four parts:
n.n.n.n, where the last part of the version number can contain non-numeric characters (for example, the last part of1.0-SNAPSHOTis the alphanumeric substring,0-SNAPSHOT). - packaging
- Defines the packaged entity that is produced when you build the project. For OSGi projects, the packaging is
bundle. The default value isjar. - classifier
- Enables you to distinguish between artifacts that were built from the same POM, but have different content.
dependency element to a POM:
bundle package type in the preceding dependency, because a bundle is just a particular kind of JAR file and jar is the default Maven package type. If you do need to specify the packaging type explicitly in a dependency, however, you can use the type element.
1.3. Preparing to use AMQ with SSL Copy linkLink copied to clipboard!
Overview Copy linkLink copied to clipboard!
- To generate a certificate for the amq broker, create a directory on your system to hold the generated files. For example, mkdir certificates_dir
- To generate the certificates, navigate to the certificates directory and run the following command.
keytool -genkey -alias broker -keyalg RSA -keystore broker.ks \ -storepass ${general_passwd} -dname "O=RedHat Inc.,CN=$(hostname)" \ -keypass ${general_passwd} -validity 99999keytool -genkey -alias broker -keyalg RSA -keystore broker.ks \ -storepass ${general_passwd} -dname "O=RedHat Inc.,CN=$(hostname)" \ -keypass ${general_passwd} -validity 99999Copy to Clipboard Copied! Toggle word wrap Toggle overflow where,general_passwdis the value of the password that you need to specify andhostnamespecify the hostname as per the settings on your system
Setting up A-MQ for listening to amqp+ssl connection Copy linkLink copied to clipboard!
activemq.xml file to include the authentication settings
- For Server authentication, add the amqp+ssl connector to the list if
transportConnectorsinactivemq.xml.<transportConnector name="amqp+ssl" uri="amqp+ssl://<hostname>:5671"/>
<transportConnector name="amqp+ssl" uri="amqp+ssl://<hostname>:5671"/>Copy to Clipboard Copied! Toggle word wrap Toggle overflow - For Client authentication, add the amqp+ssl connector to the list if
transportConnectorsinactivemq.xml<transportConnector name="amqp+ssl" uri="amqp+ssl://<hostname>:5671?needClientAuth=true"/>
<transportConnector name="amqp+ssl" uri="amqp+ssl://<hostname>:5671?needClientAuth=true"/>Copy to Clipboard Copied! Toggle word wrap Toggle overflow - For skip SASL authentication, enable the anonymous access property for the
simpleAuthenticationPlugininactivemq.xml<simpleAuthenticationPlugin anonymousAccessAllowed="true"/>
<simpleAuthenticationPlugin anonymousAccessAllowed="true"/>Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Chapter 2. OpenWire ActiveMQ Client APIs Copy linkLink copied to clipboard!
Abstract
2.1. General Approach to Establishing a Connection Copy linkLink copied to clipboard!
Steps to establish a connection Copy linkLink copied to clipboard!
- Get an instance of the Red Hat JBoss A-MQ connection factory.Depending on the environment, the application can create a new instance of the connection factory or use JNDI, or another mechanism, to look up the connection factory.
- Use the connection factory to create a connection.
- Get an instance of the destination used for sending or receiving messages.Destinations are administered objects that are typically created by the broker. The JBoss A-MQ allows clients to create destinations on-demand. You can also look up destinations using JNDI or another mechanism.
- Use the connection to create a session.The session is the factory for creating producers and consumers. The session also is a factory for creating messages.
- Use the session to create the message consumer or message producer.
- Start the connection.
2.2. OpenWire JMS Client API Copy linkLink copied to clipboard!
Overview Copy linkLink copied to clipboard!
The connection factory Copy linkLink copied to clipboard!
ActiveMQConnectionFactory, is used to create connections to brokers and does not need to be looked up using JNDI. Instances are created using a broker URI that specifies one of the transport connectors configured on a broker and the connection factory will do the heavy lifting.
ActiveMQConnectionFactory constructors.
Example 2.1. Connection Factory Constructors
ActiveMQConnectionFactory(String brokerURI);ActiveMQConnectionFactory(URI brokerURI);ActiveMQConnectionFactory(String username,
String password,
String brokerURI);ActiveMQConnectionFactory(String username,
String password,
URI brokerURI);The connection Copy linkLink copied to clipboard!
Connection object will suffice. However, JBoss A-MQ does provide an implementation, ActiveMQConnection, that provides a number of additional methods for working with the broker. Using ActiveMQConnection will make your client code less portable between JMS providers.
The session Copy linkLink copied to clipboard!
Example Copy linkLink copied to clipboard!
EXAMPLE.FOO.
Example 2.2. JMS Producer Connection
2.3. OpenWire C++ Client API Copy linkLink copied to clipboard!
Overview Copy linkLink copied to clipboard!
The connection factory Copy linkLink copied to clipboard!
ConnectionFactory. A ConnectionFactory allows you to create connections which maintain a connection to a message broker.
ConnectionFactory is to use the static createCMSConnectionFactory() method that all CMS provider libraries are required to implement. Example 2.3, “Creating a Connection Factory” demonstrates how to obtain a new ConnectionFactory.
Example 2.3. Creating a Connection Factory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
createCMSConnectionFactory() takes a single string parameter which a URI that defines the connection that will be created by the factory. Additionally configuration information can be encoded in the URI. For details on how to construct a broker URI see the Connection Reference.
The connection Copy linkLink copied to clipboard!
Connection is a object that manages the client's connection to the broker. Example 2.4, “Creating a Connection” shows the code to create a connection.
Example 2.4. Creating a Connection
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
CMSException is thrown with a description of the error that occurred stored in its message property.
- It encapsulates an open connection with a JMS provider. It typically represents an open TCP/IP socket between a client and a provider service daemon.
- Its creation is where client authentication takes place.
- It can specify a unique client identifier.
- It provides a
ConnectionMetaDataobject. - It supports an optional
ExceptionListenerobject.
The session Copy linkLink copied to clipboard!
Example 2.5. Creating a Session
std::auto_ptr<cms::Session> session( connection->createSession(cms::Session::CLIENT_ACKNOWLEDGE) );
std::auto_ptr<cms::Session> session( connection->createSession(cms::Session::CLIENT_ACKNOWLEDGE) );
| Acknowledge Mode | Description |
|---|---|
AUTO_ACKNOWLEDGE | The session automatically acknowledges a client's receipt of a message when the session returns successfully from a recieve call or when the message listener of the session returns successfully. |
CLIENT_ACKNOWLEDGE | The client acknowledges a consumed message by calling the message's acknowledge method. Acknowledging a consumed message acknowledges all messages that the session has consumed. |
DUPS_OK_ACKNOWLEDGE | The session to lazily acknowledges the delivery of messages. This is likely to result in the delivery of some duplicate messages if the broker fails, so it should only be used by consumers that can tolerate duplicate messages. Use of this mode can reduce session overhead by minimizing the work the session does to prevent duplicates. |
SESSION_TRANSACTED | The session is transacted and the acknowledge of messages is handled internally. |
INDIVIDUAL_ACKNOWLEDGE | Acknowledges are applied to a single message only. |
AUTO_ACKNOWLEDGE.
- It is a factory for producers and consumers.
- It supplies provider-optimized message factories.
- It is a factory for temporary topics and temporary queues.
- It provides a way to create a queue or a topic for those clients that need to dynamically manipulate provider-specific destination names.
- It supports a single series of transactions that combine work spanning its producers and consumers into atomic units.
- It defines a serial order for the messages it consumes and the messages it produces.
- It retains messages it consumes until they have been acknowledged.
- It serializes execution of message listeners registered with its message consumers.
Resources Copy linkLink copied to clipboard!
Example Copy linkLink copied to clipboard!
EXAMPLE.FOO.
Example 2.6. CMS Producer Connection
2.4. OpenWire .Net Client API Copy linkLink copied to clipboard!
Overview Copy linkLink copied to clipboard!
Resources Copy linkLink copied to clipboard!
Example Copy linkLink copied to clipboard!
EXAMPLE.FOO.
Example 2.7. NMS Producer Connection
2.5. Configuring NMS.ActiveMQ Copy linkLink copied to clipboard!
Abstract
Connection configuration using the generic NMSConnectionFactory class Copy linkLink copied to clipboard!
NMSConnectionFactory class, you can configure an ActiveMQ endpoint as follows:
var cf = new NMSConnectionFactory(
"activemq:tcp://localhost:61616?wireFormat.tightEncodingEnabled=true");
var cf = new NMSConnectionFactory(
"activemq:tcp://localhost:61616?wireFormat.tightEncodingEnabled=true");
Connection configuration using the ActiveMQ ConnectionFactory class Copy linkLink copied to clipboard!
ConnectionFactory class, you can configure an ActiveMQ endpoint as follows:
var cf = new Apache.NMS.ActiveMQ.ConnectionFactory(
"tcp://localhost:61616?wireFormat.tightEncodingEnabled=true");
var cf = new Apache.NMS.ActiveMQ.ConnectionFactory(
"tcp://localhost:61616?wireFormat.tightEncodingEnabled=true");
Protocol variants Copy linkLink copied to clipboard!
|
Option Name
|
Description
|
|---|---|
tcp
|
Uses TCP/IP Sockets to connect to the Broker.
|
ssl
|
Uses TCP/IP Sockets to connect to the Broker with an added SSL layer.
|
discovery
|
Uses The Discovery Transport to find a Broker.
|
failover
|
Uses the Failover Transport to connect and reconnect to one or more Brokers.
|
TCP transport options Copy linkLink copied to clipboard!
tcp transport supports the following options:
|
Option Name
|
Default
|
Description
|
|---|---|---|
transport.useLogging
|
false
|
Log data that is sent across the Transport.
|
transport.receiveBufferSize
|
8192
|
Amount of Data to buffer from the Socket.
|
transport.sendBufferSize
|
8192
|
Amount of Data to buffer before writing to the Socket.
|
transport.receiveTimeout
|
0
|
Time to wait for more data, zero means wait infinitely.
|
transport.sendTimeout
|
0
|
Timeout on sends, 0 means wait forever for completion.
|
transport.requestTimeout
|
0
|
Time to wait before a Request Command is considered to have failed.
|
Failover transport options Copy linkLink copied to clipboard!
failover transport supports the following options:
|
Option Name
|
Default
|
Description
|
|---|---|---|
transport.timeout
|
-1
|
Time that a send operation blocks before failing.
|
transport.initialReconnectDelay
|
10
|
Time in Milliseconds that the transport waits before attempting to reconnect the first time.
|
transport.maxReconnectDelay
|
30000
|
The max time in Milliseconds that the transport will wait before attempting to reconnect.
|
transport.backOffMultiplier
|
2
|
The amount by which the reconnect delay will be multiplied by if useExponentialBackOff is enabled.
|
transport.useExponentialBackOff
|
true
|
Should the delay between connection attempt grow on each try up to the max reconnect delay.
|
transport.randomize
|
true
|
Should the Uri to connect to be chosen at random from the list of available Uris.
|
transport.maxReconnectAttempts
|
0
|
Maximum number of time the transport will attempt to reconnect before failing (0 means infinite retries)
|
transport.startupMaxReconnectAttempts
|
0
|
Maximum number of time the transport will attempt to reconnect before failing when there has never been a connection made. (0 means infinite retries) (included in NMS.ActiveMQ v1.5.0+)
|
transport.reconnectDelay
|
10
|
The delay in milliseconds that the transport waits before attempting a reconnection.
|
transport.backup
|
false
|
Should the Failover transport maintain hot backups.
|
transport.backupPoolSize
|
1
|
If enabled, how many hot backup connections are made.
|
transport.trackMessages
|
false
|
keep a cache of in-flight messages that will flushed to a broker on reconnect
|
transport.maxCacheSize
|
256
|
Number of messages that are cached if trackMessages is enabled.
|
transport.updateURIsSupported
|
true
|
Update the list of known brokers based on BrokerInfo messages sent to the client.
|
Connection Options Copy linkLink copied to clipboard!
connection. prefix or the nms. prefix (in a similar way to the Java client's jms. prefixed settings).
|
Option Name
|
Default
|
Description
|
|---|---|---|
connection.AsyncSend
|
false
|
Are message sent Asynchronously.
|
connection.AsyncClose
|
true
|
Should the close command be sent Asynchronously
|
connection.AlwaysSyncSend
|
false
|
Causes all messages a Producer sends to be sent Asynchronously.
|
connection.CopyMessageOnSend
|
true
|
Copies the Message objects a Producer sends so that the client can reuse Message objects without affecting an in-flight message.
|
connection.ProducerWindowSize
|
0
|
The ProducerWindowSize is the maximum number of bytes in memory that a producer will transmit to a broker before waiting for acknowledgement messages from the broker that it has accepted the previously sent messages. In other words, this how you configure the producer flow control window that is used for async sends where the client is responsible for managing memory usage. The default value of 0 means no flow control at the client. See also Producer Flow Control
|
connection.useCompression
|
false
|
Should message bodies be compressed before being sent.
|
connection.sendAcksAsync
|
false
|
Should message acks be sent asynchronously
|
connection.messagePrioritySupported
|
true
|
Should messages be delivered to the client based on the value of the Message Priority header.
|
connection.dispatchAsync
|
false
|
Should the broker dispatch messages asynchronously to the connection's consumers.
|
connection.watchTopicAdvisories
|
true
|
Should the client watch for advisory messages from the broker to track the creation and deletion of temporary destinations.
|
OpenWire options Copy linkLink copied to clipboard!
|
Option Name
|
Default
|
Description
|
|---|---|---|
wireFormat.stackTraceEnabled
|
false
|
Should the stack trace of exception that occur on the broker be sent to the client? Only used by openwire protocol.
|
wireFormat.cacheEnabled
|
false
|
Should commonly repeated values be cached so that less marshalling occurs? Only used by openwire protocol.
|
wireFormat.tcpNoDelayEnabled
|
false
|
Does not affect the wire format, but provides a hint to the peer that TCP nodelay should be enabled on the communications Socket. Only used by openwire protocol.
|
wireFormat.sizePrefixDisabled
|
false
|
Should serialized messages include a payload length prefix? Only used by openwire protocol.
|
wireFormat.tightEncodingEnabled
|
false
|
Should wire size be optimized over CPU usage? Only used by the openwire protocol.
|
wireFormat.maxInactivityDuration
|
30000
|
The maximum inactivity duration (before which the socket is considered dead) in milliseconds. On some platforms it can take a long time for a socket to appear to die, so we allow the broker to kill connections if they are inactive for a period of time. Use by some transports to enable a keep alive heart beat feature. Set to a value <= 0 to disable inactivity monitoring.
|
maxInactivityDurationInitalDelay
|
10000
|
The initial delay in starting the maximum inactivity checks (and, yes, the word 'Inital' is supposed to be misspelled like that)
|
Destination configuration Copy linkLink copied to clipboard!
d = session.CreateTopic("com.foo?consumer.prefetchSize=2000&consumer.noLocal=true");
d = session.CreateTopic("com.foo?consumer.prefetchSize=2000&consumer.noLocal=true");
General options Copy linkLink copied to clipboard!
|
Option Name
|
Default
|
Description
|
|---|---|---|
consumer.prefetchSize
|
1000
|
The number of message the consumer will prefetch.
|
consumer.maximumPendingMessageLimit
|
0
|
Use to control if messages are dropped if a slow consumer situation exists.
|
consumer.noLocal
|
false
|
Same as the noLocal flag on a Topic consumer. Exposed here so that it can be used with a queue.
|
consumer.dispatchAsync
|
false
|
Should the broker dispatch messages asynchronously to the consumer.
|
consumer.retroactive
|
false
|
Is this a Retroactive Consumer.
|
consumer.selector
|
null
|
JMS Selector used with the consumer.
|
consumer.exclusive
|
false
|
Is this an Exclusive Consumer.
|
consumer.priority
|
0
|
Allows you to configure a Consumer Priority.
|
OpenWire specific options Copy linkLink copied to clipboard!
|
Option Name
|
Default
|
Description
|
|---|---|---|
consumer.browser
|
false
|
|
consumer.networkSubscription
|
false
|
|
consumer.optimizedAcknowledge
|
false
|
Enables an optimised acknowledgement mode where messages are acknowledged in batches rather than individually. Alternatively, you could use
Session.DUPS_OK_ACKNOWLEDGE acknowledgement mode for the consumers which can often be faster. WARNING: enabling this issue could cause some issues with auto-acknowledgement on reconnection
|
consumer.noRangeAcks
|
false
|
|
consumer.retroactive
|
false
|
Sets whether or not retroactive consumers are enabled. Retroactive consumers allow non-durable topic subscribers to receive old messages that were published before the non-durable subscriber started.
|
2.6. Stomp Heartbeats Copy linkLink copied to clipboard!
Abstract
Stomp 1.1 heartbeats Copy linkLink copied to clipboard!
CONNECT heart-beat:CltSend,CltRecv CONNECTED: heart-beat:SrvSend,SrvRecv
CONNECT
heart-beat:CltSend,CltRecv
CONNECTED:
heart-beat:SrvSend,SrvRecv
- CltSend
- Indicates the minimum frequency of messages sent from the client, expressed as the maximum time between messages in units of milliseconds. If the client does not send a regular Stomp message within this time limit, it must send a special heartbeat message, in order to keep the connection alive.A value of zero indicates that the client does not send heartbeats.
- CltRecv
- Indicates how often the client expects to receive message from the server, expressed as the maximum time between messages in units of milliseconds. If the client does not receive any messages from the server within this time limit, it would time out the connection.A value of zero indicates that the client does not expect heartbeats and will not time out the connection.
- SrvSend
- Indicates the minimum frequency of messages sent from the server, expressed as the maximum time between messages in units of milliseconds. If the server does not send a regular Stomp message within this time limit, it must send a special heartbeat message, in order to keep the connection alive.A value of zero indicates that the server does not send heartbeats.
- SrvRecv
- Indicates how often the server expects to receive message from the client, expressed as the maximum time between messages in units of milliseconds. If the server does not receive any messages from the client within this time limit, it would time out the connection.A value of zero indicates that the server does not expect heartbeats and will not time out the connection.
Stomp 1.0 heartbeat compatibility Copy linkLink copied to clipboard!
transport.defaultHeartBeat option in the broker's transportConnector element, as follows:
<transportConnector name="stomp" uri="stomp://0.0.0.0:0?transport.defaultHeartBeat=5000,0" />
<transportConnector name="stomp" uri="stomp://0.0.0.0:0?transport.defaultHeartBeat=5000,0" />
CONNECT heart-beat:5000,0
CONNECT
heart-beat:5000,0
0, indicates that the client does not expect to receive any heartbeats from the server (which makes sense, because Stomp 1.0 clients do not understand heartbeats).
transport.defaultHeartBeat such that the connection will stay alive, as long as the Stomp 1.0 clients are sending messages at their normal rate.
2.7. Stomp Composite Destinations Copy linkLink copied to clipboard!
Abstract
Specify Composite Destinatons for Stomp in A-MQ Copy linkLink copied to clipboard!
(dest-type/dest-name01,dest-type/dest-name02,dest-type/dest-name79)
(dest-type/dest-name01,dest-type/dest-name02,dest-type/dest-name79)
(topic/test01,topic/test02,topic/test15A)
(topic/test01,topic/test02,topic/test15A)
(queue/queuename01,queue/queuename02,queue/queuename31C)
(queue/queuename01,queue/queuename02,queue/queuename31C)
2.8. Intra-JVM Connections Copy linkLink copied to clipboard!
Abstract
Overview Copy linkLink copied to clipboard!
Figure 2.1. Clients Connected through the VM Transport
Embedded brokers Copy linkLink copied to clipboard!
- explicitly defining the broker in the application's configuration
- explicitly creating the broker using the Java APIs
- automatically when the first client attempts to connect to it using the VM transport
waitForStart option or the create=false option to manage how the VM transport determines when to create a new embedded broker.
Using the VM transport Copy linkLink copied to clipboard!
- simpleThe simple VM URI is used in most situations. It allows you to specify the name of the embedded broker to which the client will connect. It also allows for some basic broker configuration.Example 2.8, “Simple VM URI Syntax” shows the syntax for a simple VM URI.
Example 2.8. Simple VM URI Syntax
vm://BrokerName?TransportOptions
vm://BrokerName?TransportOptionsCopy to Clipboard Copied! Toggle word wrap Toggle overflow - BrokerName specifies the name of the embedded broker to which the client connects.
- TransportOptions specifies the configuration for the transport. They are specified in the form of a query list. For details about the available options see the Connection Reference.ImportantThe broker configuration options specified on the VM URI are only meaningful if the client is responsible for instantiating the embedded broker. If the embedded broker is already started, the transport will ignore the broker configuration properties.
- advancedThe advanced VM URI provides you full control over how the embedded broker is configured. It uses a broker configuration URI similar to the one used by the administration tool to configure the embedded broker.Example 2.9, “Advanced VM URI Syntax” shows the syntax for an advanced VM URI.
Example 2.9. Advanced VM URI Syntax
vm://(BrokerConfigURI)?TransportOptions
vm://(BrokerConfigURI)?TransportOptionsCopy to Clipboard Copied! Toggle word wrap Toggle overflow - BrokerConfigURI is a broker configuration URI.
- TransportOptions specifies the configuration for the transport. They are specified in the form of a query list. For details about the available options see the Connection Reference.
Examples Copy linkLink copied to clipboard!
broker1.
Example 2.10. Basic VM URI
vm://broker1
vm://broker1
Example 2.11. Simple URI with broker options
vm://broker1?broker.persistent=false
vm://broker1?broker.persistent=false
Example 2.12. Advanced VM URI
vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false
vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false
2.9. Peer Protocol Copy linkLink copied to clipboard!
Abstract
Overview Copy linkLink copied to clipboard!
Figure 2.2. Peer Protocol Endpoints with Embedded Brokers
broker1, by connecting to the local VM endpoint, vm://broker1. The embedded brokers, broker1 and broker2, are linked together using a network connector which allows messages to flow in either direction between the brokers. When the producer sends a message to the queue, broker1 pushes the message across the network connector to broker2. The consumer receives the message from broker2.
Peer endpoint discovery Copy linkLink copied to clipboard!
URI syntax Copy linkLink copied to clipboard!
peer URI must conform to the following syntax:
peer://PeerGroup/BrokerName?BrokerOptions
peer://PeerGroup/BrokerName?BrokerOptions
Sample URI Copy linkLink copied to clipboard!
groupA, and creates an embedded broker with broker name, broker1:
peer://groupA/broker1?persistent=false
peer://groupA/broker1?persistent=false
2.10. Message Prefetch Behavior Copy linkLink copied to clipboard!
Overview Copy linkLink copied to clipboard!
Figure 2.3. Consumer Prefetch Limit
Consumer specific prefetch limits Copy linkLink copied to clipboard!
| Consumer Type | Property | Default |
|---|---|---|
| Queue consumer | queuePrefetch | 1000 |
| Queue browser | queueBrowserPrefetch | 500 |
| Topic consumer | topicPrefetch | 32766 |
| Durable topic subscriber | durableTopicPrefetch | 100 |
Setting prefetch limits per broker Copy linkLink copied to clipboard!
destinationPolicy element as a child of the broker element in the broker's configuration, as shown in Example 2.13, “Configuring a Destination Policy”.
Example 2.13. Configuring a Destination Policy
queue. is set to 1 (the > character is a wildcard symbol that matches one or more name segments); and the topic prefetch limit for all topics whose names start with topic. is set to 1000.
Setting prefetch limits per connection Copy linkLink copied to clipboard!
ActiveMQConnectionFactory instance. Example 2.14, “Setting Prefetch Limit Properties Per Connection” shows how to specify the prefetch limits for all consumer types on a connection factory.
Example 2.14. Setting Prefetch Limit Properties Per Connection
Setting prefetch limits per destination Copy linkLink copied to clipboard!
TEST.QUEUE with a prefetch limit of 10. The option is set as a destination option as part of the URI used to create the queue.
Example 2.15. Setting the Prefetch Limit on a Destination
Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
MessageConsumer consumer = session.createConsumer(queue);
Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
MessageConsumer consumer = session.createConsumer(queue);
Disabling the prefetch extension logic Copy linkLink copied to clipboard!
Example 2.16. Disabling the Prefetch Extension
2.11. Message Redelivery Copy linkLink copied to clipboard!
Overview Copy linkLink copied to clipboard!
- A transacted session is used and
rollback()is called. - A transacted session is closed before commit is called.
- A session is using
CLIENT_ACKNOWLEDGEandSession.recover()is called.
- On the broker, using the broker's redelivery plug-in,
- On the connection factory, using the connection URI,
- On the connection, using the
RedeliveryPolicy, - On destinations, using the connection's
RedeliveryPolicyMap.
Redelivery properties Copy linkLink copied to clipboard!
| Option | Default | Description |
|---|---|---|
collisionAvoidanceFactor | 0.15 | Specifies the percentage of range of collision avoidance. |
maximumRedeliveries | 6 | Specifies the maximum number of times a message will be redelivered before it is considered a poisoned pill and returned to the broker so it can go to a dead letter queue. -1 specifies an infinite number of redeliveries. |
maximumRedeliveryDelay | -1 | Specifies the maximum delivery delay that will be applied if the useExponentialBackOff option is set. -1 specifies that no maximum be applied. |
initialRedeliveryDelay | 1000 | Specifies the initial redelivery delay in milliseconds. |
redeliveryDelay | 1000 | Specifies the delivery delay, in milliseconds, if initialRedeliveryDelay is 0. |
useCollisionAvoidance | false | Specifies if the redelivery policy uses collision avoidance. |
useExponentialBackOff | false | Specifies if the redelivery time out should be increased exponentially. |
backOffMultiplier | 5 | Specifies the back-off multiplier. |
Configuring the broker's redelivery plug-in Copy linkLink copied to clipboard!
maximumRedeliveries to 0 on the destination).
redeliveryPlugin element. As shown in the following example this element is a child of the broker's plugins element and contains a policy map defining the desired behavior.
Example 2.17. Configuring the Redelivery Plug-In
- 1
- The
redeliveryPolicyEntrieselement contains a list ofredeliveryPolicyelements that configures redelivery policies on a per-destination basis. - 2
- The
defaultEntryelement contains a singleredeliveryPolicyelement that configures the redelivery policy used by all destinations that do not match the one with a specific policy.
Configuring the redelivery using the broker URI Copy linkLink copied to clipboard!
Example 2.18. Setting the Redelivery Policy using a Connection URI
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=4");
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=4");
Setting the redelivery policy on a connection Copy linkLink copied to clipboard!
ActiveMQConnection class' getRedeliveryPolicy() method allows you to configure the redelivery policy for all consumer's using that connection.
getRedeliveryPolicy() returns a RedeliveryPolicy object that controls the redelivery policy for the connection. The RedeliveryPolicy object has setters for each of the properties listed in Table 2.3, “Redelivery Policy Options”.
Example 2.19. Setting the Redelivery Policy for a Connection
Setting the redelivery policy on a destination Copy linkLink copied to clipboard!
ActiveMQConnection class' getRedeliveryPolicyMap() method returns a RedeliveryPolicyMap object that is a map of RedeliveryPolicy objects with destination names as the key.
RedeliveryPolicy object controls the redelivery policy for all destinations whose name match the destination name specified in the map's key.
FRED.JOE can only be redelivered 4 times.
Example 2.20. Setting the Redelivery Policy for a Destination
Chapter 3. AMQP 1.0 Client APIs Copy linkLink copied to clipboard!
Abstract
3.1. Introduction to AMQP Copy linkLink copied to clipboard!
What is AMQP? Copy linkLink copied to clipboard!
- Open standard (defined by the OASIS AMQP Technical Committee)
- Defines a wire protocol
- Defines APIs for multiple languages (C++, Java)
- Interoperability between different AMQP implementations
JMS is an API Copy linkLink copied to clipboard!
AMQP is a wire protocol Copy linkLink copied to clipboard!
AMQP-to-JMS requires message conversion Copy linkLink copied to clipboard!
3.2. JMS AMQP 1.0 Client API Copy linkLink copied to clipboard!
3.2.1. Getting Started with AMQP Copy linkLink copied to clipboard!
Getting started with AMQP Copy linkLink copied to clipboard!
- Configure the broker to use AMQP—to enable AMQP in the broker, add an AMQP endpoint to the broker's configuration. This implicitly activates the broker's AMQP integration, ensuring that incoming messages are converted from AMQP message format to JMS message format, as required.
- Implement the AMQP clients—the AMQP clients are based on the Apache Qpid JMS client libraries.
3.2.2. Configuring the Broker for AMQP Copy linkLink copied to clipboard!
Overview Copy linkLink copied to clipboard!
- Make sure that you have appropriate user entries in the
etc/users.propertiesfile, so that the AMQP clients will be able to log on to the broker. - Add an AMQP endpoint to the broker (by inserting a
transportConnectorelement into the broker's XML configuration).
Steps to configure the broker Copy linkLink copied to clipboard!
- This example assumes that you are working with a fresh install of a standalone JBoss A-MQ broker, InstallDir.
- Define a JAAS user for the AMQP clients, so that the AMQP clients can authenticate themselves to the broker using JAAS security (security is enabled by default in the broker). Edit the
InstallDir/etc/users.propertiesfile and add a new user entry, as follows:Copy to Clipboard Copied! Toggle word wrap Toggle overflow At this point, you can add entries for any other secure users you want. In particular, it is advisable to have at least one user with theadminrole, so that you can log into the secure container remotely (remembering to choose a secure password for the admin user).NoteTo avoid authentication issue, include the user guest in the list ofauthorizationEntriesforjaasAuthenticationPlugininactivemq.xml - Add an AMQP endpoint to the broker configuration. Edit the broker configuration file,
InstallDir/etc/activemq.xml. As shown in the following XML fragment, add the highlightedtransportConnectorelement as a child of thetransportConnectorselement in the broker configuration:Copy to Clipboard Copied! Toggle word wrap Toggle overflow - To start the broker, open a new command prompt, change directory to
InstallDir/bin, and enter the following command:./amq
./amqCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Message conversion Copy linkLink copied to clipboard!
Reference Copy linkLink copied to clipboard!
3.2.3. AMQP Example Clients Copy linkLink copied to clipboard!
Overview Copy linkLink copied to clipboard!
Prerequisites Copy linkLink copied to clipboard!
Steps to implement and run the AMQP clients Copy linkLink copied to clipboard!
- At any convenient location, download and extract the qpid-jms code for example
examples, to hold the example code:mkdir example
mkdir exampleCopy to Clipboard Copied! Toggle word wrap Toggle overflow - The extracted files should have the following directory structure for the
examplesproject:Copy to Clipboard Copied! Toggle word wrap Toggle overflow - On the console, run the command mvn clean package dependency:copy-dependencies -DincludeScope=runtime -DskipTestsAfter building the code (and downloading any packages required by Maven), if the build is successful, you should see output like the following in the console window:
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Run the following java command:
java -cp "target/classes/:target/dependency/*" org.apache.qpid.jms.example.HelloWorld
java -cp "target/classes/:target/dependency/*" org.apache.qpid.jms.example.HelloWorldCopy to Clipboard Copied! Toggle word wrap Toggle overflow After building the code, this target proceeds to run the consumer client, which reads messages from thequeuequeue. You should see output like the following in the console window:Hello world!
Hello world!Copy to Clipboard Copied! Toggle word wrap Toggle overflow
A Simple Messaging Program in Java JMS Copy linkLink copied to clipboard!
Example 3.1. "Hello world!" in Java
- 1
- Creates the JNDI initial context.
- 2
- Creates a JMS connection factory for Qpid.
- 3
- Creates a JMS connection.
- 4
- Creates a session. This session is not transactional (transactions='false'), and messages are automatically acknowledged.
- 5
- Creates a producer that sends messages to the topic exchange.
- 6
- Creates a consumer that reads messages from the topic exchange.
- 7
- Reads the next available message.
- 8
- Closes the connection, all sessions managed by the connection, and all senders and receivers managed by each session.
Example 3.2. JNDI Properties File for "Hello world!" example
- 1
- Defines a connection factory from which connections can be created. The syntax of a ConnectionURL is given in the section called “Apache Qpid JMS Client Configuration”.
- 2
- Defines a destination for which MessageProducers and/or MessageConsumers can be created to send and receive messages. The value for the destination in the properties file is an address string. In the JMS implementation MessageProducers are analogous to senders in the Qpid Message API, and MessageConsumers are analogous to receivers.
Apache Qpid JMS Client Configuration Copy linkLink copied to clipboard!
Configuring a JNDI InitialContext Copy linkLink copied to clipboard!
org.apache.qpid.jms.jndi.JmsInitialContextFactory. You can configure JNDI InitialContext in three ways.
- Using
jndi.propertiesfile on the Java Classpath.To configure JNDI InitialContext using the properties file, Include the filejndi.propertieson the Classpath and set thejava.naming.factory.initialproperty to valueorg.apache.qpid.jms.jndi.JmsInitialContextFactory. The Qpid InitialContextFactory implementation is discovered while instantiating InitialContext object.javax.naming.Context ctx = new javax.naming.InitialContext();The ConnectionFactory, Queue and Topic objects contained in the context are configured using properties, either directly within thejndi.propertiesfile, or in a separate file which is referenced injndi.propertiesusing thejava.naming.provider.urlproperty. - Using system properties.To configure JNDI InitialContext using the system properties, set the
java.naming.factory.initialto valueorg.apache.qpid.jms.jndi.JmsInitialContextFactory. The Qpid InitialContextFactory implementation is discovered while instantiating InitialContext object.javax.naming.Context ctx = new javax.naming.InitialContext();The ConnectionFactory, Queue and Topic objects contained in the context are configured using properties, which is passed using thejava.naming.provider.urlsystem property. - Programmatically using an environment Hashtable.The InitialContext can be configured by passing an environment during creation:
Hashtable<Object, Object> env = new Hashtable<Object, Object>(); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); javax.naming.Context context = new javax.naming.InitialContext(env);
Hashtable<Object, Object> env = new Hashtable<Object, Object>(); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); javax.naming.Context context = new javax.naming.InitialContext(env);Copy to Clipboard Copied! Toggle word wrap Toggle overflow The ConnectionFactory, Queue and Topic objects contained in the context are configured using properties, either directly within the environment Hashtable or a seperate file which is referenced using thejava.naming.provider.urlproperty within the environment Hashtable.
Syntax of the Properties file Copy linkLink copied to clipboard!
- For ConnectionFactory, use
connectionfactory.lookupName = URI, for example,connectionfactory.myFactoryLookup = amqp://localhost:5672 - For a Queue, use
queue.lookupName = queueName, for example,queue.myQueueLookup = queueA - For a Topic, use
topic.lookupName = topicName,for example,topic.myTopicLookup = topicA
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
Queue queue = (Queue) context.lookup("myQueueLookup");
Topic topic = (Topic) context.lookup("myTopicLookup");
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
Queue queue = (Queue) context.lookup("myQueueLookup");
Topic topic = (Topic) context.lookup("myTopicLookup");
Connection URI Copy linkLink copied to clipboard!
amqp://hostname:port[?option=value[&option2=value...]]
amqp://hostname:port[?option=value[&option2=value...]]
JMS Configuration options Copy linkLink copied to clipboard!
|
Option Name
|
Description
|
|---|---|
jms.username
|
User name value used to authenticate the connection
|
jms.password
|
Password value used to authenticate the connection.
|
jms.clientID
|
The ClientID value that is applied to the connection.
|
jms.forceAsyncSend
|
Configures whether all Messages sent from a MessageProducer are sent asynchronously or only those Message that qualify such as Messages inside a transaction or non-persistent messages.
|
jms.alwaysSyncSend
|
Override all asynchronous send conditions and always sends every Message from a MessageProducer synchronously.
|
jms.sendAcksAsync
|
Causes all Message acknowledgments to be sent asynchronously.
|
jms.localMessagePriority
|
If enabled prefetched messages are reordered locally based on their given Message priority value. Default value is false
|
jms.localMessageExpiry
|
Controls whether MessageConsumer instances locally filter expired Messages or deliver them. By default this value is set to true and expired messages are filtered
|
jms.validatePropertyNames
|
If message property names should be validated as valid Java identifiers. Default is true.
|
jms.queuePrefix
|
Optional prefix value added to the name of any Queue created from a JMS Session.
|
jms.topicPrefix
|
Optional prefix value added to the name of any Topic created from a JMS Session.
|
jms.closeTimeout
|
Timeout value that controls how long the client waits on Connection close before returning. (By default the client waits 15 seconds for a normal close completion event).
|
jms.connectTimeout
|
Timeout value that controls how long the client waits on Connection establishment before returning with an error. (By default the client waits 15 seconds for a connection to be established before failing).
|
jms.clientIDPrefix
|
Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory. The default prefix is 'ID:'.
|
jms.connectionIDPrefix
|
Optional prefix value that is used for generated Connection ID values when a new Connection is created for the JMS ConnectionFactory. This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing the logs easier. The default prefix is 'ID:'.
|
jms.prefetchPolicy.queuePrefetchdefaults to 1000jms.prefetchPolicy.topicPrefetchdefaults to 1000jms.prefetchPolicy.queueBrowserPrefetchdefaults to 1000jms.prefetchPolicy.durableTopicPrefetchdefaults to 1000jms.prefetchPolicy.allused to set all prefetch values at once.
jms.redeliveryPolicy.maxRedeliveries controls when an incoming message is rejected based on the number of times it has been redelivered, the default value is (-1) disabled. A value of zero would indicate no message redeliveries are accepted, a value of five would allow a message to be redelivered five times, and so on.
TCP Transport Configuration options Copy linkLink copied to clipboard!
amqp://localhost:5672?jms.clientID=foo&transport.connectTimeout=30000
amqp://localhost:5672?jms.clientID=foo&transport.connectTimeout=30000
|
Option Name
|
Default Value
|
|---|---|
transport.sendBufferSize
|
64k
|
transport.receiveBufferSize
|
64k
|
transport.trafficClass
|
10
|
transport.connectTimeout
|
60 secs
|
transport.soTimeout
|
-1
|
transport.soLinger
|
-1
|
transport.tcpKeepAlive
|
false
|
transport.tcpNoDelay
|
true
|
SSL Transport Configuration options Copy linkLink copied to clipboard!
amqps://localhost:5673
amqps://localhost:5673
transport.keyStoreLocation
|
default is to read from the system property
javax.net.ssl.keyStore.
|
transport.keyStorePassword
|
default is to read from the system property
javax.net.ssl.keyStorePassword.
|
transport.trustStoreLocation
|
default is to read from the system property
javax.net.ssl.trustStore.
|
transport.trustStorePassword
|
default is to read from the system property
javax.net.ssl.trustStorePassword.
|
transport.storeType
|
The type of trust store being used. Default is "JKS".
|
transport.contextProtocol
|
The protocol argument used when getting an SSLContext. Default is "TLS".
|
transport.enabledCipherSuites
|
The cipher suites to enable, comma separated. The context default ciphers are used. Any disabled ciphers are removed.
|
transport.disabledCipherSuites
|
The cipher suites to disable, comma separated. Ciphers listed here are removed from the enabled ciphers. No default.
|
transport.enabledProtocols
|
The protocols to enable, comma separated, the context default protocols are used. Any disabled protocols are removed.
|
transport.disabledProtocols
|
The protocols to disable, comma separated. Protocols listed here are removed from the enabled protocols. Default is "SSLv2Hello,SSLv3".
|
transport.trustAll
|
Whether to trust the provided server certificate implicitly, regardless of any configured trust store. Defaults to false.
|
transport.verifyHost
|
Whether to verify that the hostname being connected to matches with the provided server certificate. Defaults to true.
|
transport.keyAlias
|
The alias to use when selecting a keypair from the keystore to send a client certificate to the server. No default.
|
Failover Configuration options Copy linkLink copied to clipboard!
jms.* options are applied to the overall failover URI, outside the parentheses, and affect the JMS Connection object for its lifetime.
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.maxReconnectAttempts=20
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.maxReconnectAttempts=20
transport. or amqp. options defined earlier, with these being applied as each host is connected to:
failover:(amqp://host1:5672?amqp.option=value,amqp://host2:5672?transport.option=value)?jms.clientID=foo
failover:(amqp://host1:5672?amqp.option=value,amqp://host2:5672?transport.option=value)?jms.clientID=foo
failover.initialReconnectDelay
|
The amount of time the client will wait before the first attempt to reconnect to a remote peer. The default value is zero.
|
failover.reconnectDelay
|
Controls the delay between successive reconnection attempts, defaults to 10 milliseconds. If the backoff option is not enabled this value remains constant.
|
failover.maxReconnectDelay
|
The maximum time that the client will wait before attempting a reconnect. This value is used when the backoff feature is enabled to ensure that the delay is not too long. Defaults to 30 seconds.
|
failover.useReconnectBackOff
|
Controls whether the time between reconnection attempts should grow based on a configured multiplier. Defaults value is true.
|
failover.reconnectBackOffMultiplier
|
The multiplier used to grow the reconnection delay value, defaults to 2.0d.
|
failover.maxReconnectAttempts
|
The number of reconnection attempts allowed before reporting the connection as failed to the client. The default is no limit or (-1).
|
failover.startupMaxReconnectAttempts
|
For a client that has never connected to a remote peer. This option controls the number of attempts made to connect before reporting the connection as failed. The default value is maxReconnectAttempts.
|
failover.warnAfterReconnectAttempts
|
Controls how often the client logs a message indicating that failover reconnection is being attempted. The default is to log every 10 connection attempts.
|
transport.enabledProtocols
|
The protocols to enable, the values are comma separated and the context default protocols are used. Any disabled protocols are removed.
|
transport. and amqp. URI options outlined earlier for a non-failover broker URI but prefixed with failover.nested.
amqp.vhost option to every broker connected to you might have a URI like:
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.nested.amqp.vhost=myhost
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.nested.amqp.vhost=myhost
AMQP Configuration options Copy linkLink copied to clipboard!
amqp.idleTimeout: The idle timeout in milliseconds, the connection fails if the peer sends no AMQP frames. Default value 60000.amqp.vhost: The vhost to connect to. Used to populate the Sasl and Open hostname fields. Default is the main hostname from the Connection URI.amqp.saslLayer: Controls whether connections should use a SASL layer or not. Default is true.amqp.saslMechanisms: Which SASL mechanism(s) the client should allow selection of, if offered by the server and usable with the configured credentials. Comma separated if specifying more than 1 mechanism. Default is to allow selection from all the clients supported mechanisms, which are currently EXTERNAL, CRAM-MD5, PLAIN, and ANONYMOUS.amqp.maxFrameSize: The max-frame-size value in bytes that is advertised to the peer. Default is 1048576.
Discovery Configuration options Copy linkLink copied to clipboard!
discovery:(<agent-uri>)?discovery.maxReconnectAttempts=20&discovery.discovered.jms.clientID=foo
discovery:(<agent-uri>)?discovery.maxReconnectAttempts=20&discovery.discovered.jms.clientID=foo
discovery:(file:///path/to/monitored-file?updateInterval=60000)
updateInterval: Controls the frequency in milliseconds which the file is inspected for change. The default value is 30000.
discovery:(multicast://default?group=default)
discovery:(multicast://default?group=default)
group: Controls which multicast group messages are listened for on. The default value is "default".
JMS Client Logging Copy linkLink copied to clipboard!
org.apache.qpid.jms.
- By setting the environment variable
PN_TRACE_FRMto true, which enables Proton to emit frame logging to stdout. - Add the option
amqp.traceFrames=trueto the connection URI. This enables the client to add a protocol tracer to Proton, and configure theorg.apache.qpid.jms.provider.amqp.FRAMESLogger to TRACE level to include the output in the logs.
3.3. .NET AMQP 1.0 Client API Copy linkLink copied to clipboard!
3.3.1. Getting Started with .NET AMQP 1.0 Client API Copy linkLink copied to clipboard!
3.3.1.1. Introduction to .NET AMQP 1.0 Client API Copy linkLink copied to clipboard!
What is AMQPNet.Lite? Copy linkLink copied to clipboard!
Hardware and Software Requirements to setup AMQPNet.Lite SDK Copy linkLink copied to clipboard!
- Visual Studio version 2012 or 2013
- .NET Framework support for Common Language Runtime (CLR) versions 2.0 and 4.
- Windows Desktop machine
Installing the SDK Copy linkLink copied to clipboard!
amqpnetlite-sdk-1.1.0.2.zip file in a directory on a windows machine.
Contents of the AMQPNet.Lite SDK Copy linkLink copied to clipboard!
- The pre-compiled binary (.dll) files and the associated debug program database (.pdb) files.
- Source files of examples which demonstrate using this SDK and AMQP.
- Amqpnetlite API documentation,see
InstallDir\doc\html\index.html
AMQPNet.Lite Examples Copy linkLink copied to clipboard!
- HelloWorld-simple Minimal send-receive through brokered topic.
- HelloWorld-robust Send-receive with more features.
- Interop.Drain, Interop.Spout-Interoperate with Apache Qpid using simple send and receive.
- Interop.Client, Interop.Server - Interoperate with Apache Qpid C++ Messaging using request and response.
- PeertoPeer - Client and Server programs illustrate using the Amqpnetlite library to create peer-to-peer connections without using an intermediate broker system.
- Receive Selector - Receive messages matching filter criteria
- Anonymous Relay - Like Interop.Client but detects and uses peer ANONYMOUS-RELAY capability for sending all messages over a single link, regardless of destination address.
InstallDir/amqpnetlite/Examples.
3.3.1.2. A Simple Messaging Program in AMQPNet.Lite Copy linkLink copied to clipboard!
Building the Example Copy linkLink copied to clipboard!
InstallDiramqpnetlite/amqp.sln and InstallDiramqpnetlite/amqp-vs2012.sln. The amqp.sln is the project file for Visual Studio 2013 solution and the amqp-vs2012.sln is the project file for Visual Studio 2012 solution.
- Go to the directory where you extracted the SDK, open
amqp.slnsolution file with Visual Studio 2013 - In the Solution Explorer window, you can view all the examples.
- To build the examples, click BUILD icon.
- The Output window shows the build status.
Running the Example Copy linkLink copied to clipboard!
- On the terminal, navigate to the directory where SDK is installed, i.e
InstallDir/amqpnetlite/bin/Debug - On the command prompt enter the Helloworld-simple.exeBy default, this program connects to a broker running on localhost:5672. You can specify a host and port, and the AMQP endpoint address explicitly on the command line:, for example HelloWorld_simple amqp://localhost:5672 amq.topicBy default, this program addresses its messages to
amq.topic. In Amqp brokers amq.topic is a predefined endpoint address and is immediately available with no broker configuration. - On Success, you can see the output on the DOS prompt as: HelloWorld!
- In Visual Studio 2013 Solution Explorer window, right-click on Helloworld-simple example.
- Select Set as Startup Project option from the panel.
- In Solution Explorer window, click on
HelloWorld-simple.csfile and open the source codeCopy to Clipboard Copied! Toggle word wrap Toggle overflow - Insert a breakpoint at the last line in the source file at
connection.Close();,and click - On success, you can see the output on the console window as
Hello World!
3.4. Python AMQP 1.0 Client API Copy linkLink copied to clipboard!
3.4.1. Getting Started with Qpid Proton Python Client Copy linkLink copied to clipboard!
3.4.1.1. Introduction to Qpid Proton Python Client Copy linkLink copied to clipboard!
What is Qpid Proton Copy linkLink copied to clipboard!
Introduction to the Qpid Proton Python Client Copy linkLink copied to clipboard!
Container(name_of_a_handler_class(arguments)).run()
- Where the logic of the application is defined as a class handling particular events. A Container instance is created and passed an instance of that handler class.The call to
run()gives control to the Container, which performs the necessary IO operations and informs the handler of the events.Therun()returns when there is nothing to do.
3.4.2. Python Client Tutorials with examples Copy linkLink copied to clipboard!
A Simple Sending and Receiving Program in Python Copy linkLink copied to clipboard!
at-least-once guarantee, since each message should eventually be received at least once, though a given message may be received more than once.
sent and total, where, sent keeps track of the number of messages that are send and total maintains a count of number of messages to send.
Example 3.3. Sending reliable messages
- 1
On_start()method is called when the Container first starts to run.In this example it is used to establish a sending link over which the messages are transferred.- 2
On_sendable()method is called to known when the messages can be transferred.The callback checks that the sender has credit before sending messages and if the sender has already sent the required number of messages.NoteAMQP provides flow control allowing any component receiving messages to avoid being overwhelmed by the number of messages it is sent. In this example messages are sent when the broker has enabled their flow.- 3
Send()is an asynchronous call. The return of the call does not indicate that the messages has been transferred yet.- 4
on_accepted()notifies if the amq broker has received and accepted the message.In this example, we use this event to track the confirmation of the messages sent. The connection closes and exits when the amq broker has received all the messages.NoteTheon_accepted()call will be made by the Container when the amq broker accepts the message and not the receiving client.- 5
- Resets the sent count to reflect the confirmed messages. The library automatically reconnects to the sender and hence when the sender is ready, it can restart sending the remaining messages
Example 3.4. Receiving reliable messages
examples queue on a broker accessible on port 5672 on localhost. The program simply prints the body of the received messages.
- 1
On_start()method is called when the Container first starts to run.In this example it is used to establish a receiving link over which the messages are transferred.- 2
On_message()method is called when a message is received. It simply prints the messagesIn this example, the amq broker waits for a certain number of messages before closing and exiting the connection. The method checks for duplicate messages and ignores them. The logic to ignore duplicates is implement using the sequential id scheme
Sending and Receiving Program using SSL in Python Copy linkLink copied to clipboard!
SSL Configuration Copy linkLink copied to clipboard!
SSL settings for A-MQ to run Qpid Python client
- Generate pem trust certificate for Qpid Python client
keytool -importkeystore -srckeystore broker.ks -srcalias broker \ -srcstoretype JKS -deststoretype PKCS12 -destkeystore broker.pkcs12 \ -srcstorepass ${general_passwd} -deststorepass ${general_passwd} openssl pkcs12 -in broker.pkcs12 -out broker_cert.pem \ -passin pass:${general_passwd} -passout pass:${general_passwd}keytool -importkeystore -srckeystore broker.ks -srcalias broker \ -srcstoretype JKS -deststoretype PKCS12 -destkeystore broker.pkcs12 \ -srcstorepass ${general_passwd} -deststorepass ${general_passwd} openssl pkcs12 -in broker.pkcs12 -out broker_cert.pem \ -passin pass:${general_passwd} -passout pass:${general_passwd}Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Adjust A-MQ broker to use the certificate by modifying the A-MQ environment
sed -i '/KARAF_OPTS/d' ${A_MQ_HOME}/bin/setenvsed -i '/KARAF_OPTS/d' ${A_MQ_HOME}/bin/setenvCopy to Clipboard Copied! Toggle word wrap Toggle overflow where,A_MQ_HOMEis the installation path of the amq broker executable file.echo "export KARAF_OPTS=\"-Djavax.net.ssl.keyStore=${certificates_dir}/broker.ks \ -Djavax.net.ssl.keyStorePassword=${general_passwd}\"" >> ${A_MQ_HOME}/bin/setenvecho "export KARAF_OPTS=\"-Djavax.net.ssl.keyStore=${certificates_dir}/broker.ks \ -Djavax.net.ssl.keyStorePassword=${general_passwd}\"" >> ${A_MQ_HOME}/bin/setenvCopy to Clipboard Copied! Toggle word wrap Toggle overflow - Generate the client certificate
keytool -genkey -alias client -keyalg RSA -keystore client.ks \ -storepass ${general_passwd} -keypass ${general_passwd} \ -dname "O=Client,CN=client" -validity 99999keytool -genkey -alias client -keyalg RSA -keystore client.ks \ -storepass ${general_passwd} -keypass ${general_passwd} \ -dname "O=Client,CN=client" -validity 99999Copy to Clipboard Copied! Toggle word wrap Toggle overflow keytool -export -alias client -keystore client.ks -file client_cert \ -storepass ${general_passwd}keytool -export -alias client -keystore client.ks -file client_cert \ -storepass ${general_passwd}Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Add client certificate as trusted to the broker database
keytool -import -alias client -keystore broker.ts -file client_cert \ -storepass ${general_passwd} -v -trustcacerts -nopromptkeytool -import -alias client -keystore broker.ts -file client_cert \ -storepass ${general_passwd} -v -trustcacerts -nopromptCopy to Clipboard Copied! Toggle word wrap Toggle overflow
SSL certificate and keys settings for Qpid Python client Copy linkLink copied to clipboard!
- Set SLL to prevent the private key and the certificate to be send to output.
openssl pkcs12 -nocerts -in client.pkcs12 -out client_private_key.pem \ -passin pass:${general_passwd} -passout pass:${general_passwd}openssl pkcs12 -nocerts -in client.pkcs12 -out client_private_key.pem \ -passin pass:${general_passwd} -passout pass:${general_passwd}Copy to Clipboard Copied! Toggle word wrap Toggle overflow openssl pkcs12 -nokeys -in client.pkcs12 -out client_cert.pem \ -passin pass:${general_passwd} -passout pass:${general_passwd}openssl pkcs12 -nokeys -in client.pkcs12 -out client_cert.pem \ -passin pass:${general_passwd} -passout pass:${general_passwd}Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Adjust A-MQ broker to use the certificate
sed -i '/KARAF_OPTS/d' ${A_MQ_HOME}/bin/setenvsed -i '/KARAF_OPTS/d' ${A_MQ_HOME}/bin/setenvCopy to Clipboard Copied! Toggle word wrap Toggle overflow echo "export KARAF_OPTS=\"-Djavax.net.ssl.keyStore=${certificates_dir}/broker.ks \ -Djavax.net.ssl.keyStorePassword=${general_passwd} \ -Djavax.net.ssl.trustStore=${certificates_dir}/broker.ts \ -Djavax.net.ssl.trustStorePassword=${general_passwd}\"" >> ${A_MQ_HOME}/bin/setenvecho "export KARAF_OPTS=\"-Djavax.net.ssl.keyStore=${certificates_dir}/broker.ks \ -Djavax.net.ssl.keyStorePassword=${general_passwd} \ -Djavax.net.ssl.trustStore=${certificates_dir}/broker.ts \ -Djavax.net.ssl.trustStorePassword=${general_passwd}\"" >> ${A_MQ_HOME}/bin/setenvCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Example Copy linkLink copied to clipboard!
Example 3.5. Sending reliable messages over a secured connection
- 1
set_trusted_ca_db("/path/to/ca-certificate.pem")call specifies the location of the CA's certificate in pem file formatset_peer_authentication(SSLDomain.VERIFY_PEER)call requests the servers certificate to be verified as valid using the specified CA's public key.To verify if the hostname used matches which the name specified in the servers certificate, replace theVERIFY_PEERmacro toVERIFY_PEER_NAME.NoteEnsure to update the program with the path of the certificates as per your environment before running the example.- 2
set_credentials("/path/to/client-certificate.pem", "/path/to/client-private-key.pem", "client-password")call is used if the client needs to authenticate itself. In such a case, you need to mention the clients public certificate, private key file both in pem format, and also specify the password required for the private keyNoteEnsure to update the program with the path of the client certificate, client private key as per your environment and the correct client-password before running the example.
A Request/Response Server and Client Program Copy linkLink copied to clipboard!
Example 3.6. A simple server program to send responses
- 1
On_message()method performs a lookup at thereply_toaddress on themessageand creates a sender over which the response can be send.In case there are more requests with the samereply_toaddress, the method will store the senders.
Run python client over SSL Copy linkLink copied to clipboard!
- No server and client authentication
./sender.py -b "amqps://$(hostname):5672/examples"
./sender.py -b "amqps://$(hostname):5672/examples"Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Server authentication enabled
./sender.py -b "amqps://$(hostname):5672/examples" --conn-ssl-trust-store<certificates_dir>/broker_cert.pem --conn-ssl-verify-peer --conn-ssl-verify-peer-name
./sender.py -b "amqps://$(hostname):5672/examples" --conn-ssl-trust-store<certificates_dir>/broker_cert.pem --conn-ssl-verify-peer --conn-ssl-verify-peer-nameCopy to Clipboard Copied! Toggle word wrap Toggle overflow - Server and client authentication enabled
./sender.py -b "amqps://$(hostname):5672/examples" --conn-ssl-certificate <certificates_dir>/client-certificate.pem --conn-ssl-private-key <certificates_dir>/client-private-key.pem --conn-ssl-trust-store <certificates_dir>/broker_cert.pem --conn-ssl-verify-peer --conn-ssl-verify-peer-name
./sender.py -b "amqps://$(hostname):5672/examples" --conn-ssl-certificate <certificates_dir>/client-certificate.pem --conn-ssl-private-key <certificates_dir>/client-private-key.pem --conn-ssl-trust-store <certificates_dir>/broker_cert.pem --conn-ssl-verify-peer --conn-ssl-verify-peer-nameCopy to Clipboard Copied! Toggle word wrap Toggle overflow
A simple client program to receive the messages from the Server Copy linkLink copied to clipboard!
examples broker.
Example 3.7. A simple client program to receive the messages from the Server
- 1
On_start()method creates a receiver to receive the responses from the serverIn this example, instead of using the localhost, we set the dynamic option which informs the amq broker that the client is connected to create a temporary address over which it can receive the responses.- 2
On_link_opened()method sends the first requests if the receiving link is setup and confirmed by the brokerHere, we use the address allocated by the broker as the reply_to address of the requests hence the broker needs to confirm that the receiving link is established.
Sending and Receiving using Transactions Copy linkLink copied to clipboard!
TransactionHandler in addition to MessagingHandler as a base class for the handler definition.
Example 3.8. Sending messages using local transactions
- 1
on_transaction_declared()method requests a newtransactionalcontext, passing themselves as the handler for the transaction- 2
on_transaction_declared()method is notified when that context is in place- 5
- When the
on_transaction_committed()method is called the committed count is incremented by the size of the current batch. If the committed count after that is equal to the number of message it was asked to send, it has completed all its work so closes the connection and the program will exit. If not, it starts a new transaction and sets the current batch to 0.The sender tracks the number of messages sent in the current_batch, as well as the number of messages committed . - 3
- Messages are sent when the transaction context has been declared and there is credit. The
send()method of the transaction is invoked, rather than on the sender itself. This ensures that send operation is tied to that transaction context. - 4
- The
current_batchcounter is incremented for each message. When that counter reaches the preconfigured batch size, thecommit()method is called on the transaction.
Example Copy linkLink copied to clipboard!
current_batch and the overall number committed
Example 3.9. Receiving using local transactions
- 1
on_message()method the receiver calls theaccept()method on the transaction object to tie the acceptance to the context. It then increments thecurrent_batch. If thecurrent_batchis now equal to thebatch_size, the receiver calls thecommit()method on the transaction.- 2
on_transaction_declared()method controls the message flow. The receiver uses theflow()method on the receiver to request an equal number of messages that match thebatch_size- 3
- When the
on_transaction_committed()method is called the committed count is incremented, the application then tests whether it has received all expected messages. If all the messages are received, the application exists. If all messages are not received a new transactional context is declared and the batch is reset.
Using a Selector Filter Copy linkLink copied to clipboard!
Example 3.10. Filtering messages using a selector
- 1
on_start()method implements a selector that filters messages based on the message headerWhile creating the receiver, specify the Selector object as an option. The options argument can be a single object or a list.
Sending and Receiving Best-Effort Messages Copy linkLink copied to clipboard!
AtMostOnce to the options keyword arguments to Container.create_sender and Container.create_receiver. For AtMostOnce, the sender settles the message as soon as it sends it. If the connection is lost before the message is received by the receiver, the message will not be delivered. The AtMostOnce link option type is defined in proton.reactors.
Example 3.11. Receiving best-effort messages
- 1
on_start()method uses the AtMostOnce option to receive the unacknowledged messages.If the connection is lost before the message is received by the receiver, the message will not be delivered.
on_accepted method is redundant. There is no distinction between confirmed and sent status and the on_disconnected method is redundant. Any shutdown would be triggered directly after sending.
Example 3.12. Sending best-effort messages
- 1
on_start()method uses the AtMostOnce option to send the unacknowledged messages.
3.5. C++ AMQP 1.0 Client API Copy linkLink copied to clipboard!
3.5.1. Getting Started with C++ AMQP Copy linkLink copied to clipboard!
Introduction to C++ AMQP 1.0 Client API Copy linkLink copied to clipboard!
Downloading A-MQ C++ Client Copy linkLink copied to clipboard!
3.5.2. C++ Example Clients Copy linkLink copied to clipboard!
A Simple Messaging Program in C++ Copy linkLink copied to clipboard!
Example 3.13. Simple Asynchronous Consumer
- 1
- The constructor of the
SimpleAsyncConsumerclass. This constructor allows the user to create an instance of the class that connects to a particular broker and destination. It also identifies the destination as a Queue or a Topic - 2
- The
runConsumermethod creates a Connection to the broker and start a new Session configured with the configured Acknowledgment mode. Once a Session is created a new Consumer can then be created and attached to the configured Destination. To listen asynchronously for new messages theSimpleAsyncConsumerinherits fromcms::MessageListenerso that it can register itself as a Message Listener with theMessageConsumercreated inrunConsumermethod. - 3
- All the messages received by the application are dispatched to the
onMessagemethod and if the message is a TextMessage its contents are printed on the screen.
Example 3.14. A simple Asynchronous producer
- 1
- The
SimpleProducerclass exposes a similar interface to the consumer example Example 3.13, “Simple Asynchronous Consumer”. The constructor creates an instance with the configuration options for the broker and destination and the number of messages to be send to the configured destination. - 2
- The
runmethod publishes the specified number of messages. Once the run method completes, the client can close theSimpleProducerapplication by calling theclose()method, which cleans up the allocated CMS resource and exits the application.
3.6. Interoperability between AMQP 1.0 Client APIs Copy linkLink copied to clipboard!
Index Copy linkLink copied to clipboard!
A
- ActiveMQConnection, The connection, Setting the redelivery policy on a connection, Setting the redelivery policy on a destination
- ActiveMQConnectionFactory, The connection factory
B
- backOffMultiplier, Redelivery properties
C
- collisionAvoidanceFactor, Redelivery properties
- Connection, The connection
- ConnectionFactory, The connection factory
D
- durableTopicPrefetch, Consumer specific prefetch limits
E
- embedded broker, Embedded brokers
G
- getRedeliveryPolicy(), Setting the redelivery policy on a connection
- getRedeliveryPolicyMap(), Setting the redelivery policy on a destination
I
- initialRedeliveryDelay, Redelivery properties
M
- maximumRedeliveries, Redelivery properties
- maximumRedeliveryDelay, Redelivery properties
P
- prefetch
- per broker, Setting prefetch limits per broker
- per connection, Setting prefetch limits per connection
- per destination, Setting prefetch limits per destination
Q
- queueBrowserPrefetch, Consumer specific prefetch limits
- queuePrefetch, Consumer specific prefetch limits
R
- redeliveryDelay, Redelivery properties
- redeliveryPlugin, Configuring the broker's redelivery plug-in
- RedeliveryPolicy, Setting the redelivery policy on a connection, Setting the redelivery policy on a destination
- RedeliveryPolicyMap, Setting the redelivery policy on a destination
T
- topicPrefetch, Consumer specific prefetch limits
U
- useCollisionAvoidance, Redelivery properties
- useExponentialBackOff, Redelivery properties
- usePrefetchExtension, Disabling the prefetch extension logic
V
- VM
- advanced URI, Using the VM transport
- broker name, Using the VM transport
- create, Embedded brokers
- embedded broker, Embedded brokers
- simple URI, Using the VM transport
- waitForStart, Embedded brokers
- VM URI
- advanced, Using the VM transport
- simple, Using the VM transport
Legal Notice Copy linkLink copied to clipboard!
Trademark Disclaimer