이 콘텐츠는 선택한 언어로 제공되지 않습니다.

6.6.3. Last Value Queue Example


This example demonstrates how to create and use a Last Value Queue. The language bindings and programming details differ between languages, but the principles are the same.
We will create a messaging queue that provides regular stock price updates. Message consumers are interested in the current stock price, and do not wish or need to receive messages with historical information. A last value queue is perfect for this application: newly arriving messages can update and replace older ones.
We will call our queue "stock-ticker". Our stock-ticker queue will use "stock-symbol" as the last value queue key. The value of this key in the message header will identify a message as a new message to the queue, or an update to a message already in the queue.
First we import the Qpid Messaging client library:
Python
import sys
from qpid.messaging import *
Copy to Clipboard Toggle word wrap
Now we create a Connection to the broker running on the standard AMQP port, 5672, on the local machine:
Python
connection = Connection("localhost:5672")
connection.open()
Copy to Clipboard Toggle word wrap
And now we use this connection to create a session:
Python
session = connection.session()
Copy to Clipboard Toggle word wrap
Now we create a sender and declare a last value queue at the same time. We will create a queue called "stock-ticker", and use "stock-symbol" as the last value queue key. Messages sent to this queue will identify themselves as an update to a previous message by specifying the same "stock-symbol" in their headers.
The following statement is a single line of code. It may break across lines in display, but it should be entered as a single line.
Python
stockSender = session.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
Copy to Clipboard Toggle word wrap
Sidenote: We could also create the queue using the qpid-config command line tool:
qpid-config add queue stock-ticker --argument qpid.last_value_queue_key=stock-symbol
Copy to Clipboard Toggle word wrap
Now let's create and send some messages to the queue. We use the "stock-symbol" key in the header to identify which stock a message describes. Our last value queue uses this header key to match our message with messages already in the queue.
Python
msg1 = Message("10")
msg1.properties = {'stock-symbol':'RHT'}

msg2 = Message("10")  
msg2.properties = {'stock-symbol':'JAVA'}

msg3 = Message("10")
msg3.properties = {'stock-symbol':'MSFT'}

msg4 = Message("12")
msg4.properties = {'stock-symbol':'RHT'}
Copy to Clipboard Toggle word wrap
After sending these messages to our last value queue a new consumer should see three messages in the queue, one for each stock symbol, with msg4 updating msg1. To contrast the behavior of the last value queue with a standard FIFO queue, we'll send our messages to a control queue, called control-queue at the same time:
Python
controlSender = session.sender("control-queue;{create:always, node:{type:queue}}")
Copy to Clipboard Toggle word wrap
Now we send our messages to the two queues:
Python
stockSender.send(msg1)
controlSender.send(msg1)

stockSender.send(msg2)
controlSender.send(msg2)

stockSender.send(msg3)
controlSender.send(msg3)

stockSender.send(msg4)
controlSender.send(msg4)
Copy to Clipboard Toggle word wrap
Our messages are now in the queues. We create two receivers to now examine the content of the queues:
Python
stockBrowser = session.receiver("stock-ticker; {mode:browse}")
controlBrowser = session.receiver("control-queue; {mode:browse}")
Copy to Clipboard Toggle word wrap
These are browsing receivers, so they do not acquire messages and remove them from the queue. To clear the queues, remove the browse property from the receiver declarations, like so: session.receiver("stock-ticker"), and run the demo again. With the receivers browsing, you will be able to see more distinctly the effect of a Last Value Queue over time by running the demo several times in succession without clearing the queues.
We will use the prefetch capability of the receivers to browse messages on the queue, and to allow us to count how many messages are in the queue using the available() method. We do this by setting the receivers' prefetch capacity to a value higher than the default of 0:
Python
stockBrowser.capacity = 20
controlBrowser.capacity = 20
Copy to Clipboard Toggle word wrap
Once the prefetch capacity of the receiver is set to 20, up to 20 available messages are retrieved asynchronously from the queue. Because the operation is asynchronous we need to wait for it to complete. We will put our application to sleep for 10 seconds before examining the prefetched messages:
Python
sleep 10
Copy to Clipboard Toggle word wrap
We need to import sleep from the time library:
Python
from time import sleep
Copy to Clipboard Toggle word wrap
Note that we do this in order to examine the available() property of the receiver with certainty that this represents the number of messages in the queue. When operating asynchronously available() reports the number of messages available locally. After a ten second delay, we can be reasonably certain that this is the total number of messages in the queue. In an actual asynchronous operation you would not want to block execution of your application. Instead you would use a pattern like this:
Python
while True:
  try:
    msg = stockBrowser.fetch(timeout = 10)
    print msg.properties["stock-symbol"] + ":" + msg.content
  except Empty:
    break
