3.4. Python AMQP 1.0 Client API


Python AMQP 1.0 Client API is based on the Apache Qpid Proton Client API. The API is available at Qpid Python API Reference
Note
This is an initial version of documentation for the Python client. Regular updates and enhancements of the documentation can be expected after the GA release of Fuse 6.2.0

3.4.1. Getting Started with Qpid Proton Python Client

This chapter consists of Python Client tutorials with examples and API reference.

3.4.1.1. Introduction to Qpid Proton Python Client

What is Qpid Proton
Qpid Proton can be described as a AMQP messaging toolkit Qpid Proton is a messaging library used in messaging applications, including brokers, client libraries, routers, bridges, proxies, and so on. Application build across different platform, language, and environment can be integrated with AMQP using Proton.
Introduction to the Qpid Proton Python Client
Qpid Proton is a toolkit for messaging using AMQP. You can install Qpid Proton Python client using the command yum install python-qpid-proton The API is event driven and centers on the Container class which provides methods for establishing connections and creating senders and receivers as well as dispatching events to application defined handlers.
The following examples use the pattern
Container(name_of_a_handler_class(arguments)).run()
  • Where the logic of the application is defined as a class handling particular events. A Container instance is created and passed an instance of that handler class.
    The call to run() gives control to the Container, which performs the necessary IO operations and informs the handler of the events.
    The run() returns when there is nothing to do.

3.4.2. Python Client Tutorials with examples

A Simple Sending and Receiving Program in Python

In this example, there are two parts, sending a fixed number of messages and receiving messages
This example demonstrates the reliable sending of messages. The sender sends a fixed number of messages to a named queue on the broker (accessible on port 5672 on localhost). In case the sender is disconnected after a message is sent and before it has been confirmed by the receiver, it is said to be in-doubt. It is unknown whether the message was received, this scenario is handled by resending any in-doubt messages. This is known as an at-least-once guarantee, since each message should eventually be received at least once, though a given message may be received more than once.
Note
The program uses the variables sent and total, where, sent keeps track of the number of messages that are send and total maintains a count of number of messages to send.

Example 3.3. Sending reliable messages

import optparse
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

class Send(MessagingHandler):
    def __init__(self, url, messages):
        super(Send, self).__init__()
        self.url = url
        self.sent = 0
        self.confirmed = 0
        self.total = messages

    def on_start(self, event): 1
        event.container.create_sender(self.url)

    def on_sendable(self, event): 2
        while event.sender.credit and self.sent < self.total:
            msg = Message(id=(self.sent+1), body={'sequence':(self.sent+1)})
            event.sender.send(msg)  3
            self.sent += 1

    def on_accepted(self, event): 4
        self.confirmed += 1
        if self.confirmed == self.total:
            print "all messages confirmed"
            event.connection.close()

    def on_disconnected(self, event): 5
        self.sent = self.confirmed

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Send messages to the supplied address.")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address to which messages are sent (default %default)")
parser.add_option("-m", "--messages", type="int", default=100,
                  help="number of messages to send (default %default)")
opts, args = parser.parse_args()

