4.9.3. Detect Overwritten Messages in Ring Queues
Ring queues overwrite older messages with incoming messages when the queue capacity is reached. Some applications need to be aware when messages have been overwritten. This can be achieved by declaring the queue with the
qpid.queue_msg_sequence
argument.
The
qpid.queue_msg_sequence
argument accepts a single string value as its parameter. This string value is added by the broker as a message property on each message that comes through the ring queue, and the property is set to a sequentially incrementing integer value.
Applications can examine the value of the
qpid.queue_msg_sequence
specified property on each message to determine if interim messages have been overwritten in the ring queue, and response appropriately.
Note that the message sequence must be examined by a stateful application to detect a break in the sequence. An exclusive queue with a single consumer is able to do this. If multiple consumers take messages from the queue the message sequence is split between consumers and they have no way to tell if a message has been overwritten.
Note also that the message sequence is not persisted, even with persistent messages sent to a durable queue, so a broker restart causes sequence discontinuity.
The following code demonstrates the use of
qpid.queue_msg_sequence
:
- Python
import sys from qpid.messaging import * from qpid.datatypes import Serial conn = Connection.establish("localhost:5672") ssn = conn.session() name="ring-sequence-queue" key="my_sequence_key" addr = "%s; {create:sender, delete:always, node: {x-declare: {arguments: {'qpid.queue_msg_sequence':'%s', 'qpid.policy_type':'ring', 'qpid.max_count':4}}}}" % (name, key) sender = ssn.sender(addr) msg = Message() sender.send(msg) receiver = ssn.receiver(name) msg = receiver.fetch(1) try: seqNo = Serial(long(msg.properties[key])) if seqNo != 1: print "Unexpected sequence number. Should be 1. Received (%s)" % seqNo else: print "Received message with sequence number 1" except: print "Unable to get key (%s) from message properties" % key """ Test that sequence number for ring queues shows gaps when queue messages are overwritten """ msg = Message() sender.send(msg) msg = receiver.fetch(1) seqNo = Serial(long(msg.properties[key])) print "Received second message with sequence number %s" % seqNo # send 5 more messages to overflow the queue for i in range(5): sender.send(msg) msg = receiver.fetch(1) seqNo = msg.properties[key] if seqNo != 3: print "Unexpected sequence number. Should be 3. Received (%s) - Message overwritten in ring queue." % seqNo receiver.close() ssn.close()
The message sequence number is transferred as an unsigned 32 bit integer, so it wraps around at 2^32. In Python, use the
Serial
class from qpid.datatype
to handle the wrapping.