Copy to Clipboard Toggle word wrap
When our application finishes its sleep cycle, we will examine the number of messages in the queue, and print them out:
Python
print "Last Value Queue has " + str(stockBrowser.available()) + " messages"

print "\nLast Value Queue messages:"
      
for x in range(stockBrowser.available()):
  try:
    msg = stockBrowser.fetch(timeout = 1)
    print msg.properties["stock-symbol"] + ":" + msg.content
  except MessagingError, m:
    pass
      
print "Control Queue has " + str(controlBrowser.available()) + " messages"
      
print "\nControl Queue messages:"
for x in range(controlBrowser.available()):
  try:
    msg = controlBrowser.fetch(timeout = 1)
    print msg.properties["stock-symbol"] + ":" + msg.content
  except MessagingError, m:
    pass
Copy to Clipboard Toggle word wrap
And finally we acknowledge our session and close the connection:
Python
session.acknowledge()
connection.close()
Copy to Clipboard Toggle word wrap
We are now ready to run our test. Here's the complete program listing:
Python
import sys
from qpid.messaging import *
from time import sleep

connection = Connection("localhost:5672")
try:
  connection.open()
  session = connection.session()

  stockSender = session.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
  controlSender = session.sender("control-queue;{create:always, node:{type:queue}}")

  stockBrowser = session.receiver("stock-ticker;{mode:browse}")
  controlBrowser = session.receiver("control-queue;{mode:browse}")
  controlBrowser = session.receiver("control-queue")

  msg1 = Message("10")
  msg1.properties = {'stock-symbol':'RHT'}

  msg2 = Message("10")
  msg2.properties = {'stock-symbol':'JAVA'}

  msg3 = Message("10")
  msg3.properties = {'stock-symbol':'MSFT'}

  msg4 = Message("12")
  msg4.properties = {'stock-symbol':'RHT'}

  stockSender.send(msg1)
  controlSender.send(msg1)

  stockSender.send(msg2)
  controlSender.send(msg2)

  stockSender.send(msg3)
  controlSender.send(msg3)

  stockSender.send(msg4)
  controlSender.send(msg4)

  stockBrowser.capacity = 20
  controlBrowser.capacity = 20

  sleep(10)

  print "\nLast Value Queue has " + str(stockBrowser.available()) + " messages"
      
  print "Last Value Queue messages:"
      
  for x in range(stockBrowser.available()):
    try:
      msg = stockBrowser.fetch(timeout = 1)
      print msg.properties["stock-symbol"] + ":" + msg.content
    except MessagingError, m:
      pass
      
  print "\nControl Queue has " + str(controlBrowser.available()) + " messages"
      
  print "Control Queue messages:"
      
    for x in range(controlBrowser.available()):
    try:
      msg = controlBrowser.fetch(timeout = 1)
      print msg.properties["stock-symbol"] + ":" + msg.content
    except MessagingError, m:
      pass

  session.acknowledge()
      
except MessagingError,m:
  print m
finally:
  connection.close()
Copy to Clipboard Toggle word wrap
맨 위로 이동
Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2025 Red Hat