Ce contenu n'est pas disponible dans la langue sélectionnée.
6.6.3. Last Value Queue Example
This example demonstrates how to create and use a Last Value Queue. The language bindings and programming details differ between languages, but the principles are the same.
We will create a messaging queue that provides regular stock price updates. Message consumers are interested in the current stock price, and do not wish or need to receive messages with historical information. A last value queue is perfect for this application: newly arriving messages can update and replace older ones.
We will call our queue "stock-ticker". Our stock-ticker queue will use "stock-symbol" as the last value queue key. The value of this key in the message header will identify a message as a new message to the queue, or an update to a message already in the queue.
First we import the Qpid Messaging client library:
- Python
import sys from qpid.messaging import *
import sys from qpid.messaging import *
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Now we create a Connection to the broker running on the standard AMQP port, 5672, on the local machine:
- Python
connection = Connection("localhost:5672") connection.open()
connection = Connection("localhost:5672") connection.open()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
And now we use this connection to create a session:
- Python
session = connection.session()
session = connection.session()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Now we create a sender and declare a last value queue at the same time. We will create a queue called "stock-ticker", and use "stock-symbol" as the last value queue key. Messages sent to this queue will identify themselves as an update to a previous message by specifying the same "stock-symbol" in their headers.
The following statement is a single line of code. It may break across lines in display, but it should be entered as a single line.
- Python
stockSender = session.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
stockSender = session.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Sidenote: We could also create the queue using the
qpid-config
command line tool:
qpid-config add queue stock-ticker --argument qpid.last_value_queue_key=stock-symbol
qpid-config add queue stock-ticker --argument qpid.last_value_queue_key=stock-symbol
Now let's create and send some messages to the queue. We use the "stock-symbol" key in the header to identify which stock a message describes. Our last value queue uses this header key to match our message with messages already in the queue.
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
After sending these messages to our last value queue a new consumer should see three messages in the queue, one for each stock symbol, with
msg4
updating msg1
. To contrast the behavior of the last value queue with a standard FIFO queue, we'll send our messages to a control queue, called control-queue at the same time:
- Python
controlSender = session.sender("control-queue;{create:always, node:{type:queue}}")
controlSender = session.sender("control-queue;{create:always, node:{type:queue}}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Now we send our messages to the two queues:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Our messages are now in the queues. We create two receivers to now examine the content of the queues:
- Python
stockBrowser = session.receiver("stock-ticker; {mode:browse}") controlBrowser = session.receiver("control-queue; {mode:browse}")
stockBrowser = session.receiver("stock-ticker; {mode:browse}") controlBrowser = session.receiver("control-queue; {mode:browse}")
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
These are browsing receivers, so they do not acquire messages and remove them from the queue. To clear the queues, remove the browse property from the receiver declarations, like so:
session.receiver("stock-ticker")
, and run the demo again. With the receivers browsing, you will be able to see more distinctly the effect of a Last Value Queue over time by running the demo several times in succession without clearing the queues.
We will use the prefetch capability of the receivers to browse messages on the queue, and to allow us to count how many messages are in the queue using the
available()
method. We do this by setting the receivers' prefetch capacity
to a value higher than the default of 0:
- Python
stockBrowser.capacity = 20 controlBrowser.capacity = 20
stockBrowser.capacity = 20 controlBrowser.capacity = 20
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Once the prefetch capacity of the receiver is set to 20, up to 20 available messages are retrieved asynchronously from the queue. Because the operation is asynchronous we need to wait for it to complete. We will put our application to sleep for 10 seconds before examining the prefetched messages:
- Python
sleep 10
sleep 10
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
We need to import
sleep
from the time library:
- Python
from time import sleep
from time import sleep
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
Note that we do this in order to examine the
available()
property of the receiver with certainty that this represents the number of messages in the queue. When operating asynchronously available()
reports the number of messages available locally. After a ten second delay, we can be reasonably certain that this is the total number of messages in the queue. In an actual asynchronous operation you would not want to block execution of your application. Instead you would use a pattern like this:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
When our application finishes its sleep cycle, we will examine the number of messages in the queue, and print them out:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
And finally we acknowledge our session and close the connection:
- Python
session.acknowledge() connection.close()
session.acknowledge() connection.close()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
We are now ready to run our test. Here's the complete program listing:
- Python
Copy to Clipboard Copied! Toggle word wrap Toggle overflow