5.2. Browsing and Consuming Messages


5.2.1. Message Acquisition and Acceptance

A message consumer can browse the messages in a queue, or consume them.
Browsing means that the consuming application reads the messages, but the messages remain on the queue for other consumers. Consuming means that the consuming application removes the message from the queue. This is also known as acquiring a message.
We will first look at the broad distinction between browsing and acquiring messages, then in Acquired and Acknowledged we'll look in more detail at the acquisition process, which has two phases that we need to understand.
Browsing

The included drain program can be used in either browse or acquisition mode.

The drain source code is part of the C++ and the Python client library packages. You can compile the C++ source code, or run the Python source uncompiled using a Python interpreter.
When the client library packages are installed, drain can be found in:
/usr/share/doc/python-qpid-0.14/examples/api/drain
/usr/share/qpidc/examples/messaging/drain.cpp
To demonstrate the difference between browsing and acquisition, you can try the following:
With the broker installed and running, create a queue with the qpid-config command:
qpid-config add queue browse-acquire-demo
You should now see your browse-acquire-demo queue when you run qpid-config queues.
Now let's send a message to the browse-acquire-demo using spout. Spout is included in the same packages as drain, and can be found in the same directories. Run spout to send a message to the queue:
./spout browse-acquire-demo "Hello World"
Our "Hello World" message has now been sent to the browse-acquire-demo queue. Let's use drain to browse it first of all:
./drain -c 0 "browse-acquire-demo; {mode:browse}"
You will now see the "Hello World" message. Run the above drain a second time, and you'll see the message again. Running the drain program twice simulates two different browsing consumers accessing the queue. The message is read and remains available for other consuming applications when it is browsed.
Try deleting the browse-acquire-demo queue using qpid-config:
qpid-config del queue browse-acquire-demo
qpid-config responds with an error because a message remains in the queue.
Now run this drain command:
./drain -c 0 "browse-acquire-demo"
The default mode is acquisition. When drain is run like this with no mode specified, it acquires the message. You will see the "Hello World" message just as you did on the previous browsing accesses. However, this time the message has been removed. Try browsing it again using drain. The queue is empty.
You can delete the now-empty queue using qpid-config:
qpid-config del queue browse-acquire-demo
One thing you will not see with the drain demo is the fact that browsers see a message only once. Because each time drain is run it creates a different browser, it sees the message in the queue each time. The same browser, however, sees the message only once, no matter how many times it looks.
The following Python code demonstrates browsing and acquiring, and demonstrates how a browser sees each message once:
Python
import sys
from qpid.messaging import *

def msgfetch(rx):
  try:
    msg = rx.fetch(timeout=1)
  except MessagingError, m:
    msg = m
  return msg
  
connection = Connection("localhost:5672")
connection.open()
try:
  session = connection.session()
  tx = session.sender("browse-acquire-demo;{create:always}")
  rxbrowse1 = session.receiver("browse-acquire-demo;{mode:browse}")
  rxbrowse2 = session.receiver("browse-acquire-demo;{mode:browse}")
  rxbrowse3 = session.receiver("browse-acquire-demo;{mode:browse}")
  rxacquire = session.receiver("browse-acquire-demo")
  
  tx.send("Hello World")
  
  print "\nBrowser 1 saw message:"
  print msgfetch(rxbrowse1)
      
  print "Browser 1 then saw message:"
  print msgfetch(rxbrowse1)

  print "\nBrowser 2 saw message:"
  print msgfetch(rxbrowse2)

  print "Browser 2 then saw message:"
  print msgfetch(rxbrowse2)

  print "\nAcquired message:"
  print msgfetch(rxacquire)
  
  print "\nBrowser 3 saw message:"
  print msgfetch(rxbrowse3)
  
except MessagingError, m:
  print m
finally:
  connection.close()
