第4章 例
本章では、サンプルプログラムで AMQ Python を使用する方法について説明します。
その他の例は、AMQ Python サンプルのスイート と Qpid Proton Python の例 を参照してください。
4.1. メッセージの送信
このクライアントプログラムは <connection-url>
を使用してサーバーに接続し、ターゲット <address>
の送信者を作成し、<message-body>
を含むメッセージを送信して接続を切断して終了します。
例: メッセージの送信
from __future__ import print_function import sys from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container class SendHandler(MessagingHandler): def __init__(self, conn_url, address, message_body): super(SendHandler, self).__init__() self.conn_url = conn_url self.address = address self.message_body = message_body def on_start(self, event): conn = event.container.connect(self.conn_url) # To connect with a user and password: # conn = event.container.connect(self.conn_url, user="<user>", password="<password>") event.container.create_sender(conn, self.address) def on_link_opened(self, event): print("SEND: Opened sender for target address '{0}'".format (event.sender.target.address)) def on_sendable(self, event): message = Message(self.message_body) event.sender.send(message) print("SEND: Sent message '{0}'".format(message.body)) event.sender.close() event.connection.close() def main(): try: conn_url, address, message_body = sys.argv[1:4] except ValueError: sys.exit("Usage: send.py <connection-url> <address> <message-body>") handler = SendHandler(conn_url, address, message_body) container = Container(handler) container.run() if __name__ == "__main__": try: main() except KeyboardInterrupt: pass
from __future__ import print_function
import sys
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
class SendHandler(MessagingHandler):
def __init__(self, conn_url, address, message_body):
super(SendHandler, self).__init__()
self.conn_url = conn_url
self.address = address
self.message_body = message_body
def on_start(self, event):
conn = event.container.connect(self.conn_url)
# To connect with a user and password:
# conn = event.container.connect(self.conn_url, user="<user>", password="<password>")
event.container.create_sender(conn, self.address)
def on_link_opened(self, event):
print("SEND: Opened sender for target address '{0}'".format
(event.sender.target.address))
def on_sendable(self, event):
message = Message(self.message_body)
event.sender.send(message)
print("SEND: Sent message '{0}'".format(message.body))
event.sender.close()
event.connection.close()
def main():
try:
conn_url, address, message_body = sys.argv[1:4]
except ValueError:
sys.exit("Usage: send.py <connection-url> <address> <message-body>")
handler = SendHandler(conn_url, address, message_body)
container = Container(handler)
container.run()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
サンプルの実行
サンプルプログラムを実行するには、サンプルプログラムをローカルファイルにコピーし、python
コマンドを使用して呼び出します。詳細は、「3章はじめに」を参照してください。
python send.py amqp://localhost queue1 hello
$ python send.py amqp://localhost queue1 hello