3.4. Python AMQP 1.0 Client API
3.4.1. Getting Started with Qpid Proton Python Client
3.4.1.1. Introduction to Qpid Proton Python Client
What is Qpid Proton
Introduction to the Qpid Proton Python Client
Container(name_of_a_handler_class(arguments)).run()
- Where the logic of the application is defined as a class handling particular events. A Container instance is created and passed an instance of that handler class.The call to
run()
gives control to the Container, which performs the necessary IO operations and informs the handler of the events.Therun()
returns when there is nothing to do.
3.4.2. Python Client Tutorials with examples
A Simple Sending and Receiving Program in Python
at-least-once guarantee
, since each message should eventually be received at least once, though a given message may be received more than once.
sent
and total
, where, sent
keeps track of the number of messages that are send and total
maintains a count of number of messages to send.
Example 3.3. Sending reliable messages
import optparse from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container class Send(MessagingHandler): def __init__(self, url, messages): super(Send, self).__init__() self.url = url self.sent = 0 self.confirmed = 0 self.total = messages def on_start(self, event): 1 event.container.create_sender(self.url) def on_sendable(self, event): 2 while event.sender.credit and self.sent < self.total: msg = Message(id=(self.sent+1), body={'sequence':(self.sent+1)}) event.sender.send(msg) 3 self.sent += 1 def on_accepted(self, event): 4 self.confirmed += 1 if self.confirmed == self.total: print "all messages confirmed" event.connection.close() def on_disconnected(self, event): 5 self.sent = self.confirmed parser = optparse.OptionParser(usage="usage: %prog [options]", description="Send messages to the supplied address.") parser.add_option("-a", "--address", default="localhost:5672/examples", help="address to which messages are sent (default %default)") parser.add_option("-m", "--messages", type="int", default=100, help="number of messages to send (default %default)") opts, args = parser.parse_args() try: Container(Send(opts.address, opts.messages)).run() except KeyboardInterrupt: pass
- 1
On_start()
method is called when the Container first starts to run.In this example it is used to establish a sending link over which the messages are transferred.- 2
On_sendable()
method is called to known when the messages can be transferred.The callback checks that the sender has credit before sending messages and if the sender has already sent the required number of messages.NoteAMQP provides flow control allowing any component receiving messages to avoid being overwhelmed by the number of messages it is sent. In this example messages are sent when the broker has enabled their flow.- 3
Send()
is an asynchronous call. The return of the call does not indicate that the messages has been transferred yet.- 4
on_accepted()
notifies if the amq broker has received and accepted the message.In this example, we use this event to track the confirmation of the messages sent. The connection closes and exits when the amq broker has received all the messages.NoteTheon_accepted()
call will be made by the Container when the amq broker accepts the message and not the receiving client.- 5
- Resets the sent count to reflect the confirmed messages. The library automatically reconnects to the sender and hence when the sender is ready, it can restart sending the remaining messages
Example 3.4. Receiving reliable messages
examples
queue on a broker accessible on port 5672 on localhost. The program simply prints the body of the received messages.
import optparse from proton.handlers import MessagingHandler from proton.reactor import Container class Recv(MessagingHandler): def __init__(self, url, count): super(Recv, self).__init__() self.url = url self.expected = count self.received = 0 def on_start(self, event): 1 event.container.create_receiver(self.url) def on_message(self, event): 2 if event.message.id and event.message.id < self.received: # ignore duplicate message return if self.expected == 0 or self.received < self.expected: print event.message.body self.received += 1 if self.received == self.expected: event.receiver.close() event.connection.close() parser = optparse.OptionParser(usage="usage: %prog [options]") parser.add_option("-a", "--address", default="localhost:5672/examples", help="address from which messages are received (default %default)") parser.add_option("-m", "--messages", type="int", default=100, help="number of messages to receive; 0 receives indefinitely (default %default)") opts, args = parser.parse_args() try: Container(Recv(opts.address, opts.messages)).run() except KeyboardInterrupt: pass
- 1
On_start()
method is called when the Container first starts to run.In this example it is used to establish a receiving link over which the messages are transferred.- 2
On_message()
method is called when a message is received. It simply prints the messagesIn this example, the amq broker waits for a certain number of messages before closing and exiting the connection. The method checks for duplicate messages and ignores them. The logic to ignore duplicates is implement using the sequential id scheme
Sending and Receiving Program using SSL in Python
SSL Configuration
SSL settings for A-MQ to run Qpid Python client
- Generate pem trust certificate for Qpid Python client
keytool -importkeystore -srckeystore broker.ks -srcalias broker \ -srcstoretype JKS -deststoretype PKCS12 -destkeystore broker.pkcs12 \ -srcstorepass ${general_passwd} -deststorepass ${general_passwd} openssl pkcs12 -in broker.pkcs12 -out broker_cert.pem \ -passin pass:${general_passwd} -passout pass:${general_passwd}
- Adjust A-MQ broker to use the certificate by modifying the A-MQ environment
sed -i '/KARAF_OPTS/d' ${A_MQ_HOME}/bin/setenv
where,A_MQ_HOME
is the installation path of the amq broker executable file.echo "export KARAF_OPTS=\"-Djavax.net.ssl.keyStore=${certificates_dir}/broker.ks \ -Djavax.net.ssl.keyStorePassword=${general_passwd}\"" >> ${A_MQ_HOME}/bin/setenv
- Generate the client certificate
keytool -genkey -alias client -keyalg RSA -keystore client.ks \ -storepass ${general_passwd} -keypass ${general_passwd} \ -dname "O=Client,CN=client" -validity 99999
keytool -export -alias client -keystore client.ks -file client_cert \ -storepass ${general_passwd}
- Add client certificate as trusted to the broker database
keytool -import -alias client -keystore broker.ts -file client_cert \ -storepass ${general_passwd} -v -trustcacerts -noprompt
SSL certificate and keys settings for Qpid Python client
- Set SLL to prevent the private key and the certificate to be send to output.
openssl pkcs12 -nocerts -in client.pkcs12 -out client_private_key.pem \ -passin pass:${general_passwd} -passout pass:${general_passwd}
openssl pkcs12 -nokeys -in client.pkcs12 -out client_cert.pem \ -passin pass:${general_passwd} -passout pass:${general_passwd}
- Adjust A-MQ broker to use the certificate
sed -i '/KARAF_OPTS/d' ${A_MQ_HOME}/bin/setenv
echo "export KARAF_OPTS=\"-Djavax.net.ssl.keyStore=${certificates_dir}/broker.ks \ -Djavax.net.ssl.keyStorePassword=${general_passwd} \ -Djavax.net.ssl.trustStore=${certificates_dir}/broker.ts \ -Djavax.net.ssl.trustStorePassword=${general_passwd}\"" >> ${A_MQ_HOME}/bin/setenv
Example
Example 3.5. Sending reliable messages over a secured connection
import optparse from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container class Send(MessagingHandler): def __init__(self, url, messages): super(Send, self).__init__() self.url = url self.sent = 0 self.confirmed = 0 self.total = messages def on_start(self, event): event.container.ssl.client.set_trusted_ca_db("/path/to/ca-certificate.pem") event.container.ssl.client.set_peer_authentication(SSLDomain.VERIFY_PEER) 1 event.container.ssl.client.set_credentials("/path/to/client-certificate.pem", "/path/to/client-private-key.pem", "client-password") 2 event.container.create_sender(self.url) def on_sendable(self, event): while event.sender.credit and self.sent < self.total: msg = Message(id=(self.sent+1), body={'sequence':(self.sent+1)}) event.sender.send(msg) self.sent += 1 def on_accepted(self, event): self.confirmed += 1 if self.confirmed == self.total: print "all messages confirmed" event.connection.close() def on_disconnected(self, event): self.sent = self.confirmed parser = optparse.OptionParser(usage="usage: %prog [options]", description="Send messages to the supplied address.") parser.add_option("-a", "--address", default="localhost:5672/examples", help="address to which messages are sent (default %default)") parser.add_option("-m", "--messages", type="int", default=100, help="number of messages to send (default %default)") opts, args = parser.parse_args() try: Container(Send(opts.address, opts.messages)).run() except KeyboardInterrupt: pass
- 1
set_trusted_ca_db("/path/to/ca-certificate.pem")
call specifies the location of the CA's certificate in pem file formatset_peer_authentication(SSLDomain.VERIFY_PEER)
call requests the servers certificate to be verified as valid using the specified CA's public key.To verify if the hostname used matches which the name specified in the servers certificate, replace theVERIFY_PEER
macro toVERIFY_PEER_NAME
.NoteEnsure to update the program with the path of the certificates as per your environment before running the example.- 2
set_credentials("/path/to/client-certificate.pem", "/path/to/client-private-key.pem", "client-password")
call is used if the client needs to authenticate itself. In such a case, you need to mention the clients public certificate, private key file both in pem format, and also specify the password required for the private keyNoteEnsure to update the program with the path of the client certificate, client private key as per your environment and the correct client-password before running the example.
A Request/Response Server and Client Program
Example 3.6. A simple server program to send responses
from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container class Server(MessagingHandler): def __init__(self, url, address): super(Server, self).__init__() self.url = url self.address = address self.senders = {} def on_start(self, event): print "Listening on", self.url self.container = event.container self.conn = event.container.connect(self.url) self.receiver = event.container.create_receiver(self.conn, self.address) self.relay = None def on_connection_opened(self, event): if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: self.relay = self.container.create_sender(self.conn, None) def on_message(self, event): print "Received", event.message sender = self.relay or self.senders.get(event.message.reply_to) 1 if not sender: sender = self.container.create_sender(self.conn, event.message.reply_to) self.senders[event.message.reply_to] = sender sender.send(Message(address=event.message.reply_to, body=event.message.body.upper(), correlation_id=event.message.correlation_id)) try: Container(Server("0.0.0.0:5672", "examples")).run() except KeyboardInterrupt: pass
- 1
On_message()
method performs a lookup at thereply_to
address on themessage
and creates a sender over which the response can be send.In case there are more requests with the samereply_to
address, the method will store the senders.
Run python client over SSL
- No server and client authentication
./sender.py -b "amqps://$(hostname):5672/examples"
- Server authentication enabled
./sender.py -b "amqps://$(hostname):5672/examples" --conn-ssl-trust-store<certificates_dir>/broker_cert.pem --conn-ssl-verify-peer --conn-ssl-verify-peer-name
- Server and client authentication enabled
./sender.py -b "amqps://$(hostname):5672/examples" --conn-ssl-certificate <certificates_dir>/client-certificate.pem --conn-ssl-private-key <certificates_dir>/client-private-key.pem --conn-ssl-trust-store <certificates_dir>/broker_cert.pem --conn-ssl-verify-peer --conn-ssl-verify-peer-name
A simple client program to receive the messages from the Server
examples
broker.
Example 3.7. A simple client program to receive the messages from the Server
import optparse from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container, DynamicNodeProperties class Client(MessagingHandler): def __init__(self, url, requests): super(Client, self).__init__() self.url = url self.requests = requests def on_start(self, event): self.sender = event.container.create_sender(self.url) self.receiver = event.container.create_receiver(self.sender.connection, None, dynamic=True) 1 def next_request(self): if self.receiver.remote_source.address: req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0]) self.sender.send(req) def on_link_opened(self, event): 2 if event.receiver == self.receiver: self.next_request() def on_message(self, event): print "%s => %s" % (self.requests.pop(0), event.message.body) if self.requests: self.next_request() else: event.connection.close() REQUESTS= ["Twas brillig, and the slithy toves", "Did gire and gymble in the wabe.", "All mimsy were the borogroves,", "And the mome raths outgrabe."] parser = optparse.OptionParser(usage="usage: %prog [options]", description="Send requests to the supplied address and print responses.") parser.add_option("-a", "--address", default="localhost:5672/examples", help="address to which messages are sent (default %default)") opts, args = parser.parse_args() Container(Client(opts.address, args or REQUESTS)).run()
- 1
On_start()
method creates a receiver to receive the responses from the serverIn this example, instead of using the localhost, we set the dynamic option which informs the amq broker that the client is connected to create a temporary address over which it can receive the responses.- 2
On_link_opened()
method sends the first requests if the receiving link is setup and confirmed by the brokerHere, we use the address allocated by the broker as the reply_to address of the requests hence the broker needs to confirm that the receiving link is established.
Sending and Receiving using Transactions
TransactionHandler
in addition to MessagingHandler
as a base class for the handler definition.
Example 3.8. Sending messages using local transactions
import optparse from proton import Message, Url from proton.reactor import Container from proton.handlers import MessagingHandler, TransactionHandler class TxSend(MessagingHandler, TransactionHandler): def __init__(self, url, messages, batch_size): super(TxSend, self).__init__() self.url = Url(url) self.current_batch = 0 self.committed = 0 self.confirmed = 0 self.total = messages self.batch_size = batch_size def on_start(self, event): self.container = event.container self.conn = self.container.connect(self.url) self.sender = self.container.create_sender(self.conn, self.url.path) self.container.declare_transaction(self.conn, handler=self) 1 self.transaction = None def on_transaction_declared(self, event): 2 self.transaction = event.transaction self.send() def on_sendable(self, event): self.send() def send(self): 3 while self.transaction and self.sender.credit and (self.committed + self.current_batch) < self.total: seq = self.committed + self.current_batch + 1 msg = Message(id=seq, body={'sequence':seq}) self.transaction.send(self.sender, msg) self.current_batch += 1 if self.current_batch == self.batch_size: self.transaction.commit() 4 self.transaction = None def on_accepted(self, event): if event.sender == self.sender: self.confirmed += 1 def on_transaction_committed(self, event): 5 self.committed += self.current_batch if self.committed == self.total: print "all messages committed" event.connection.close() else: self.current_batch = 0 self.container.declare_transaction(self.conn, handler=self) def on_disconnected(self, event): self.current_batch = 0 parser = optparse.OptionParser(usage="usage: %prog [options]", description="Send messages transactionally to the supplied address.") parser.add_option("-a", "--address", default="localhost:5672/examples", help="address to which messages are sent (default %default)") parser.add_option("-m", "--messages", type="int", default=100, help="number of messages to send (default %default)") parser.add_option("-b", "--batch-size", type="int", default=10, help="number of messages in each transaction (default %default)") opts, args = parser.parse_args() try: Container(TxSend(opts.address, opts.messages, opts.batch_size)).run() except KeyboardInterrupt: pass
- 1
on_transaction_declared()
method requests a newtransactional
context, passing themselves as the handler for the transaction- 2
on_transaction_declared()
method is notified when that context is in place- 5
- When the
on_transaction_committed()
method is called the committed count is incremented by the size of the current batch. If the committed count after that is equal to the number of message it was asked to send, it has completed all its work so closes the connection and the program will exit. If not, it starts a new transaction and sets the current batch to 0.The sender tracks the number of messages sent in the current_batch, as well as the number of messages committed . - 3
- Messages are sent when the transaction context has been declared and there is credit. The
send()
method of the transaction is invoked, rather than on the sender itself. This ensures that send operation is tied to that transaction context. - 4
- The
current_batch
counter is incremented for each message. When that counter reaches the preconfigured batch size, thecommit()
method is called on the transaction.
Example
current_batch
and the overall number committed
Example 3.9. Receiving using local transactions
import optparse from proton import Url from proton.reactor import Container from proton.handlers import MessagingHandler, TransactionHandler class TxRecv(MessagingHandler, TransactionHandler): def __init__(self, url, messages, batch_size): super(TxRecv, self).__init__(prefetch=0, auto_accept=False) self.url = Url(url) self.expected = messages self.batch_size = batch_size self.current_batch = 0 self.committed = 0 def on_start(self, event): self.container = event.container self.conn = self.container.connect(self.url) self.receiver = self.container.create_receiver(self.conn, self.url.path) self.container.declare_transaction(self.conn, handler=self) self.transaction = None def on_message(self, event): 1 self.receiver.flow(self.batch_size) print event.message.body self.transaction.accept(event.delivery) self.current_batch += 1 if self.current_batch == self.batch_size: self.transaction.commit() self.transaction = None def on_transaction_declared(self, event): 2 self.receiver.flow(self.batch_size) self.transaction = event.transaction def on_transaction_committed(self, event): 3 self.committed += self.current_batch self.current_batch = 0 if self.expected == 0 or self.committed < self.expected: self.container.declare_transaction(self.conn, handler=self) else: event.connection.close() def on_disconnected(self, event): self.current_batch = 0 parser = optparse.OptionParser(usage="usage: %prog [options]") parser.add_option("-a", "--address", default="localhost:5672/examples", help="address from which messages are received (default %default)") parser.add_option("-m", "--messages", type="int", default=100, help="number of messages to receive; 0 receives indefinitely (default %default)") parser.add_option("-b", "--batch-size", type="int", default=10, help="number of messages in each transaction (default %default)") opts, args = parser.parse_args() try: Container(TxRecv(opts.address, opts.messages, opts.batch_size)).run() except KeyboardInterrupt: pass
- 1
on_message()
method the receiver calls theaccept()
method on the transaction object to tie the acceptance to the context. It then increments thecurrent_batch
. If thecurrent_batch
is now equal to thebatch_size
, the receiver calls thecommit()
method on the transaction.- 2
on_transaction_declared()
method controls the message flow. The receiver uses theflow()
method on the receiver to request an equal number of messages that match thebatch_size
- 3
- When the
on_transaction_committed()
method is called the committed count is incremented, the application then tests whether it has received all expected messages. If all the messages are received, the application exists. If all messages are not received a new transactional context is declared and the batch is reset.
Using a Selector Filter
Example 3.10. Filtering messages using a selector
from proton.reactor import Container, Selector from proton.handlers import MessagingHandler class Recv(MessagingHandler): def __init__(self): super(Recv, self).__init__() def on_start(self, event): conn = event.container.connect("localhost:5672") event.container.create_receiver(conn, "examples", options=Selector(u"colour = 'green'")) 1 def on_message(self, event): print event.message.body try: Container(Recv()).run() except KeyboardInterrupt: pass
- 1
on_start()
method implements a selector that filters messages based on the message headerWhile creating the receiver, specify the Selector object as an option. The options argument can be a single object or a list.
Sending and Receiving Best-Effort Messages
AtMostOnce
to the options
keyword arguments to Container.create_sender
and Container.create_receiver
. For AtMostOnce, the sender settles the message as soon as it sends it. If the connection is lost before the message is received by the receiver, the message will not be delivered. The AtMostOnce link option type is defined in proton.reactors
.
Example 3.11. Receiving best-effort messages
import optparse from proton.handlers import MessagingHandler from proton.reactor import AtMostOnce, Container class Recv(MessagingHandler): def __init__(self, url, count): super(Recv, self).__init__() self.url = url self.expected = count self.received = 0 def on_start(self, event): event.container.create_receiver(self.url, options=AtMostOnce()) 1 def on_message(self, event): if event.message.id and event.message.id < self.received: # ignore duplicate message return if self.expected == 0 or self.received < self.expected: print event.message.body self.received += 1 if self.received == self.expected: event.receiver.close() event.connection.close() parser = optparse.OptionParser(usage="usage: %prog [options]") parser.add_option("-a", "--address", default="localhost:5672/examples", help="address from which messages are received (default %default)") parser.add_option("-m", "--messages", type="int", default=100, help="number of messages to receive; 0 receives indefinitely (default %default)") opts, args = parser.parse_args() try: Container(Recv(opts.address, opts.messages)).run() except KeyboardInterrupt: pass
- 1
on_start()
method uses the AtMostOnce option to receive the unacknowledged messages.If the connection is lost before the message is received by the receiver, the message will not be delivered.
on_accepted
method is redundant. There is no distinction between confirmed and sent status and the on_disconnected
method is redundant. Any shutdown would be triggered directly after sending.
Example 3.12. Sending best-effort messages
import optparse from proton import Message from proton.handlers import MessagingHandler from proton.reactor import AtMostOnce, Container class Send(MessagingHandler): def __init__(self, url, messages): super(Send, self).__init__() self.url = url self.sent = 0 self.confirmed = 0 self.total = messages def on_start(self, event): event.container.create_sender(self.url, options=AtMostOnce()) 1 def on_sendable(self, event): while event.sender.credit and self.sent < self.total: msg = Message(id=(self.sent+1), body={'sequence':(self.sent+1)}) event.sender.send(msg) self.sent += 1 if self.sent == self.total: print "all messages sent" event.connection.close() parser = optparse.OptionParser(usage="usage: %prog [options]", description="Send messages to the supplied address.") parser.add_option("-a", "--address", default="localhost:5672/examples", help="address to which messages are sent (default %default)") parser.add_option("-m", "--messages", type="int", default=100, help="number of messages to send (default %default)") opts, args = parser.parse_args() try: Container(Send(opts.address, opts.messages)).run() except KeyboardInterrupt: pass
- 1
on_start()
method uses the AtMostOnce option to send the unacknowledged messages.