此内容没有您所选择的语言版本。

4.9.3. Detect Overwritten Messages in Ring Queues


Ring queues overwrite older messages with incoming messages when the queue capacity is reached. Some applications need to be aware when messages have been overwritten. This can be achieved by declaring the queue with the qpid.queue_msg_sequence argument.
The qpid.queue_msg_sequence argument accepts a single string value as its parameter. This string value is added by the broker as a message property on each message that comes through the ring queue, and the property is set to a sequentially incrementing integer value.
Applications can examine the value of the qpid.queue_msg_sequence specified property on each message to determine if interim messages have been overwritten in the ring queue, and response appropriately.
Note that the message sequence must be examined by a stateful application to detect a break in the sequence. An exclusive queue with a single consumer is able to do this. If multiple consumers take messages from the queue the message sequence is split between consumers and they have no way to tell if a message has been overwritten.
Note also that the message sequence is not persisted, even with persistent messages sent to a durable queue, so a broker restart causes sequence discontinuity.
The following code demonstrates the use of qpid.queue_msg_sequence:
Python
import sys
from qpid.messaging import *
from qpid.datatypes import Serial

conn = Connection.establish("localhost:5672")
ssn = conn.session()

name="ring-sequence-queue"
key="my_sequence_key"
addr = "%s; {create:sender, delete:always, node: {x-declare: {arguments: {'qpid.queue_msg_sequence':'%s', 'qpid.policy_type':'ring', 'qpid.max_count':4}}}}"  % (name, key)
sender = ssn.sender(addr)

msg = Message()
sender.send(msg)

receiver = ssn.receiver(name)
msg = receiver.fetch(1)

try:
    seqNo = Serial(long(msg.properties[key]))
    if seqNo != 1:
        print "Unexpected sequence number. Should be 1. Received (%s)" % seqNo
    else:
       print "Received message with sequence number 1"
except:
    print "Unable to get key (%s) from message properties" % key

"""
Test that sequence number for ring queues shows gaps when queue messages are overwritten
"""

msg = Message()
sender.send(msg)
msg = receiver.fetch(1)
seqNo = Serial(long(msg.properties[key]))

print "Received second message with sequence number %s" % seqNo
# send 5 more messages to overflow the queue
for i in range(5):
    sender.send(msg)

msg = receiver.fetch(1)
seqNo = msg.properties[key]
if seqNo != 3:
    print "Unexpected sequence number. Should be 3. Received (%s) - Message overwritten in ring queue." % seqNo
receiver.close()
ssn.close()
The message sequence number is transferred as an unsigned 32 bit integer, so it wraps around at 2^32. In Python, use the Serial class from qpid.datatype to handle the wrapping.
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部