Browser 1 and Browser 2 both see the message, and only see it once each. Because the message is acquired before Browser 3 looks at the queue, Browser 3 sees no message on the queue.
However, now run drain to examine the queue:
./drain -c 0 browse-acquire-demo
You may be surprised to see the message still on the queue (you just removed it, by the way). What happened?
Acquired and Acknowledged

When our receiver acquired the message from the queue, the broker set the message to acquired. When a message is acquired, the broker treats the message as if it has been delivered, but it does not delete it from the queue. One of a number of things happen from here: the consumer who acquired the message acknowledges the message, releases the message, or rejects the message, or the consumer might disconnect through a network failure.

In our case, our application is disconnecting from the broker without acknowledging receipt of the message. While our application is connected the message is acquired, and message consumers browsing or fetching from the queue will not see the message. When our application disconnects without acknowledging receipt, the broker switches the message out of acquired state and sets a header redelivered=True. The message is then made available to other consumers, such as the drain browser that we ran after our application closed.
This goal of the "acquire, acknowledge" pattern is to provide reliable delivery of messages. Imagine a situation where a group of nodes are performing a service that is driven by messages. Each node in the workgroup grabs a bunch of messages from the queue when it has the capacity to perform some work. A node might grab a handful of messages from the queue, and then suffer a power outage. In this case those messages would be missing, if the broker did not have the concept of acquire and acknowledge. With this pattern, the worker node can acquire the messages, perform some work, and then acknowledge ownership at a point in time where it is safe to say that the message has been delivered and acted on. This narrows the window for exceptions. Even in the case where the node fails right at the critical moment after it has acted on the messages but before it can acknowledge receipt, the other nodes will retrieve the messages from the queue with the header 'redelivered=true'. This alerts the other nodes that this message may have already been acted on, and they can perform checks to see if that is so. This narrows the window for exceptions even further, when the applications are designed to take advantage of these features.
To see a message returning to the queue when a consumer disconnects without acquiring the message demonstrated inside the application, add the following code to the end of the application, after the final connection.close() line:
Python
connection.open()
try:
  session=connection.session()

  rxacquire2 = session.receiver("browse-acquire-demo")
  print "\nAcquirer 2 saw message:"
  print msgfetch(rxacquire2)
except MessagingError, m:
  print m
finally:
  session.acknowledge()
  connection.close()
Our application closes its connection, disconnecting the consumer from the broker without acknowledging receipt of the message. We then open a new connection to broker, effectively appearing as a new consumer. Our receiver now sees the message, which has been marked by the broker as redelivered to inform us that another consumer acquired this message previously. We have now acquired this message, and it will again disappear for other consumers browsing or fetching from this queue. This time, however, we call session.acknowledge() before closing the connection. This method acknowledges receipt of the message (it acknowledges all messages as-yet unacknowledged for the session). Since we have acknowledged receipt of the message, the message is acquired, and it is removed from the queue.
If you run drain now, you will see that there are no messages in the queue.
Releasing a message

A consumer can explicitly release a message. When this happens, the message is returned to the queue for redelivery. The effect is the same as if the consumer lost its connection to the broker.

To release the message explicitly with the Python API, call the acknowledge() method with the message and Disposition(RELEASED) as parameters:
session.acknowledge(msg, Disposition(RELEASED))
To release the message explicitly with the C++ API, call the session's release() method.
Link Reliability

Note that this two-phase acquisition and acceptance behavior is the behavior over a reliable link (technically an at-least-once link), which is the default link for receiver connections to the broker. If you explicitly connect your receiver to a queue using an unreliable link, or directly connect to an exchange, then received messages are immediately acquired with no need to acknowledge them.

Cleaning up the demo queue

To delete the queue we used for this demo, you can either restart the broker (all non-durable queues are deleted when the broker is restarted), or you can use qpid-config:

qpid-config del queue browse-acquire-demo
If there are messages remaining in the queue this command will fail with an message informing you that the queue is not empty. You can use the --force switch to override this check and delete a queue with messages in it, or you can use drain to empty the queue, and then reissue the command on the now-empty queue.
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.