6.8.7. メッセージグループデモンストレーション
以下の Python プログラムは、メッセージグループの使用および動作を示しています。このプログラムを実行するには、コードをテキストファイルにコピーして貼り付け
message-groups.py
、そのまま保存してから、メッセージングブローカーが起動したマシンで Python を使用します。
プログラムは、メッセージングが有効または無効の自動削除キューを作成し、キューのグループヘッダーに一致するメッセージグループヘッダーを持つメッセージをキューに送信します。メッセージングを有効にすると、コンシューマーにブローカーによってメッセージグループの所有権が付与される方法と、キューで表示される内容や表示されない内容にどのように影響するかが実証されます。また、グループから取得したメッセージをすべて承認することで、コンシューマーがグループの所有権をリリースする方法と、フェッチされたメッセージの一部でグループの所有権がリリースされない方法を実証します。
プログラムは 2 つの異なる接続を使用して、2 つのコンシューマーをシミュレートします。コンシューマーは通常別のプロセスとして実行され、おそらく異なるマシンで実行されます。
- python
import sys from qpid.messaging import * def sendmsg(group, num): # send the message to the broker and add it to our in-memory representation of the broker queue global memoryqueue global tx msg = Message(group + num) msg.properties = {'ourGroupID': group} tx.send(msg) memoryqueue.append(group + num) def pullmsg(consumer): # fetch a message from the broker and print it to the console global counter global memoryqueue msg = consumers[consumer - 1].fetch(timeout = 1) print "\nQueued message: " + memoryqueue[counter] print "Consumer " + str(consumer) + " got: " + msg.content counter +=1 return msg # Two connections are used to simulate two distinct consumers connection = Connection("localhost:5672") connection2 = Connection("localhost:5672") connection.open() connection2.open() try: session = connection.session() session2 = connection2.session() x = raw_input('Enable message grouping [Y/n]?') if x == 'N' or x == 'n': # Create the queue without message groups tx = session.sender("test-nogroup-queue; {create: always, node:{x-declare:{auto-delete:True}}}") rx1 = session.receiver("test-nogroup-queue") rx2 = session2.receiver("test-nogroup-queue") print "\nMessage grouping is disabled" msggroup = False else: # Create the queue with message groups enabled tx = session.sender("test-group-queue; {create: always, node:{x-declare:{auto-delete: True, arguments: {'qpid.group_header_key': 'ourGroupID', 'qpid.shared_msg_group' : 1}}}}") rx1 = session.receiver("test-group-queue") rx2 = session2.receiver("test-group-queue") print "\nMessage grouping is enabled" msggroup = True # Put the receivers in an array so we can use a function to fetch messages consumers = [] consumers.append(rx1) consumers.append(rx2) print "Sending interleaved messages from two different groups to the queue..." # We create an in-memory picture of the queue, to see what order the messages are on the broker memoryqueue = [] sendmsg('A', '1') sendmsg('B', '1') sendmsg('B', '2') sendmsg('A', '2') sendmsg('B', '3') sendmsg('A', '3') counter = 0 pullmsg(1) pullmsg(2) if msggroup: print "\nConsumer 1 now owns message group A. Consumer 2 now owns message group B." msgc1 = pullmsg(1) msgc2 = pullmsg(2) if msggroup: print "\nThe consumers will now acknowledge all the messages, or only the last one." resp = raw_input('Should they acknowlege all messages? [Y/n]') if resp == 'N' or resp == 'n': print "\nAcknowledging only part of the group. The consumers retain ownership of the group. This is an anti-pattern! See the source code comments for details." session.acknowledge(msgc1) session2.acknowledge(msgc2) antipattern = True # Acknowledging only part of a group is an anti-pattern. Messages are grouped to ensure that a single consumer can deal with the whole group. If this consumer now fails before completing the rest of the group, the unacknowledged messages in the group will be released and redelivered by the broker, but the acknowledged messages in the group are now missing in action! else: print "\nAcknowledging all fetched messages. The consumers will release ownership of the groups." session.acknowledge() session2.acknowledge() antipattern = False print "\nPulling more messages from the queue:" pullmsg(1) pullmsg(2) if msggroup: if antipattern == False: print "\nConsumer 1 now owns message group B. Consumer 2 now owns message group A." print "\nSending some more messages to the queue..." sendmsg('B', '4') sendmsg('B', '5') sendmsg('A', '4') sendmsg('A', '5') pullmsg(1) pullmsg(2) pullmsg(1) pullmsg(2) finally: connection.close() connection2.close()
import sys from qpid.messaging import * def sendmsg(group, num): # send the message to the broker and add it to our in-memory representation of the broker queue global memoryqueue global tx msg = Message(group + num) msg.properties = {'ourGroupID': group} tx.send(msg) memoryqueue.append(group + num) def pullmsg(consumer): # fetch a message from the broker and print it to the console global counter global memoryqueue msg = consumers[consumer - 1].fetch(timeout = 1) print "\nQueued message: " + memoryqueue[counter] print "Consumer " + str(consumer) + " got: " + msg.content counter +=1 return msg # Two connections are used to simulate two distinct consumers connection = Connection("localhost:5672") connection2 = Connection("localhost:5672") connection.open() connection2.open() try: session = connection.session() session2 = connection2.session() x = raw_input('Enable message grouping [Y/n]?') if x == 'N' or x == 'n': # Create the queue without message groups tx = session.sender("test-nogroup-queue; {create: always, node:{x-declare:{auto-delete:True}}}") rx1 = session.receiver("test-nogroup-queue") rx2 = session2.receiver("test-nogroup-queue") print "\nMessage grouping is disabled" msggroup = False else: # Create the queue with message groups enabled tx = session.sender("test-group-queue; {create: always, node:{x-declare:{auto-delete: True, arguments: {'qpid.group_header_key': 'ourGroupID', 'qpid.shared_msg_group' : 1}}}}") rx1 = session.receiver("test-group-queue") rx2 = session2.receiver("test-group-queue") print "\nMessage grouping is enabled" msggroup = True # Put the receivers in an array so we can use a function to fetch messages consumers = [] consumers.append(rx1) consumers.append(rx2) print "Sending interleaved messages from two different groups to the queue..." # We create an in-memory picture of the queue, to see what order the messages are on the broker memoryqueue = [] sendmsg('A', '1') sendmsg('B', '1') sendmsg('B', '2') sendmsg('A', '2') sendmsg('B', '3') sendmsg('A', '3') counter = 0 pullmsg(1) pullmsg(2) if msggroup: print "\nConsumer 1 now owns message group A. Consumer 2 now owns message group B." msgc1 = pullmsg(1) msgc2 = pullmsg(2) if msggroup: print "\nThe consumers will now acknowledge all the messages, or only the last one." resp = raw_input('Should they acknowlege all messages? [Y/n]') if resp == 'N' or resp == 'n': print "\nAcknowledging only part of the group. The consumers retain ownership of the group. This is an anti-pattern! See the source code comments for details." session.acknowledge(msgc1) session2.acknowledge(msgc2) antipattern = True # Acknowledging only part of a group is an anti-pattern. Messages are grouped to ensure that a single consumer can deal with the whole group. If this consumer now fails before completing the rest of the group, the unacknowledged messages in the group will be released and redelivered by the broker, but the acknowledged messages in the group are now missing in action! else: print "\nAcknowledging all fetched messages. The consumers will release ownership of the groups." session.acknowledge() session2.acknowledge() antipattern = False print "\nPulling more messages from the queue:" pullmsg(1) pullmsg(2) if msggroup: if antipattern == False: print "\nConsumer 1 now owns message group B. Consumer 2 now owns message group A." print "\nSending some more messages to the queue..." sendmsg('B', '4') sendmsg('B', '5') sendmsg('A', '4') sendmsg('A', '5') pullmsg(1) pullmsg(2) pullmsg(1) pullmsg(2) finally: connection.close() connection2.close()
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
プログラムの出力例
プログラムは、2 つの異なるグループ A
と B
- からキューにメッセージを送信します。メッセージグループが無効になっている場合の出力の例を以下に示します。
python message-groups.py
$ python message-groups.py
Enable message grouping [Y/n]?n
Message grouping is disabled
Sending interleaved messages from two different groups to the queue...
Queued message: A1
Consumer 1 got: A1
Queued message: B1
Consumer 2 got: B1
Queued message: B2
Consumer 1 got: B2
Queued message: A2
Consumer 2 got: A2
Queued message: B3
Consumer 1 got: B3
Queued message: A3
Consumer 2 got: A3
Queued message: B4
Consumer 1 got: B4
Queued message: B5
Consumer 2 got: B5
Queued message: A4
Consumer 1 got: A4
Queued message: A5
Consumer 2 got: A5
コンシューマーはキューからメッセージをラウンドロビン方式でプルし、キューにメッセージが送信される順序でメッセージが表示されます。
メッセージグループが有効になっているプログラムを実行すると、メッセージグループがキューでメッセージの表示方法に影響を与える方法が示されます。
python message-groups.py
$ python message-groups.py
Enable message grouping [Y/n]?y
Message grouping is enabled
Sending interleaved messages from two different groups to the queue...
Queued message: A1
Consumer 1 got: A1
Queued message: B1
Consumer 2 got: B1
Consumer 1 now owns message group A. Consumer 2 now owns message group B.
Queued message: B2
Consumer 1 got: A2
Queued message: A2
Consumer 2 got: B2
この段階では、取得したメッセージをすべて確認するか、一部のメッセージしか確認しないかを選択できます。これまでに取得したすべてのメッセージのグループの所有権が認識され、コンシューマーの次のメッセージがキューの次のメッセージになります。
The consumers will now acknowledge all the messages, or only the last one. Should they acknowlege all messages? [Y/n]y Acknowledging all fetched messages. The consumers will release ownership of the groups. Pulling more messages from the queue: Queued message: B3 Consumer 1 got: B3 Queued message: A3 Consumer 2 got: A3
The consumers will now acknowledge all the messages, or only the last one.
Should they acknowlege all messages? [Y/n]y
Acknowledging all fetched messages. The consumers will release ownership of the groups.
Pulling more messages from the queue:
Queued message: B3
Consumer 1 got: B3
Queued message: A3
Consumer 2 got: A3
その後、これらのメッセージのグループの所有権を取得します。
Consumer 1 now owns message group B. Consumer 2 now owns message group A. Sending some more messages to the queue... Queued message: B4 Consumer 1 got: B4 Queued message: B5 Consumer 2 got: A4 Queued message: A4 Consumer 1 got: B5 Queued message: A5 Consumer 2 got: A5
Consumer 1 now owns message group B. Consumer 2 now owns message group A.
Sending some more messages to the queue...
Queued message: B4
Consumer 1 got: B4
Queued message: B5
Consumer 2 got: A4
Queued message: A4
Consumer 1 got: B5
Queued message: A5
Consumer 2 got: A5
グループで取得したメッセージではなく、最後のメッセージのみの確認を選択すると、プログラムはアンチパターンであることを警告し、コンシューマーがグループの所有権を保持することを示します。
The consumers will now acknowledge all the messages, or only the last one. Should they acknowlege all messages? [Y/n]n Acknowledging only part of the group. The consumers retain ownership of the group. This is an anti-pattern! See the source code comments for details. Pulling more messages from the queue: Queued message: B3 Consumer 1 got: A3 Queued message: A3 Consumer 2 got: B3 Sending some more messages to the queue... Queued message: B4 Consumer 1 got: A4 Queued message: B5 Consumer 2 got: B4 Queued message: A4 Consumer 1 got: A5 Queued message: A5 Consumer 2 got: B5
The consumers will now acknowledge all the messages, or only the last one.
Should they acknowlege all messages? [Y/n]n
Acknowledging only part of the group. The consumers retain ownership of the group. This is an anti-pattern! See the source code comments for details.
Pulling more messages from the queue:
Queued message: B3
Consumer 1 got: A3
Queued message: A3
Consumer 2 got: B3
Sending some more messages to the queue...
Queued message: B4
Consumer 1 got: A4
Queued message: B5
Consumer 2 got: B4
Queued message: A4
Consumer 1 got: A5
Queued message: A5
Consumer 2 got: B5