6.8.7. メッセージグループデモンストレーション


以下の Python プログラムは、メッセージグループの使用および動作を示しています。このプログラムを実行するには、コードをテキストファイルにコピーして貼り付け message-groups.py、そのまま保存してから、メッセージングブローカーが起動したマシンで Python を使用します。
プログラムは、メッセージングが有効または無効の自動削除キューを作成し、キューのグループヘッダーに一致するメッセージグループヘッダーを持つメッセージをキューに送信します。メッセージングを有効にすると、コンシューマーにブローカーによってメッセージグループの所有権が付与される方法と、キューで表示される内容や表示されない内容にどのように影響するかが実証されます。また、グループから取得したメッセージをすべて承認することで、コンシューマーがグループの所有権をリリースする方法と、フェッチされたメッセージの一部でグループの所有権がリリースされない方法を実証します。
プログラムは 2 つの異なる接続を使用して、2 つのコンシューマーをシミュレートします。コンシューマーは通常別のプロセスとして実行され、おそらく異なるマシンで実行されます。
python
import sys
from qpid.messaging import *

def sendmsg(group, num):
# send the message to the broker and add it to our in-memory representation of the broker queue
  global memoryqueue
  global tx

  msg = Message(group + num)
  msg.properties = {'ourGroupID': group}

  tx.send(msg)
  memoryqueue.append(group + num)

def pullmsg(consumer):
# fetch a message from the broker and print it to the console  
  global counter
  global memoryqueue

  msg = consumers[consumer - 1].fetch(timeout = 1)
  
  print "\nQueued message: " + memoryqueue[counter]
  print "Consumer " + str(consumer) + " got: " + msg.content

  counter +=1
  return msg
  
# Two connections are used to simulate two distinct consumers  
connection = Connection("localhost:5672")
connection2 = Connection("localhost:5672")
connection.open()
connection2.open()

try:
  session = connection.session()
  session2 = connection2.session()
  
  x = raw_input('Enable message grouping [Y/n]?')

  if x == 'N' or x == 'n':
  
    # Create the queue without message groups
    tx = session.sender("test-nogroup-queue; {create: always, node:{x-declare:{auto-delete:True}}}")
    rx1 = session.receiver("test-nogroup-queue")
    rx2 = session2.receiver("test-nogroup-queue")
  
    print "\nMessage grouping is disabled"
    msggroup = False
  
  else:
  
    # Create the queue with message groups enabled
    tx = session.sender("test-group-queue; {create: always, node:{x-declare:{auto-delete: True, arguments: {'qpid.group_header_key': 'ourGroupID', 'qpid.shared_msg_group' : 1}}}}")
    rx1 = session.receiver("test-group-queue")
    rx2 = session2.receiver("test-group-queue")
  
    print "\nMessage grouping is enabled"
    msggroup = True

# Put the receivers in an array so we can use a function to fetch messages  
  consumers = []
  consumers.append(rx1)
  consumers.append(rx2)

  print "Sending interleaved messages from two different groups to the queue..."

# We create an in-memory picture of the queue, to see what order the messages are on the broker
  memoryqueue = []

  sendmsg('A', '1')
  sendmsg('B', '1')
  sendmsg('B', '2')
  sendmsg('A', '2')
  sendmsg('B', '3')
  sendmsg('A', '3')

  counter = 0 
  pullmsg(1)
  pullmsg(2)  
  
  if msggroup:
    print "\nConsumer 1 now owns message group A. Consumer 2 now owns message group B."  

  msgc1 = pullmsg(1)
  msgc2 = pullmsg(2)

  if msggroup:
    print "\nThe consumers will now acknowledge all the messages, or only the last one."
    resp = raw_input('Should they acknowlege all messages? [Y/n]')
  
    if resp == 'N' or resp == 'n':
      print "\nAcknowledging only part of the group. The consumers retain ownership of the group. This is an anti-pattern! See the source code comments for details."
  
      session.acknowledge(msgc1)
      session2.acknowledge(msgc2)
      antipattern = True
  
      # Acknowledging only part of a group is an anti-pattern. Messages are grouped to ensure that a single consumer can deal with the whole group. If this consumer now fails before completing the rest of the group, the unacknowledged messages in the group will be released and redelivered by the broker, but the acknowledged messages in the group are now missing in action!
  
    else:
      print "\nAcknowledging all fetched messages. The consumers will release ownership of the groups."
      session.acknowledge()
      session2.acknowledge()
      antipattern = False 
  
    print "\nPulling more messages from the queue:"

  pullmsg(1)
  pullmsg(2)
  if msggroup:
    if antipattern == False:
      print "\nConsumer 1 now owns message group B. Consumer 2 now owns message group A."
    print "\nSending some more messages to the queue..."
  
  sendmsg('B', '4')
  sendmsg('B', '5')
  sendmsg('A', '4')
  sendmsg('A', '5')
 
  pullmsg(1)
  pullmsg(2)
  pullmsg(1)
  pullmsg(2)

