Messaging Programming Reference
A Guide to Programming with Red Hat Enterprise Messaging
Abstract
Chapter 1. Introduction Copy linkLink copied to clipboard!
1.1. Red Hat Enterprise MRG Messaging Copy linkLink copied to clipboard!
1.2. Apache Qpid Copy linkLink copied to clipboard!
1.3. AMQP - Advanced Message Queuing Protocol Copy linkLink copied to clipboard!
1.4. Differences between AMQP 0-10 and AMQP 1.0 Copy linkLink copied to clipboard!
Broker Architecture
Broker Management
Symmetry
1.5. AMQP 1.0 support in MRG-M 3 Copy linkLink copied to clipboard!
1.5.1. Support for the C++ qpid::messaging API Copy linkLink copied to clipboard!
qpid::messaging
API to speak AMQP 1.0 in a clear and natural way that avoids tying its use to any particular broker.
1.5.2. Reply-To Addresses and Temporary Queues Copy linkLink copied to clipboard!
#
' character by inserting a UUID. This works well for 0-10 where the name is chosen by clients and must be unique. This transformation of the name is done when constructing an Address from a single address string (rather than from its constituent parts). The modified name can then be accessed via Address::getName()
.
getAddress()
- has been added to both Sender
and Receiver
.
reply-to
on any request messages they send. (This new approach will work for both 0-10 and 1.0).
1.5.3. Connections, Session and Links Copy linkLink copied to clipboard!
protocol
' connection property. The recognized values are 'amqp1.0
' and 'amqp0-10
'. AMQP 0-10 is still the default and the 1.0 support is only available if the required module (the Apache Proton library) is loaded.
sasl_mechanisms
connection option can be set to NONE
.
1.5.4. Addresses Copy linkLink copied to clipboard!
*
' or a '#
') it is sent as a legacy-amqp-topic-binding
, if not it is sent as a legacy-amqp-direct-binding
.
#
', the dynamic flag is set on the corresponding source or target and the dynamic-node-properties
are populated based on the node properties. Note that when the dynamic flag is set the address should not be specified.
1.5.5. On-demand Create Workaround for Legacy Applications Copy linkLink copied to clipboard!
create
' behavior similar to that supported over 0-10. That is, it will create a node with the name specified by the client if it does not already exist. This is provided to help transition applications that rely on create policy. However, this is non-standard behavior, and new applications should not rely on this.
#
' as the name, or through the create
policy - the node properties are sent as dynamic-node-properties
on the source or target. These can be specified in a nested map within the node. Additionally, any durable
and type
properties in the node map are sent. There is also a translation from the 0-10 style x-declare
in the node. All fields specified in the node are included as if listed in properties.
1.5.6. Link-scoped x-declare and x-subscribe Copy linkLink copied to clipboard!
x-declare
and x-subscribe
are not supported.
1.5.7. Node- and Link-scoped x-bindings Copy linkLink copied to clipboard!
x-bindings
property is not supported for AMQP 1.0 in nodes or links.
1.5.8. Delete Policy Copy linkLink copied to clipboard!
1.5.9. Node Lifetime Policies Copy linkLink copied to clipboard!
amqp:delete-on-close:list
, amqp:delete-on-no-links:list
, amqp:delete-on-no-messages:list
, amqp:delete-on-no-links-or-messages:list
.
delete-on-close
, delete-if-unused
, delete-if-empty
or delete-if-unused-and-empty
.
"my-queue;{create:always, node: {properties: {lifetime-policy: delete-if-empty}}}"
"my-queue;{create:always, node: {properties: {lifetime-policy: delete-if-empty}}}"
1.5.10. Message Timestamp Copy linkLink copied to clipboard!
1.5.11. Accessing AMQP Message Properties and Headers Copy linkLink copied to clipboard!
message-id
, correlation-id
, user-id
, subject
, reply-to
and content-type
fields in the properties
section of a 1.0 message can all be set or retrieved via accessors of the same name on the Message
instance. The same is true of the durable
, priority
and ttl
fields in the header
section.
delivery-count
field within the header
section. There is no direct accessor for this field. However if the value is greater than 1, then the Message::getRedelivered()
method returns true. If Message::setRedelivered()
is called with a value of true
, then the delivery count is set to 1, else it is set to 0.
application-properties
section of a received 1.0 message is available via the properties
map of the Message
class. The properties
map is used to populate the application-properties
section when sending a message.
Message
class.
x-amqp-<field-name>
. The keys in use are: x-amqp-first-acquirer
and x-amqp-delivery-count
for the header
section, and x-amqp-to
, x-amqp-absolute-expiry-time
, x-amqp-creation-time
, x-amqp-group-id
, x-amqp-qroup-sequence
and x-amqp-reply-to-group-id
for the properties
section.
x-amqp-delivery-annotations
and x-amqp-message-annotations
respectively.
1.5.12. AMQP Support in qpidd Copy linkLink copied to clipboard!
qpidd
, the amqp
module must be loaded. This allows the broker to recognize the 1.0 protocol header alongside the 0-10 one.
1.5.13. Simple Authentication and Security Layer (SASL) Support Copy linkLink copied to clipboard!
1.5.14. Queues and Exchanges Copy linkLink copied to clipboard!
qpid-config
tool:
qpid-config list incoming
# qpid-config list incoming
qpid-config list outgoing
# qpid-config list outgoing
dynamic-node-properties
are used to determine the characteristics of the node created. The properties are the same as the QMF create
method properties: the 0-10 defined options durable
, auto-delete
, alternate-exchange
, exchange-type
and any qpidd specific options, such as qpid.max-count
.
supported-dist-modes
property determines whether a queue or exchange is desired (the create
method uses the 'type
' property). If 'move
' is specified a queue is created, if 'copy
' is specified an exchange is created. If this property is not set, then a queue is assumed.
1.5.15. Filters Copy linkLink copied to clipboard!
legacy-amqp-direct-binding
legacy-amqp-topic-binding
legacy-amqp-headers-binding
selector-filter
xquery-filter
filter
property in the link
properties specified in the address. The value of this filter
property should be a list of maps, with each map specifying a filter through key-value pairs for name, descriptor (can be specified as numeric or symbolic) and a value. For example:
my-xml-exchange; {link:{filter:{value:"declare variable $colour external; colour='red'",name:x,descriptor:"apache.org:xquery-filter:string"}}}
my-xml-exchange; {link:{filter:{value:"declare variable $colour external; colour='red'",name:x,descriptor:"apache.org:xquery-filter:string"}}}
direct | topic | fanout | headers | xml | queue | |
---|---|---|---|---|---|---|
legacy-amqp-direct-binding
|
Yes
|
Yes
|
No
|
No
|
Yes
|
Yes
|
legacy-amqp-topic-binding
|
No
|
Yes
|
No
|
No
|
No
|
Yes
|
legacy-amqp-headers-binding
|
No
|
No
|
No
|
Yes
|
No
|
No
|
xquery-filter
|
No
|
No
|
No
|
No
|
Yes
|
No
|
selector-filter
|
Yes
|
Yes
|
Yes
|
Yes
|
Yes
|
Yes
|
1.5.16. Message Conversion Between AMQP 0-10 and AMQP 1.0 Copy linkLink copied to clipboard!
message-id
, correlation-id
, userid
, content-type
and content-encoding
map between the properties
section in 1.0 and the message-properties
in an 0-10 header. Note, however, that a 0-10 message-id
must be a UUID. This field is skipped when translating a 1.0 message to 0-10 if it does not contain a valid UUID.
priority
field in the header section of a 1.0 message maps to the priority
field in the delivery-properties
of an 0-10 message. The durable
header in a 1.0 message is equivalent to the delivery-mode
in the delivery-properties
of an 0-10 message, with a value of true
in the former being equivalent to a value of 2 in the latter and a value of false
in the former equivalent to 1 in the latter.
reply-to
is the routing-key
. If the exchange is set then the reply-to
address for 1.0 is composed from the exchange and any routing key (separated by a forward slash).
reply-to
address is a queue if no type is specified. To ensure that a 0-10 routing-key
for an exchange is correctly converted to a 1.0 reply-to
, specify the node type in the 0-10 address, for instance 'amq.direct/rk; {node:{type:topic}}
', or set the type on the Address instance.
subject
field in the properties
of the 1.0 message is set to the value of the routing-key
from the message-properties
of the 0-10 message. In the reverse direction, the subject
field of the properties
section of the 1.0 message populates the routing-key
in the message-properties
of the 0-10 message. Note that the routing-key
truncates at 255 characters.
to
' field of the properties
section when converting to 1.0, but the reverse translation is not done (as the destination for messages sent out by the broker is controlled by the subscription in 0-10).
application-properties
section of a 1.0 message is converted to the application-headers
field in the message-properties
of an 0-10 message and vice versa.
reply-to
from 1.0 to 0-10, if the address contains a forward slash it is assumed to be of the form exchange/routing key. If it does not contain a forward slash, it is assumed to be a simple node name. If that name matches an existing queue, then the resulting 0-10 reply-to
will have the exchange empty and the routing key populated with the queue name. If the name does not match an existing queue, but the name matches an exchange, then the reply-to
has the exchange populated with the node name and the routing key left empty. If the node refers to neither a known queue nor exchange then the resulting reply-to
will be empty.
1.5.17. Capabilities Copy linkLink copied to clipboard!
shared
' capability allows subscriptions from an exchange to be shared by multiple receivers. Where this is specified the subscription queue created takes the name of the link (and does not include the container id).
durable
' capability is added if the queue or exchange referred to by the source or target is durable. The 'queue
' capability is added if the source or target references a queue. The 'topic
' capability is added if the source or target references an exchange. If the source or target references a queue or a direct exchange the 'legacy-amqp-direct-binding
' is added. If it references a queue or a topic exchange, 'legacy-amqp-topic-binding
' is added.
create-on-demand
' capability is an extension that allows legacy applications to use a 'create
' policy in the messaging client. If set in the client and the named node does not exist, the node is created using the dynamic-node-properties
, in the same way as when the dynamic flag is set.
1.5.18. Capability Matching and Assert Copy linkLink copied to clipboard!
assert
option is not exactly equivalent to the 0-10 based mechanism. Over AMQP 1.0, the client sets the capabilities it desires, the broker sets the capabilities it can offer and when the assert
option is on, the client ensures that all the capabilities it requested are supported.
durable
is set in the node properties, then a capability of 'durable
' is requested (meaning the node will not lose messages if its volatile memory is lost).
type
is set, then that will also be passed as a requested capability. For example: 'queue
' means the node supports queue-like characteristics (stores messages until consumers claim them and allocates messages between competing consumers), 'topic
' means the node supports classic pub-sub characteristics.
1.5.19. Configuring Subscription Queues using Topics Copy linkLink copied to clipboard!
qpid-config add topic my-topic --argument exchange=amq.topic\ --argument qpid.max_count=500 --argument qpid.policy_type=self-destruct
# qpid-config add topic my-topic --argument exchange=amq.topic\
--argument qpid.max_count=500 --argument qpid.policy_type=self-destruct
my-topic/my-key
' over 1.0 now, it will result in a subscription queue being created with a limit of 500 messages, that deletes itself (thus ending the subscription) if that limit is exceeded and is bound to 'amq.topic
' with the key 'my-key
'.
1.6. qpid::messaging Message::get/setContentObject() Copy linkLink copied to clipboard!
Message::getContentObject()
and Message::setContentObject()
to access the semantic content of structured AMQP 1.0 messages. These methods allow the body of the message to be accessed or manipulated as a Variant. Using these methods produces the most widely applicable code as they work for both protocol versions and work with map-, list-, text- or binary- messages.
Message::getContent()
and Message::setContent()
continue to refer to the raw bytes of the content. The encode()
and decode()
methods in the API continue to decode map- and list- messages in the AMQP 0-10 format.
Chapter 2. AMQP Model Overview Copy linkLink copied to clipboard!
2.1. The Producer - Consumer Model Copy linkLink copied to clipboard!
2.2. Consumer-driven messaging Copy linkLink copied to clipboard!
2.3. Message Producer (Sender) Copy linkLink copied to clipboard!
2.4. Message Copy linkLink copied to clipboard!
2.5. Message Broker Copy linkLink copied to clipboard!
2.6. Routing Key Copy linkLink copied to clipboard!
x-ampq-0.10-routing-key
property. However, this is managed by the Qpid Messaging API, and you do not need to manually access or set this property. The exception to this is if you are exchanging messages with another AMQP system. In that case you should understand how the Qpid Messaging API manages this property based on message and sender subject.
2.7. Message Subject Copy linkLink copied to clipboard!
2.8. Message Properties Copy linkLink copied to clipboard!
key:value
pairs that can be set for a message. Some predefined properties are used by the message broker to determine how to treat messages while they are in transit; these message properties can be set to ensure quality of service and guaranteed delivery. Other user-defined message properties can be set for application-specific functionality.
2.9. Connection Copy linkLink copied to clipboard!
2.10. Session Copy linkLink copied to clipboard!
2.11. Exchange Copy linkLink copied to clipboard!
2.12. Binding Copy linkLink copied to clipboard!
2.13. Topic Copy linkLink copied to clipboard!
qpid-config add topic my-topic --argument exchange=amq.topic\ --argument qpid.max_count=500 --argument qpid.policy_type=self-destruct
qpid-config add topic my-topic --argument exchange=amq.topic\
--argument qpid.max_count=500 --argument qpid.policy_type=self-destruct
my-topic/my-key
' over 1.0 now, it will result in a subscription queue being created with a limit of 500 messages, that deletes itself (thus ending the subscription) if that limit is exceeded and is bound to 'amq.topic
' with the key 'my-key
'.
2.14. Domain Copy linkLink copied to clipboard!
sasl_mechanisms
, username
, password
.
qpid-config
, for example:
qpid-config add domain my-domain --argument url=some.hostname.com:5672
qpid-config add domain my-domain --argument url=some.hostname.com:5672
qpid-config add incoming incoming-name --argument domain=my-domain --argument source=queue1 --argument target=queue2
qpid-config add incoming incoming-name --argument domain=my-domain --argument source=queue1 --argument target=queue2
queue1
in the process identified by my-domain
and directed into queue2
on the qpidd instance against which the command is run.
2.15. Message Queue Copy linkLink copied to clipboard!
--queue-purge-interval
. While this is not a qpid-config option, it is worth understanding that message TTL can be configured, and when the purge attempt is successful the messages are subsequently removed.
2.16. Transaction Copy linkLink copied to clipboard!
- some text.
2.17. Message Consumer (Receiver) Copy linkLink copied to clipboard!
Chapter 3. Getting Started Copy linkLink copied to clipboard!
3.1. Getting Started with Python Copy linkLink copied to clipboard!
3.1.1. Python Messaging Development Copy linkLink copied to clipboard!
python
interpreter to execute the file.
3.1.2. Python Client Libraries Copy linkLink copied to clipboard!
python-qpid
- Apache Qpid Python client library.
python-qpid-qmf
- Queue Management Framework (QMF) Python client library.
python-saslwrapper
- Python bindings for the saslwrapper library.
3.1.3. Install Python Client Libraries (Red Hat Enterprise Linux 6) Copy linkLink copied to clipboard!
yum
command.
- Red Hat Enterprise Linux Server 6
- Red Hat Enterprise Linux Workstation 6
- Red Hat Enterprise Linux Client 6
yum install python-qpid python-qpid-qmf python-saslwrapper
yum install python-qpid python-qpid-qmf python-saslwrapper
3.2. Getting Started with .NET Copy linkLink copied to clipboard!
3.2.1. .NET Messaging Development Copy linkLink copied to clipboard!
3.2.2. Windows SDK Copy linkLink copied to clipboard!
3.2.3. Windows SDK Contents Copy linkLink copied to clipboard!
\bin
- Compiled binary (.dll and .exe) files, and the associated debug program database (.pdb) files.
- Boost library files.
- Microsoft Visual Studio runtime library files.
\docs
- Apache Qpid C++ API Reference
\dotnet_examples
- A Visual Studio solution file and associated project files to demonstrate using the WinSDK in C#
\examples
- A Visual Studio solution file and associated project files to demonstrate using the WinSDK in unmanaged C++
\include
- A directory tree of .h files
\lib
- The linker .lib files that correspond to files in /bin
3.2.4. How To Download and Install the Windows SDK Copy linkLink copied to clipboard!
3.2.4.1. Obtain the Windows SDK Copy linkLink copied to clipboard!
Procedure 3.1. How To Obtain the Windows SDK For Your Environment
- Log in to the Red Hat Customer Portal.
- Click the
A-Z
tab to sort the product list alphabetically, and then selectRed Hat Enterprise MRG Messaging
to display the downloads screen. - Select the desired product version from themenu.
- Select the desired architecture from themenu.
- Locate the correct Windows SDK binary for your environment, and then clickto start the download.
3.2.4.2. Install the Windows SDK Copy linkLink copied to clipboard!
Previous Step in How To Download and Install the Windows SDK
- Unzip the downloaded Windows SDK to your filesystem.
- Copy all
qpid*
andboost*
files from the/bin/Release/
directory into your enviroment's/bin/Release/
directory.
3.3. Getting Started with C++ Copy linkLink copied to clipboard!
3.3.1. C++ Messaging Development Copy linkLink copied to clipboard!
3.3.2. C++ on Linux Copy linkLink copied to clipboard!
3.3.2.1. C++ Client Libraries Copy linkLink copied to clipboard!
qpid-cpp-client
- Apache Qpid C++ client library.
qpid-cpp-client-ssl
- SSL support for clients.
qpid-cpp-client-rdma
- RDMA Protocol support (including Infiniband) for Qpid clients.
qpid-cpp-client-devel
- Header files and tools for developing Qpid C++ clients.
qpidd-cpp-client-devel-docs
- AMQP client development documentation.
3.3.2.2. Install C++ Client Libraries (Red Hat Enterprise Linux 6) Copy linkLink copied to clipboard!
yum
command.
Red Hat MRG Messaging v.2 (for RHEL-6 Server)
channel.
yum install qpid-cpp-client qpid-cpp-client-rdma qpid-cpp-client-ssl qpid-cpp-client-devel
yum install qpid-cpp-client qpid-cpp-client-rdma qpid-cpp-client-ssl qpid-cpp-client-devel
3.3.2.3. Install C++ Client Libraries for MRG 3 Copy linkLink copied to clipboard!
yum
command.
Red Hat MRG Messaging v.3 (for RHEL-6 Server)
channel.
yum install qpid-cpp-client qpid-cpp-client-rdma qpid-cpp-client-ssl qpid-cpp-client-devel
yum install qpid-cpp-client qpid-cpp-client-rdma qpid-cpp-client-ssl qpid-cpp-client-devel
3.3.3. C++ on Windows Copy linkLink copied to clipboard!
3.3.3.1. Windows SDK Copy linkLink copied to clipboard!
3.3.3.2. Windows SDK Contents Copy linkLink copied to clipboard!
\bin
- Compiled binary (.dll and .exe) files, and the associated debug program database (.pdb) files.
- Boost library files.
- Microsoft Visual Studio runtime library files.
\docs
- Apache Qpid C++ API Reference
\dotnet_examples
- A Visual Studio solution file and associated project files to demonstrate using the WinSDK in C#
\examples
- A Visual Studio solution file and associated project files to demonstrate using the WinSDK in unmanaged C++
\include
- A directory tree of .h files
\lib
- The linker .lib files that correspond to files in /bin
3.3.3.3. How To Download and Install the Windows SDK Copy linkLink copied to clipboard!
3.3.3.3.1. Obtain the Windows SDK Copy linkLink copied to clipboard!
Procedure 3.2. How To Obtain the Windows SDK For Your Environment
- Log in to the Red Hat Customer Portal.
- Click the
A-Z
tab to sort the product list alphabetically, and then selectRed Hat Enterprise MRG Messaging
to display the downloads screen. - Select the desired product version from themenu.
- Select the desired architecture from themenu.
- Locate the correct Windows SDK binary for your environment, and then clickto start the download.
3.3.3.3.2. Install the Windows SDK Copy linkLink copied to clipboard!
Previous Step in How To Download and Install the Windows SDK
- Unzip the downloaded Windows SDK to your filesystem.
- Copy all
qpid*
andboost*
files from the/bin/Release/
directory into your enviroment's/bin/Release/
directory.
3.4. Getting Started with Java Copy linkLink copied to clipboard!
3.4.1. Java Client Libraries Copy linkLink copied to clipboard!
qpid-java-client
- The Java implementation of the Qpid client
qpid-java-common
- Common files for the Qpid Java client
qpid-java-example
- Programming examples
3.4.2. Install Java Client Libraries (Red Hat Enterprise Linux 6) Copy linkLink copied to clipboard!
- Subscribe your system to the
Additional Services Channels for Red Hat Enterprise Linux 6 / MRG Messaging v.2 (for RHEL-6 Server)
channel. - Run the following yum command with root privileges:
yum install qpid-java-client qpid-java-common qpid-java-example
yum install qpid-java-client qpid-java-common qpid-java-example
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
3.5. Getting Started with Ruby Copy linkLink copied to clipboard!
3.5.1. Ruby Messaging Development Copy linkLink copied to clipboard!
3.5.2. Ruby Client Libraries Copy linkLink copied to clipboard!
ruby-qpid-qmf
- Ruby QMF bindings
ruby-saslwrapper
- Ruby bindings for the saslwrapper library
3.5.3. Install Ruby Client Libraries (Red Hat Enterprise Linux 6) Copy linkLink copied to clipboard!
ruby-qpid-qmf
package is in the main channel; the ruby-saslwrapper
package is in the Optional child channel.
- Subscribe your system to one of the following channels:
Red Hat Enterprise Linux Server 6
Red Hat Enterprise Linux Client 6
Red Hat Enterprise Linux Workstation 6
- With root privileges run the command:
yum install ruby-qpid-qmf
yum install ruby-qpid-qmf
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Subscribe your system one of the following channels:
Red Hat Enterprise Linux Optional Server v 6
Red Hat Enterprise Linux Optional Client 6
Red Hat Enterprise Linux Optional Workstation 6
- With root privileges run the command:
yum install ruby-saslwrapper
yum install ruby-saslwrapper
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
3.6. Hello World Copy linkLink copied to clipboard!
3.6.1. Red Hat Enterprise Messaging "Hello World" Copy linkLink copied to clipboard!
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
3.6.2. Java JMS "Hello World" Program Listing Copy linkLink copied to clipboard!
qpid-java-examples
package.
- Java
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
hello.properties
:
3.6.3. "Hello World" Walk-through Copy linkLink copied to clipboard!
- Python
from qpid.messaging import *
from qpid.messaging import *
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
using Org.Apache.Qpid.Messaging; namespace Org.Apache.Qpid.Messaging {
using Org.Apache.Qpid.Messaging; namespace Org.Apache.Qpid.Messaging {
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Connection
object. The Connection object constructor takes the url of the broker as its parameter:
- Python
connection = Connection("localhost:5672")
connection = Connection("localhost:5672")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Connection connection(broker);
Connection connection(broker);
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
Connection connection = null; connection = new Connection(broker);
Connection connection = null; connection = new Connection(broker);
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
username/password@serverurl:port
. If you try this with a remote server, remember to open the firewall on the message broker to allow incoming connections for the broker port.
- C++
Connection connection(broker, "{protocol:amqp1.0}");
Connection connection(broker, "{protocol:amqp1.0}");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
connection = new Connection(broker, "{protocol:amqp1.0}");
connection = new Connection(broker, "{protocol:amqp1.0}");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
open
method, which opens a configured connection.
- Python
try: connection.open()
try: connection.open()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
try { connection.open();
try { connection.open();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
connection.Open();
connection.Open();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Connection
object has a createSession
method (session
in Python) that returns a Session
object, so we get a session from the connection that we created previously:
- Python
session = connection.session()
session = connection.session()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Session session = connection.createSession();
Session session = connection.createSession();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
Session session = connection.CreateSession();
Session session = connection.CreateSession();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Session
object has sender
and receiver
methods, which take a target or source address as a parameter, and return a Sender
and a Receiver
object, respectively. These are the objects that we need to send and receive messages, so we will create them by calling the respective methods of our session. We will use the amq.topic
exchange for this demo. This is a pre-configured exchange on the broker, so we don't need to create it, and we can rely on its presence:
- Python
sender = session.sender("amq.topic") receiver = session.receiver("amq.topic")
sender = session.sender("amq.topic") receiver = session.receiver("amq.topic")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Receiver receiver = session.createReceiver(address); Sender sender = session.createSender(address);
Receiver receiver = session.createReceiver(address); Sender sender = session.createSender(address);
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
Receiver receiver = session.CreateReceiver(address); Sender sender = session.CreateSender(address);
Receiver receiver = session.CreateReceiver(address); Sender sender = session.CreateSender(address);
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
amq.topic
exchange on the broker. Because our routing target is an exchange, it will be routed further from there by the broker.
amq.topic
exchange, and our receiver will receive it in a queue.
Message
object takes as a parameter to its constructor a string that becomes the message.content
:
- Python
message = Message("Hello World!")
message = Message("Hello World!")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Message
object constructor sets the correct content-type
when you set the message.content
through the constructor. However, if you set it after creating the Message
object by assigning a value to the message.content
property, then you also need to set the message.content_type
property appropriately.
send
method of our sender to send the message to the broker:
- Python
sender.send(message)
sender.send(message)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
sender.send(Message("Hello world!"));
sender.send(Message("Hello world!"));
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
sender.Send(new Message("Hello world!"));
sender.Send(new Message("Hello world!"));
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
amq.topic
exchange on the message broker.
amq.topic
exchange for us. The message is now waiting in that queue.
fetch
method of our receiver:
- Python
fetchedmessage = receiver.fetch(timeout=1)
fetchedmessage = receiver.fetch(timeout=1)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Message message = receiver.fetch(Duration::SECOND * 1);
Message message = receiver.fetch(Duration::SECOND * 1);
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
Message message = new Message(); message = receiver.Fetch(DurationConstants.SECOND * 1);
Message message = new Message(); message = receiver.Fetch(DurationConstants.SECOND * 1);
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
timeout
parameter tells fetch
how long to wait for a message. If we do not set a timeout the receiver will wait indefinitely for a message to appear on the queue. If we set the timeout to 0, the receiver will check the queue and return immediately if nothing is there. We set it to timeout in 1 second to ensure ample time for our message to be routed and appear in the queue.
Fetch
returns a Message
object, so we will print its content
property:
- Python
print fetchedmessage.content
print fetchedmessage.content
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
std::cout << message.getContent() << std::endl;
std::cout << message.getContent() << std::endl;
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
Console.WriteLine("{0}", message.GetContent());
Console.WriteLine("{0}", message.GetContent());
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
session.acknowledge()
session.acknowledge()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
session.acknowledge();
session.acknowledge();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
session.Acknowledge();
session.Acknowledge();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
except MessagingError,m: print m connection.close()
except MessagingError,m: print m connection.close()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
} catch(const std::exception& error) { std::cerr << error.what() << std::endl; connection.close(); return 1; }
} catch(const std::exception& error) { std::cerr << error.what() << std::endl; connection.close(); return 1; }
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
} catch (Exception e) { Console.WriteLine("Exception {0}.", e); if (connection != null) connection.Close(); }
} catch (Exception e) { Console.WriteLine("Exception {0}.", e); if (connection != null) connection.Close(); }
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
helloworld.py
, and then run it using the command python helloworld.py
. If the message broker is running on your local machine, you should see the words: "Hello World!" printed on your programlisting.
Chapter 4. Beyond "Hello World" Copy linkLink copied to clipboard!
4.1. Subscriptions Copy linkLink copied to clipboard!
amq.topic
exchange. In the background this creates a queue and subscribes
it to the amq.topic
exchange. Our Hello World program sender publishes to the amq.topic
exchange. The amq.topic
exchange is a good one to use for the demo. A topic exchange allows queues to be subscribed (to bind to the exchange) with a binding key that acts as a filter on the subject of messages sent to the exchange. Since we bind to the exchange with no binding key, we signal that we're interested in all messages coming through the exchange.
amq.topic
exchange, the message is delivered to the subscription queue for our receiver. Our receiver then calls fetch()
to retrieve the message from its subscription queue.
amq.topic
exchange and after we send the message, register our receiver with the exchange.
- Python
sender = session.sender("amq.topic") receiver = session.receiver("amq.topic") message = Message("Hello World!") sender.send(message)
sender = session.sender("amq.topic") receiver = session.receiver("amq.topic") message = Message("Hello World!") sender.send(message)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C#/.NET
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
amq.topic
exchange. The exchange then delivered the message to all the subscribed queues... which was none. When our receiver subscribes to the exchange it's too late to receive the message. In the original version of the program the receiver subscribes to the exchange before the message is sent, so it receives a copy of the message in its subscription queue.
qpid-config
command. Restart the broker to clear all the queues (all non-durable
queues are destroyed when the broker restarts). Then run the command:
qpid-config queues
qpid-config queues
raw_input
method to grab some keyboard input.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
qpid-config queues
to examine the queues on the broker.
qpid-config queues
qpid-config queues
amq.topic
exchange for us,to allow our receiver to receive messages from the exchange. You'll also see a number of other queues with the same ID number at the end of them. These are the queues that the qpid-config
utility uses to query the message broker and receive the queue list you run the command. If you run the command again, you'll see that our receiver queue remains the same, and the other queues have a new ID - each time you run a qpid-config
command it creates it own queues to receive a response from the server. You won't be able to see that those queues aren't there when you're not running qpid-config
, because you need to run qpid-config
to see the queues, but you can take my word for it.
- Version 2.2 and below
- To see the queue-exchange bindings, run:
qpid-config queues -b
qpid-config queues -b
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The-b
switch displays bindings. You'll see that the two dynamically created queues are bound to theamq.topic
exchange. - Version 2.3 and above
- To see the queue-exchange bindings, run:
qpid-config queues -r
qpid-config queues -r
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The-r
switch displays bindings. You'll see that the two dynamically created queues are bound to theamq.topic
exchange.
connection.close()
ends the session, and the two exclusive queues on the broker are deleted. You can run qpid-config queues
again to verify that.
amq.topic
exchange. This queue is private (randomly named and exclusive
), and deleted when the consumer disconnects, so it is not suitable for publishing. In order to make messages available to consumers who may or may not be connected to the exchange when the message is sent, a message-producing application needs to create a publicly-accessible queue (publishing). Consuming applications can then subscribe to this published queue and receive messages in a decoupled fashion.
4.2. Publishing Copy linkLink copied to clipboard!
4.3. AMQP Exchange Types Copy linkLink copied to clipboard!
- Direct
- A Direct Exchange allows a consumer to bind a queue to it with a key. When a message is received by a direct-type exchange, the message is routed to any queues whose binding key matches the subject of the message. The Direct Exchange also supports exclusive bindings, which allow a queue to monopolize messages sent to an exchange, and implement a simple direct-to-queue model.
- Topic
- A Topic Exchange allows a consumer to bind a queue to it with a key that specifies wildcard matching. The wildcard is then matched against the subject of messages sent to the exchange. This allows you to implement message filtering patterns using a topic exchange and various queues with different binding keys.
4.4. Pre-configured Exchanges Copy linkLink copied to clipboard!
- Default exchange
- A nameless direct exchange. All queues are bound to this exchange by default, allowing them to be accessed by queue name.
amq.direct
- The pre-configured named direct exchange.
amq.fanout
- The pre-configured fanout exchange.
amq.match
- The pre-configured headers exchange.
amq.topic
- The pre-configured topic exchange.
4.5. Exchange Subscription Patterns Copy linkLink copied to clipboard!
- copy of messages
- move of messages
- exclusive binding
A copy of messages is where each consumer gets their own copy of every message.
Note
A move of messages is where multiple consumers connect to the same queue and take messages from the queue in a round-robin fashion.
Note
The third pattern, exclusive binding, is where a consumer mandates that only the consumer may have access to messages routed to an endpoint.
Note
4.6. The Default Exchange Copy linkLink copied to clipboard!
4.6.1. Default Exchange Copy linkLink copied to clipboard!
4.6.2. Publish to a Queue using the Default Exchange Copy linkLink copied to clipboard!
qpid-config
:
qpid-config add queue quick-publish
qpid-config add queue quick-publish
{create: always}
then the queue will be created if it does not already exist. In addition to always
, the create
command can also take the arguments sender
and receiver
, to indicate that the queue should be created only when a sender connects to the address, or only when a receiver connects to the address.
- Python
sender = session.sender("quick-publish; {create: always}")
sender = session.sender("quick-publish; {create: always}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Sender sender = session.createSender("quick-publish; {create: always}")
Sender sender = session.createSender("quick-publish; {create: always}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.6.3. Subscribe to the Default Exchange Copy linkLink copied to clipboard!
quick-publish
", using the Python API:
- C++
Receiver receiver = session.createReceiver('quick-publish');
Receiver receiver = session.createReceiver('quick-publish');
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
receiver = session.receiver('quick-publish')
receiver = session.receiver('quick-publish')
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
quick-publish
queue.
- C++
Receiver receiver = session.createReceiver('quick-publish; {mode: browse}');
Receiver receiver = session.createReceiver('quick-publish; {mode: browse}');
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
receiver = session.receiver('quick-publish; {mode: browse}')
receiver = session.receiver('quick-publish; {mode: browse}')
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
create
parameter:
- C++
Receiver receiver = session.createReceiver("my-own-copies-please; {create: always, node: {type: 'queue'}}");
Receiver receiver = session.createReceiver("my-own-copies-please; {create: always, node: {type: 'queue'}}");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
receiver = session.receiver("my-own-copies-please; {create: always, node: {type: 'queue'}}")
receiver = session.receiver("my-own-copies-please; {create: always, node: {type: 'queue'}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
my-own-copies-please
" already exists, then your receiver will connect to that queue. If the queue does not exist, then it will be created (all of this assumes sufficient privileges, of course).
my-own-copies-please
" exists, your receiver will silently connect to that in preference to creating a queue. This is not what you intended, and will have unpredictable results. To avoid this, you can use the assert parameter, like this:
- C++
try { Receiver receiver = session.createReceiver("my-own-copies-please; {create: always, assert: always, node: {type: 'queue'}}"); } catch(const std::exception& error) { std::cerr << error.what() << std::endl; }
try { Receiver receiver = session.createReceiver("my-own-copies-please; {create: always, assert: always, node: {type: 'queue'}}"); } catch(const std::exception& error) { std::cerr << error.what() << std::endl; }
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
try: receiver = session.receiver("my-own-copies-please; {create: always, assert: always, node: {type: 'queue'}}") except MessagingError m: print m
try: receiver = session.receiver("my-own-copies-please; {create: always, assert: always, node: {type: 'queue'}}") except MessagingError m: print m
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
my-own-copies-please
" already exists and is an exchange, the receiver constructor will raise an exception: "expected queue, got topic
".
4.7. Direct Exchange Copy linkLink copied to clipboard!
4.7.1. Direct Exchange Copy linkLink copied to clipboard!
Figure 4.1. Direct Exchange
4.7.2. Create a Direct Exchange using qpid-config Copy linkLink copied to clipboard!
qpid-config add exchange direct exchange name
creates a new direct exchange.
qpid-config
command creates a new direct exchange called engineering
:
qpid-config add exchange direct engineering
qpid-config add exchange direct engineering
4.7.3. Create a Direct Exchange from an application Copy linkLink copied to clipboard!
engineering
:
- Python
sender = session.sender('engineering;{create: always, node:{type:topic, x-declare:{type:direct}}}')
sender = session.sender('engineering;{create: always, node:{type:topic, x-declare:{type:direct}}}')
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
engineering
already exists, the sender will not try to create a new one, but will connect to the existing one. You need to be careful, however, because if a queue with the name engineering
already exists, then your sender will silently connect to that queue.
engineering
, you can use assert
, as in this example:
- Python
try: sender = session.sender('engineering;{create: always, node:{type:topic, x-declare:{type:direct}}, assert: always}') except MessagingError, m: print m
try: sender = session.sender('engineering;{create: always, node:{type:topic, x-declare:{type:direct}}, assert: always}') except MessagingError, m: print m
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
assert: always, node: {type: topic}
; if engineering
exists and is a queue, rather than an exchange, the sender constructor will raise an exception: "expected topic, got queue
".
assert
to verify that it is an exchange and not a queue, you cannot verify what type of exchange it is.
4.7.4. Publish to a Direct Exchange Copy linkLink copied to clipboard!
The first is to create a sender that routes messages directly to the endpoint that you wish to publish to. Remember that a Direct Exchange requires an exact match, so you are sending to a specific destination. At the same time, bear in mind that multiple queues can bind to the exchange to receive messages routed to the same destination. So it is a specific endpoint that may have multiple consumers.
qpid-config add exchange direct finance
qpid-config add exchange direct finance
- Python
sender = session.sender('finance;{create:always, node: {type: topic, x-declare: {type: direct}}}')
sender = session.sender('finance;{create:always, node: {type: topic, x-declare: {type: direct}}}')
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
reports
endpoint on the finance
exchange.
- Python
sender = session.sender('finance/reports') sender.send('Message to all consumers bound to finance with key reports')
sender = session.sender('finance/reports') sender.send('Message to all consumers bound to finance with key reports')
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
sender
will go to queues that have bound to the finance
direct exchange using the key reports
; with one caveat.
The second option is to create a sender that routes messages to the exchange, and use the message subject to control the routing to the specific endpoint. This way you can dynamically decide where messages will go, for example based on the names of keys that are provided at run-time, perhaps in the body of other messages.
- Python
sender = session.sender('finance; {assert: always, node: {type: topic}}') msg = Message('Message to all consumers bound to finance with key reports') msg.subject = 'reports' sender.send(msg)
sender = session.sender('finance; {assert: always, node: {type: topic}}') msg = Message('Message to all consumers bound to finance with key reports') msg.subject = 'reports' sender.send(msg)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
subject
. You can target different endpoints on that exchange by changing the subject before sending the message. For example, to send copies of the same message to finance/reports
and finance/records
:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
{assert: always, node: {type: topic}}
is used to ensure that we don't inadvertently connect to a queue with the name finance
bound to the default exchange. Queues and exchanges have separate namespaces, but remember that the default exchange is nameless.
As you can observe in the second case, setting the subject influences where the message is routed. If you use the first method — the sender with the subject in its address — you must be careful not to set the message subject inadvertently. The sender will write the correct subject into the message when you send it if the message subject is blank, but it will not overwrite any message subject that you provide. The first method — the sender with a subject in its address — provides a "default destination" for all messages it sends that do not have a message subject set. You can target other endpoints on the exchange by explicitly setting a subject before sending the message - in which case they go to the exchange for further routing based on your custom subject. Just be aware that setting the message subject determines its routing.
4.7.5. Subscribe to a Direct Exchange Copy linkLink copied to clipboard!
This is the most straight-forward method to implement. Create a receiver using an address comprised of the exchange name and the routing key. For example, create a receiver on direct exchange "finance
" using the "reports
" key of interest:
- C++
Receiver receiver = session.createReceiver("finance/reports")
Receiver receiver = session.createReceiver("finance/reports")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
receiver = session.receiver('finance/reports')
receiver = session.receiver('finance/reports')
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Subscription using a shared queue may be created by naming the subscription queue and defining it non-exclusive. For example:
- C++
Receiver receiver = session.createReceiver("finance/quick-publish;{link:{name:my-subscription, x-declare:{exclusive:False}}}");
Receiver receiver = session.createReceiver("finance/quick-publish;{link:{name:my-subscription, x-declare:{exclusive:False}}}");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
receiver = session.receiver('finance/quick-publish;{link:{name:my-subscription, x-declare:{exclusive:False}}}')
receiver = session.receiver('finance/quick-publish;{link:{name:my-subscription, x-declare:{exclusive:False}}}')
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
x-bindings
. For example:
- C++
Receiver receiver = session.createReceiver("my-subscription;{create: always, node:{x-bindings: [{exchange: 'finance', key: 'quick-publish'}]}}");
Receiver receiver = session.createReceiver("my-subscription;{create: always, node:{x-bindings: [{exchange: 'finance', key: 'quick-publish'}]}}");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
receiver = session.receiver('my-subscription;{create: always, node:{x-bindings: [{exchange: 'finance', key: 'quick-publish'}]}}')
receiver = session.receiver('my-subscription;{create: always, node:{x-bindings: [{exchange: 'finance', key: 'quick-publish'}]}}')
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
my-subscription
" and bound it to the direct exchange "finance
" with the key "quick-publish
".
Both Link-scoped x-declare
and Node-scoped x-bindings
clauses are not supported in AMQP 1.0, hence we request the capability of a shared subscription:
- C++
Receiver receiver = session.createReceiver("finance/quick-publish;{node: {capabilities:[shared]}, link: {name: 'my-subscription'}}");
Receiver receiver = session.createReceiver("finance/quick-publish;{node: {capabilities:[shared]}, link: {name: 'my-subscription'}}");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.7.6. Exclusive Bindings for Direct Exchanges Copy linkLink copied to clipboard!
qpid.exclusive-binding
is used to declare an exclusive binding.
drain -f "amq.direct; {create:always, link: {name:one, x-bindings:[{key:unique, arguments: {qpid.exclusive-binding:True}}]}}"
drain -f "amq.direct; {create:always, link: {name:one, x-bindings:[{key:unique, arguments: {qpid.exclusive-binding:True}}]}}"
4.8. Fanout Exchange Copy linkLink copied to clipboard!
4.8.1. The pre-configured Fanout Exchange Copy linkLink copied to clipboard!
amq.fanout
.
4.8.2. Fanout Exchange Copy linkLink copied to clipboard!
Figure 4.2. Fanout Exchange
#
as their binding key.
4.8.3. Create a Fanout Exchange using qpid-config Copy linkLink copied to clipboard!
qpid-config
:
qpid-config add exchange fanout my-fanout-exchange
qpid-config add exchange fanout my-fanout-exchange
durable
(persistent between restarts of the broker), use the --durable
option:
qpid-config add exchange fanout my-fanout-exchange --durable
qpid-config add exchange fanout my-fanout-exchange --durable
qpid-config exchanges
command lists the exchanges on the broker.
4.8.4. Create a Fanout Exchange from an application Copy linkLink copied to clipboard!
create: always
node: {type: topic, x-declare: {exchange: exchange-name, type: fanout}}
myfanout
.
- Python
tx = ssn.sender("myfanout; {create: always, node: {type: topic, x-declare: {exchange: myfanout, type: fanout}}}")
tx = ssn.sender("myfanout; {create: always, node: {type: topic, x-declare: {exchange: myfanout, type: fanout}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.8.5. Publish to Multiple Queues using the Fanout Exchange Copy linkLink copied to clipboard!
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.8.6. Subscribe to a Fanout Exchange Copy linkLink copied to clipboard!
- Subscribe to the exchange using an ephemeral subscription. This creates and binds a temporary private queue that is destroyed when your application disconnects. This approach makes sense when you do not need to share responsibility for the messages between multiple consumers, and you do not care about messages that are sent when your application is not running or is disconnected.
- Subscribe to a queue that is bound to the exchange. This allows messages to be buffered in the queue when your application is disconnected, and allows several consumers to share responsibility for the messages in the queue.
To implement the private, ephemeral subscription, create a receiver using the name of the fanout exchange as the receiver's address. For example:
- Python
rx = receiver("amq.fanout")
rx = receiver("amq.fanout")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
To implement a shareable subscription that persists across consumer application restarts, create a queue, and subscribe to that queue.
qpid-config
:
qpid-config add queue shared-q qpid-config bind amq.fanout shared-q
qpid-config add queue shared-q
qpid-config bind amq.fanout shared-q
--durable
option.
qpid-config
command to view the exchange bindings after issuing these commands. On MRG Messaging 2.2 and below use the command qpid-config exchanges -b
. On MRG Messaging 2.3 and above use the command qpid-config exchanges -r
.
- Python
rx = receiver("shared-q")
rx = receiver("shared-q")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
qpid-config
:
- Python
rx = receiver("shared-q;{create: always, link: {x-bindings: [{exchange: 'amq.fanout', queue: 'shared-q'}]}}")
rx = receiver("shared-q;{create: always, link: {x-bindings: [{exchange: 'amq.fanout', queue: 'shared-q'}]}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- C++
Receiver receiver = session.createReceiver("amq.fanout;{node: {capabilities:[shared]}, link: {name: 'shared-q'}}");
Receiver receiver = session.createReceiver("amq.fanout;{node: {capabilities:[shared]}, link: {name: 'shared-q'}}");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.9. Topic Exchange Copy linkLink copied to clipboard!
4.9.1. The pre-configured Topic Exchange Copy linkLink copied to clipboard!
amq.topic
.
4.9.2. Topic Exchange Copy linkLink copied to clipboard!
Figure 4.3. Topic Exchange
In the binding key, #
matches any number of period-separated terms, and *
matches a single term.
#.news
will match messages with subjects such as usa.news
and germany.europe.news
, while a binding key of *.news
will match messages with the subject usa.news
, but not germany.europe.news
.
4.9.3. Create a Topic Exchange using qpid-config Copy linkLink copied to clipboard!
qpid-config
command creates a topic exchange called news
:
qpid-config add exchange topic news
qpid-config add exchange topic news
4.9.4. Create a Topic Exchange from an application Copy linkLink copied to clipboard!
news
:
- Python
txtopic = ssn.sender("news; {create: always, node: {type: topic}}")
txtopic = ssn.sender("news; {create: always, node: {type: topic}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.9.5. Publish to a Topic Exchange Copy linkLink copied to clipboard!
news
topic exchange with routing keys that allow geography-based subscriptions by consumers:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.9.6. Subscribe to a Topic Exchange Copy linkLink copied to clipboard!
qpid-config
to create a queue named news
and bind it to the amq.topic
exchange with a wildcard that matches everything.news
, where everything is any number of period-separated terms:
qpid-config add queue news qpid-config bind amq.topic news "#.news"
qpid-config add queue news
qpid-config bind amq.topic news "#.news"
news
queue for all messages whose routing key ends with .news
:
- Python
rxnews = ssn.receiver("news")
rxnews = ssn.receiver("news")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
rxnews = ssn.receiver("news;{create: always, node: {type:queue}, link:{x-bindings:[{exchange: 'amq.topic', queue: 'news', key: '#.news'}]}}")
rxnews = ssn.receiver("news;{create: always, node: {type:queue}, link:{x-bindings:[{exchange: 'amq.topic', queue: 'news', key: '#.news'}]}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- C++
Receiver rxnews = ssn.createReceiver("amq.topic/#.news;{node:{capabilities:[shared]}, link:{name: 'news'}}");
Receiver rxnews = ssn.createReceiver("amq.topic/#.news;{node:{capabilities:[shared]}, link:{name: 'news'}}");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
rxnews = ssn.receiver("amq.topic/#.news");
rxnews = ssn.receiver("amq.topic/#.news");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
#
symbol will match any number of period-separated terms. The #
will match exactly one term.
4.10. Headers Exchange Copy linkLink copied to clipboard!
4.10.1. The pre-configured Headers Exchange Copy linkLink copied to clipboard!
amq.match
.
4.10.2. Headers Exchange Copy linkLink copied to clipboard!
4.10.3. Create a Headers Exchange using qpid-config Copy linkLink copied to clipboard!
qpid-config
command creates a headers exchange called property-match
:
qpid-config add exchange headers property-match
qpid-config add exchange headers property-match
4.10.4. Create a Headers Exchange from an application Copy linkLink copied to clipboard!
headers-match
:
- Python
txheaders = ssn.sender("headers-match;{create: always, node: {type: topic, x-declare: {exchange: headers-match, type: headers}}}")
txheaders = ssn.sender("headers-match;{create: always, node: {type: topic, x-declare: {exchange: headers-match, type: headers}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.10.5. Publish to a Headers Exchange Copy linkLink copied to clipboard!
properties
. For example:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.10.6. Subscribe to a Headers Exchange Copy linkLink copied to clipboard!
- Changes
- Updated April 2013.
- Updated July 2013.
match-q
, and subscribes it to the amq.match
exchange using a binding key that matches messages that have a header key header1
with a value of value1
:
- Python
rxheaders = ssn.receiver("match-q;{create: always, node: {type: queue}, link:{x-bindings:[{key: 'binding-name', exchange: 'amq.match', queue: 'match-q', arguments:{'x-match': 'any', 'header1': 'value1'}}]}}")
rxheaders = ssn.receiver("match-q;{create: always, node: {type: queue}, link:{x-bindings:[{key: 'binding-name', exchange: 'amq.match', queue: 'match-q', arguments:{'x-match': 'any', 'header1': 'value1'}}]}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- C++
Receiver rxheaders = ssn.createReceiver("amq.match; {link: {name:match-q, filter:{value:{'x-match': 'any', 'header1': 'value1'}, name: headers, descriptor:'apache.org:legacy-amqp-headers-binding:map'}}}");
Receiver rxheaders = ssn.createReceiver("amq.match; {link: {name:match-q, filter:{value:{'x-match': 'any', 'header1': 'value1'}, name: headers, descriptor:'apache.org:legacy-amqp-headers-binding:map'}}}");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
x-match
argument can take the values any
, which matches messages with any of the key value pairs in the binding, or all
, which matches messages that have all the key value pairs from the binding key in their header.
x-binding
, and so a filter is used.
x-binding
. Note the x-bindings
argument key
. This argument creates a named handle for the binding, which is visible when running qpid-config exchanges -r
. Without a handle, a binding cannot be deleted by name. A null
key is valid, but in addition to not being able to be deleted by name, when a binding is created with a null
handle, any further attempt to create a binding with a null
handle on that exchange will be update the existing binding rather than create a new one.
4.11. XML Exchange Copy linkLink copied to clipboard!
4.11.1. Custom Exchange Types Copy linkLink copied to clipboard!
4.11.2. The pre-configured XML Exchange Type Copy linkLink copied to clipboard!
4.11.3. Create an XML Exchange Copy linkLink copied to clipboard!
qpid-config
command creates an XML exchange called myxml
:
qpid-config add exchange xml myxml
qpid-config add exchange xml myxml
- Python
tx = ssn.sender("myxml; {create: always, node: {type: topic, x-declare: {exchange: myxml, type: xml}}}")
tx = ssn.sender("myxml; {create: always, node: {type: topic, x-declare: {exchange: myxml, type: xml}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
4.11.4. Subscribe to the XML Exchange Copy linkLink copied to clipboard!
myxml
by creating a queue xmlq
and binding it to the exchange with an XQuery.
- Python
rxXML = ssn.receiver("myxmlq; {create:always, link: { x-bindings: [{exchange:myxml, key:weather, arguments:{xquery:'./weather'} }]}}")
rxXML = ssn.receiver("myxmlq; {create:always, link: { x-bindings: [{exchange:myxml, key:weather, arguments:{xquery:'./weather'} }]}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- C++
Receiver rxXML = ssn.createReceiver("myxml/weather; {link: {name:myxmlq, filter:{name:myfilter, descriptor:'apache.org:query-filter:string', value:'./weather'}}}");
Receiver rxXML = ssn.createReceiver("myxml/weather; {link: {name:myxmlq, filter:{name:myfilter, descriptor:'apache.org:query-filter:string', value:'./weather'}}}");
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
./weather
will match any messages whose body content has the root XML element <weather>
.
key
argument for x-bindings
. This ensures that the binding has a unique name, allowing it to be deleted and updated by name, and ensuring that it is not accidentally updated, as might be the case if it were anonymous in the namespace of the exchange.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Chapter 5. Message Delivery and Acceptance Copy linkLink copied to clipboard!
5.1. The Lifecycle of a Message Copy linkLink copied to clipboard!
5.1.1. Message Delivery Overview Copy linkLink copied to clipboard!
Figure 5.1. Fanout Exchange
message.subject
, which acts as the routing key (2), and then send the message to the broker (3).
5.1.2. Message Generation Copy linkLink copied to clipboard!
Message
object is used to generate a message.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
5.1.3. Message Send over Reliable Link Copy linkLink copied to clipboard!
- The sender passes the message to the broker.
- The broker responds with an acknowledgement that it takes responsibility for delivery of the message.
- The sender deletes its local copy of the message.
5.1.4. Message Send over Unreliable Link Copy linkLink copied to clipboard!
- The sender passes the message to the broker.
- The sender deletes the local copy of the message.
5.1.5. Message Distribution on the Broker Copy linkLink copied to clipboard!
5.1.6. Message Receive over Reliable Link Copy linkLink copied to clipboard!
- The broker passes the message to the receiver.
- The receiver acknowledges responsibility for the message. In this case the broker deletes the server-side copy of the message.
- The receiver rejects the message. In this case the broker routes the message to an
alternate-exchange
if one is defined for the queue, or else discards the message. - The receiver releases the message. In this case the broker returns the message to the queue with a message header
redelivered:true
. - The receiver disconnects without acknowledging or rejecting the message. In this case the broker returns the message to the queue with a message header
redelivered:true
.
5.1.7. Message Receive over Unreliable Link Copy linkLink copied to clipboard!
- The broker passes the message to the receiver.
- The broker deletes the server-side copy of the message.
5.2. Browsing and Consuming Messages Copy linkLink copied to clipboard!
5.2.1. Message Acquisition and Acceptance Copy linkLink copied to clipboard!
The included drain
program can be used in either browse or acquisition mode.
drain
can be found in:
/usr/share/doc/python-qpid-0.14/examples/api/drain /usr/share/qpidc/examples/messaging/drain.cpp
/usr/share/doc/python-qpid-0.14/examples/api/drain
/usr/share/qpidc/examples/messaging/drain.cpp
qpid-config
command:
qpid-config add queue browse-acquire-demo
qpid-config add queue browse-acquire-demo
browse-acquire-demo
queue when you run qpid-config queues
.
browse-acquire-demo
using spout
. Spout
is included in the same packages as drain
, and can be found in the same directories. Run spout to send a message to the queue:
./spout browse-acquire-demo "Hello World"
./spout browse-acquire-demo "Hello World"
browse-acquire-demo
queue. Let's use drain to browse it first of all:
./drain -c 0 "browse-acquire-demo; {mode:browse}"
./drain -c 0 "browse-acquire-demo; {mode:browse}"
drain
a second time, and you'll see the message again. Running the drain program twice simulates two different browsing consumers accessing the queue. The message is read and remains available for other consuming applications when it is browsed.
browse-acquire-demo
queue using qpid-config
:
qpid-config del queue browse-acquire-demo
qpid-config del queue browse-acquire-demo
qpid-config
responds with an error because a message remains in the queue.
./drain -c 0 "browse-acquire-demo"
./drain -c 0 "browse-acquire-demo"
qpid-config
:
qpid-config del queue browse-acquire-demo
qpid-config del queue browse-acquire-demo
drain
demo is the fact that browsers see a message only once. Because each time drain
is run it creates a different browser, it sees the message in the queue each time. The same browser, however, sees the message only once, no matter how many times it looks.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
drain
to examine the queue:
./drain -c 0 browse-acquire-demo
./drain -c 0 browse-acquire-demo
When our receiver acquired the message from the queue, the broker set the message to acquired
. When a message is acquired
, the broker treats the message as if it has been delivered, but it does not delete it from the queue. One of a number of things happen from here: the consumer who acquired the message acknowledges the message, releases the message, or rejects the message, or the consumer might disconnect through a network failure.
acquired
, and message consumers browsing or fetching from the queue will not see the message. When our application disconnects without acknowledging receipt, the broker switches the message out of acquired
state and sets a header redelivered=True
. The message is then made available to other consumers, such as the drain
browser that we ran after our application closed.
redelivered=true
'. This alerts the other nodes that this message may have already been acted on, and they can perform checks to see if that is so. This narrows the window for exceptions even further, when the applications are designed to take advantage of these features.
connection.close()
line:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
redelivered
to inform us that another consumer acquired this message previously. We have now acquired this message, and it will again disappear for other consumers browsing or fetching from this queue. This time, however, we call session.acknowledge()
before closing the connection. This method acknowledges receipt of the message (it acknowledges all messages as-yet unacknowledged for the session). Since we have acknowledged receipt of the message, the message is acquired
, and it is removed from the queue.
drain
now, you will see that there are no messages in the queue.
A consumer can explicitly release a message. When this happens, the message is returned to the queue for redelivery. The effect is the same as if the consumer lost its connection to the broker.
acknowledge()
method with the message and Disposition(RELEASED)
as parameters:
session.acknowledge(msg, Disposition(RELEASED))
session.acknowledge(msg, Disposition(RELEASED))
release()
method.
Note that this two-phase acquisition and acceptance behavior is the behavior over a reliable link (technically an at-least-once link), which is the default link for receiver connections to the broker. If you explicitly connect your receiver to a queue using an unreliable link, or directly connect to an exchange, then received messages are immediately acquired with no need to acknowledge them.
To delete the queue we used for this demo, you can either restart the broker (all non-durable
queues are deleted when the broker is restarted), or you can use qpid-config
:
qpid-config del queue browse-acquire-demo
qpid-config del queue browse-acquire-demo
--force
switch to override this check and delete a queue with messages in it, or you can use drain
to empty the queue, and then reissue the command on the now-empty queue.
5.2.2. Message Acquisition and Acceptance on an Unreliable Link Copy linkLink copied to clipboard!
link: {reliability: unreliable}
in the address. For example, to create a receiver with an unreliable link to a queue named "browse-acquire-demo":
- Python
rxacquire = session.receiver("browse-acquire-demo; {link:{reliability: unreliable}")
rxacquire = session.receiver("browse-acquire-demo; {link:{reliability: unreliable}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Acquirer 2 saw message: Message(redelivered=True, properties={'x-amqp-0-10.routing-key': u'browse-acquire-demo'}, content='Hello World')
Acquirer 2 saw message:
Message(redelivered=True, properties={'x-amqp-0-10.routing-key': u'browse-acquire-demo'}, content='Hello World')
Acquirer 2 saw message: None
Acquirer 2 saw message:
None
unreliable
.
It is not possible to release or reject messages acquired over an unreliable link. Over an unreliable link messages are implicitly acknowledged when they are fetched.
5.2.3. Message Rejection Copy linkLink copied to clipboard!
alternate exchange
, then the rejected message is routed there; otherwise it is discarded.
acknowledge()
method of the session, passing in the message that you wish to reject, and specify REJECTED
as the Disposition
parameter:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
unreliable
link, mesages are implicitly acknowledged when they are fetched.
5.2.4. Receiving Messages from Multiple Sources Copy linkLink copied to clipboard!
Prerequisites:
Receiver
object receives messages from a single subscription. An application can create many receivers, and may wish to deal with messages from these various receivers in the order that the messages are received. The session
object provides a method nextReceiver
that allows an application to read messages from multiple receivers in a federated order.
prefetch
must be enabled for the receivers, and the receivers must be using the same session.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - .NET/C#
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
5.2.5. Rejected and Orphaned Messages Copy linkLink copied to clipboard!
5.2.6. Alternate Exchange Copy linkLink copied to clipboard!
- Messages that are acquired and then rejected by a message consumer (rejected messages).
- Unacknowledged messages in a queue that is deleted (orphaned messages).
- Messages sent to the exchange with a routing key for which there is no matching binding on the exchange.
Chapter 6. Advanced Queue Features Copy linkLink copied to clipboard!
6.1. Browse-only Queues Copy linkLink copied to clipboard!
spout
and drain
programs are part of the client libraries package and when installed can be found at:
/usr/share/doc/python-qpid-${version}/examples/api/
/usr/share/doc/python-qpid-${version}/examples/api/
6.2. Ignore Locally Published Messages Copy linkLink copied to clipboard!
no-local
key in the queue declaration as a key:value pair. The value of the key is ignored; the presence of the key is sufficient.
qpid-config
:
qpid-config add queue noloopbackqueue1 --argument no-local=true
qpid-config add queue noloopbackqueue1 --argument no-local=true
6.3. Exclusive Queues Copy linkLink copied to clipboard!
6.4. Server-side Selectors Copy linkLink copied to clipboard!
6.4.1. Select messages using a filter Copy linkLink copied to clipboard!
selector
in the link portion of the connection URL.
green
, red
, or blue
as the value of the color
property:
queue_name;{link:{selector:"color in ('green', 'red', 'blue')"}}
queue_name;{link:{selector:"color in ('green', 'red', 'blue')"}}
queue_name;{link:{selector:"amqp.priority = 1"}} queue_name;{link:{selector:"amqp.priority IS BETWEEN 3 AND 6"}} queue_name;{link:{selector:"myflag AND amqp.redelivered"}} queue_name;{link:{selector:"msg_title LIKE '%news%'"}
queue_name;{link:{selector:"amqp.priority = 1"}}
queue_name;{link:{selector:"amqp.priority IS BETWEEN 3 AND 6"}}
queue_name;{link:{selector:"myflag AND amqp.redelivered"}}
queue_name;{link:{selector:"msg_title LIKE '%news%'"}
With Python, selectors can be used by temporary syntax. For example, the C++ address with selector:
queue_name;{link:{selector:"myproperty = 1"}}
queue_name;{link:{selector:"myproperty = 1"}}
queue_name;{link:{'x-subscribe': {'arguments': {'x-apache-selector': "myproperty = 1"}}}}
queue_name;{link:{'x-subscribe': {'arguments': {'x-apache-selector': "myproperty = 1"}}}}
The Qpid Java client does not currently support server-side selectors, only JMS selectors. JMS selectors function differently than server-side selectors. Consult the JMS specification for more detail on JMS slectors.
6.4.2. Server-side selector syntax Copy linkLink copied to clipboard!
E
" for exponent and the boolean values true
and false
, are case-insensitive.
''
becomes '
.
( "ESCAPE" LiteralString )
clause, LiteralString
is limited to a one character string. The characters %
and _
are not allowed.
6.5. Automatically Deleted Queues Copy linkLink copied to clipboard!
6.5.1. Automatically Deleted Queues Copy linkLink copied to clipboard!
qpid-config
utility to receive information from the message broker are an example of this pattern.
auto-delete
is deleted by the broker after the last consumer has released its subscription to the queue. After the auto-delete
queue is created, it becomes eligible for deletion as soon as a consumer subscribes to the queue. When the number of consumers subscribed to the queue reaches zero, the queue is deleted.
- Python
responsequeue = session.receiver('my-response-queue; {create:always, node:{x-declare:{auto-delete:True}}}')
responsequeue = session.receiver('my-response-queue; {create:always, node:{x-declare:{auto-delete:True}}}')
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Note
default
exchange: a pre-configured nameless direct exchange.
A custom timeout can be configured to provide a grace period before the deletion occurs.
Note
qpid.auto_delete_timeout:0
is specified, the parameter has no effect: setting the parameter to 0 turns off the delayed auto-delete function.
- Python
responsequeue = session.receiver("my-response-queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{'qpid.auto_delete_timeout':120}}}}")
responsequeue = session.receiver("my-response-queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{'qpid.auto_delete_timeout':120}}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
exclusive
and auto-delete
; these queues are deleted by the broker when the session that declared the queue ends, since the session that declared the queue is only possible subscriber.
6.5.2. Automatically Deleted Queue Example Copy linkLink copied to clipboard!
auto-delete-producer.py
. It can be run using a Python interpreter.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
durable
queues are deleted. This allows you to start this test with a clean slate.
qpid-config queues
qpid-config queues
exclusive
and auto-del
. This is the queue that qpid-config
is using to retrieve the list of queues, and will change each time you run the command.
auto-delete-producer.py
program using a Python interpreter:
python auto-delete-producer.py
python auto-delete-producer.py
qpid-config queues
again to list the queues on the broker. This time you will see the test-queue
that our program created. Our program has exited, but the queue has not been deleted because so far no-one has subscribed to it.
amqp1.0
behaviour. Using amqp0-10
the queue is deleted when not in use only if there have been consumers, using amqp1.0
the queue is deleted when not in use even if there have never been any consumers.
drain
utility to examine the messages on the queue. The drain
utility is part of the C++ and Python client library packages.
drain
runs, it subscribes to the queue, retrieves messages, and then unsubscribes. Run:
drain -c 0 test-queue
drain -c 0 test-queue
qpid-config queues
now, you will see that the test-queue has been deleted. A consumer subscribed to the queue, and then unsubscribed.
drain
to browse the queue, rather than acquire the messages:
drain -c 0 "test-queue;{mode:browse}"
drain -c 0 "test-queue;{mode:browse}"
auto-delete-subscribe.py
:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
auto-delete-producer.py
. When it pauses, run auto-delete-subscriber.py
, then check qpid-config queues
. You'll see that the queue has been deleted.
drain
to browse the test-queue. It doesn't exist.
auto-delete-producer.py
was deleted when our consumer program subscribed to the queue by creating and attaching a receiver, and then unsubscribed by closing the connection. The second message sent by our message producer was never delivered and no exception was raised.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
auto-delete-producer.py
program. Run auto-delete-subscriber.py
in the first pause. Previously, this would delete the queue, and the second message would go nowhere. This time our producer's own subscription is keeping the queue alive. Press Enter to have auto-delete-producer.py
send the second message. Now check the queue using either drain
or auto-delete-subscriber.py
. This time you'll see that the queue exists and the message has been delivered as expected.
6.5.3. Queue Deletion Checks Copy linkLink copied to clipboard!
- If ACL is enabled, the broker will check that the user who initiated the deletion has permission to do so.
- If the
ifEmpty
flag is passed the broker will raise an exception if the queue is not empty - If the
ifUnused
flag is passed the broker will raise an exception if the queue has subscribers - If the queue is exclusive the broker will check that the user who initiated the deletion owns the queue
6.6. Last Value (LV) Queues Copy linkLink copied to clipboard!
6.6.1. Last Value Queues Copy linkLink copied to clipboard!
6.6.2. Declaring a Last Value Queue Copy linkLink copied to clipboard!
qpid.last_value_queue_key
when creating the queue.
stock-ticker
that uses stock-symbol
as the key, using qpid-config
:
qpid-config add queue stock-ticker --argument qpid.last_value_queue_key=stock-symbol
qpid-config add queue stock-ticker --argument qpid.last_value_queue_key=stock-symbol
- Python
myLastValueQueue = mySession.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
myLastValueQueue = mySession.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
RHT
", "JAVA
", and other string values; and also 3
, 15
, and other integer values.
6.6.3. Last Value Queue Example Copy linkLink copied to clipboard!
- Python
import sys from qpid.messaging import *
import sys from qpid.messaging import *
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
connection = Connection("localhost:5672") connection.open()
connection = Connection("localhost:5672") connection.open()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
session = connection.session()
session = connection.session()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
stockSender = session.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
stockSender = session.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
qpid-config
command line tool:
qpid-config add queue stock-ticker --argument qpid.last_value_queue_key=stock-symbol
qpid-config add queue stock-ticker --argument qpid.last_value_queue_key=stock-symbol
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
msg4
updating msg1
. To contrast the behavior of the last value queue with a standard FIFO queue, we'll send our messages to a control queue, called control-queue at the same time:
- Python
controlSender = session.sender("control-queue;{create:always, node:{type:queue}}")
controlSender = session.sender("control-queue;{create:always, node:{type:queue}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
stockBrowser = session.receiver("stock-ticker; {mode:browse}") controlBrowser = session.receiver("control-queue; {mode:browse}")
stockBrowser = session.receiver("stock-ticker; {mode:browse}") controlBrowser = session.receiver("control-queue; {mode:browse}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
session.receiver("stock-ticker")
, and run the demo again. With the receivers browsing, you will be able to see more distinctly the effect of a Last Value Queue over time by running the demo several times in succession without clearing the queues.
available()
method. We do this by setting the receivers' prefetch capacity
to a value higher than the default of 0:
- Python
stockBrowser.capacity = 20 controlBrowser.capacity = 20
stockBrowser.capacity = 20 controlBrowser.capacity = 20
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
sleep 10
sleep 10
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
sleep
from the time library:
- Python
from time import sleep
from time import sleep
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
available()
property of the receiver with certainty that this represents the number of messages in the queue. When operating asynchronously available()
reports the number of messages available locally. After a ten second delay, we can be reasonably certain that this is the total number of messages in the queue. In an actual asynchronous operation you would not want to block execution of your application. Instead you would use a pattern like this:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
session.acknowledge() connection.close()
session.acknowledge() connection.close()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
6.6.4. Last Value Queue Command-line Example Copy linkLink copied to clipboard!
drain
and spout
can be used for sending and receiving messages for testing purposes. The source code for the two utilities is included in the Python and C++ client library packages. The Python version can be run uncompiled using a Python interpreter.
qpid-config
command to create a Last Value Queue:
qpid-config add queue my-queue --argument qpid.last_value_queue_key=type
qpid-config add queue my-queue --argument qpid.last_value_queue_key=type
type
' is used to match messages in the queue.
drain
command:
./drain -f -c 0 'my-queue; {mode: browse}'
./drain -f -c 0 'my-queue; {mode: browse}'
spout
to send messages to the queue, setting a header value for the key 'type
':
./drain -c 0 'my-queue; {mode: browse}'
./drain -c 0 'my-queue; {mode: browse}'
type
' values.
6.7. Priority Queuing Copy linkLink copied to clipboard!
6.7.1. Priority Queuing Copy linkLink copied to clipboard!
qpid.priority
attribute. This attribute is an integer value between 1 and 10, and defines the number of distinct priority levels for the queue.
qpid.priority
attribute of a queue is set to 10, there are ten distinct priority levels for the queue. In this case a message with a priority level of 10 is delivered before a message with a priority of 9, which is delivered before a message with a priority level of 5, which is delivered before a message with a priority level of 1.
qpid.priority
attribute of a queue is set to 2, there are two distinct priority levels for the queue. In this case message priorities 6-10 is the queue priority level 1, and message priorities 1-5 is the queue priority level 2. Messages in the same priority band are delivered based on their priority and the order in which they are received.
6.7.2. Declaring a Priority Queue Copy linkLink copied to clipboard!
qpid.priorities
in the x-declare
arguments of the node declaration. For example:
- Python
sender = session.sender('my-queue; {create: always, node:{x-declare:{arguments:{qpid.priorities:10}}}}')
sender = session.sender('my-queue; {create: always, node:{x-declare:{arguments:{qpid.priorities:10}}}}')
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
qpid-config
:
qpid-config add queue 'my-queue; {create: always, node:{x-declare:{arguments:{qpid.priorities:10}}}}'
qpid-config add queue 'my-queue; {create: always, node:{x-declare:{arguments:{qpid.priorities:10}}}}'
6.7.3. Considerations when using Priority Queues Copy linkLink copied to clipboard!
Priority Queues deliver messages to acquiring consumers in order of priority, rather than the usual First-In-First-Out (FIFO) order of a queue. The delivery order for browsing consumers is "undefined". At the time of writing, browsing consumers receive messages from a priority queue in FIFO order; however, you should not rely on this behavior in your applications, as it may change in the future.
If the message enqueue rate sufficient outpaces the dequeue rate in a priority queue, it is possible that lower priority messages may never be removed from the queue. To avoid this situation the Fairshare feature allows a consumer to take a specified block of message from each priority level in turn.
6.7.4. Priority Queue Demonstration Copy linkLink copied to clipboard!
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Create queue with 2 or 10 priority levels? 10 Creating a priority queue with 10 priority levels: priority-demo-queue; {create: always, node:{x-declare: {auto-delete:True, arguments: {'qpid.priorities': 10}}}}
Create queue with 2 or 10 priority levels?
10
Creating a priority queue with 10 priority levels:
priority-demo-queue; {create: always, node:{x-declare: {auto-delete:True, arguments: {'qpid.priorities': 10}}}}
auto-delete: True
to allow the program to be run multiple times with different values for qpid.priorities
. If the queue already exists when the sender is created, the value given for qpid.priorities
has no effect. This value only has an effect when the queue is created.
First message sent: Message(priority=1, content='priority 1') Second message sent: Message(priority=4, content='priority 4')
First message sent:
Message(priority=1, content='priority 1')
Second message sent:
Message(priority=4, content='priority 4')
BROWSE PRIORITY QUEUE First browse in priority queue: Message(priority=1, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 1') Second browse in priority queue: Message(priority=4, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 4')
BROWSE PRIORITY QUEUE
First browse in priority queue:
Message(priority=1, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 1')
Second browse in priority queue:
Message(priority=4, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 4')
ACQUIRE PRIORITY QUEUE First message in priority queue: Message(priority=4, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 4') Second message in priority queue: Message(priority=1, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 1')
ACQUIRE PRIORITY QUEUE
First message in priority queue:
Message(priority=4, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 4')
Second message in priority queue:
Message(priority=1, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 1')
ACQUIRE NON-PRIORITY QUEUE First message in non-priority queue: Message(priority=1, properties={'x-amqp-0-10.routing-key': u'nonpriority-demo-queue'}, content='priority 1') Second message in non-priority queue: Message(priority=4, properties={'x-amqp-0-10.routing-key': u'nonpriority-demo-queue'}, content='priority 4')
ACQUIRE NON-PRIORITY QUEUE
First message in non-priority queue:
Message(priority=1, properties={'x-amqp-0-10.routing-key': u'nonpriority-demo-queue'}, content='priority 1')
Second message in non-priority queue:
Message(priority=4, properties={'x-amqp-0-10.routing-key': u'nonpriority-demo-queue'}, content='priority 4')
6.8. Message Groups Copy linkLink copied to clipboard!
6.8.1. Message Groups Copy linkLink copied to clipboard!
6.8.2. Create a Queue with Message Groups enabled Copy linkLink copied to clipboard!
qpid.group_header_key
and qpid.shared_msg_group
in the queue creation arguments.
qpid.group_header_key
is the header key that will be used to match messages on. Messages with the same value for this key in their header belong to the same group.
qpid.shared_msg_group
should be set to 1
.
- Python
groupedSender = session.sender("my-grouped-msg-queue; {create: always, node: {x-declare: {auto-delete: True, arguments: {'qpid.group_header_key': 'msgGroupID', 'qpid.shared_msg_group': 1}}}}")
groupedSender = session.sender("my-grouped-msg-queue; {create: always, node: {x-declare: {auto-delete: True, arguments: {'qpid.group_header_key': 'msgGroupID', 'qpid.shared_msg_group': 1}}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Sender groupedSender = session.createSender("my-grouped-msg-queue; {create:always, node: {x-declare: {auto-delete: True, arguments: {'qpid.group_header_key':'msgGroupID', 'qpid.shared_msg_group':1}}}}")
Sender groupedSender = session.createSender("my-grouped-msg-queue; {create:always, node: {x-declare: {auto-delete: True, arguments: {'qpid.group_header_key':'msgGroupID', 'qpid.shared_msg_group':1}}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
6.8.3. Message Group Consumer Requirements Copy linkLink copied to clipboard!
redelivered=True
, and the rest of the group is missing.
6.8.4. Configure a Queue for Message Groups using qpid-config Copy linkLink copied to clipboard!
qpid-config
command creates a queue called "MyMsgQueue", with message grouping enabled and using the header key "GROUP_KEY" to identify message groups.
qpid-config add queue MyMsgQueue --group-header="GROUP_KEY" --shared-groups
qpid-config add queue MyMsgQueue --group-header="GROUP_KEY" --shared-groups
6.8.5. Default Group Copy linkLink copied to clipboard!
qpid.no-group
. If a message cannot be assigned to any other group, it is assigned to this group.
6.8.6. Override the Default Group Name Copy linkLink copied to clipboard!
qpid.no-group
. You can change this default group name by supplying a value for the default-message-group
configuration parameter to the broker at start-up. For example, using the command line:
qpidd --default-message-group "EMPTY-GROUP"
qpidd --default-message-group "EMPTY-GROUP"
6.8.7. Message Groups Demonstration Copy linkLink copied to clipboard!
message-groups.py
, then run it using Python on a machine with the messaging broker started.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
The program sends messages from two different Groups - A
and B
- to a queue. Here is an example of the output when message groups are disabled:
Chapter 7. Asynchronous Messaging Copy linkLink copied to clipboard!
7.1. Asynchronous Operations Copy linkLink copied to clipboard!
7.2. Asynchronous Sending Copy linkLink copied to clipboard!
7.2.1. Synchronous and Asynchronous Send Copy linkLink copied to clipboard!
send()
method of a send
object is asynchronous - it returns immediately, without waiting for a receipt from the broker:
- Python
sender.send(message, sync = False)
sender.send(message, sync = False)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
sender.send(message, false)
sender.send(message, false)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow Note that this is the default behavior for the C++ API.
7.2.2. Sender Capacity Copy linkLink copied to clipboard!
capacity
is the property of a sender object that controls the number of asynchronous sends pending acknowledgement from the server that the sender will permit. These unacknowledged messages are buffered in memory for retransmission in the event of a link failure, so the sender capacity is also known as the sender replay buffer size.
UNLIMITED
, meaning that the sender will allow an unlimited number of asynchronous calls to be made, and buffer a number of messages that is limited only by the memory limits of the system.
capacity
is set to a number other than UNLIMITED, the sender will allow only that many asynchronous send operations to be outstanding at the same time.
capacity
is set to 10, then a maximum of 10 asynchronous send operations can be awaiting acknowledgement at the same time for the sender. If 10 asynchronous send operations are invoked, and an 11th operation is attempted before any of those 10 are acknowledged by the broker, then the sender will block until one of the asynchronous send operations is acknowledged by the broker.
7.2.3. Set Sender Capacity Copy linkLink copied to clipboard!
capacity
property of a sender. In C++, the sender capacity is set using the setCapacity
method.
- Python
sender.capacity = 20
sender.capacity = 20
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
sender.setCapacity(20)
sender.setCapacity(20)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
7.2.4. Query Sender Capacity Copy linkLink copied to clipboard!
- Sender Capacity
- The maximum number of asynchronously sent messages that can be pending acknowledgement at any given time. By default this is
UNLIMITED
, but it can be changed to constrain the number of unsettled asynchronous calls. An attempt to make a further asynchronous call when the sender is at capacity will block until another sent message is acknowledged by the broker.- C++
sender.getCapacity()
sender.getCapacity()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
sender.capacity
sender.capacity
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Sender Unsettled
- The number of asynchronous sends pending acknowledgement from the broker.
- C++
sender.getUnsettled()
sender.getUnsettled()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
sender.unsettled()
sender.unsettled()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Sender Available
- The number of additional asynchronous calls that the sender can accept at the moment. This value is available as a property, but can also be computed from
sender.capacity
-sender.unsettled
.- C++
sender.getAvailable()
sender.getAvailable()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
sender.available()
sender.available()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
7.2.5. Avoiding a Blocked Asynchronous Send Copy linkLink copied to clipboard!
- C++
if (sender.getAvailable() > 0) sender.send(message, false) // else drop the message
if (sender.getAvailable() > 0) sender.send(message, false) // else drop the message
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
if sender.available() > 0: sender.send(message, sync=False) else: # drop the message
if sender.available() > 0: sender.send(message, sync=False) else: # drop the message
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- C++
sender.setCapacity(SOME_LARGE_NUMBER)
sender.setCapacity(SOME_LARGE_NUMBER)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
sender.capacity = SOME_LARGE_NUMBER
sender.capacity = SOME_LARGE_NUMBER
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
7.2.6. Asynchronous Message Sending Example Copy linkLink copied to clipboard!
- C++
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
7.2.7. Asynchronous Send and Link Reliability Copy linkLink copied to clipboard!
sender.capacity
is the number of unacknowledged sends that a sender will allow when sending asynchronously. The two-phase send/acknowledge behavior is a characteristic of a reliable link (technically known as a link with at-least-once reliability). The sender sends a message, and buffers that message locally until the server responds to acknowledge receipt of the message. This buffering of unacknowledged sent messages enables the sender to resend messages (sender replay) if the link is dropped and then re-established. When a reliable link is dropped and then transparently re-established, messages that were sent asynchronously but not acknowledged by the server are resent from the sender replay buffer.
unreliable
link when creating a sender. For example:
- Python
sender = session.sender("amq.topic;{link: {'reliability': 'unreliable'}}")
sender = session.sender("amq.topic;{link: {'reliability': 'unreliable'}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
unreliable
link, sender capacity has no meaning. On an unreliable link the server does not acknowledge receipt of messages. All messages are considered as good as acknowledged once they are sent. This is the meaning of unreliable
for a sender. If the link is dropped there is no way for the sender to know which messages made it to the broker and which were lost. This also means that over an unreliable link asynchronous senders will not block, as their capacity is never utilized.
Sender.capacity
is used to limit the exposure of an application to data loss, and the amount of memory that senders can consume with their replay buffer. It can also be used to throttle producers. You can use an unreliable link along with asynchronous send to maximize throughput without the implications of local memory required for the sender replay buffer, and no throttling of producers. However, you must be aware of the reduced reliability and employ this pattern in situations where the potential for data loss is not important.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
message number : unacknowledged messages : further async send capacity
message number : unacknowledged messages : further async send capacity
sender.capacity
(set to 5 in the program code) to see the impact it has on sender blocking.
sender.capacity
has no impact on the performance of the sender. Remember, however, that it is now unreliable:
7.3. Asynchronous Receiving Copy linkLink copied to clipboard!
7.3.1. Asynchronous Message Retrieval (Prefetch) Copy linkLink copied to clipboard!
fetch()
call. The receiver's capacity to prefetch messages is 0 by default.
capacity
property of a receiver.
- Prefetched messages are available locally when requested by the application, without the overhead of a synchronous call to retrieve a message from the broker.
- A receiver with prefetching enabled has an
available()
method that can be invoked to determine how many prefetched messages are available.
available()
method:
available()
as an absolute indicator of the state of the queue. For example, calling available()
immediately after setting the capacity of a receiver to something other than 0 is likely to return a value of 0 messages available. This does not necessarily mean that the queue has no messages, but rather that no prefetched messages are locally available yet.
available
method of a receiver with prefetching enabled will be the capacity
of the receiver. The available()
method reports the number of prefetched messages available, not the number of messages in the queue. If the number of available messages is less than the capacity of the receiver, however, you can infer that this is the number of messages in the queue, with the above caveat about the asynchronous nature of prefetching.
7.3.2. Enable Receiver Prefetch Copy linkLink copied to clipboard!
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
7.3.3. Asynchronously Acknowledging Received Messages Copy linkLink copied to clipboard!
acquired
on the broker until they are acknowledged by the consumer. When a message is in acquired
mode it is not visible in the queue. If the consumer disconnects without acknowledging receipt, the message will be moved out of acquired
and again become available to consumers, with the header redelivered=true
.
acknowledge()
method of the session
object:
- Python
session.acknowledge()
session.acknowledge()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
acknowledge()
method with no arguments acknowledges receipt of all as-yet-unacknowledged messages fetched using that session. To acknowledge a specific message, pass the message as an argument. For example:
- Python
msg = rx.fetch(timeout = 1) session.acknowledge(msg)
msg = rx.fetch(timeout = 1) session.acknowledge(msg)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
sync = False
parameter:
- Python
session.acknowledge(msg, sync = False)
session.acknowledge(msg, sync = False)
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
When an unreliable
link is requested for a receiver, acknowledgement is implicit when a message is fetched. This means that the broker marks the message as acquired as soon as the receiver fetches it. No acknowledgement is necessary, and no release or rejection of messages is possible.
7.3.4. Asynchronous Receive and Link Reliability Copy linkLink copied to clipboard!
unreliable
link is a potentially lossy situation. Over an unreliable
link, when an application is consuming (as opposed to browsing the queue) the broker deletes the message from the queue as soon as it is prefetched. It does not wait for acknowledgement from the consumer. If the consumer fails before it dispatches prefetched messages, the broker will not redeliver them.
unreliable
link - be aware of the implications.
Chapter 8. Reliability and Quality of Service Copy linkLink copied to clipboard!
8.1. Link Reliability Copy linkLink copied to clipboard!
8.1.1. Reliable Link Copy linkLink copied to clipboard!
An acquiring message consumer (also known as a competing message consumer) is a message consumer who removes messages from a queue, and makes them unavailable to other consumers. When an acquiring message consumer fetches a message from the broker over a reliable link, the message is set to acquired
. In the acquired state the message is not visible to other consumers. It is to all intents and purposes acquired by the consumer, but the broker maintains its copy in acquired state until the consumer acknowledges acquisition. At that point the broker considers the message reliably delivered, and will delete its copy.
redelivered: true
.
alternate exchange
, if one has been configured for this queue or exchange. If no alternate exchange
is configured, the message will be discarded.
When a message is sent to the broker over a reliable link, the sender maintains its local copy until the broker acknowledges receipt. At that time the sender deletes the local copy. When sending synchronously this causes the application to block until this exchange has taken place. When sending asynchronously these unacknowledged sent messages are stored in the sender replay buffer.
All links to queues are reliable by default. It is not necessary to explicitly request a reliable link when connecting to a queue.
link: {'reliability': 'at-least-once'}
in the address. For example:
sender = session.sender("amq.topic;{link: {'reliability': 'at-least-once'}}")
sender = session.sender("amq.topic;{link: {'reliability': 'at-least-once'}}")
8.1.2. Unreliable Link Copy linkLink copied to clipboard!
unreliable
link when establishing a connection to a queue.
unreliable
link, the broker deletes it immediately, without waiting for the consumer to acknowledge that it received and successfully actioned a message.
unreliable
link, although this is be no means certain. The most obvious use for an unreliable link is when a large volume of data is being transmitted at high speed and data loss is not an issue.
unreliable
link
To request an unreliable
link, specify link: {'reliability': 'unreliable'}
in the address for the receiver or sender. For example:
- Python
sender = session.sender("amq.topic;{link: {'reliability': 'unreliable'}}")
sender = session.sender("amq.topic;{link: {'reliability': 'unreliable'}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
8.2. Queue Sizing Copy linkLink copied to clipboard!
8.2.1. Controlling Queue Size Copy linkLink copied to clipboard!
qpid.max_size
) and maximum message count (qpid.max_count
) for the queue.
qpid.max_size
is specified in bytes. qpid.max_count
is specified as the number of messages.
qpid-config
creates a queue with a maximum size in memory of 200MB, and a maximum number of 5000 messages:
qpid-config add queue my-queue --max-queue-size=204800000 --max-queue-count 5000
qpid-config add queue my-queue --max-queue-size=204800000 --max-queue-count 5000
qpid.max_count
and qpid.max_size
directives go inside the arguments
of the x-declare
of the node
. For example, the following address will create the queue as the qpid-config
command above:
- Python
tx = ssn.sender("my-queue; {create: always, node: {x-declare: {'auto-delete': True, arguments:{'qpid.max_count': 5000, 'qpid.max_size': 204800000}}}}")
tx = ssn.sender("my-queue; {create: always, node: {x-declare: {'auto-delete': True, arguments:{'qpid.max_count': 5000, 'qpid.max_size': 204800000}}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
qpid.max_count
attribute will only be applied if the queue does not exist when this code is executed.
qpid.policy_type
The behavior when a queue reaches these limits is configurable. By default, on non-durable
queues the behavior is reject
: further attempts to send to the queue result in a TargetCapacityExceeded
exception being thrown at the sender.
qpid.policy_type
option. The possible values are:
- reject
- Message publishers throw an exception
TargetCapacityExceeded
. This is the default behavior for non-durable
queues. - ring
- The oldest messages are removed to make room for newer messages.
qpid-config
command sets the limit policy to ring
:
qpid-config add queue my-queue --max-queue-size=204800 --max-queue-count 5000 --limit-policy ring
qpid-config add queue my-queue --max-queue-size=204800 --max-queue-count 5000 --limit-policy ring
- Python
tx = ssn.sender("my-queue; {create: always, node: {x-declare: {'auto-delete': True, arguments:{'qpid.max_count': 5000, 'qpid.max_size': 204800, 'qpid.policy_type': 'ring'}}}}")
tx = ssn.sender("my-queue; {create: always, node: {x-declare: {'auto-delete': True, arguments:{'qpid.max_count': 5000, 'qpid.max_size': 204800, 'qpid.policy_type': 'ring'}}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
See Also:
8.2.2. Queue Threshold Alerts Copy linkLink copied to clipboard!
qpid.max_size
or qpid.max_count
) approaches 80% of its limit. The figure of 80% is configurable across the server using the broker option --default-event-threshold-ratio
. If you set this to zero, alerts are disabled for all queues by default. Additionally, you can override the default alert threshold per-queue using qpid.alert_count
and qpid.alert_size
when creating the queue.
qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#
. Alerts are sent as map messages.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
To avoid alert message flooding, there is a 60 second gap between alert messages. This can be overridden on a per-queue basis using the qpid.alert_repeat_gap
to specify a different value in seconds.
The following aliases are maintained for compatibility with earlier clients:
x-qpid-maximum-message-count
is equivalent toqpid.alert_count
x-qpid-maximum-message-size
is equivalent toqpid.alert_size
x-qpid-minimum-alert-repeat-gap
is equivalent toqpid.alert_repeat_gap
8.3. Producer Flow Control Copy linkLink copied to clipboard!
8.3.1. Flow Control Copy linkLink copied to clipboard!
ring
do not have queue flow thresholds enabled. These queues deal with reaching capacity through the ring
mechanism. All other queues with limits have two threshold values that are set by the broker when the queue is created:
- flow_stop_threshold
- the queue resource utilization level that enables flow control when exceeded. Once crossed, the queue is considered in danger of overflow, and the broker will cease acknowledging sent messages to induce producer flow control. Note that either queue size or message count capacity utilization can trigger this.
- flow_resume_threshold
- the queue resource utilization level that disables flow control when dropped below. Once crossed, the queue is no longer considered in danger of overflow, and the broker again acknowledges sent messages. Note that once trigger by either, both queue size and message count must fall below this threshold before producer flow control is deactivated.
qpid.max_size
of 204800 (200MB), and a flow_stop_threshold
of 80
, then the broker will initiate producer flow control if the queue reaches 80% of 204800, or 163840 bytes of enqueued messages.
flow_resume_threshold
, producer flow control is stopped. Setting the flow_resume_threshold
above the flow_stop_threshold
has the obvious consequence of locking producer flow control on, so don't do it.
8.3.2. Queue Flow State Copy linkLink copied to clipboard!
flowState
boolean in the queue's QMF management object. When this is true
flow control is active.
flowStoppedCount
that increments each time flow control becomes active for the queue.
8.3.3. Broker Default Flow Thresholds Copy linkLink copied to clipboard!
--default-flow-stop-threshold
= flow control activated at this percentage of capacity (size or count)--default-flow-resume-threshold
= flow control de-activated at this percentage of capacity (size or count)
qpidd --default-flow-stop-threshold=90 --default-flow-resume-threshold=75
qpidd --default-flow-stop-threshold=90 --default-flow-resume-threshold=75
8.3.4. Disable Broker-wide Default Flow Thresholds Copy linkLink copied to clipboard!
qpidd --default-flow-stop-threshold=100 --default-flow-resume-threshold=100
qpidd --default-flow-stop-threshold=100 --default-flow-resume-threshold=100
8.3.5. Per-Queue Flow Thresholds Copy linkLink copied to clipboard!
qpid.flow_stop_size
integer
flow stop threshold value in bytes.qpid.flow_resume_size
integer
flow resume threshold value in bytes.qpid.flow_stop_count
integer
flow stop threshold value as a message count.qpid.flow_resume_count
integer
flow resume threshold value as a message count.
8.4. Credit-based Flow Control Copy linkLink copied to clipboard!
8.4.1. Flow Control Using Credit Copy linkLink copied to clipboard!
8.4.2. Credit Allocation Modes Copy linkLink copied to clipboard!
- In credit mode, credit must be explicitly re-issued by the subscriber before the broker can recommence sending messages
- In window mode, the credit is automatically reissued for received messages. In this mode, the client indicates that a message has been received by informing the broker of the completion of the transfer. Though completion is essentially a form of acknowledgment, it should not be confused with acceptance which is an confirmation of ownership transfer.
8.5. Durable Queues Copy linkLink copied to clipboard!
8.5.1. Durable Queues Copy linkLink copied to clipboard!
8.5.2. Persistent Messages Copy linkLink copied to clipboard!
Message.setDurable(true)
to mark a message as persistent.
8.5.3. Create a durable queue in an application Copy linkLink copied to clipboard!
- C++
Sender sender = session.createSender("important-messages; {create:always, node:{durable: True}")
Sender sender = session.createSender("important-messages; {create:always, node:{durable: True}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Python
newqueue = session.sender("important-messages; {create:always, node:{durable: True}")
newqueue = session.sender("important-messages; {create:always, node:{durable: True}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
durable
and auto-delete
, it is only durable until it gets auto-deleted! Carefully consider if this is the behavior that you want.
8.5.4. Mark a message as persistent Copy linkLink copied to clipboard!
PERSISTENT
. For instance, in C++, the following code makes a message persistent:
message.getDeliveryProperties().setDeliveryMode(PERSISTENT);
message.getDeliveryProperties().setDeliveryMode(PERSISTENT);
A persistent message AND durable queue | Written to disk |
A persistent message AND non-durable queue | Not written to disk |
A non-persistent message AND non-durable queue | Not written to disk |
A non-persistent message AND durable queue | Not written to disk |
8.5.5. Durable Message State After Restart Copy linkLink copied to clipboard!
redelivered
flag on all recovered persistent messages.
redelivered
flag as a suggestion.
8.5.6. Journal Description Copy linkLink copied to clipboard!
8.5.7. Configure the Message Journal in an application Copy linkLink copied to clipboard!
qpid.file_size
and qpid.file_count
in the x-declare
arguments of the address used to create a queue:
- Python
tx = ssn.sender("my-queue;{create: always, node: {durable: True, x-declare: {arguments: {'qpid.file_size': 20, 'qpid.file_count': 12}}}}")
tx = ssn.sender("my-queue;{create: always, node: {durable: True, x-declare: {arguments: {'qpid.file_size': 20, 'qpid.file_count': 12}}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
8.6. Transactions Copy linkLink copied to clipboard!
8.6.1. Transactions Copy linkLink copied to clipboard!
8.6.2. Transactions Example Copy linkLink copied to clipboard!
- .NET/C#
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Chapter 9. Qpid Management Framework (QMF) Copy linkLink copied to clipboard!
9.1. QMF - Qpid Management Framework Copy linkLink copied to clipboard!
qmf.default.direct/broker
where qmf.default.direct
is the exchange, with a routing key or subject of broker
. The message should contain a reply-to
address from which the sender can receive responses.
9.2. QMF Versions Copy linkLink copied to clipboard!
9.3. Creating Exchanges from an Application Copy linkLink copied to clipboard!
test-fanout
Message(subject='broker', reply_to='qmf.default.topic/direct.6da5bfc3-44fb-4441-b834-6c5897b9606a;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_method_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'exchange', 'name': u'test-fanout', 'properties': {'exchange-type': u'fanout'}}})
Message(subject='broker', reply_to='qmf.default.topic/direct.6da5bfc3-44fb-4441-b834-6c5897b9606a;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_method_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'exchange', 'name': u'test-fanout', 'properties': {'exchange-type': u'fanout'}}})
9.4. Broker Exchange and Queue Configuration via QMF Copy linkLink copied to clipboard!
qpid-config
command-line utility uses QMF messages to perform many of its administration tasks.
9.5. Command Messages Copy linkLink copied to clipboard!
qmf.default.direct/broker
.
See Also:
9.6. QMF Command Message Structure Copy linkLink copied to clipboard!
QMF Command Messages are map messages. A QMF command message contains the keys _object_id
, _method_name
and _arguments
.
_object_id
is mandatory. Its value is a nested map identifying the target of the command. For QMF commands that administer the broker and its resources, the _object_id
map contains a single value with the key _object_name
containing the value org.apache.qpid.broker:broker:amqp-broker
. The _object_name
value has the following syntax 'package:class:id
'. The desired value may be obtained from the schema, using qpid-tool
.
_method_name
has the name of the command as its value and the key _arguments
contains a nested map of command arguments.
Two message properties, x-amqp-0-10.app-id
and qmf.opcode
must be set. The property x-amqp-0-10.app-id
should always have the value qmf2
and qmf.opcode
contains the value _method_request
.
To receive a response from the server, set the reply-to
address of the QMF command message to an address where you can receive messages. After the command message is sent to the broker's QMF address, the response arrives from the reply-to
address specified. The response message has the x-amqp-0-10.app-id
property set to qmf2
when using amqp0-10.
qmf.opcode
property is set to _method_response
. If an error was encountered, qmf.opcode
property will contain the value _exception
.
_arguments
. In the case of an exception, details of the exception are within a nested map against the key _values
.
9.7. Create Command Copy linkLink copied to clipboard!
create
command takes five arguments:
- type
- The type of object to be created, this can be a queue, exchange or binding.
- name
- The name of the object to be created. The
name
argument of a queue or exchange is a single value, for example a queue namedmy-queue
sets the name argument to a string of that value. The name of a binding uses the pattern exchange/queue/key, for example:amq.topic/my-queue/my-key
identifies a binding betweenmy-queue
and the exchangeamq.topic
with the binding keymy-key
. - properties
- The specific properties for the object to be created, value is a nested map.
- strict
- The strict argument takes a boolean value that is presently ignored. This value is intended to indicate whether the command will fail if any unrecognized properties have been specified.
- auto_delete_timeout
- Optional. If specified upon first declaring an auto-delete queue, specifies a delay, in seconds, after which the deletion will take place. Note: If the queue is re-declared after becoming eligible for deletion, but before the delay expires, then the queue will be not be deleted.
my-queue
. In this example my-queue
is configured to be auto-deleted after 10 seconds.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
9.8. Delete Command Copy linkLink copied to clipboard!
delete
command takes three arguments:
- type
- The type of object to be deleted, this can be a queue, exchange or binding.
- name
- The name of the object to be deleted. The
name
argument of a queue or exchange is a single value, for examplemy-queue
. The name of a binding uses the pattern exchange/queue/key, for example:amq.topic/my-queue/my-key
identifies a binding betweenmy-queue
and the exchangeamq.topic
with the binding keymy-key
. - options
- A nested map with the key
options
. This is presently unused.
9.9. List Command Copy linkLink copied to clipboard!
- Python
Message(subject='broker', reply_to='qmf.default.topic/direct.8b59a7ae-93f1-4450-9e43-1b0665bf622b;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_query_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_what': 'OBJECT', '_schema_id': {'_class_name': 'exchange'}})
Message(subject='broker', reply_to='qmf.default.topic/direct.8b59a7ae-93f1-4450-9e43-1b0665bf622b;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_query_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_what': 'OBJECT', '_schema_id': {'_class_name': 'exchange'}})
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
Message(subject='broker', reply_to='qmf.default.topic/direct.7f703720-c815-4c79-986c-354b3963bc76;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_query_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_what': 'OBJECT', '_schema_id': {'_class_name': 'queue'}})
Message(subject='broker', reply_to='qmf.default.topic/direct.7f703720-c815-4c79-986c-354b3963bc76;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_query_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_what': 'OBJECT', '_schema_id': {'_class_name': 'queue'}})
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
9.10. Queue and Exchange Creation using QMF Copy linkLink copied to clipboard!
test
:
- Python
Message(subject='broker', reply_to='qmf.default.topic/direct.8702f596-b112-427d-b93e-7e0ae28f2ae8;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_method_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'queue', 'name': u'test', 'properties': {}}})
Message(subject='broker', reply_to='qmf.default.topic/direct.8702f596-b112-427d-b93e-7e0ae28f2ae8;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_method_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'queue', 'name': u'test', 'properties': {}}})
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
test-fanout
:
- Python
Message(subject='broker', reply_to='qmf.default.topic/direct.81915d0a-d2e1-4cf9-9369-921bac725aab;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_method_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'exchange', 'name': u'test-fanout', 'properties': {'exchange-type': u'fanout'}}})
Message(subject='broker', reply_to='qmf.default.topic/direct.81915d0a-d2e1-4cf9-9369-921bac725aab;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_method_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'exchange', 'name': u'test-fanout', 'properties': {'exchange-type': u'fanout'}}})
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
9.11. QMF Events Copy linkLink copied to clipboard!
qmf.default.topic/agent.ind.event.org_apache_qpid_broker.$QMF_Event.#
, where $QMF_Event is one of the provided QMF Events from the following table:
QMF Event | Severity | Arguments |
---|---|---|
clientConnect
|
inform
|
rhost, user, properties
|
clientConnectFail
|
warn
|
rhost, user, reason, properties
|
clientDisconnect
|
inform
|
rhost, user, properties
|
brokerLinkUp
|
inform
|
rhost
|
brokerLinkDown
|
warn
|
rhost
|
queueDeclare
|
inform
|
rhost, user, qName, durable, excl, autoDel, altEx, args, disp
|
queueDelete
|
inform
|
rhost, user, qName
|
exchangeDeclare
|
inform
|
rhost, user, exName, exType, altEx, durable, autoDel, args, disp
|
exchangeDelete
|
inform
|
rhost, user, exName
|
bind
|
inform
|
rhost, user, exName, qName, key, args
|
unbind
|
inform
|
rhost, user, exName, qName, key
|
subscribe
|
inform
|
rhost, user, qName, dest, excl, args
|
unsubscribe
|
inform
|
rhost, user, dest
|
queueThresholdExceeded
|
warn
|
qName, msgDepth, byteDepth
|
See Also:
9.12. QMF Client Connection Events Copy linkLink copied to clipboard!
QMF queue | Purpose |
---|---|
qmf.default.topic/agent.ind.event.org_apache_qpid_broker.clientConnect.#
|
Client connections
|
qmf.default.topic/agent.ind.event.org_apache_qpid_broker.clientConnectFail.#
|
Failed connection attempts
|
qmf.default.topic/agent.ind.event.org_apache_qpid_broker.clientDisconnect.#
|
Client disconnections
|
client_ppid
[1]client_pid
client_process
9.13. ACL Lookup Query Methods Copy linkLink copied to clipboard!
Lookup
and LookupPublish
.
Lookup
method is a general query for any action, object, and set of properties. The LookupPublish
method is the optimized, per-message fastpath query.
allow
, deny
, allow-log
, or deny-log
.
Method: Lookup
Argument | Type | Direction |
---|---|---|
userId
|
long-string
|
I
|
action
|
long-string
|
I
|
object
|
long-string
|
I
|
objectName
|
long-string
|
I
|
propertyMap
|
field-table
|
I
|
result
|
long-string
|
O
|
Method: LookupPublish
Argument | Type | Direction |
---|---|---|
userId
|
long-string
|
I
|
exchangeName
|
long-string
|
I
|
routingKey
|
long-string
|
I
|
result
|
long-string
|
O
|
Management Properties and Statistics
Element | Type | Access | Description |
---|---|---|---|
maxConnections
|
uint16
|
ReadOnly
|
Maximum allowed connections
|
Element | Type | Access | Description |
---|---|---|---|
maxConnectionsPerIp
|
uint16
|
ReadOnly
|
Maximum allowed connections
|
maxConnectionsPerUser
|
uint16
|
ReadOnly
|
Maximum allowed connections
|
maxQueuesPerUser
|
uint16
|
ReadOnly
|
Maximum allowed queues
|
connectionDenyCount
|
uint64
| |
Number of connections denied
|
queueQuotaDenyCount
|
uint64
| |
Number of queue creations denied
|
Example
Procedure 9.1. ACL Lookup Example
- Start the broker using the example ACL file
acl-test-01-rules.acl
reproduced below, and withQPID_LOG_ENABLE=debug+:acl
. - Run the Python script
acl-test-01.py
. - Examine the Python program output and the broker log.
ACL File acl-test-01-rules.acl
Python Script acl-test-01.py
9.14. Using QMF in a Cluster Copy linkLink copied to clipboard!
Chapter 10. The Qpid Messaging API Copy linkLink copied to clipboard!
10.1. Handling Exceptions Copy linkLink copied to clipboard!
10.1.1. Messaging Exceptions Reference Copy linkLink copied to clipboard!
10.1.2. C++ Messaging Exceptions Class Hierarchy Copy linkLink copied to clipboard!
- MessagingException
- The base class for Messaging exceptions.
- InvalidOptionString : public MessagingException
- Thrown when the syntax of the option string used to configure a connection is not valid.
- KeyError : public MessagingException
- Thrown to indicate a failed lookup of some local object. For example when attempting to retrieve a session, sender or receiver by name.
- LinkError : public MessagingException
- Base class for exceptions thrown to indicate a failed lookup of some local object.
- AddressError : public LinkError
- Thrown to indicate a failed lookup of some local object. For example when attempting to retrieve a session, sender or receiver by name.
- ResolutionError : public AddressError
- Thrown when a syntactically correct address cannot be resolved or used.
- AssertionFailed : public ResolutionError
- Thrown when creating a sender or receiver for an address for which some asserted property of the node is not matched.
- NotFound : public ResolutionError
- Thrown on attempts to create a sender or receiver to a non-existent node.
- MalformedAddress : public AddressError
- Thrown when an address string with invalid syntax is used.
- ReceiverError : public LinkError
- FetchError : public ReceiverError
- NoMessageAvailable : public FetchError
- Thrown by Receiver::fetch(), Receiver::get() and Session::nextReceiver() to indicate that there no message was available before the timeout specified.
- SenderError : public LinkError
- SendError : public SenderError
- TargetCapacityExceeded : public SendError
- Thrown to indicate that the sender attempted to send a message that would result in the target node on the peer exceeding a preconfigured capacity.
- SessionError : public MessagingException
- TransactionError : public SessionError
- TransactionAborted : public TransactionError
- Thrown on Session::commit() if reconnection results in the transaction being automatically aborted.
- TransactionUnknown : public TransactionError
- The outcome of the transaction on the broker (commit or roll-back) is not known. This occurs when the connection fails after the commit was sent, but before a response is received.
- UnauthorizedAccess : public SessionError
- Thrown to indicate that the application attempted to do something for which it was not authorized by its peer.
- UnauthorizedAccess : public SessionError
- ConnectionError : public MessagingException
- TransportFailure : public MessagingException
- Thrown to indicate loss of underlying connection. When auto-reconnect is used this will be caught by the library and used to trigger reconnection attempts. If reconnection fails (according to whatever settings have been configured), then an instance of this class will be thrown to signal that.
10.1.3. Connection Exceptions Copy linkLink copied to clipboard!
qpid::messaging
namespace.
- Connection::Connection(const std::string&, const qpid::types::Variant::Map&)
MessagingException
if any of the options in the supplied map are not recognised.qpid::types::InvalidConversion
if any of the option values are of the wrong type.- Connection::Connection(const std::string& url, const std::string& options)
MessagingException
if any of the options in the supplied map are not recognised.qpid::types::InvalidConversion
if any of the option values are of the wrong type.InvalidOptionString
if the format of the option string is invalid.- Connection::setOption(const std::string& name, const qpid::types::Variant& value)
MessagingException
if the named option is not recognised.qpid::types::InvalidConversion
if the option value is of the wrong type.- Connection::open()
qpid::Url::Invalid
if the url is not valid (this may be the url supplied on construction or any of the reconnect_urls supplied via options).TransportFailure
if a connection could not be established.ConnectionError
for any other failure, including where the broker sends a connection.close control before the AMQP 0-10 defined connection handshake completes.qpid::types::InvalidConversion
if the broker sends an improperly encoded value for the 'known-host
' field of theconnection.open-ok control
as defined by AMQP 0-10 specification.- Connection::isOpen()
- Does not throw exceptions.
- Connection::close()
TargetCapacityExceeded
if any of the sessions established for the connection have attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if any of the sessions established for the connection have attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends a connection.close control (i.e. if broker initiates closing of an active connection just before the client does).MessagingException
if the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session while the close is in progress).TransportFailure
if a connection was lost while trying to perform the close 'handshake' with the broker.- Connection::createTransactionalSession(const std::string& name)
SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected which could happen on enabling transactions for the session (e.g. if the broker in question did not support transactions).ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of the session before it becomes active).TransportFailure
if the connection was lost (and if automatic reconnect is enabled could not be re-established).qpid::Url::Invalid
if reconnect is enabled and a url in thereconnect_urls
option list is invalid.qpid::types::InvalidConversion
if the broker were to send an improperly encoded value for the 'known-host
' field of theconnection.open-ok
control as defined by AMQP 0-10 specification.- Connection::createSession(const std::string&)
ConnectionError
if the broker to which the client is connected sends a connection.close control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of the session before it becomes active).TransportFailure
if the connection was lost (and if automatic reconnect is enabled could not be re-established).qpid::Url::Invalid
if reconnect is enabled and a url in thereconnect_urls
option list is invalid.qpid::types::InvalidConversion
if the broker were to send an improperly encoded value for the 'known-host
' field of the connection.open-ok control as defined by AMQP 0-10 specification.- Connection::getSession(const std::string&)
KeyError
if no session for the specified name exists.- Connection::getAuthenticatedUsername()
- Does not throw any exception.
10.1.4. Session Exceptions Copy linkLink copied to clipboard!
qpid::messaging
namespace.
- Session::close()
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::commit()
TransactionAborted
if the original AMQP 0-10 session is lost, e.g. due to failover, forcing an automatic rollback.TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::rollback()
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::acknowledge(bool)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::acknowledge(Message&, bool)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::acknowledgeUpTo(Message&, bool)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::reject(Message&)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.ThrowsSessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::release(Message&)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::sync(bool)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::getReceivable()
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::getUnsettledAcks()
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::nextReceiver(Receiver&, Duration)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::nextReceiver(Duration)
Receiver::NoMessageAvailable
if no message became available in time.TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.ThrowsSessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::createSender(const Address&)
ResolutionError
if there is an error in resolving the address.TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::createSender(const std::string&)
ResolutionError
if there is an error in resolving the address.MalformedAddress
if the syntax of the address string is not valid.TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::createReceiver(const Address&)
ResolutionError
if there is an error in resolving the address.TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends a connection.close control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::createReceiver(const std::string&)
ResolutionError
if there is an error in resolving the address.MalformedAddress
if the syntax of the address string is not valid.TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::getSender(const std::string&)
KeyError
if there is no sender for the specified name.- Session::getReceiver(const std::string&)
- KeyError if there is no receiver for the specified name.
- Session::checkError()
qpid::messaging::SessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.qpid::messaging::ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).qpid::messaging::MessagingException
if the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).- Session::getConnection()
- Does not throw exceptions.
- Session::hasError()
- Does not throw exceptions.
10.1.5. Sender Exceptions Copy linkLink copied to clipboard!
qpid::messaging
namespace.
- Sender::send(const Message& message, bool)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Sender::close()
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Sender::setCapacity(uint32_t)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Sender::getUnsettled()
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Sender::getAvailable()
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Sender::getCapacity()
- Does not throw exceptions.
- Sender::getName()
- Does not throw exceptions.
- Sender::getSession()
- Does not throw exceptions.
10.1.6. Receiver Exceptions Copy linkLink copied to clipboard!
qpid::messaging
namespace.
- Receiver::get(Message& message, Duration timeout=Duration::FOREVER)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::Message get(Duration timeout=Duration::FOREVER)
NoMessageAvailable
if there is no message to give after waiting for the specified timeout, or if the Receiver is closed, in which caseisClose()
will be true.TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::fetch(Message& message, Duration timeout=Duration::FOREVER)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::fetch(Duration timeout=Duration::FOREVER)
NoMessageAvailable
if there is no message to give after waiting for the specified timeout, or if the Receiver is closed, in which caseisClose()
will be true.TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::setCapacity(uint32_t)
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::getAvailable()
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::getUnsettled()
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::close()
TargetCapacityExceeded
if the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccess
if the session has attempted to perform an operation for which it has not been granted permission.SessionError
if anexecution.exception
command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionError
if the broker to which the client is connected sends aconnection.close
control (i.e. if broker initiates closing of an active connection).MessagingException
if the broker to which the client is connected sends asession.detached
control (i.e. if broker initiates closing of an active session).TransportFailure
if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::isClosed()
- Does not throw exceptions.
- Receiver::getCapacity()
- Does not throw exceptions.
- Receiver::getName()
- Does not throw exceptions.
- Receiver::getSession()
- Does not throw exceptions.
Chapter 11. Addresses Copy linkLink copied to clipboard!
11.1. x-declare Parameters Copy linkLink copied to clipboard!
x-declare
part of an address string:
Parameter | Usage |
---|---|
auto-delete
| boolean specifying if the queue/exchange should be auto-deleted
|
exclusive
| boolean specifying exclusiveness of the queue/exchange
|
alternate-exchange
|
alternate exchange where messages shall be routed to when this queue is deleted / the exchange fails to find a matching bind for a message
|
arguments
|
a nested map with arguments available specifically for the queue / exchange. Refer to https://cwiki.apache.org/confluence/display/qpid/Qpid+extensions+to+AMQP for further details.
|
11.2. Address String Options Reference Copy linkLink copied to clipboard!
Option | Value | Semantics |
---|---|---|
assert
|
one of:
always , never , sender or receiver
|
Asserts that the properties specified in the node option match whatever the address resolves to. If they do not, resolution fails and an exception is raised.
|
create
|
one of:
always , never , sender or receiver
|
Creates the node to which an address refers if it does not exist. No error is raised if the node does exist. The details of the node may be specified in the node option.
|
delete
|
one of:
always , never , sender or receiver
|
Delete the node when the sender or receiver is closed.
|
node
|
A nested map containing
node properties.
|
Specifies properties of the node to which the address refers. These are used in conjunction with the
assert or create options.
|
link
|
A nested map containing
link properties.
|
Used to control the establishment of a conceptual link from the client application to or from the target/source address.
|
mode
|
one of:
browse , consume
|
This option is only of relevance for source addresses that resolve to a queue. If browse is specified the messages delivered to the receiver are left on the queue rather than being removed. If consume is specified the normal behavior applies; messages are removed from the queue once the client acknowledges their receipt.
|
11.3. Node Properties Copy linkLink copied to clipboard!
Property | Value | Semantics |
---|---|---|
type
|
one of:
topic , queue
|
Indicates the type of the node.
|
durable
|
one of:
True , False
|
Indicates whether the node survives a loss of volatile storage e.g. if the broker is restarted.
|
x-declare
|
A nested map whose values correspond to the valid fields on an AMQP 0-10
queue-declare or exchange-declare command.
|
These values are used to fine tune the creation or assertion process. Note however that they are protocol specific.
|
x-bindings
|
A nested list in which each binding is represented by a map. The entries of the map for a binding contain the fields that describe an AMQP 0-10 binding. Here is the format for x-bindings:
|
In conjunction with the create option, each of these bindings is established as the address is resolved. In conjunction with the assert option, the existence of each of these bindings is verified during resolution. Again, these are protocol specific.
|
properties |
A nested map of AMQP 1.0 properties.
| A nested map of properties specified through properties is recommended over use of x-declare , which generates the nested map of properties when it is used. |
capabilities | A single string or list of strings representing AMQP 1.0 capabilities. | A list containing the AMQP 1.0 capabilities requested from the source or target. |
11.4. Link Properties Copy linkLink copied to clipboard!
Option | Value | Semantics |
---|---|---|
reliability
|
Currently only
unreliable and at-least-once are supported. See the footnotes for further details.
Reliability indicates the level of link reliability requested by the sender or receiver.
unreliable and at-most-once are currently treated as synonyms, and allow messages to be lost if a broker crashes or the connection to a broker is lost. at-least-once guarantees that a message is not lost, but duplicates may be received. exactly-once guarantees that a message is not lost, and is delivered precisely once.
| |
durable
|
One of:
True , False .
|
Indicates whether the link survives a loss of volatile storage e.g. if the broker is restarted.
|
x-declare
|
A nested map whose values correspond to the valid fields of an AMQP 0-10
queue-declare command.
|
These values can be used to customize the subscription queue in the case of receiving from an exchange. Note however that they are protocol specific.
|
x-subscribe
|
A nested map whose values correspond to the valid fields of an AMQP 0-10
message-subscribe command.
|
These values can be used to customize the subscription.
|
x-bindings
|
A nested list each of whose entries is a map that may contain fields (
queue , exchange , key and arguments ) describing an AMQP 0-10 binding.
|
These bindings are established during resolution independent of the create option. They are considered logically part of the linking process rather than of node creation.
|
filter
|
A map containing
name , descriptor , and value , describing an AMQP 1.0 filter.
| name
descriptor is a string descriptor identifying the filter type; value is value for the filter, whose type is dictated by the type of filter (for example: string for legacy-amqp-direct-binding , and map for legacy-amqp-headers-binding ). |
11.5. Address String Grammar Copy linkLink copied to clipboard!
The following regular expressions define the tokens used to parse address strings:
The formal grammar for addresses is given below:
The address string options map supports the following parameters:
The create
, delete
(AMQP 0-10 only), and assert
policies specify who should perform the associated action:
- always
- the action is performed by any messaging client
- sender
- the action is only performed by a sender
- receiver
- the action is only performed by a receiver
- never
- the action is never performed (this is the default)
The node-type
is one of:
- topic
- in the AMQP 0-10 mapping, a topic node defaults to the topic exchange, x-declare may be used to specify other exchange types
- queue
- this is the default
node-type
The following AMQP 1.0 filters are implemented in MRG 3:
legacy-amqp-direct-binding
legacy-amqp-topic-binding
legacy-amqp-headers-binding
selector-filter
xquery-filter
11.6. Connection Options Copy linkLink copied to clipboard!
11.7. Setting Connection Options Copy linkLink copied to clipboard!
- Python
connection = Connection("localhost:5672", reconnect = True, reconnect_urls = "amqp:tcp:127.0.0.1:5674", heartbeat = 1) try: connection.open()
connection = Connection("localhost:5672", reconnect = True, reconnect_urls = "amqp:tcp:127.0.0.1:5674", heartbeat = 1) try: connection.open()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Connection connection("localhost:5672", "{reconnect: true, reconnect_urls:'amqp:tcp:127.0.0.1:5674', reconnect:true, heartbeat: 1}"); try { connection.open();
Connection connection("localhost:5672", "{reconnect: true, reconnect_urls:'amqp:tcp:127.0.0.1:5674', reconnect:true, heartbeat: 1}"); try { connection.open();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - .NET/C#
Connection connection= new Connection("localhost:5672", "{reconnect: true, reconnect_urls:'amqp:tcp:127.0.0.1:5674', reconnect:true, heartbeat: 1}"); try { connection.Open();
Connection connection= new Connection("localhost:5672", "{reconnect: true, reconnect_urls:'amqp:tcp:127.0.0.1:5674', reconnect:true, heartbeat: 1}"); try { connection.Open();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
- Python
connection = Connection("localhost:5672") connection.reconnect = True try: connection.Open()
connection = Connection("localhost:5672") connection.reconnect = True try: connection.Open()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - C++
Connection connection("localhost:5672"); connection.setOption("reconnect", true); try { connection.open();
Connection connection("localhost:5672"); connection.setOption("reconnect", true); try { connection.open();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - .NET/C#
Connection connection = new Connection("localhost:5672"); connection.SetOption("reconnect", true); try { connection.Open();
Connection connection = new Connection("localhost:5672"); connection.SetOption("reconnect", true); try { connection.Open();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
11.8. Connection Options Reference Copy linkLink copied to clipboard!
Option name | Value type | Semantics |
---|---|---|
username | string | The username to use when authenticating to the broker. |
password | string | The password to use when authenticating to the broker. |
heartbeat | integer | Requests that heartbeats be sent every N seconds. If two successive heartbeats are missed, the connection is considered lost and will fail or start the reconnect process if configured to do so. |
max-channels | integer | Restricts the maximum number of supported channels, to assist with tuning the Messaging API. Not supported in AMPQ 1.0. |
max-frame-size | integer |
Restricts the maximum frame size, to assist with tuning the Messaging API. Not supported in AMPQ 1.0.
The minimum value should be at least 4096B; anything lower will cause authentication failures. The product does not enforce this restriction.
|
protocol | string | The AMQP protocol to use. The recognized values are 'amqp1.0 ' and 'amqp0-10 '. AMQP 0-10 is the default. Note: Not supported in Python client. |
reconnect | boolean | Transparently reconnect if the connection is lost. |
reconnect_urls | Broker address list | A list of one or more brokers to attempt communication with when a connection fails. |
reconnect_urls_replace | boolean |
Controls how setting the reconnect_urls option is treated. If true, setting reconnect_urls causes the old list to be replaced with the new one. If false, the new list is appended to the old list. The default value is false. |
reconnect_timeout | float | Total number of seconds to continue reconnection attempts before giving up and raising an exception. |
reconnect_limit | integer | Maximum number of reconnection attempts before giving up and raising an exception. |
reconnect_interval_min | float | Minimum number of seconds between reconnection attempts. The first reconnection attempt is made immediately; if that fails, the first reconnection delay is set to the value of reconnect_interval_min ; if that attempt fails, the reconnect interval increases exponentially until a reconnection attempt succeeds or reconnect_interval_max is reached. This value can be fractional. For example, 0.001 sets the maximum reconnect interval to one millisecond. |
reconnect_interval_max | float | Maximum reconnect interval in seconds. This value can be fractional. For example, 0.001 sets the maximum reconnect interval to one millisecond. |
reconnect_interval | float | Sets both reconnection_interval_min and reconnection_interval_max to the same number of seconds. |
sasl_mechanisms | string | The specific SASL mechanisms to use when authenticating to the broker as a space separated list. |
sasl_service | string | The service name if needed by the SASL mechanism in use. |
sasl_min_ssf | integer | The minimum acceptable security strength factor. |
sasl_max_ssf | integer | The maximum acceptable security strength factor. |
ssl_cert_name | string | Name of the certificate to use for a given client. |
ssl_ignore_hostname_verification_failure | boolean | Disables authentication of the server to the client (and should be used only as a last resort). If set to true, the client can connect to the server even if the hostname used (or IP address) does not match what is in the servers certificate. |
tcp_nodelay | boolean | Set tcp_no_delay , i.e. disable Nagle algorithm. Note: Not Supported in Python client. |
transport | string | Sets the underlying transport protocol used. The default option is tcp . To enable ssl, set to ssl . The C++ client additionally supports rdma . |
Option name | Value type | Semantics |
---|---|---|
address_ttl | float | Time until cached address resolution expires. |
host | string | The name or ip address of the remote host (overridden by url ). |
port | integer | The port number of the remote host (overridden by url ). |
ssl_certfile | string | File with client's public key (PEM format). |
ssl_keyfile | string | File with client's private key (PEM format). |
ssl_trustfile | string | File with trusted certificates to validate the server. |
url | string | [ <username> [ / <password> ] @ ] <host> [ : <port> ]. |
Option name | Value type | Semantics |
---|---|---|
container_id | string | The container ID to use for the connection. |
nest_annotations | boolean | If true, annotations in received messages are presented as properties with keys x-amqp-delivery-annotations or x-amqp-delivery-annotations. The values consist of nested maps containing the annotations. If false, the annotations are merged in with the properties. |
set_to_on_send | boolean | If true, all sent messages will have the to field set to the node name of the sender. |
properties or client_properties | integer | The properties to include in the open frame sent. |
properties
nested map is recommended. The x-declare
map is supported as a convenience and is automatically converted to a properties
map before sending to the broker.
Chapter 12. Message Timestamping Copy linkLink copied to clipboard!
12.1. Message Timestamping Copy linkLink copied to clipboard!
12.2. Enable Message Timestamping at Broker Start-up Copy linkLink copied to clipboard!
--enable-timestamp yes
argument:
./qpidd --enable-timestamp yes
./qpidd --enable-timestamp yes
12.3. Enable Message Timestamping from an Application Copy linkLink copied to clipboard!
getTimestampConfig
and setTimestampConfig
get and set the timestamping configuration.
- getTimestampConfig
- Returns
True
if received messages are timestamped. - setTimestampConfig
- Set
True
to enable timestamping received messages,False
to disable timestamping.
12.4. Access a Message Timestamp in Python Copy linkLink copied to clipboard!
12.5. Access a Message Timestamp in C++ Copy linkLink copied to clipboard!
12.6. Using AMQ 0-10 Message Property Keys for Timestamping Copy linkLink copied to clipboard!
delivery-properties.timestamp
), the timestamp value can be accessed using the x-amqp-0-10.timestamp
message property.
See Also:
Chapter 13. Maps and Lists Copy linkLink copied to clipboard!
13.1. Maps and Lists in Message Content Copy linkLink copied to clipboard!
13.2. Map and List Representation in Native Data Types Copy linkLink copied to clipboard!
Language | map | list |
---|---|---|
Python | dict | list |
C++ | Variant::Map | Variant::List |
Java | MapMessage | ListMessage |
.NET | Dictionary<string, object> | Collection<object> |
13.3. Qpid Maps and Lists in Python Copy linkLink copied to clipboard!
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
13.4. Python Data Types in Maps Copy linkLink copied to clipboard!
Python Data Type | → C++ | → Java |
---|---|---|
bool | bool | boolean |
int | int64 | long |
long | int64 | long |
float | double | double |
unicode | string | java.lang.String |
uuid | qpid::types::Uuid | java.util.UUID |
dict | Variant::Map | java.util.Map |
list | Variant::List | java.util.List |
13.5. Qpid Maps and Lists in C++ Copy linkLink copied to clipboard!
13.6. C++ Data Types in Maps Copy linkLink copied to clipboard!
C++ Data Type | → Python | → Java |
---|---|---|
bool | bool | boolean |
uint16 | int | long | short |
uint32 | int | long | int |
uint64 | int | long | long |
int16 | int | long | short |
int32 | int | long | int |
int64 | int | long | long |
float | float | float |
double | float | double |
string | unicode | java.lang.String |
qpid::types::Uuid | uuid | java.util.UUID |
Variant::Map | dict | java.util.Map |
Variant::List | list | java.util.List |
13.7. Qpid Maps and Lists in .NET C# Copy linkLink copied to clipboard!
- .NET/C#
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
13.8. C# Data Types and .NET bindings Copy linkLink copied to clipboard!
C++ Data Type | .NET binding |
---|---|
void | nullptr |
bool | bool |
uint8 | byte |
uint16 | UInt16 |
uint32 | UInt32 |
uint64 | UInt64 |
int16 | char |
int16 | Int16 |
int32 | Int32 |
int64 | Int64 |
float | Single |
double | Double |
string | string |
qpid::types::Uuid | Guid |
Variant::Map | Dictionary< string, object > |
Variant::List | Collection< object > |
Note
string
objects are translated to and from C++ strings using UTF-8 encoding only.
Chapter 14. The Request/Response Pattern Copy linkLink copied to clipboard!
14.1. The Request/Response Pattern Copy linkLink copied to clipboard!
reply-to
message property to allow a server to respond to the client that sent a message. A server sets up a service queue, with a name known to clients. A client creates a private queue for the server's response, creates a message for a request, sets the request's reply-to property to the address of the client's response queue, and sends the request to the service queue. The server sends the response to the address specified in the request's reply-to
property.
14.2. Request/Response C++ Example Copy linkLink copied to clipboard!
#
, it is given a unique name.
Chapter 15. Performance Tips Copy linkLink copied to clipboard!
15.1. Apache Qpid Programming for Performance Copy linkLink copied to clipboard!
- Consider prefetching messages for receivers. This helps eliminate roundtrips and increases throughput. Prefetch is disabled by default, and enabling it is the most effective means of improving throughput of received messages.
- Send messages asynchronously. Again, this helps eliminate roundtrips and increases throughput. The C++ and .NET clients send asynchronously by default, however the python client defaults to synchronous sends.
- Acknowledge messages in batches. Rather than acknowledging each message individually, consider issuing acknowledgments after n messages and/or after a particular duration has elapsed.
- Tune the sender capacity. If the capacity is too low the sender may block waiting for the broker to confirm receipt of messages, before it can free up more capacity.
- If you are setting a reply-to address on messages being sent by the c++ client, make sure the address type is set to either queue or topic as appropriate. This avoids the client having to determine which type of node is being referred to, which is required when handling reply-to in AMQP 0-10.
- For latency-sensitive applications, setting
tcp-nodelay
onqpidd
and on client connections can help reduce the latency.
Chapter 16. Cluster Failover Copy linkLink copied to clipboard!
16.1. Changes to Clustering in MRG 3 Copy linkLink copied to clipboard!
cluster
module with the new ha
module. This module provides active-passive clustering functionality for high availability.
cluster
module in MRG 2 was active-active: clients could connect to any broker in the cluster. The new ha
module is active-passive. Exactly one broker acts as primary the other brokers act as backup. Only the primary accepts client connections. If a client attempts to connect to a backup broker, the connection is aborted and the client fails-over until it connects to the primary.
ha
module also supports a virtual IP address. Clients can be configured with a single IP address that is automatically routed to the primary broker. This is the recommended configuration.
In MRG 2, a clustered broker would only utilize a single CPU thread. Some users worked around this by running multiple clustered brokers on a single machine, to utilize the multiple cores.
16.2. Active-Passive Messaging Clusters Copy linkLink copied to clipboard!
rgmanager
, to detect failures, choose the new primary and handle network partitions.
16.3. Cluster Failover in C++ Copy linkLink copied to clipboard!
reconnect
to be true. For example:
qpid::messaging::Connection c("node1,node2,node3","{reconnect:true}");
qpid::messaging::Connection c("node1,node2,node3","{reconnect:true}");
heartbeat
option. For example:
qpid::messaging::Connection c("node1,node2,node3","{reconnect:true,heartbeat:10}");
qpid::messaging::Connection c("node1,node2,node3","{reconnect:true,heartbeat:10}");
16.4. Cluster Failover in Python Copy linkLink copied to clipboard!
reconnect=True
and a list of host:port
addresses as reconnect_urls
when calling Connection.establish
or Connection.open
:
connection = qpid.messaging.Connection.establish("node1", reconnect=True, reconnect_urls=["node1", "node2", "node3"])
connection = qpid.messaging.Connection.establish("node1", reconnect=True, reconnect_urls=["node1", "node2", "node3"])
heartbeat
option. For example:
connection = qpid.messaging.Connection.establish("node1", reconnect=True, reconnect_urls=["node1", "node2", "node3"], heartbeat=10)
connection = qpid.messaging.Connection.establish("node1", reconnect=True, reconnect_urls=["node1", "node2", "node3"], heartbeat=10)
16.5. Failover Behavior in Java JMS Clients Copy linkLink copied to clipboard!
failover
property:
connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'&failover='failover_exchange'
connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'&failover='failover_exchange'
Fail-over Modes
failover_exchange
- If the connection fails, fail over to any other broker in the cluster. This is provided for backward compatibility. Use of a Virtual IP (and transparent server-side failover) is recommended.
roundrobin
- If the connection fails, fail over to one of the brokers specified in the brokerlist.
singlebroker
- Fail-over is not supported; the connection is to a single broker only.
nofailover
- Disables all retry and failover logic.
<class>
- Any other value is interpreted as a classname which must implement the
org.apache.qpid.jms.failover.FailoverMethod
interface.
idle_timeout
property, which is an integer corresponding to the heartbeat period in seconds. For instance, the following line from a JNDI properties file sets the heartbeat time out to 3 seconds:
connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'&idle_timeout=3
connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'&idle_timeout=3
Chapter 17. Logging Copy linkLink copied to clipboard!
17.1. Logging in C++ Copy linkLink copied to clipboard!
- Use
QPID_LOG_ENABLE
to set the level of logging you are interested in (trace
,debug
,info
,notice
,warning
,error
, orcritical
):export QPID_LOG_ENABLE="warning+"
export QPID_LOG_ENABLE="warning+"
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - The Qpidd broker and C++ clients use
QPID_LOG_OUTPUT
to determine where logging output should be sent. This is either a file name or the special valuesstderr
,stdout
, orsyslog
:export QPID_LOG_TO_FILE="/tmp/myclient.out"
export QPID_LOG_TO_FILE="/tmp/myclient.out"
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - From a Windows command prompt, use the following command format to set the environment variables:
set QPID_LOG_ENABLE=warning+ set QPID_LOG_TO_FILE=D:\tmp\myclient.out
set QPID_LOG_ENABLE=warning+ set QPID_LOG_TO_FILE=D:\tmp\myclient.out
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
17.2. Logging in Python Copy linkLink copied to clipboard!
basicConfig()
logging method reports all warnings and errors:
from logging import basicConfig basicConfig()
from logging import basicConfig
basicConfig()
qpidd
daemon allows you to specify the level of logging desired. For instance, the following code enables logging at the DEBUG
level:
from qpid.log import enable, DEBUG enable("qpid.messaging.io", DEBUG)
from qpid.log import enable, DEBUG
enable("qpid.messaging.io", DEBUG)
$ pydoc qpid.log
.
17.3. Change the logging level at runtime Copy linkLink copied to clipboard!
setLogLevel
method to control the logging level. The following C++ code demonstrates calling this method to set the logging level.
- Save the example code to a file
set_log_level.cpp
. - Modify the Connection URL in the code to resolve to your broker. At the moment it is set to connect to a broker running on port 5672 on the local machine.
- Compile the example code:
g++ -Wall -lqpidclient -lqpidcommon -lqpidmessaging -lqpidtypes -o set_log_level set_log_level.cpp
g++ -Wall -lqpidclient -lqpidcommon -lqpidmessaging -lqpidtypes -o set_log_level set_log_level.cpp
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - Use the complied program to change the log level of the broker:
./set_log_level "trace+"
./set_log_level "trace+"
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - To observe the change in the logging level, tail the server log as you run the program.
Chapter 18. Security Copy linkLink copied to clipboard!
18.1. Security features provided by Qpid Copy linkLink copied to clipboard!
18.2. Authentication Copy linkLink copied to clipboard!
18.3. SASL Support in Windows Clients Copy linkLink copied to clipboard!
ANONYMOUS
and PLAIN
and EXTERNAL
authentication mechanisms.
18.4. Enable Kerberos authentication Copy linkLink copied to clipboard!
kinit
, there is no need to supply a user name or password. If you are using another form of authentication, or are not already authenticated with Kerberos, you can supply these as connection options:
connection.setOption("username", "mick"); connection.setOption("password", "pa$$word");
connection.setOption("username", "mick");
connection.setOption("password", "pa$$word");
18.5. Enable SSL Copy linkLink copied to clipboard!
transport
connection option to ssl
:
connection.setOption("transport", "ssl");
connection.setOption("transport", "ssl");
18.6. SSL Client Environment Variables for C++ Clients Copy linkLink copied to clipboard!
SSL Client Options for C++ clients | |
---|---|
QPID_SSL_USE_EXPORT_POLICY | Use NSS export policy |
QPID_SSL_CERT_PASSWORD_FILE PATH | File containing password to use for accessing certificate database |
QPID_SSL_CERT_DB PATH | Path to directory containing certificate database |
QPID_SSL_CERT_NAME NAME | Name of the certificate to use. When SSL client authentication is enabled, a certificate name should normally be provided. |
Chapter 19. The AMQP 0-10 mapping Copy linkLink copied to clipboard!
19.1. The AMQP 0-10 mapping Copy linkLink copied to clipboard!
qpid.subject
entry in the application-headers
field of the message-properties
.
message-subscribe
request for the queue in question. The accept-mode
is determined by the reliability option in the link properties; for unreliable links the accept-mode
is none, for reliable links it is explicit. The default for a queue is reliable. The acquire-mode
is determined by the value of the mode option. If the mode is set to browse the acquire mode is not-acquired
, otherwise it is set to pre-acquired
. The exclusive and arguments fields in the message-subscribe
command can be controlled using the x-subscribe
map.
x-declare
map within the link properties. The reliability option determines most of the other parameters. If the reliability is set to unreliable
then an auto-deleted, exclusive queue is used meaning that if the client or connection fails messages may be lost. For exactly-once
the queue is not set to be auto-deleted. The durability of the subscription queue is determined by the durable option in the link properties. The binding process depends on the type of the exchange the source address resolves to.
- For a topic exchange, if no subject is specified and no
x-bindings
are defined for the link, the subscription queue is bound using a wildcard matching any routing key (thus satisfying the expectation that any message sent to that address will be received from it). If a subject is specified in the source address however, it is used for the binding key (this means that the subject in the source address may be a binding pattern including wildcards). - For a fanout exchange the binding key is irrelevant to matching. A receiver created from a source address that resolves to a fanout exchange receives all messages sent to that exchange regardless of any subject the source address may contain. An
x-bindings
element in the link properties should be used if there is any need to set the arguments to the bind. - For a direct exchange, the subject is used as the binding key. If no subject is specified an empty string is used as the binding key.
- For a headers exchange, if no subject is specified the binding arguments simply contain an
x-match
entry and no other entries, causing all messages to match. If a subject is specified then the binding arguments contain anx-match
entry set to all and an entry forqpid.subject
whose value is the subject in the source address (this means the subject in the source address must match the message subject exactly). For more control thex-bindings
element in the link properties must be used. - For the XML exchange, if a subject is specified it is used as the binding key and an XQuery is defined that matches any message with that value for
qpid.subject
. Again this means that only messages whose subject exactly match that specified in the source address are received. If no subject is specified then the empty string is used as the binding key with an xquery that will match any message (this means that only messages with an empty string as the routing key will be received). For more control the x-bindings element in the link properties must be used. A source address that resolves to the XML exchange must contain either a subject or an x-bindings element in the link properties as there is no way at present to receive any message regardless of routing key.
queue
, exchange
, key
, or arguments
. If the queue value is absent the queue name the address resolves to is implied. If the exchange value is absent the exchange name the address resolves to is implied.
msg
refers to the Message class defined in the Qpid Messaging API, mp
refers to an AMQP 0-10 message-properties
struct, and dp
refers to an AMQP 0-10 delivery-properties
struct.
Python API | C++ API [a] | AMQP 0-10 Property [b] |
---|---|---|
msg.id | msg.{get,set}MessageId() | mp.message_id |
msg.subject | msg.{get,set}Subject() | mp.application_headers ["qpid.subject"] |
msg.user_id | msg.{get,set}UserId() | mp.user_id |
msg.reply_to | msg.{get,set}ReplyTo() | mp.reply_to [c] |
msg.correlation_id | msg.{get,set}CorrelationId() | mp.correlation_id |
msg.durable | msg.{get,set}Durable() | dp.delivery_mode == delivery_mode.persistent [d] |
msg.priority | msg.{get,set}Priority() | dp.priority |
msg.ttl | msg.{get,set}Ttl() | dp.ttl |
msg.redelivered | msg.{get,set}Redelivered() | dp.redelivered |
msg.properties | msg.{get,set}Properties() | mp.application_headers |
msg.content_type | msg.{get,set}ContentType() | mp.content_type |
[a]
The .NET Binding for C++ Messaging provides all the message and delivery properties described in the C++ API.
[b]
In these entries, mp refers to an AMQP message property, and dp refers to an AMQP delivery property.
[c]
The reply_to is converted from the protocol representation into an address.
[d]
Note that msg.durable is a boolean, not an enum.
|
19.2. AMQ 0-10 Message Property Keys Copy linkLink copied to clipboard!
x-amqp-0-10.app-id
, its value will be used to set the message-properties.app-id
property in the outgoing message. Likewise, if an incoming message has message-properties.app-id
set, its value can be accessed via the x-amqp-0-10.app-id
message property key.
x-amqp-0-10.content-encoding
, its value will be used to set the message-properties.content-encoding
property in the outgoing message. Likewise, if an incoming message has message-properties.content-encoding
set, its value can be accessed via the x-amqp-0-10.content-encoding
message property key.
delivery-properties.routing-key
) in an incoming messages can be accessed via the x-amqp-0-10.routing-key
message property.
19.3. AMQP Routing Key and Message Subject Copy linkLink copied to clipboard!
x-amqp-0-10.routing-key
property is set to the value of the message subject, with one exception.
sender = session.sender('amq.topic/SubjectX')
sender = session.sender('amq.topic/SubjectX')
msg1 = Message('A message with no subject') msg2 = Message('A message with a subject') msg2.subject = 'SubjectY'
msg1 = Message('A message with no subject')
msg2 = Message('A message with a subject')
msg2.subject = 'SubjectY'
msg1
has its subject and AMQP routing key set to 'SubjectX
'. msg2
retains its subject 'SubjectY
', and has its AMQP routing key set to 'SubjectY
'.
sender = session('amq.topic') msg = Message('No subject, and none assigned by the sender') sender.send(msg)
sender = session('amq.topic')
msg = Message('No subject, and none assigned by the sender')
sender.send(msg)
sender = session('amq.topic') msg = Message('No subject, but a manually assigned AMQP routing key') msg.properties['x-amqp-0-10.routing-key'] = 'amqp-SubjectX' sender.send(msg)
sender = session('amq.topic')
msg = Message('No subject, but a manually assigned AMQP routing key')
msg.properties['x-amqp-0-10.routing-key'] = 'amqp-SubjectX'
sender.send(msg)
amqp-0-10.routing-key
may be useful in an interoperability scenario, but in Red Hat Enterprise Messaging the message subject
is used for routing.
19.4. Using AMQ 0-10 Message Property Keys for Timestamping Copy linkLink copied to clipboard!
delivery-properties.timestamp
), the timestamp value can be accessed using the x-amqp-0-10.timestamp
message property.
See Also:
Chapter 20. Using the qpid-java AMQP 0-10 client Copy linkLink copied to clipboard!
20.1. A Simple Messaging Program in Java JMS Copy linkLink copied to clipboard!
Here is an explanation of the program code:
properties.load(this.getClass().getResourceAsStream("hello.properties"));
properties.load(this.getClass().getResourceAsStream("hello.properties"));
Context context = new InitialContext(properties);
Context context = new InitialContext(properties);
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("qpidConnectionfactory");
ConnectionFactory connectionFactory
= (ConnectionFactory) context.lookup("qpidConnectionfactory");
Connection connection = connectionFactory.createConnection();
Connection connection = connectionFactory.createConnection();
connection.start();
connection.start();
Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination destination = (Destination) context.lookup("topicExchange");
Destination destination = (Destination) context.lookup("topicExchange");
MessageProducer messageProducer = session.createProducer(destination);
MessageProducer messageProducer = session.createProducer(destination);
MessageConsumer messageConsumer = session.createConsumer(destination);
MessageConsumer messageConsumer = session.createConsumer(destination);
message = (TextMessage)messageConsumer.receive();
message = (TextMessage)messageConsumer.receive();
connection.close();
connection.close();
context.close();
context.close();
The contents of the hello.properties file are shown below.
20.2. Apache Qpid JNDI Properties for AMQP Messaging Copy linkLink copied to clipboard!
- connectionfactory.<jndiname>
- The Connection URL that the connection factory uses to perform connections.
- queue.<jndiname>
- A JMS queue. Implemented as an
amq.direct
exchange in Apache Qpid. - topic.<jndiname>
- A JMS topic. Implemented as an
amq.topic
exchange in Apache Qpid. - destination.<jndiname>
- Can be used for defining all amq destinations, queues, topics and header matching, using an address string (or a binding URL, for backward-compatibility with earlier implementations).
20.3. JNDI Properties for Apache Qpid Copy linkLink copied to clipboard!
20.4. Durable Subscription Queues in MRG 3 Copy linkLink copied to clipboard!
java -cp ${CLASSPATH} org.apache.qpid.example.Drain "amq.topic/some_subject;{ link: { durable: true } }"
# java -cp ${CLASSPATH} org.apache.qpid.example.Drain "amq.topic/some_subject;{ link: { durable: true } }"
javax.jms.JMSException: Error registering consumer: org.apache.qpid.AMQException: You cannot mark a subscription queue as durable without providing a name for the link.
java -cp ${CLASSPATH} org.apache.qpid.example.Drain "amq.topic/some_subject;{ link: { name: some_name, durable: true } }"
# java -cp ${CLASSPATH} org.apache.qpid.example.Drain "amq.topic/some_subject;{ link: { name: some_name, durable: true } }"
20.5. Connection URLs Copy linkLink copied to clipboard!
amqp://[<user>:<pass>@][<clientid>]<virtualhost>[?<option>='<value>'[&<option>='<value>']]
amqp://[<user>:<pass>@][<clientid>]<virtualhost>[?<option>='<value>'[&<option>='<value>']]
amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
Option | Type | Description |
---|---|---|
brokerlist
|
The broker to use for this connection. In the current release, precisely one broker must be specified.
| |
max_prefetch
|
Integer
|
The maximum number of pre-fetched messages per destination.
|
sync_publish
|
{'persistent' | 'transient' | 'all' | ''}
|
A sync command is sent after every persistent or transient message to guarantee that it has been received.
persistent sets this behavior for persistent messages.
transient sets this behavior for transient messages only.
all syncs both type of messages, however the default behavior '' also has the same effect.
|
sync_ack
|
Boolean
|
A sync command is sent after every acknowledgment to guarantee that it has been received.
|
use_legacy_map_msg_format
|
Boolean
|
If you are using JMS Map messages and deploying a new client with any JMS client older than 0.7 release, you must set this to
true to ensure the older clients can understand the map message encoding.
|
failover
| {'roundrobin' | 'failover_exchange' | 'singlebroker' | 'nofailover' | '<class>'}
|
|
ssl |
Boolean
|
If
ssl='true' , use SSL for all broker connections. Overrides any per-broker settings in the brokerlist entries. If not specified, the brokerlist entry for each given broker is used to determine whether SSL is used.
|
Broker list URL
brokerlist=<transport>://<host>[:<port>](?<param>=<value>)?(&<param>=<value>)*
brokerlist=<transport>://<host>[:<port>](?<param>=<value>)?(&<param>=<value>)*
brokerlist='tcp://localhost:5672'
brokerlist='tcp://localhost:5672'
Example 20.1. Broker Lists
amqp://guest:guest@test/test?sync_ack='true' &brokerlist='tcp://ip1:5672?sasl_mechs='GSSAPI''
amqp://guest:guest@test/test?sync_ack='true'
&brokerlist='tcp://ip1:5672?sasl_mechs='GSSAPI''
amqp://guest:guest@test/test?sync_ack='true' &brokerlist='tcp://ip1:5672?ssl='true'&ssl_cert_alias='cert1''
amqp://guest:guest@test/test?sync_ack='true'
&brokerlist='tcp://ip1:5672?ssl='true'&ssl_cert_alias='cert1''
amqp://guest:guest@/test?failover='roundrobin?cyclecount='2'' &brokerlist='tcp://ip1:5672?retries='5'&connectdelay='2000';tcp://ip2:5672?retries='5'&connectdelay='2000''
amqp://guest:guest@/test?failover='roundrobin?cyclecount='2''
&brokerlist='tcp://ip1:5672?retries='5'&connectdelay='2000';tcp://ip2:5672?retries='5'&connectdelay='2000''
Option | Type | Description |
---|---|---|
idle_timeout
|
Integer
|
Frequency of idle_timeout messages (in seconds)
|
sasl_mechs
|
--
|
For secure applications, we suggest
CRAM-MD5 , DIGEST-MD5 , or GSSAPI . The ANONYMOUS method is not secure. The PLAIN method is secure only when used together with SSL. For Kerberos, sasl_mechs must be set to GSSAPI , sasl_protocol must be set to the principal for the qpidd broker, e.g. qpidd/ , and sasl_server must be set to the host for the SASL server, e.g. sasl.com . SASL External is supported using SSL certification, e.g. ssl='true'&sasl_mechs='EXTERNAL'
|
sasl_encryption
|
Boolean
|
If
sasl_encryption ='true' , the JMS client attempts to negotiate a security layer with the broker using GSSAPI to encrypt the connection. Note that for this to happen, GSSAPI must be selected as the sasl_mech .
|
ssl
|
Boolean
|
If
ssl ='true ', the JMS client will encrypt the connection using SSL.
|
tcp_nodelay
|
Boolean
|
If
tcp_nodelay ='true ', TCP packet batching is disabled.
|
sasl_protocol
|
--
|
Used only for Kerberos.
sasl_protocol must be set to the principal for the qpidd broker, e.g. qpidd/
|
sasl_server
|
--
|
For Kerberos,
sasl_mechs must be set to GSSAPI , sasl_server must be set to the host for the SASL server, e.g. sasl.com .
|
trust_store
|
String
|
Path to Kerberos trust store
|
trust_store_password
|
String
|
Kerberos trust store password
|
key_store
|
String
|
Path to Kerberos key store
|
key_store_password
|
String
|
Kerberos key store password
|
ssl_verify_hostname
|
Boolean
|
When using SSL you can enable hostname verification by using "
ssl_verify_hostname =true " in the broker URL.
|
ssl_cert_alias
|
String
|
If multiple certificates are present in the keystore, the alias will be used to extract the correct certificate.
|
retries | integer |
The number of times to retry connection to each broker in the broker list. Defaults to 1.
|
connectdelay | integer |
Length of time (in milliseconds) to wait before attempting to reconnect. Defaults to 0.
|
connecttimeout | integer |
Length of time (in milliseconds) to wait for the socket connection to succeed. A value of 0 represents an infinite timeout, i.e. the connection attempt will block until established or an error occurs. Defaults to 30000.
|
tcp_nodelay | Boolean | If tcp_nodelay='true' , TCP packet batching is disabled. Defaults to true since Qpid 0.14. |
20.6. Java JMS Message Properties Copy linkLink copied to clipboard!
JMS Header Name | AMQP Identifier | AMQP Field | AMQP Section | Notes |
---|---|---|---|---|
JMSCorrelationID | correlation_id
| correlation-id
| properties | |
JMSDeliveryMode | delivery_mode
| durable
| header | Computed value: [durable ? 'PERSISTENT' : 'NON_PERSISTENT'] |
JMSDestination | to
| to
| properties | |
JMSExpiration | absolute_expiry_time
| absolute-expiry-time
| properties | |
JMSMessageID | message_id
| message-id
| properties | |
JMSPriority | priority
| priority
| header | |
JMSRedelivered | redelivered
| delivery-count
| header | computed value: delivery-count > 0 |
JMSReplyTo | reply_to
| reply-to
| properties | |
JMSTimestamp | creation_time
| creation-time
| properties | |
JMSType | subject
| subject
| properties |
Note
JMSDeliveryMode
JMSPriority
JMSMessageID
JMSTimestamp
JMSCorrelationID
JMSType
20.7. JMS MapMessage Types Copy linkLink copied to clipboard!
MapMessage
interface, which provides support for maps in messages. The following code shows how to send a MapMessage
in Java JMS.
Example 20.2. Sending a Java JMS MapMessage
MapMessage
, and the corresponding data types that will be received by clients in Python or C++.
Java Data Type | ? Python | ? C++ |
---|---|---|
boolean | bool | bool |
short | int | long | int16 |
int | int | long | int32 |
long | int | long | int64 |
float | float | float |
double | float | double |
java.lang.String | unicode | std::string |
java.util.UUID | uuid | qpid::types::Uuid |
java.util.Map [a] | dict | Variant::Map |
java.util.List | list | Variant::List |
[a]
In Qpid, maps can nest. This goes beyond the functionality required by the JMS specification.
|
20.8. JMS ListMessage Copy linkLink copied to clipboard!
ListMessage
type is available for sending lists.
javax.jms.StreamMessage
javax.jms.MapMessage
org.apache.qpid.jms.ListMessage
org.apache.qpid.jms.ListMessage
- by creating it viacreateListMessage()
inorg.apache.qpid.jms.Session
.Example:ListMessage msg = ((org.apache.qpid.jms.Session)ssn).createListMessage();
ListMessage msg = ((org.apache.qpid.jms.Session)ssn).createListMessage();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - If you set
-Dqpid.use_legacy_stream_message=false
any stream message you create will be encoded as a list message.Example:StreamMessage msg = jmsSession.createStreamMessage();
StreamMessage msg = jmsSession.createStreamMessage();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
20.9. JMS Client Logging Copy linkLink copied to clipboard!
org.apache.qpid
. Otherwise log4j will default to DEBUG
which will degrade performance considerably due to excessive logging. The recommended logging level for production is WARN
.
log4j.properties
file and placed in the CLASSPATH
, or they can be set explicitly using the -Dlog4j.configuration
property.
Example 20.3. log4j Logging Properties
20.10. AMQP 0-10 JMS Client Configuration Copy linkLink copied to clipboard!
20.10.1. Configuration Methods and Granularity Copy linkLink copied to clipboard!
- JVM level using JVM arguments - Affects all connections, sessions, consumers and producers created within the JVM.Example: The
-dmax_prefetch=1000
property specifies the message credits to use. - Connection level using connection or broker properties - Affects the respective connection and sessions, consumers and producers created by that connection.Example: The
amqp://guest:guest@test/test?max_prefetch='1000' &brokerlist='tcp://localhost:5672'
property specifies the message credits to use. This overrides any value specified via the JVM argumentmax_prefetch
. - Destination level using addressing options - Affects the producer(s) and consumer(s) created using the respective destination.Example:
my-queue; {create: always, link:{capacity: 10}}
where capacity option specifies the message credits to use. This overrides any connection level configuration.
20.10.2. qpid-java JVM Arguments Copy linkLink copied to clipboard!
Property Name | Type | Default Value | Description |
---|---|---|---|
qpid.amqp.version | string | 0-10 | Sets the AMQP version to be used - currently supports 0-8, 0-9, 0-91, and 0-10. The client will begin negotiation at the specified version and only negotiate downwards if the broker does not support the specified version. |
qpid.heartbeat | int | 120 (seconds) | The heartbeat interval in seconds. Two consecutive missed heartbeats result in the connection timing out. This can also be set per connection. |
ignore_setclientID | boolean | false | If a client ID is specified in the connection URL then it is used, otherwise an ID is generated. If an ID is specified after it has been generated Qpid will throw an exception. Setting this property to 'true' disables that check and allows you to set a client ID at any time. |
Property Name | Type | Default Value | Description |
---|---|---|---|
qpid.session.command_limit | int | 65536 | Limits the number of unacknowledged commands. |
qpid.session.byte_limit | int | 1048576 | Limits the number of unacknowledged commands in bytes. |
qpid.use_legacy_map_message | boolean | false | Uses the old map message encoding. By default the map messages are encoded using the 0-10 map encoding. This can also be set per connection as well. |
qpid.jms.daemon.dispatcher | boolean | false | Controls whether the Session dispatcher thread is a daemon thread or not. If this system property is set to true then the Session dispatcher threads will be created as daemon threads. This setting is introduced in version 0.16. |
Property Name | Type | Default Value | Description |
---|---|---|---|
max_prefetch | int | 500 | Maximum number of messages to credits. Can also be set per connection or per destination. |
qpid.session.max_ack_delay | long | 1000 (ms) | Timer interval to flush message acks in buffer when using AUTO_ACK and DUPS_OK . |
sync_ack | boolean | false | If set, each message will be acknowledged synchronously. When using AUTO_ACK mode, set this to "true". Can also be set per connection. |
Property Name | Type | Default Value | Description |
---|---|---|---|
sync_publish | string | - | Sends messages synchronously. Valid values are persistent , transient , all . Can also be set per connection. |
Property Name | Type | Default Value | Description |
---|---|---|---|
qpid.thread_factory | string | org.apache.qpid.thread.DefaultThreadFactory | Specifies the thread factory to use. If using a real time JVM, set to org.apache.qpid.thread.RealtimeThreadFactory . |
qpid.rt_thread_priority | int | 20 | Specifies the priority (1-99) for realtime threads created by the realtime thread factory. |
Property Name | Type | Default Value | Description |
---|---|---|---|
qpid.transport | string | org.apache.qpid.transport.network.io.IoNetworkTransport | The transport implementation to be used. You can also specify the org.apache.qpid.transport.network.NetworkTransport transport mechanism. |
qpid.sync_op_timeout | long | 60000 (milliseconds) | The length of time to wait for a synchronous operation to complete. For compatibility with older clients, use amqj.default_syncwrite_timeout . |
qpid.tcp_nodelay | boolean | true |
Sets the TCP_NODELAY property of the underlying socket.
This can also be set per connection using the Connection URL options.
For compatibility with older clients, the synonym
amqj.tcp_nodelay is supported.
|
qpid.send_buffer_size | integer | 65535 |
Sets the SO_SNDBUF property of the underlying socket.
For compatibility with older clients, the synonym
amqj.sendBufferSize is supported.
|
qpid.receive_buffer_size | integer | 65535 |
Sets the SO_RCVBUF property of the underlying socket.
For compatibility with older clients, the synonym
amqj.receiveBufferSize is supported.
|
qpid.failover_method_timeout | long | 60000 |
During failover, this is the timeout for each attempt to try to re-establish the connection. If a reconnection attempt exceeds the timeout, the entire failover process is aborted.
It is only applicable for AMQP 0-8/0-9/0-9-1 clients.
|
Property Name | Type | Default Value | Description |
---|---|---|---|
qpid.sasl_mechs | string | PLAIN | The SASL mechanism used. More than one can be specified using a comma separated list. Supported values are PLAIN, GSSAPI, and EXTERNAL. |
qpid.sasl_protocol | string | AMQP | When using GSSAPI as the SASL mechanism, sasl_protocol must be set to the principal for the qpidd broker. |
qpid.sasl_server_name | string | localhost | When using GSSAPI as the SASL mechanism, sasl_server must be set to the host for the SASL server. |
Property Name | Type | Default Value | Description |
---|---|---|---|
javax.security.auth.useSubjectCredsOnly | boolean | true | If set to 'false', forces the SASL GASSPI client to obtain kerberos credentials explicitly. |
java.security.auth.login.config | string | - | Specifies the JASS configuration file. |
Property Name | Type | Default Value | Description |
---|---|---|---|
qpid.ssl_timeout | long | 60000 | Timeout value used by the Java SSL engine when waiting on operations. |
qpid.ssl.KeyManagerFactory.algorithm | string | - |
The key manager factory algorithm name. If not set, defaults to the value returned from the Java runtime call KeyManagerFactory.getDefaultAlgorithm().
For compatibility with older clients, the synonym qpid.ssl.keyStoreCertType is supported.
|
qpid.ssl.TrustManagerFactory.algorithm | string | - |
The trust manager factory algorithm name. If not set, defaults to the value returned from the Java runtime call TrustManagerFactory.getDefaultAlgorithm()
For compatibility with older clients, the synonym qpid.ssl.trustStoreCertType is supported.
|
Property Name | Type | Default Value | Description |
---|---|---|---|
javax.net.ssl.keyStore | string | jvm default | Specifies the key store path. |
javax.net.ssl.keyStorePassword | string | jvm default | Specifies the key store password. |
javax.net.ssl.trustStore | string | jvm default | Specifies the trust store path. |
javax.net.ssl.trustStorePassword | string | jvm default | Specifies the trust store password. |
20.11. Java Message Service with Filters Copy linkLink copied to clipboard!
20.11.1. No Local filter Copy linkLink copied to clipboard!
<type name="no-local-filter" class="composite" source="list" provides="filter"> <descriptor name="apache.org:no-local-filter:list" code="0x0000468C:0x00000003"/> </type>
<type name="no-local-filter" class="composite" source="list" provides="filter">
<descriptor name="apache.org:no-local-filter:list" code="0x0000468C:0x00000003"/>
</type>
20.11.2. Selector filter Copy linkLink copied to clipboard!
<type name="selector-filter" class="restricted" source="string" provides="filter"> <descriptor name="apache.org:selector-filter:string" code="0x0000468C:0x00000004"/> </type>
<type name="selector-filter" class="restricted" source="string" provides="filter">
<descriptor name="apache.org:selector-filter:string" code="0x0000468C:0x00000004"/>
</type>
amqp.field_name
where field_name
is the appropriate AMQP 1.0 field named in the table above, with the hyphen replaced by an underscore. For example, the selector: JMSCorrelationID = 'abc' AND color = 'blue' AND weight > 2500
would be transferred over the wire as: amqp.correlation_id = 'abc' AND color = 'blue' AND weight > 2500
AMQP Type | JMS Selector Type |
---|---|
null
|
null
|
boolean
|
boolean
|
ubyte
|
short
|
ushort
|
int
|
uint
|
long
|
ulong
|
long
|
byte
|
byte
|
short
|
short
|
int
|
int
|
long
|
long
|
float
|
float
|
double
|
double
|
decimal32
|
double
|
decimal64
|
double
|
decimal128
|
double
|
char
|
char
|
timestamp
|
long
|
uuid
|
byte[16]
|
binary
|
byte[]
|
string
|
String
|
symbol
|
String
|
Chapter 21. Using the qpid-jms AMQP 1.0 client Copy linkLink copied to clipboard!
21.1. QPID AMQP 1.0 JMS Client Configuration Copy linkLink copied to clipboard!
InitialContext
, the syntax for its related configuration, and various URI options that can be set when defining a ConnectionFactory
.
InitialContext
, itself obtained from an InitialContextFactory
, to look up JMS objects such as ConnectionFactory
. The Qpid JMS client provides an implementation of the InitialContextFactory
in class org.apache.qpid.jms.jndi.JmsInitialContextFactory. This may be configured and used in three main ways:
- Via
jndi.properties
file on the Java Classpath. - By including a file named
jndi.properties
on the Classpath and setting thejava.naming.factory.initial
property to valueorg.apache.qpid.jms.jndi.JmsInitialContextFactory
, the QpidInitialContextFactory
implementation will be discovered when instantiating InitialContext object.javax.naming.Context ctx = new javax.naming.InitialContext();
javax.naming.Context ctx = new javax.naming.InitialContext();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The particularConnectionFactory
, Queue and Topic objects you wish the context to contain are configured using properties (the syntax for which is detailed below) either directly within thejndi.properties
file, or in a separate file which is referenced injndi.properties
using thejava.naming.provider.url
property. - Via system properties.
- By setting the
java.naming.factory.initial
system property to valueorg.apache.qpid.jms.jndi.JmsInitialContextFactory
, the QpidInitialContextFactory
implementation will be discovered when instantiating InitialContext object.javax.naming.Context ctx = new javax.naming.InitialContext();
javax.naming.Context ctx = new javax.naming.InitialContext();
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The particularConnectionFactory
, Queue and Topic objects you wish the context to contain are configured as properties in a file, which is passed using thejava.naming.provider.url
system property. The syntax for these properties is detailed below. - Programmatically using an environment Hashtable.
- The InitialContext may also be configured directly by passing an environment during creation:
Hashtable<Object, Object> env = new Hashtable<Object, Object>(); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); javax.naming.Context context = new javax.naming.InitialContext(env);
Hashtable<Object, Object> env = new Hashtable<Object, Object>(); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); javax.naming.Context context = new javax.naming.InitialContext(env);
Copy to Clipboard Copied! Toggle word wrap Toggle overflow The particularConnectionFactory
, Queue and Topic objects you wish the context to contain are configured as properties (the syntax for which is detailed below), either directly within the environment Hashtable, or in a separate file which is referenced using thejava.naming.provider.url
property within the environment Hashtable.
Property | Syntax |
---|---|
ConnectionFactory | connectionfactory.lookupName = URI |
Queue | queue.lookupName = queueName |
Topic | topic.lookupName = topicName |
ConnectionFactory
, Queue, and Topic:
connectionfactory.myFactoryLookup = amqp://localhost:5672 queue.myQueueLookup = queueA topic.myTopicLookup = topicA
connectionfactory.myFactoryLookup = amqp://localhost:5672
queue.myQueueLookup = queueA
topic.myTopicLookup = topicA
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup"); Queue queue = (Queue) context.lookup("myQueueLookup"); Topic topic = (Topic) context.lookup("myTopicLookup");
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
Queue queue = (Queue) context.lookup("myQueueLookup");
Topic topic = (Topic) context.lookup("myTopicLookup");
21.2. QPID AMQP 1.0 JMS Client Connection URLs Copy linkLink copied to clipboard!
amqp://hostname:port[?option=value[&option2=value...]]
amqp://hostname:port[?option=value[&option2=value...]]
ConnectionFactory
, these are detailed in the following tables.
Connection
, Session
, MessageConsumer
and MessageProducer
.
Option | Description |
---|---|
jms.username
|
User name value used to authenticate the connection
|
jms.password
|
The password value used to authenticate the connection
|
jms.clientID
|
The ClientID value that is applied to the connection.
|
jms.forceAsyncSend
|
Configures whether all Messages sent from a
MessageProducer are sent asynchronously or only those Message that qualify such as Messages inside a transaction or non-persistent messages.
|
jms.alwaysSyncSend
|
Override all asynchronous send conditions and always sends every Message from a
MessageProducer synchronously.
|
jms.sendAcksAsync
|
Causes all Message acknowledgments to be sent asynchronously.
|
jms.localMessageExpiry
|
Controls whether
MessageConsumer instances will locally filter expired Messages or deliver them. By default this value is set to true and expired messages will be filtered.
|
jms.localMessagePriority
|
If enabled pre-fetched messages are reordered locally based on their given Message priority value. Default is false.
|
jms.validatePropertyNames
|
If message property names should be validated as valid Java identifiers. Default is
true .
|
jms.queuePrefix
|
Optional prefix value added to the name of any Queue created from a JMS Session.
|
jms.topicPrefix
|
Optional prefix value added to the name of any Topic created from a JMS Session.
|
jms.closeTimeout
|
Timeout value that controls how long the client waits on Connection close before returning. (By default the client waits 15 seconds for a normal close completion event).
|
jms.connectTimeout
|
Timeout value that controls how long the client waits on Connection establishment before returning with an error. (By default the client waits 15 seconds for a connection to be established before failing).
|
jms.clientIDPrefix
|
Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS
ConnectionFactory . The default prefix is ID: .
|
jms.connectionIDPrefix
|
Optional prefix value that is used for generated Connection ID values when a new Connection is created for the JMS
ConnectionFactory . This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing the logs easier. The default prefix is ID: .
|
Option | Description |
---|---|
jms.prefetchPolicy.queuePrefetch
|
defaults to
1000
|
jms.prefetchPolicy.topicPrefetch
|
defaults to
1000
|
jms.prefetchPolicy.queueBrowserPrefetch
|
defaults to
1000
|
jms.prefetchPolicy.durableTopicPrefetch
|
defaults to
1000
|
jms.prefetchPolicy.all
|
used to set all prefetch values at once.
|
RedeliveryPolicy
parameter controls how redelivered messages are handled on the client.
Option | Description |
---|---|
jms.redeliveryPolicy.maxRedeliveries | Controls when an incoming message is rejected based on the number of times it has been redelivered, the default value is disabled (-1 ). A value of zero (0 ) would indicate no message redeliveries are accepted, a value of five (5 ) would allow a message to be redelivered five times. |
amqp://localhost:5672?jms.clientID=foo&transport.connectTimeout=30000
amqp://localhost:5672?jms.clientID=foo&transport.connectTimeout=30000
Option | Description |
---|---|
transport.sendBufferSize
|
default is
64k
|
transport.receiveBufferSize
|
default is
64k
|
transport.trafficClass
|
default is
0
|
transport.connectTimeout
|
default is
60 seconds
|
transport.soTimeout
|
default is
-1
|
transport.soLinger
|
default is
-1
|
transport.tcpKeepAlive
|
default is
false
|
transport.tcpNoDelay
|
default is
true
|
amqps://localhost:5673
amqps://localhost:5673
Option | Description |
---|---|
transport.keyStoreLocation
|
The default is to read from the system property
javax.net.ssl.keyStore
|
transport.keyStorePassword
|
The default is to read from the system property
javax.net.ssl.keyStorePassword
|
transport.trustStoreLocation
|
The default is to read from the system property
javax.net.ssl.trustStore
|
transport.trustStorePassword
|
The default is to read from the system property
javax.net.ssl.keyStorePassword
|
transport.storeType
|
The type of trust store being used. Default is
JKS .
|
transport.contextProtocol
|
The protocol argument used when getting an SSLContext. Default is
TLS .
|
transport.enabledCipherSuites
|
The cipher suites to enable, comma separated. No default, meaning the context default ciphers are used. Any disabled ciphers are removed from this.
|
transport.disabledCipherSuites
|
The cipher suites to disable, comma separated. Ciphers listed here are removed from the enabled ciphers. No default.
|
transport.enabledProtocols
|
The protocols to enable, comma separated. No default, meaning the context default protocols are used. Any disabled protocols are removed from this.
|
transport.disabledProtocols
|
The protocols to disable, comma separated. Protocols listed here are removed from the enabled protocols. Default is
SSLv2Hello,SSLv3 .
|
transport.trustAll
|
Whether to trust the provided server certificate implicitly, regardless of any configured trust store. Defaults to
false .
|
transport.verifyHost
|
Whether to verify that the hostname being connected to matches with the provided server certificate. Defaults to
true .
|
transport.keyAlias
|
The alias to use when selecting a keypair from the keystore if required to send a client certificate to the server. No default.
|
Option | Description |
---|---|
amqp.idleTimeout
|
The idle timeout in milliseconds after which the connection will be failed if the peer sends no AMQP frames. Default is
60000 .
|
amqp.vhost
|
The
vhost to connect to. Used to populate the SASL and Open hostname fields. Default is the main hostname from the Connection URI.
|
amqp.saslLayer
|
Controls whether connections use a SASL layer or not. Default is
true .
|
amqp.saslMechanisms
|
Which SASL mechanism(s) the client allows selection of, if offered by the server and usable with the configured credentials. Comma separated if specifying more than 1 mechanism. Default is to allow selection from all the clients supported mechanisms, which are currently
EXTERNAL , CRAM-MD5 , PLAIN , and ANONYMOUS .
|
amqp.maxFrameSize
|
The max-frame-size value in bytes that is advertised to the peer. Default is
1048576 .
|
failover
prefix and a list of URIs for the brokers is contained inside a set of parentheses.
jms.
options are applied to the overall failover URI, outside the parentheses, and affect the JMS Connection object for its lifetime.
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.maxReconnectAttempts=20
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.maxReconnectAttempts=20
transport.
or amqp.
options defined earlier, with these being applied as each host is connected to:
failover:(amqp://host1:5672?amqp.option=value,amqp://host2:5672?transport.option=value)?jms.clientID=foo
failover:(amqp://host1:5672?amqp.option=value,amqp://host2:5672?transport.option=value)?jms.clientID=foo
Option | Description |
---|---|
failover.initialReconnectDelay
|
The amount of time the client will wait before the first attempt to reconnect to a remote peer. The default value is zero (
0 ), meaning the first attempt happens immediately.
|
failover.reconnectDelay
|
Controls the delay between successive reconnection attempts, defaults to
10 milliseconds. If the backoff option is not enabled this value remains constant.
|
failover.maxReconnectDelay
|
The maximum time that the client will wait before attempting a reconnect. This value is only used when the backoff feature is enabled to ensure that the delay does not grow too large. Defaults to
30 seconds as the max time between connect attempts.
|
failover.useReconnectBackOff
|
Controls whether the time between reconnection attempts grows based on a configured multiplier. This option defaults to
true .
|
failover.reconnectBackOffMultiplier
|
The multiplier used to grow the reconnection delay value, defaults to
2.0d .
|
failover.maxReconnectAttempts
|
The number of reconnection attempts allowed before reporting the connection as failed to the client. The default is no limit or (
-1 ).
|
failover.startupMaxReconnectAttempts
|
For a client that has never connected to a remote peer before this option control how many attempts are made to connect before reporting the connection as failed. The default is to use the value of
maxReconnectAttempts .
|
failover.warnAfterReconnectAttempts
|
Controls how often the client will log a message indicating that failover reconnection is being attempted. The default is to log every
10 connection attempts.
|
transport.
and amqp.
URI options outlined earlier for a non-failover broker URI but prefixed with failover.nested.
. For example, to apply the same value for the amqp.vhost
option to every broker connected to you might have a URI like:
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.nested.amqp.vhost=myhost
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.nested.amqp.vhost=myhost
failover.nested.
to discovery.discovered
. For example, without the agent URI details, a general discovery URI might look like:
discovery:(<agent-uri>)?discovery.maxReconnectAttempts=20&discovery.discovered.jms.clientID=foo
discovery:(<agent-uri>)?discovery.maxReconnectAttempts=20&discovery.discovered.jms.clientID=foo
discovery:(file:///path/to/monitored-file?updateInterval=60000)
discovery:(file:///path/to/monitored-file?updateInterval=60000)
updateInterval
. It controls the frequency in milliseconds which the file is inspected for change. The default value is 30000
.
discovery:(multicast://default?group=default)
discovery:(multicast://default?group=default)
239.255.2.3:6155
). You may change this to specify the actual IP and port in use with your multicast configuration.
group
. It controls which multicast group messages are listened for on. The default value is default
.
21.3. QPID AMQP 1.0 JMS Client Logging Copy linkLink copied to clipboard!
org.apache.qpid.jms
hierarchy, which you can use to configure a logging implementation based on your needs.
- Set the environment variable (not Java system property)
PN_TRACE_FRM
totrue
, which will cause Proton to emit frame logging to stdout. - Add the option
amqp.traceFrames=true
to your connection URI to have the client add a protocol tracer to Proton, and configure theorg.apache.qpid.jms.provider.amqp.FRAMES
Logger toTRACE
level to include the output in your logs.
Chapter 22. .NET Binding for Qpid C++ Messaging Copy linkLink copied to clipboard!
22.1. .NET Binding for the C++ Messaging Client Examples Copy linkLink copied to clipboard!
Example Name | Example Description |
---|---|
csharp.example.server | Creates a receiver and listens for messages. Upon receipt, the content of the message is converted to upper case and forwarded to the received message's ReplyTo address. |
csharp.example.client | Sends a series of messages to the server and prints the original message content and the received message content. |
See Also:
22.2. .NET Binding Class Mapping to Underlying C++ Messaging API Copy linkLink copied to clipboard!
Example Name | Example Description |
---|---|
csharp.map.receiver | Creates a receiver and listens for a map message. Upon receipt, the message is decoded and displayed on the console. |
csharp.map.sender | Creates a map message and sends it to map.receiver . The map message contains values for every supported .NET messaging binding data type. |
See Also:
22.3. .NET Binding for the C++ Messaging API Class: Address Copy linkLink copied to clipboard!
.NET Binding Class: Address | |
---|---|
Language | Syntax |
C++ | class Address |
.NET | public ref class Address |
Constructor | |
C++ | Address(); |
.NET | public Address(); |
Constructor | |
C++ | Address(const std::string& address); |
.NET | public Address(string address); |
Constructor | |
C++ | Address(const std::string& name, const std::string& subject, const qpid::types::Variant::Map& options, const std::string& type = ""); |
.NET | public Address(string name, string subject, Dictionary<string, object> options); |
.NET | public Address(string name, string subject, Dictionary<string, object> options, string type); |
Copy constructor | |
C++ | Address(const Address& address); |
.NET | public Address(Address address); |
Destructor | |
C++ | ~Address(); |
.NET | ~Address(); |
Finalizer | |
C++ | not applicable |
.NET | !Address(); |
Copy assignment operator | |
C++ | Address& operator=(const Address&); |
.NET | public Address op_Assign(Address rhs); |
Property: Name | |
C++ | const std::string& getName() const; |
C++ | void setName(const std::string&); |
.NET | public string Name { get; set; } |
Property: Subject | |
C++ | const std::string& getSubject() const; |
C++ | void setSubject(const std::string&); |
.NET | public string Subject { get; set; } |
Property: Options | |
C++ | const qpid::types::Variant::Map& getOptions() const; |
C++ | qpid::types::Variant::Map& getOptions(); |
C++ | void setOptions(const qpid::types::Variant::Map&); |
.NET | public Dictionary<string, object> Options { get; set; } |
Property: Type | |
C++ | std::string getType() const; |
C++ | void setType(const std::string&); |
.NET | public string Type { get; set; } |
Miscellaneous | |
C++ | std::string str() const; |
.NET | public string ToStr(); |
Miscellaneous | |
C++ | operator bool() const; |
.NET | not applicable |
Miscellaneous | |
C++ | bool operator !() const; |
.NET | not applicable |
See Also:
22.4. .NET Binding for the C++ Messaging API Class: Connection Copy linkLink copied to clipboard!
.NET Binding Class: Connection | |
---|---|
Language | Syntax |
C++ | class Connection : public qpid::messaging::Handle<ConnectionImpl> |
.NET | public ref class Connection |
Constructor | |
C++ | Connection(ConnectionImpl* impl); |
.NET | not applicable |
Constructor | |
C++ | Connection(); |
.NET | not applicable |
Constructor | |
C++ | Connection(const std::string& url, const qpid::types::Variant::Map& options = qpid::types::Variant::Map()); |
.NET | public Connection(string url); |
.NET | public Connection(string url, Dictionary<string, object> options); |
Constructor | |
C++ | Connection(const std::string& url, const std::string& options); |
.NET | public Connection(string url, string options); |
Copy Constructor | |
C++ | Connection(const Connection&); |
.NET | public Connection(Connection connection); |
Destructor | |
C++ | ~Connection(); |
.NET | ~Connection(); |
Finalizer | |
C++ | not applicable |
.NET | !Connection(); |
Copy assignment operator | |
C++ | Connection& operator=(const Connection&); |
.NET | public Connection op_Assign(Connection rhs); |
Method: SetOption | |
C++ | void setOption(const std::string& name, const qpid::types::Variant& value); |
.NET | public void SetOption(string name, object value); |
Method: open | |
C++ | void open(); |
.NET | public void Open(); |
Property: isOpen | |
C++ | bool isOpen(); |
.NET | public bool IsOpen { get; } |
Method: close | |
C++ | void close(); |
.NET | public void Close(); |
Method: createTransactionalSession | |
C++ | Session createTransactionalSession(const std::string& name = std::string()); |
.NET | public Session CreateTransactionalSession(); |
.NET | public Session CreateTransactionalSession(string name); |
Method: createSession | |
C++ | Session createSession(const std::string& name = std::string()); |
.NET | public Session CreateSession(); |
.NET | public Session CreateSession(string name); |
Method: getSession | |
C++ | Session getSession(const std::string& name) const; |
.NET | public Session GetSession(string name); |
Property: AuthenticatedUsername | |
C++ | std::string getAuthenticatedUsername(); |
.NET | public string GetAuthenticatedUsername(); |
See Also:
22.5. .NET Binding for the C++ Messaging API Class: Duration Copy linkLink copied to clipboard!
.NET Binding Class: Duration | |
---|---|
Language | Syntax |
C++ | class Duration |
.NET | public ref class Duration |
Constructor | |
C++ | explicit Duration(uint64_t milliseconds); |
.NET | public Duration(ulong mS); |
Copy constructor | |
C++ | not applicable |
.NET | public Duration(Duration rhs); |
Destructor | |
C++ | default |
.NET | default |
Finalizer | |
C++ | not applicable |
.NET | default |
Property: Milliseconds | |
C++ | uint64_t getMilliseconds() const; |
.NET | public ulong Milliseconds { get; } |
Operator: * | |
C++ | Duration operator*(const Duration& duration, uint64_t multiplier); |
.NET | public static Duration operator *(Duration dur, ulong multiplier); |
.NET | public static Duration Multiply(Duration dur, ulong multiplier); |
C++ | Duration operator*(uint64_t multiplier, const Duration& duration); |
.NET | public static Duration operator *(ulong multiplier, Duration dur); |
.NET | public static Duration Multiply(ulong multiplier, Duration dur); |
Constants | |
C++ | static const Duration FOREVER; |
C++ | static const Duration IMMEDIATE; |
C++ | static const Duration SECOND; |
C++ | static const Duration MINUTE; |
.NET | public sealed class DurationConstants |
.NET | public static Duration FORVER; |
.NET | public static Duration IMMEDIATE; |
.NET | public static Duration MINUTE; |
.NET | public static Duration SECOND; |
See Also:
22.6. .NET Binding for the C++ Messaging API Class: FailoverUpdates Copy linkLink copied to clipboard!
.NET Binding Class: FailoverUpdates | |
---|---|
Language | Syntax |
C++ | class FailoverUpdates |
.NET | public ref class FailoverUpdates |
Constructor | |
C++ | FailoverUpdates(Connection& connection); |
.NET | public FailoverUpdates(Connection connection); |
Destructor | |
C++ | ~FailoverUpdates(); |
.NET | ~FailoverUpdates(); |
Finalizer | |
C++ | not applicable |
.NET | !FailoverUpdates(); |
See Also:
22.7. .NET Binding for the C++ Messaging API Class: Message Copy linkLink copied to clipboard!
.NET Binding Class: Message | |
---|---|
Language | Syntax |
C++ | class Message |
.NET | public ref class Message |
Constructor | |
C++ | Message(const std::string& bytes = std::string()); |
.NET | Message(); |
.NET | Message(System::String ^ theStr); |
.NET | Message(System::Object ^ theValue); |
.NET | Message(array<System::Byte> ^ bytes); |
Constructor | |
C++ | Message(const char*, size_t); |
.NET | public Message(byte[] bytes, int offset, int size); |
Copy Constructor | |
C++ | Message(const Message&); |
.NET | public Message(Message message); |
Copy assignment operator | |
C++ | Message& operator=(const Message&); |
.NET | public Message op_Assign(Message rhs); |
Destructor | |
C++ | ~Message(); |
.NET | ~Message(); |
Finalizer | |
C++ | not applicable |
.NET | !Message() |
Property: ReplyTo | |
C++ | void setReplyTo(const Address&); |
C++ | const Address& getReplyTo() const; |
.NET | public Address ReplyTo { get; set; } |
Property: Subject | |
C++ | void setSubject(const std::string&); |
C++ | const std::string& getSubject() const; |
.NET | public string Subject { get; set; } |
Property: ContentType | |
C++ | void setContentType(const std::string&); |
C++ | const std::string& getContentType() const; |
.NET | public string ContentType { get; set; } |
Property: MessageId | |
C++ | void setMessageId(const std::string&); |
C++ | const std::string& getMessageId() const; |
.NET | public string MessageId { get; set; } |
Property: UserId | |
C++ | void setUserId(const std::string&); |
C++ | const std::string& getUserId() const; |
.NET | public string UserId { get; set; } |
Property: CorrelationId | |
C++ | void setCorrelationId(const std::string&); |
C++ | const std::string& getCorrelationId() const; |
.NET | public string CorrelationId { get; set; } |
Property: Priority | |
C++ | void setPriority(uint8_t); |
C++ | uint8_t getPriority() const; |
.NET | public byte Priority { get; set; } |
Property: Ttl | |
C++ | void setTtl(Duration ttl); |
C++ | Duration getTtl() const; |
.NET | public Duration Ttl { get; set; } |
Property: Durable | |
C++ | void setDurable(bool durable); |
C++ | bool getDurable() const; |
.NET | public bool Durable { get; set; } |
Property: Redelivered | |
C++ | bool getRedelivered() const; |
C++ | void setRedelivered(bool); |
.NET | public bool Redelivered { get; set; } |
Method: SetProperty | |
C++ | void setProperty(const std::string&, const qpid::types::Variant&); |
.NET | public void SetProperty(string name, object value); |
Property: Properties | |
C++ | const qpid::types::Variant::Map& getProperties() const; |
C++ | qpid::types::Variant::Map& getProperties(); |
.NET | public Dictionary<string, object> Properties { get; set; } |
Method: SetContent | |
C++ | void setContent(const std::string&); |
C++ | void setContent(const char* chars, size_t count); |
.NET | public void SetContent(byte[] bytes); |
.NET | public void SetContent(string content); |
.NET | public void SetContent(byte[] bytes, int offset, int size); |
Method: GetContent | |
C++ | std::string getContent() const; |
.NET | public string GetContent(); |
.NET | public void GetContent(byte[] arr); |
.NET | public void GetContent(Collection<object> __p1); |
.NET | public void GetContent(Dictionary<string, object> dict); |
Method: GetContentPtr | |
C++ | const char* getContentPtr() const; |
.NET | not applicable |
Property: ContentSize | |
C++ | size_t getContentSize() const; |
.NET | public ulong ContentSize { get; } |
Struct: EncodingException | |
C++ | struct EncodingException : qpid::types::Exception |
.NET | not applicable |
Method: decode | |
C++ | void decode(const Message& message, qpid::types::Variant::Map& map, const std::string& encoding = std::string()); |
C++ | void decode(const Message& message, qpid::types::Variant::List& list, const std::string& encoding = std::string()); |
.NET | not applicable |
Method: encode | |
C++ | void encode(const qpid::types::Variant::Map& map, Message& message, const std::string& encoding = std::string()); |
C++ | void encode(const qpid::types::Variant::List& list, Message& message, const std::string& encoding = std::string()); |
.NET | not applicable |
Method: AsString | |
C++ | not applicable |
.NET | public string AsString(object obj); |
.NET | public string ListAsString(Collection<object> list); |
.NET | public string MapAsString(Dictionary<string, object> dict); |
See Also:
22.8. .NET Binding for the C++ Messaging API Class: Receiver Copy linkLink copied to clipboard!
.NET Binding Class: Receiver | |
---|---|
Language | Syntax |
C++ | class Receiver |
.NET | public ref class Receiver |
Constructor | |
.NET | Constructed object is returned by Session.CreateReceiver |
Copy constructor | |
C++ | Receiver(const Receiver&); |
.NET | public Receiver(Receiver receiver); |
Destructor | |
C++ | ~Receiver(); |
.NET | ~Receiver(); |
Finalizer | |
C++ | not applicable |
.NET | !Receiver() |
Copy assignment operator | |
C++ | Receiver& operator=(const Receiver&); |
.NET | public Receiver op_Assign(Receiver rhs); |
Method: Get | |
C++ | bool get(Message& message, Duration timeout=Duration::FOREVER); |
.NET | public bool Get(Message mmsgp); |
.NET | public bool Get(Message mmsgp, Duration durationp); |
Method: Get | |
C++ | Message get(Duration timeout=Duration::FOREVER); |
.NET | public Message Get(); |
.NET | public Message Get(Duration durationp); |
Method: Fetch | |
C++ | bool fetch(Message& message, Duration timeout=Duration::FOREVER); |
.NET | public bool Fetch(Message mmsgp); |
.NET | public bool Fetch(Message mmsgp, Duration duration); |
Method: Fetch | |
C++ | Message fetch(Duration timeout=Duration::FOREVER); |
.NET | public Message Fetch(); |
.NET | public Message Fetch(Duration durationp); |
Property: Capacity | |
C++ | void setCapacity(uint32_t); |
C++ | uint32_t getCapacity(); |
.NET | public uint Capacity { get; set; } |
Property: Available | |
C++ | uint32_t getAvailable(); |
.NET | public uint Available { get; } |
Property: Unsettled | |
C++ | uint32_t getUnsettled(); |
.NET | public uint Unsettled { get; } |
Method: Close | |
C++ | void close(); |
.NET | public void Close(); |
Property: IsClosed | |
C++ | bool isClosed() const; |
.NET | public bool IsClosed { get; } |
Property: Name | |
C++ | const std::string& getName() const; |
.NET | public string Name { get; } |
Property: Session | |
C++ | Session getSession() const; |
.NET | public Session Session { get; } |
See Also:
22.9. .NET Binding for the C++ Messaging API Class: Sender Copy linkLink copied to clipboard!
.NET Binding Class: Sender | |
---|---|
Language | Syntax |
C++ | class Sender |
.NET | public ref class Sender |
Constructor | |
.NET | Constructed object is returned by session.createSender |
Copy constructor | |
C++ | Sender(const Sender&); |
.NET | public Sender(Sender sender); |
Destructor | |
C++ | ~Sender(); |
.NET | ~Sender(); |
Finalizer | |
C++ | not applicable |
.NET | !Sender() |
Copy assignment operator | |
C++ | Sender& operator=(const Sender&); |
.NET | public Sender op_Assign(Sender rhs); |
Method: Send | |
C++ | void send(const Message& message, bool sync=false); |
.NET | public void Send(Message mmsgp); |
.NET | public void Send(Message mmsgp, bool sync); |
Method: Close | |
C++ | void close(); |
.NET | public void Close(); |
Property: Capacity | |
C++ | void setCapacity(uint32_t); |
C++ | uint32_t getCapacity(); |
.NET | public uint Capacity { get; set; } |
Property: Available | |
C++ | uint32_t getAvailable(); |
.NET | public uint Available { get; } |
Property: Unsettled | |
C++ | uint32_t getUnsettled(); |
.NET | public uint Unsettled { get; } |
Property: Name | |
C++ | const std::string& getName() const; |
.NET | public string Name { get; } |
Property: Session | |
C++ | Session getSession() const; |
.NET | public Session Session { get; } |
See Also:
22.10. .NET Binding for the C++ Messaging API Class: Session Copy linkLink copied to clipboard!
Language | Syntax |
---|---|
C++ | class Session |
.NET | public ref class Session |
Constructor | |
.NET | Constructed object is returned by Connection.CreateSession |
Copy constructor | |
C++ | Session(const Session&); |
.NET | public Session(Session session); |
Destructor | |
C++ | ~Session(); |
.NET | ~Session(); |
Finalizer | |
C++ | not applicable |
.NET | !Session() |
Copy assignment operator | |
C++ | Session& operator=(const Session&); |
.NET | public Session op_Assign(Session rhs); |
Method: Close | |
C++ | void close(); |
.NET | public void Close(); |
Method: Commit | |
C++ | void commit(); |
.NET | public void Commit(); |
Method: Rollback | |
C++ | void rollback(); |
.NET | public void Rollback(); |
Method: Acknowledge | |
C++ | void acknowledge(bool sync=false); |
C++ | void acknowledge(Message&, bool sync=false); |
.NET | public void Acknowledge(); |
.NET | public void Acknowledge(bool sync); |
.NET | public void Acknowledge(Message __p1); |
.NET | public void Acknowledge(Message __p1, bool __p2); |
Method: Reject | |
C++ | void reject(Message&); |
.NET | public void Reject(Message __p1); |
Method: Release | |
C++ | void release(Message&); |
.NET | public void Release(Message __p1); |
Method: Sync | |
C++ | void sync(bool block=true); |
.NET | public void Sync(); |
.NET | public void Sync(bool block); |
Property: Receivable | |
C++ | uint32_t getReceivable(); |
.NET | public uint Receivable { get; } |
Property: UnsettledAcks | |
C++ | uint32_t getUnsettledAcks(); |
.NET | public uint UnsettledAcks { get; } |
Method: NextReceiver | |
C++ | bool nextReceiver(Receiver&, Duration timeout=Duration::FOREVER); |
.NET | public bool NextReceiver(Receiver rcvr); |
.NET | public bool NextReceiver(Receiver rcvr, Duration timeout); |
Method: NextReceiver | |
C++ | Receiver nextReceiver(Duration timeout=Duration::FOREVER); |
.NET | public Receiver NextReceiver(); |
.NET | public Receiver NextReceiver(Duration timeout); |
Method: CreateSender | |
C++ | Sender createSender(const Address& address); |
.NET | public Sender CreateSender(Address address); |
Method: CreateSender | |
C++ | Sender createSender(const std::string& address); |
.NET | public Sender CreateSender(string address); |
Method: CreateReceiver | |
C++ | Receiver createReceiver(const Address& address); |
.NET | public Receiver CreateReceiver(Address address); |
Method: CreateReceiver | |
C++ | Receiver createReceiver(const std::string& address); |
.NET | public Receiver CreateReceiver(string address); |
Method: GetSender | |
C++ | Sender getSender(const std::string& name) const; |
.NET | public Sender GetSender(string name); |
Method: GetReceiver | |
C++ | Receiver getReceiver(const std::string& name) const; |
.NET | public Receiver GetReceiver(string name); |
Property: Connection | |
C++ | Connection getConnection() const; |
.NET | public Connection Connection { get; } |
Property: HasError | |
C++ | bool hasError(); |
.NET | public bool HasError { get; } |
Method: CheckError | |
C++ | void checkError(); |
.NET | public void CheckError(); |
See Also:
22.11. .NET Class: SessionReceiver Copy linkLink copied to clipboard!
SessionReceiver
class provides a convenient callback mechanism for messages received by all receivers on a given session.
Org.Apache.Qpid.Messaging
and Org.Apache.Qpid.Messaging.SessionReceiver
. The calling program creates a function that implements the ISessionReceiver
interface. This function will be called whenever a message is received by the session. The callback process is started by creating a CallbackServer
and will continue to run until the client program calls the CallbackServer.Close function
.
SessionReceiver
callback is contained in cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver
.
See Also:
Appendix A. Exchange and Queue Declaration Arguments Copy linkLink copied to clipboard!
A.1. Exchange and Queue Argument Reference Copy linkLink copied to clipboard!
- Changes
qpid.last_value_queue
andqpid.last_value_queue_no_browse
deprecated and removed.qpid.msg_sequence
queue argument replaced byqpid.queue_msg_sequence
.ring_strict
andflow_to_disk
are no longer validqpid.policy_type
values.qpid.persist_last_node
deprecated and removed.
Exchange options
qpid.exclusive-binding
(bool)- Ensures that a given binding key is associated with only one queue.
qpid.ive
(bool)- If set to “true”, the exchange is an initial value exchange, which differs from other exchanges in only one way: the last message sent to the exchange is cached, and if a new queue is bound to the exchange, it attempts to route this message to the queue, if the message matches the binding criteria. This allows a new queue to use the last received message as an initial value.
qpid.msg_sequence
(bool)- If set to “true”, the exchange inserts a sequence number named “qpid.msg_sequence” into the message headers of each message. The type of this sequence number is int64. The sequence number for the first message routed from the exchange is 1, it is incremented sequentially for each subsequent message. The sequence number is reset to 1 when the qpid broker is restarted.
qpid.sequence_counter
(int64)- Start
qpid.msg_sequence
counting at the given number.
Queue options
no-local
(bool)- Specifies that the queue should discard any messages enqueued by sessions on the same connection as that which declares the queue.
qpid.alert_count
(uint32_t)- If the queue message count goes above this size an alert should be sent.
qpid.alert_repeat_gap
(int64_t)- Controls the minimum interval between events in seconds. The default value is 60 seconds.
qpid.alert_size
(int64_t)- If the queue size in bytes goes above this size an alert should be sent.
qpid.auto_delete_timeout
(bool)- If a queue is configured to be automatically deleted, it will be deleted after the amount of seconds specified here.
qpid.browse-only
(bool)- All users of queue are forced to browse. Limit queue size with ring, LVQ, or TTL. Note that this argument name uses a hyphen rather than an underscore.
qpid.file_count
(int)- Set the number of files in the persistence journal for the queue. Default value is 8.
qpid.file_size
(int64)- Set the number of pages in the file (each page is 64KB). Default value is 24.
qpid.flow_resume_count
(uint32_t)- Flow resume threshold value as a message count.
qpid.flow_resume_size
(uint64_t)- Flow resume threshold value in bytes.
qpid.flow_stop_count
(uint32_t)- Flow stop threshold value as a message count.
qpid.flow_stop_size
(uint64_t)- Flow stop threshold value in bytes.
qpid.last_value_queue_key
(string)- Defines the key to use for a last value queue.
qpid.max_count
(uint32_t)- The maximum byte size of message data that a queue can contain before the action dictated by the
policy_type
is taken. qpid.max_size
(uint64_t)- The maximum number of messages that a queue can contain before the action dictated by the
policy_type
is taken. qpid.policy_type
(string)- Sets default behavior for controlling queue size. Valid values are
reject
andring
. qpid.priorities
(size_t)- The number of distinct priority levels recognized by the queue (up to a maximum of 10). The default value is 1 level.
qpid.queue_msg_sequence
(string)- Causes a custom header with the specified name to be added to enqueued messages. This header is automatically populated with a sequence number.
qpid.trace.exclude
(string)- Does not send on messages which include one of the given (comma separated) trace ids.
qpid.trace.id
(string)- Adds the given trace id as to the application header "
x-qpid.trace
" in messages sent from the queue. x-qpid-maximum-message-count
- This is an alias for
qpid.alert_count
. x-qpid-maximum-message-size
- This is an alias for
qpid.alert_size
. x-qpid-minimum-alert-repeat-gap
- This is an alias for
qpid.alert_repeat_gap
. x-qpid-priorities
- This is an alias for
qpid.priorities
.
Appendix B. Revision History Copy linkLink copied to clipboard!
Revision History | |||
---|---|---|---|
Revision 3.2.0-6 | Fri Oct 16 2015 | ||
| |||
Revision 3.2.0-5 | Thu Oct 8 2015 | ||
| |||
Revision 3.2.0-3 | Tue Sep 29 2015 | ||
| |||
Revision 3.2.0-1 | Tue Jul 14 2015 | ||
| |||
Revision 3.1.0-5 | Wed Apr 01 2015 | ||
| |||
Revision 3.0.0-4 | Tue Sep 23 2014 | ||
|