5.2. Browsing and Consuming Messages
5.2.1. Message Acquisition and Acceptance
The included drain
program can be used in either browse or acquisition mode.
drain
can be found in:
/usr/share/doc/python-qpid-0.14/examples/api/drain /usr/share/qpidc/examples/messaging/drain.cpp
qpid-config
command:
qpid-config add queue browse-acquire-demo
browse-acquire-demo
queue when you run qpid-config queues
.
browse-acquire-demo
using spout
. Spout
is included in the same packages as drain
, and can be found in the same directories. Run spout to send a message to the queue:
./spout browse-acquire-demo "Hello World"
browse-acquire-demo
queue. Let's use drain to browse it first of all:
./drain -c 0 "browse-acquire-demo; {mode:browse}"
drain
a second time, and you'll see the message again. Running the drain program twice simulates two different browsing consumers accessing the queue. The message is read and remains available for other consuming applications when it is browsed.
browse-acquire-demo
queue using qpid-config
:
qpid-config del queue browse-acquire-demo
qpid-config
responds with an error because a message remains in the queue.
./drain -c 0 "browse-acquire-demo"
qpid-config
:
qpid-config del queue browse-acquire-demo
drain
demo is the fact that browsers see a message only once. Because each time drain
is run it creates a different browser, it sees the message in the queue each time. The same browser, however, sees the message only once, no matter how many times it looks.
- Python
import sys from qpid.messaging import * def msgfetch(rx): try: msg = rx.fetch(timeout=1) except MessagingError, m: msg = m return msg connection = Connection("localhost:5672") connection.open() try: session = connection.session() tx = session.sender("browse-acquire-demo;{create:always}") rxbrowse1 = session.receiver("browse-acquire-demo;{mode:browse}") rxbrowse2 = session.receiver("browse-acquire-demo;{mode:browse}") rxbrowse3 = session.receiver("browse-acquire-demo;{mode:browse}") rxacquire = session.receiver("browse-acquire-demo") tx.send("Hello World") print "\nBrowser 1 saw message:" print msgfetch(rxbrowse1) print "Browser 1 then saw message:" print msgfetch(rxbrowse1) print "\nBrowser 2 saw message:" print msgfetch(rxbrowse2) print "Browser 2 then saw message:" print msgfetch(rxbrowse2) print "\nAcquired message:" print msgfetch(rxacquire) print "\nBrowser 3 saw message:" print msgfetch(rxbrowse3) except MessagingError, m: print m finally: connection.close()
drain
to examine the queue:
./drain -c 0 browse-acquire-demo
When our receiver acquired the message from the queue, the broker set the message to acquired
. When a message is acquired
, the broker treats the message as if it has been delivered, but it does not delete it from the queue. One of a number of things happen from here: the consumer who acquired the message acknowledges the message, releases the message, or rejects the message, or the consumer might disconnect through a network failure.
acquired
, and message consumers browsing or fetching from the queue will not see the message. When our application disconnects without acknowledging receipt, the broker switches the message out of acquired
state and sets a header redelivered=True
. The message is then made available to other consumers, such as the drain
browser that we ran after our application closed.
redelivered=true
'. This alerts the other nodes that this message may have already been acted on, and they can perform checks to see if that is so. This narrows the window for exceptions even further, when the applications are designed to take advantage of these features.
connection.close()
line:
- Python
connection.open() try: session=connection.session() rxacquire2 = session.receiver("browse-acquire-demo") print "\nAcquirer 2 saw message:" print msgfetch(rxacquire2) except MessagingError, m: print m finally: session.acknowledge() connection.close()
redelivered
to inform us that another consumer acquired this message previously. We have now acquired this message, and it will again disappear for other consumers browsing or fetching from this queue. This time, however, we call session.acknowledge()
before closing the connection. This method acknowledges receipt of the message (it acknowledges all messages as-yet unacknowledged for the session). Since we have acknowledged receipt of the message, the message is acquired
, and it is removed from the queue.
drain
now, you will see that there are no messages in the queue.
A consumer can explicitly release a message. When this happens, the message is returned to the queue for redelivery. The effect is the same as if the consumer lost its connection to the broker.
acknowledge()
method with the message and Disposition(RELEASED)
as parameters:
session.acknowledge(msg, Disposition(RELEASED))
release()
method.
Note that this two-phase acquisition and acceptance behavior is the behavior over a reliable link (technically an at-least-once link), which is the default link for receiver connections to the broker. If you explicitly connect your receiver to a queue using an unreliable link, or directly connect to an exchange, then received messages are immediately acquired with no need to acknowledge them.
To delete the queue we used for this demo, you can either restart the broker (all non-durable
queues are deleted when the broker is restarted), or you can use qpid-config
:
qpid-config del queue browse-acquire-demo
--force
switch to override this check and delete a queue with messages in it, or you can use drain
to empty the queue, and then reissue the command on the now-empty queue.