finally:
  connection.close()
  connection2.close()
Copy to Clipboard Toggle word wrap

プログラムの出力例

プログラムは、2 つの異なるグループ AB - からキューにメッセージを送信します。メッセージグループが無効になっている場合の出力の例を以下に示します。

$ python message-groups.py 
Enable message grouping [Y/n]?n

Message grouping is disabled
Sending interleaved messages from two different groups to the queue...

Queued message: A1
Consumer 1 got: A1

Queued message: B1
Consumer 2 got: B1

Queued message: B2
Consumer 1 got: B2

Queued message: A2
Consumer 2 got: A2

Queued message: B3
Consumer 1 got: B3

Queued message: A3
Consumer 2 got: A3

Queued message: B4
Consumer 1 got: B4

Queued message: B5
Consumer 2 got: B5

Queued message: A4
Consumer 1 got: A4

Queued message: A5
Consumer 2 got: A5
Copy to Clipboard Toggle word wrap
コンシューマーはキューからメッセージをラウンドロビン方式でプルし、キューにメッセージが送信される順序でメッセージが表示されます。
メッセージグループが有効になっているプログラムを実行すると、メッセージグループがキューでメッセージの表示方法に影響を与える方法が示されます。
$ python message-groups.py 
Enable message grouping [Y/n]?y

Message grouping is enabled
Sending interleaved messages from two different groups to the queue...

Queued message: A1
Consumer 1 got: A1

Queued message: B1
Consumer 2 got: B1

Consumer 1 now owns message group A. Consumer 2 now owns message group B.

Queued message: B2
Consumer 1 got: A2

Queued message: A2
Consumer 2 got: B2
Copy to Clipboard Toggle word wrap
この段階では、取得したメッセージをすべて確認するか、一部のメッセージしか確認しないかを選択できます。これまでに取得したすべてのメッセージのグループの所有権が認識され、コンシューマーの次のメッセージがキューの次のメッセージになります。
The consumers will now acknowledge all the messages, or only the last one.
Should they acknowlege all messages? [Y/n]y

Acknowledging all fetched messages. The consumers will release ownership of the groups.

Pulling more messages from the queue:

Queued message: B3
Consumer 1 got: B3

Queued message: A3
Consumer 2 got: A3
Copy to Clipboard Toggle word wrap
その後、これらのメッセージのグループの所有権を取得します。
Consumer 1 now owns message group B. Consumer 2 now owns message group A.

Sending some more messages to the queue...

Queued message: B4
Consumer 1 got: B4

Queued message: B5
Consumer 2 got: A4

Queued message: A4
Consumer 1 got: B5

Queued message: A5
Consumer 2 got: A5
Copy to Clipboard Toggle word wrap
グループで取得したメッセージではなく、最後のメッセージのみの確認を選択すると、プログラムはアンチパターンであることを警告し、コンシューマーがグループの所有権を保持することを示します。
The consumers will now acknowledge all the messages, or only the last one.
Should they acknowlege all messages? [Y/n]n

Acknowledging only part of the group. The consumers retain ownership of the group. This is an anti-pattern! See the source code comments for details.

Pulling more messages from the queue:

Queued message: B3
Consumer 1 got: A3

Queued message: A3
Consumer 2 got: B3

Sending some more messages to the queue...

Queued message: B4
Consumer 1 got: A4

Queued message: B5
Consumer 2 got: B4

Queued message: A4
Consumer 1 got: A5

Queued message: A5
Consumer 2 got: B5
Copy to Clipboard Toggle word wrap
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat