Este contenido no está disponible en el idioma seleccionado.

4.9.3. Detect Overwritten Messages in Ring Queues


Ring queues overwrite older messages with incoming messages when the queue capacity is reached. Some applications need to be aware when messages have been overwritten. This can be achieved by declaring the queue with the qpid.queue_msg_sequence argument.
The qpid.queue_msg_sequence argument accepts a single string value as its parameter. This string value is added by the broker as a message property on each message that comes through the ring queue, and the property is set to a sequentially incrementing integer value.
Applications can examine the value of the qpid.queue_msg_sequence specified property on each message to determine if interim messages have been overwritten in the ring queue, and response appropriately.
Note that the message sequence must be examined by a stateful application to detect a break in the sequence. An exclusive queue with a single consumer is able to do this. If multiple consumers take messages from the queue the message sequence is split between consumers and they have no way to tell if a message has been overwritten.
Note also that the message sequence is not persisted, even with persistent messages sent to a durable queue, so a broker restart causes sequence discontinuity.
The following code demonstrates the use of qpid.queue_msg_sequence:
Python
import sys
from qpid.messaging import *
from qpid.datatypes import Serial

conn = Connection.establish("localhost:5672")
ssn = conn.session()

name="ring-sequence-queue"
key="my_sequence_key"
addr = "%s; {create:sender, delete:always, node: {x-declare: {arguments: {'qpid.queue_msg_sequence':'%s', 'qpid.policy_type':'ring', 'qpid.max_count':4}}}}"  % (name, key)
sender = ssn.sender(addr)

msg = Message()
sender.send(msg)

receiver = ssn.receiver(name)
msg = receiver.fetch(1)

try:
    seqNo = Serial(long(msg.properties[key]))
    if seqNo != 1:
        print "Unexpected sequence number. Should be 1. Received (%s)" % seqNo
    else:
       print "Received message with sequence number 1"
except:
    print "Unable to get key (%s) from message properties" % key

"""
Test that sequence number for ring queues shows gaps when queue messages are overwritten
"""

msg = Message()
sender.send(msg)
msg = receiver.fetch(1)
seqNo = Serial(long(msg.properties[key]))

print "Received second message with sequence number %s" % seqNo
# send 5 more messages to overflow the queue
for i in range(5):
    sender.send(msg)

msg = receiver.fetch(1)
seqNo = msg.properties[key]
if seqNo != 3:
    print "Unexpected sequence number. Should be 3. Received (%s) - Message overwritten in ring queue." % seqNo
receiver.close()
ssn.close()
The message sequence number is transferred as an unsigned 32 bit integer, so it wraps around at 2^32. In Python, use the Serial class from qpid.datatype to handle the wrapping.
Red Hat logoGithubredditYoutubeTwitter

Aprender

Pruebe, compre y venda

Comunidades

Acerca de la documentación de Red Hat

Ayudamos a los usuarios de Red Hat a innovar y alcanzar sus objetivos con nuestros productos y servicios con contenido en el que pueden confiar. Explore nuestras recientes actualizaciones.

Hacer que el código abierto sea más inclusivo

Red Hat se compromete a reemplazar el lenguaje problemático en nuestro código, documentación y propiedades web. Para más detalles, consulte el Blog de Red Hat.

Acerca de Red Hat

Ofrecemos soluciones reforzadas que facilitan a las empresas trabajar en plataformas y entornos, desde el centro de datos central hasta el perímetro de la red.

Theme

© 2026 Red Hat
Volver arriba