try:
    Container(Send(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass
1
On_start() method is called when the Container first starts to run.
In this example it is used to establish a sending link over which the messages are transferred.
2
On_sendable() method is called to known when the messages can be transferred.
The callback checks that the sender has credit before sending messages and if the sender has already sent the required number of messages.
Note
AMQP provides flow control allowing any component receiving messages to avoid being overwhelmed by the number of messages it is sent. In this example messages are sent when the broker has enabled their flow.
3
Send() is an asynchronous call. The return of the call does not indicate that the messages has been transferred yet.
4
on_accepted() notifies if the amq broker has received and accepted the message.
In this example, we use this event to track the confirmation of the messages sent. The connection closes and exits when the amq broker has received all the messages.
Note
The on_accepted() call will be made by the Container when the amq broker accepts the message and not the receiving client.
5
Resets the sent count to reflect the confirmed messages. The library automatically reconnects to the sender and hence when the sender is ready, it can restart sending the remaining messages

Example 3.4. Receiving reliable messages

In this example, the receiver application subscribes to the examples queue on a broker accessible on port 5672 on localhost. The program simply prints the body of the received messages.
import optparse
from proton.handlers import MessagingHandler
from proton.reactor import Container

class Recv(MessagingHandler):
    def __init__(self, url, count):
        super(Recv, self).__init__()
        self.url = url
        self.expected = count
        self.received = 0

    def on_start(self, event): 1
        event.container.create_receiver(self.url)

    def on_message(self, event): 2
        if event.message.id and event.message.id < self.received:
            # ignore duplicate message
            return
        if self.expected == 0 or self.received < self.expected:
            print event.message.body
            self.received += 1
            if self.received == self.expected:
                event.receiver.close()
                event.connection.close()

parser = optparse.OptionParser(usage="usage: %prog [options]")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address from which messages are received (default %default)")
parser.add_option("-m", "--messages", type="int", default=100,
                  help="number of messages to receive; 0 receives indefinitely (default %default)")
opts, args = parser.parse_args()

try:
    Container(Recv(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass
1
On_start() method is called when the Container first starts to run.
In this example it is used to establish a receiving link over which the messages are transferred.
2
On_message() method is called when a message is received. It simply prints the messages
In this example, the amq broker waits for a certain number of messages before closing and exiting the connection. The method checks for duplicate messages and ignores them. The logic to ignore duplicates is implement using the sequential id scheme

Sending and Receiving Program using SSL in Python

To run the examples using SSL, the Python client requires certain system configurations to enable SSL. This section describes the SSL settings and prerequisites for the Python client

SSL Configuration

Note
Before following the steps in this section, configure the broker in detailed in Section 1.3, “Preparing to use AMQ with SSL”.

SSL settings for A-MQ to run Qpid Python client

  • Generate pem trust certificate for Qpid Python client
    keytool -importkeystore -srckeystore broker.ks -srcalias broker \
    -srcstoretype JKS -deststoretype PKCS12 -destkeystore broker.pkcs12 \
    -srcstorepass ${general_passwd} -deststorepass ${general_passwd}
    openssl pkcs12 -in broker.pkcs12 -out broker_cert.pem \
    -passin pass:${general_passwd} -passout pass:${general_passwd}
  • Adjust A-MQ broker to use the certificate by modifying the A-MQ environment
    sed -i '/KARAF_OPTS/d' ${A_MQ_HOME}/bin/setenv
    where, A_MQ_HOME is the installation path of the amq broker executable file.
    echo "export KARAF_OPTS=\"-Djavax.net.ssl.keyStore=${certificates_dir}/broker.ks \ -Djavax.net.ssl.keyStorePassword=${general_passwd}\"" >> ${A_MQ_HOME}/bin/setenv
  • Generate the client certificate
     keytool -genkey -alias client -keyalg RSA -keystore client.ks \
    -storepass ${general_passwd} -keypass ${general_passwd} \
    -dname "O=Client,CN=client" -validity 99999
     keytool -export -alias client -keystore client.ks -file client_cert \
    -storepass ${general_passwd}
  • Add client certificate as trusted to the broker database
     keytool -import -alias client -keystore broker.ts -file client_cert \
    -storepass ${general_passwd} -v -trustcacerts -noprompt

SSL certificate and keys settings for Qpid Python client

  • Set SLL to prevent the private key and the certificate to be send to output.
    openssl pkcs12 -nocerts -in client.pkcs12 -out client_private_key.pem \
     -passin pass:${general_passwd} -passout pass:${general_passwd}
    
    openssl pkcs12 -nokeys -in client.pkcs12 -out client_cert.pem \
    -passin pass:${general_passwd} -passout pass:${general_passwd}
    
  • Adjust A-MQ broker to use the certificate
     sed -i '/KARAF_OPTS/d' ${A_MQ_HOME}/bin/setenv
    
     echo "export KARAF_OPTS=\"-Djavax.net.ssl.keyStore=${certificates_dir}/broker.ks \
    -Djavax.net.ssl.keyStorePassword=${general_passwd} \
    -Djavax.net.ssl.trustStore=${certificates_dir}/broker.ts \
    -Djavax.net.ssl.trustStorePassword=${general_passwd}\"" >> ${A_MQ_HOME}/bin/setenv
    

Example

Example 3.5. Sending reliable messages over a secured connection

In this example, we modify the sending program, as shown in Example 3.3, “Sending reliable messages” to send messages over a secured connection. The connection is setup to use SSL. SSL can be configured for outgoing connections using the client property of the Conatiner's ssl property as shown in the program below.
import optparse
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

class Send(MessagingHandler):
    def __init__(self, url, messages):
        super(Send, self).__init__()
        self.url = url
        self.sent = 0
        self.confirmed = 0
        self.total = messages

    def on_start(self, event): 
        event.container.ssl.client.set_trusted_ca_db("/path/to/ca-certificate.pem")
        event.container.ssl.client.set_peer_authentication(SSLDomain.VERIFY_PEER) 1
        event.container.ssl.client.set_credentials("/path/to/client-certificate.pem",  "/path/to/client-private-key.pem", "client-password") 2
        event.container.create_sender(self.url)

    def on_sendable(self, event): 
        while event.sender.credit and self.sent < self.total:
            msg = Message(id=(self.sent+1), body={'sequence':(self.sent+1)})
            event.sender.send(msg) 
            self.sent += 1

    def on_accepted(self, event):
        self.confirmed += 1
        if self.confirmed == self.total:
            print "all messages confirmed"
            event.connection.close()

    def on_disconnected(self, event): 
        self.sent = self.confirmed

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Send messages to the supplied address.")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address to which messages are sent (default %default)")
parser.add_option("-m", "--messages", type="int", default=100,
                  help="number of messages to send (default %default)")
opts, args = parser.parse_args()

try:
    Container(Send(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass
1
set_trusted_ca_db("/path/to/ca-certificate.pem") call specifies the location of the CA's certificate in pem file format
set_peer_authentication(SSLDomain.VERIFY_PEER) call requests the servers certificate to be verified as valid using the specified CA's public key.
To verify if the hostname used matches which the name specified in the servers certificate, replace the VERIFY_PEERmacro to VERIFY_PEER_NAME.
Note
Ensure to update the program with the path of the certificates as per your environment before running the example.
2
set_credentials("/path/to/client-certificate.pem", "/path/to/client-private-key.pem", "client-password") call is used if the client needs to authenticate itself. In such a case, you need to mention the clients public certificate, private key file both in pem format, and also specify the password required for the private key
Note
Ensure to update the program with the path of the client certificate, client private key as per your environment and the correct client-password before running the example.
Note
Similarly, you can modify the receiver program to receive messages over a secured connection.

A Request/Response Server and Client Program

This example implements a Server handler that processes the incoming requests from the amq broker and sends the response back to the amq broker. The program is implemented to receive the messages, converts the body of the received message to uppercase and sends the converted messages as a response.
Note
Ensure that the amq broker is running.

Example 3.6. A simple server program to send responses

from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

class Server(MessagingHandler):
    def __init__(self, url, address):
        super(Server, self).__init__()
        self.url = url
        self.address = address
        self.senders = {}

    def on_start(self, event):
        print "Listening on", self.url
        self.container = event.container
        self.conn = event.container.connect(self.url)
        self.receiver = event.container.create_receiver(self.conn, self.address)
        self.relay = None

    def on_connection_opened(self, event):
        if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
            self.relay = self.container.create_sender(self.conn, None)

    def on_message(self, event):
        print "Received", event.message
        sender = self.relay or self.senders.get(event.message.reply_to) 1
    
        if not sender:
            sender = self.container.create_sender(self.conn, event.message.reply_to)
            self.senders[event.message.reply_to] = sender
        sender.send(Message(address=event.message.reply_to, body=event.message.body.upper(),
                            correlation_id=event.message.correlation_id))

try:
    Container(Server("0.0.0.0:5672", "examples")).run()
except KeyboardInterrupt: pass
1
On_message() method performs a lookup at the reply_to address on the message and creates a sender over which the response can be send.
In case there are more requests with the same reply_to address, the method will store the senders.

Run python client over SSL

To run the python client over SSL you can use different options as follows
  • No server and client authentication
     ./sender.py -b "amqps://$(hostname):5672/examples"
  • Server authentication enabled
     ./sender.py -b "amqps://$(hostname):5672/examples" --conn-ssl-trust-store<certificates_dir>/broker_cert.pem --conn-ssl-verify-peer --conn-ssl-verify-peer-name
  • Server and client authentication enabled
     ./sender.py -b "amqps://$(hostname):5672/examples" --conn-ssl-certificate <certificates_dir>/client-certificate.pem --conn-ssl-private-key <certificates_dir>/client-private-key.pem --conn-ssl-trust-store <certificates_dir>/broker_cert.pem --conn-ssl-verify-peer --conn-ssl-verify-peer-name

A simple client program to receive the messages from the Server

This example implements a Client handler that sends requests to the server and prints the responses. The program uses the amq broker that supports AMQP dynamic nodes. The responses are received on the amq broker examples broker.
Note
Ensure that the amq broker is running as this program uses the amq broker.

Example 3.7. A simple client program to receive the messages from the Server

import optparse
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container, DynamicNodeProperties

class Client(MessagingHandler):
    def __init__(self, url, requests):
        super(Client, self).__init__()
        self.url = url
        self.requests = requests

    def on_start(self, event):
        self.sender = event.container.create_sender(self.url)
        self.receiver = event.container.create_receiver(self.sender.connection, None, dynamic=True) 1
    
    
    def next_request(self):
        if self.receiver.remote_source.address:
            req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0])
            self.sender.send(req)

    def on_link_opened(self, event): 												2
        if event.receiver == self.receiver:
            self.next_request()

    def on_message(self, event):
        print "%s => %s" % (self.requests.pop(0), event.message.body)
        if self.requests:
            self.next_request()
        else:
            event.connection.close()

REQUESTS= ["Twas brillig, and the slithy toves",
           "Did gire and gymble in the wabe.",
           "All mimsy were the borogroves,",
           "And the mome raths outgrabe."]

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Send requests to the supplied address and print responses.")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address to which messages are sent (default %default)")
opts, args = parser.parse_args()

Container(Client(opts.address, args or REQUESTS)).run()
1
On_start() method creates a receiver to receive the responses from the server
In this example, instead of using the localhost, we set the dynamic option which informs the amq broker that the client is connected to create a temporary address over which it can receive the responses.
On_link_opened() method sends the first requests if the receiving link is setup and confirmed by the broker
Here, we use the address allocated by the broker as the reply_to address of the requests hence the broker needs to confirm that the receiving link is established.

Sending and Receiving using Transactions

The purpose of using transactions is to provide atomicity.So, for the sending application, either all the messages in a transaction are sent or none of them are sent. The receiving application can accept a set of messages transactionally and either the transaction will succeed and all messages will be consumed, or it will fail and all messages will remain on the queue.
The example also uses the TransactionHandler in addition to MessagingHandler as a base class for the handler definition.

Example 3.8. Sending messages using local transactions

This example implements a sender that sends messages in atomic batches using local transactions.
import optparse
from proton import Message, Url
from proton.reactor import Container
from proton.handlers import MessagingHandler, TransactionHandler

class TxSend(MessagingHandler, TransactionHandler):
    def __init__(self, url, messages, batch_size):
        super(TxSend, self).__init__()
        self.url = Url(url)
        self.current_batch = 0
        self.committed = 0
        self.confirmed = 0
        self.total = messages
        self.batch_size = batch_size

    def on_start(self, event):
        self.container = event.container
        self.conn = self.container.connect(self.url)
        self.sender = self.container.create_sender(self.conn, self.url.path)
        self.container.declare_transaction(self.conn, handler=self) 1
        self.transaction = None

    def on_transaction_declared(self, event): 2
        self.transaction = event.transaction
        self.send()

    def on_sendable(self, event):
        self.send()

    def send(self): 3
        while self.transaction and self.sender.credit and (self.committed + self.current_batch) < self.total:
            seq = self.committed + self.current_batch + 1
            msg = Message(id=seq, body={'sequence':seq})
            self.transaction.send(self.sender, msg)
            self.current_batch += 1
            if self.current_batch == self.batch_size:
                self.transaction.commit() 4
                self.transaction = None

    def on_accepted(self, event):
        if event.sender == self.sender:
            self.confirmed += 1

    def on_transaction_committed(self, event): 5
        self.committed += self.current_batch
        if self.committed == self.total:
            print "all messages committed"
            event.connection.close()
        else:
            self.current_batch = 0
            self.container.declare_transaction(self.conn, handler=self)

    def on_disconnected(self, event):
        self.current_batch = 0

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Send messages transactionally to the supplied address.")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address to which messages are sent (default %default)")
parser.add_option("-m", "--messages", type="int", default=100,
                  help="number of messages to send (default %default)")
parser.add_option("-b", "--batch-size", type="int", default=10,
                  help="number of messages in each transaction (default %default)")
opts, args = parser.parse_args()

try:
    Container(TxSend(opts.address, opts.messages, opts.batch_size)).run()
except KeyboardInterrupt: pass
1
on_transaction_declared() method requests a new transactional context, passing themselves as the handler for the transaction
2
on_transaction_declared() method is notified when that context is in place
5
When the on_transaction_committed() method is called the committed count is incremented by the size of the current batch. If the committed count after that is equal to the number of message it was asked to send, it has completed all its work so closes the connection and the program will exit. If not, it starts a new transaction and sets the current batch to 0.
The sender tracks the number of messages sent in the current_batch, as well as the number of messages committed .
3
Messages are sent when the transaction context has been declared and there is credit. The send() method of the transaction is invoked, rather than on the sender itself. This ensures that send operation is tied to that transaction context.
4
The current_batch counter is incremented for each message. When that counter reaches the preconfigured batch size, the commit() method is called on the transaction.

Example

This example implements a receiver that receives messages in atomic batches using local transactions. The receiver tracks the number of messages received in the current_batch and the overall number committed
Note
In this example the receiver turns off the prefetching of messages in order to control the flow in batches and turns off auto_accept mode in order to explicitly accept the messages under a given transactional context.

Example 3.9. Receiving using local transactions

import optparse
from proton import Url
from proton.reactor import Container
from proton.handlers import MessagingHandler, TransactionHandler

class TxRecv(MessagingHandler, TransactionHandler):
    def __init__(self, url, messages, batch_size):
        super(TxRecv, self).__init__(prefetch=0, auto_accept=False)
        self.url = Url(url)
        self.expected = messages
        self.batch_size = batch_size
        self.current_batch = 0
        self.committed = 0

    def on_start(self, event):
        self.container = event.container
        self.conn = self.container.connect(self.url)
        self.receiver = self.container.create_receiver(self.conn, self.url.path)
        self.container.declare_transaction(self.conn, handler=self)
        self.transaction = None

    def on_message(self, event): 1
        self.receiver.flow(self.batch_size)
        print event.message.body
        self.transaction.accept(event.delivery)
        self.current_batch += 1
        if self.current_batch == self.batch_size:
            self.transaction.commit()
            self.transaction = None

    def on_transaction_declared(self, event): 2
        self.receiver.flow(self.batch_size)
        self.transaction = event.transaction

    def on_transaction_committed(self, event): 3
        self.committed += self.current_batch
        self.current_batch = 0
        if self.expected == 0 or self.committed < self.expected:
            self.container.declare_transaction(self.conn, handler=self)
        else:
            event.connection.close()

    def on_disconnected(self, event):
        self.current_batch = 0

parser = optparse.OptionParser(usage="usage: %prog [options]")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address from which messages are received (default %default)")
parser.add_option("-m", "--messages", type="int", default=100,
                  help="number of messages to receive; 0 receives indefinitely (default %default)")
parser.add_option("-b", "--batch-size", type="int", default=10,
                  help="number of messages in each transaction (default %default)")
opts, args = parser.parse_args()

try:
    Container(TxRecv(opts.address, opts.messages, opts.batch_size)).run()
except KeyboardInterrupt: pass
1
on_message() method the receiver calls theaccept() method on the transaction object to tie the acceptance to the context. It then increments the current_batch. If the current_batch is now equal to the batch_size, the receiver calls the commit() method on the transaction.
2
on_transaction_declared() method controls the message flow. The receiver uses the flow() method on the receiver to request an equal number of messages that match the batch_size
3
When the on_transaction_committed() method is called the committed count is incremented, the application then tests whether it has received all expected messages. If all the messages are received, the application exists. If all messages are not received a new transactional context is declared and the batch is reset.

Using a Selector Filter

Example 3.10. Filtering messages using a selector

This example implements a selector that filters messages based on particular values of the headers.
from proton.reactor import Container, Selector
from proton.handlers import MessagingHandler

class Recv(MessagingHandler):
    def __init__(self):
        super(Recv, self).__init__()

    def on_start(self, event):
        conn = event.container.connect("localhost:5672")
        event.container.create_receiver(conn, "examples", options=Selector(u"colour = 'green'")) 1

    def on_message(self, event):
        print event.message.body

try:
    Container(Recv()).run()
except KeyboardInterrupt: pass
1
on_start() method implements a selector that filters messages based on the message header
While creating the receiver, specify the Selector object as an option. The options argument can be a single object or a list.

Sending and Receiving Best-Effort Messages

.Sending and receiving best-effort requires to add an instance of AtMostOnce to the options keyword arguments to Container.create_sender and Container.create_receiver. For AtMostOnce, the sender settles the message as soon as it sends it. If the connection is lost before the message is received by the receiver, the message will not be delivered. The AtMostOnce link option type is defined in proton.reactors.

Example 3.11. Receiving best-effort messages

The simple receiving example,Example 3.4, “Receiving reliable messages” is changed to include the AtMostOnce link option.
import optparse
from proton.handlers import MessagingHandler
from proton.reactor import AtMostOnce, Container

class Recv(MessagingHandler):
    def __init__(self, url, count):
        super(Recv, self).__init__()
        self.url = url
        self.expected = count
        self.received = 0

    def on_start(self, event):
        event.container.create_receiver(self.url, options=AtMostOnce()) 1
    
    def on_message(self, event):
        if event.message.id and event.message.id < self.received:
            # ignore duplicate message
            return
        if self.expected == 0 or self.received < self.expected:
            print event.message.body
            self.received += 1
            if self.received == self.expected:
                event.receiver.close()
                event.connection.close()

parser = optparse.OptionParser(usage="usage: %prog [options]")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address from which messages are received (default %default)")
parser.add_option("-m", "--messages", type="int", default=100,
                  help="number of messages to receive; 0 receives indefinitely (default %default)")
opts, args = parser.parse_args()

try:
    Container(Recv(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass
1
on_start() method uses the AtMostOnce option to receive the unacknowledged messages.
If the connection is lost before the message is received by the receiver, the message will not be delivered.
The simple sending example,Example 3.3, “Sending reliable messages” is changed to include the AtMostOnce link option. In this example, there will be no acknowledgments for the send messages, hence the on_accepted method is redundant. There is no distinction between confirmed and sent status and the on_disconnected method is redundant. Any shutdown would be triggered directly after sending.

Example 3.12. Sending best-effort messages

import optparse
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import AtMostOnce, Container

class Send(MessagingHandler):
    def __init__(self, url, messages):
        super(Send, self).__init__()
        self.url = url
        self.sent = 0
        self.confirmed = 0
        self.total = messages

    def on_start(self, event):
        event.container.create_sender(self.url, options=AtMostOnce()) 1
    

    def on_sendable(self, event):
        while event.sender.credit and self.sent < self.total:
            msg = Message(id=(self.sent+1), body={'sequence':(self.sent+1)})
            event.sender.send(msg) 
            self.sent += 1
            if self.sent == self.total:
                print "all messages sent"
                event.connection.close()

parser = optparse.OptionParser(usage="usage: %prog [options]",
                               description="Send messages to the supplied address.")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address to which messages are sent (default %default)")
parser.add_option("-m", "--messages", type="int", default=100,
                  help="number of messages to send (default %default)")
opts, args = parser.parse_args()

try:
    Container(Send(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass
1
on_start() method uses the AtMostOnce option to send the unacknowledged messages.
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.