이 콘텐츠는 선택한 언어로 제공되지 않습니다.

Chapter 4. Beyond "Hello World"


4.1. Subscriptions

In the "Hello World" example, we sent a message to a topic exchange. AMQP messaging uses exchanges to provide flexible decoupled routing between message senders and message producers. Message consumers can subscribe to exchanges by creating a queue and binding it to the exchange. Exchanges and bindings are covered in more depth in their own sections. Here we will touch briefly on the topic exchange specifically, and learn something about the difference between exchanges and queues, as we learn how a message consumer subscribes to an exchange by binding a queue to it.
An exchange differs from a queue in a number of ways. One significant difference is that a queue will queue messages, and can store them, whereas an exchange will distribute them to queues, but has no local storage of its own. Message consumers are decoupled from the message producers by the message broker. Queues provide a mechanism for buffering messages between the two, to allow them to produce and consume data at different rates. A message consumer does not need to be connected at the point in time that a message is published to a queue to receive the message. The message remains in the queue until it is removed.
Exchanges, on the other hand, are a mechanism for routing messages to different queues. If a message is sent to an exchange and there are no queues bound to that exchange, then the message is lost, as there is no-one is listening and there is nowhere to store the message. Queues are subscriptions, and indicate to the broker that "I (an application) am interested in these messages", in the case of a queue created by a consumer, or "I want these messages to be here for interested applications that are coming", in the case of a queue created by a producer. To subscribe to messages of interest, an consumer application creates a queue and binds it to an exchange, or connects to an existing queue (subscribe). To provide messages that are of interest to applications, an application creates a queue and binds it to an exchange (publish). Consuming applications can then use that queue.
In our "Hello World" example program we created a receiver listening to the amq.topic exchange. In the background this creates a queue and subscribes it to the amq.topic exchange. Our Hello World program sender publishes to the amq.topic exchange. The amq.topic exchange is a good one to use for the demo. A topic exchange allows queues to be subscribed (to bind to the exchange) with a binding key that acts as a filter on the subject of messages sent to the exchange. Since we bind to the exchange with no binding key, we signal that we're interested in all messages coming through the exchange.
When our sender sends its message to the amq.topic exchange, the message is delivered to the subscription queue for our receiver. Our receiver then calls fetch() to retrieve the message from its subscription queue.
We will make two modifications to our Hello World program to demonstrate this.
First of all, we will send our message to the amq.topic exchange and after we send the message, register our receiver with the exchange.
We need to change the order of these operations:
Python
sender = session.sender("amq.topic")
receiver = session.receiver("amq.topic")
  
message = Message("Hello World!")
sender.send(message)
Copy to Clipboard Toggle word wrap
C++
Session session = connection.createSession(); 

Receiver receiver = session.createReceiver(address); 
Sender sender = session.createSender(address); 

sender.send(Message("Hello world!"));
Copy to Clipboard Toggle word wrap
C#/.NET
Session session = connection.CreateSession();  

Receiver receiver = session.CreateReceiver(address);   
Sender sender = session.CreateSender(address);  

sender.Send(new Message("Hello world!"));
Copy to Clipboard Toggle word wrap
At the moment we register a receiver with the exchange before sending the message. Let's instead send the message, then register the receiver:
Python
sender = session.sender("amq.topic")
  
message = Message("Hello World!")
sender.send(message)

receiver = session.receiver("amq.topic")
Copy to Clipboard Toggle word wrap
C++
Session session = connection.createSession(); 

Sender sender = session.createSender(address); 
sender.send(Message("Hello world!"));
        
Receiver receiver = session.createReceiver(address);
Copy to Clipboard Toggle word wrap
C#/.NET
Session session = connection.CreateSession();  
  
Sender sender = session.CreateSender(address);  
sender.Send(new Message("Hello world!"));
          
Receiver receiver = session.CreateReceiver(address);
Copy to Clipboard Toggle word wrap
When you run the modified Hello World program, you will not see the "Hello World!" message this time. What happened? The sender published the message to the amq.topic exchange. The exchange then delivered the message to all the subscribed queues... which was none. When our receiver subscribes to the exchange it's too late to receive the message. In the original version of the program the receiver subscribes to the exchange before the message is sent, so it receives a copy of the message in its subscription queue.
Let's now examine the subscription queues that are created when we create the sender and receiver. We'll do that using the qpid-config command. Restart the broker to clear all the queues (all non-durable queues are destroyed when the broker restarts). Then run the command:
qpid-config queues
Copy to Clipboard Toggle word wrap
You see the list of queues on the broker.
Now modify the Hello World program back to its original form, where the receiver is created (subscribed to the exchange) before the message is sent. In order to see what happens, we'll pause the application between creating the exchange subscriptions and sending the message. We'll do that in Python by asking the user to press Enter, and using the raw_input method to grab some keyboard input.
Python
sender = session.sender("amq.topic")
receiver = session.receiver("amq.topic")

print "Press Enter to continue"
x= raw_input()

message = Message("Hello World!")
sender.send(message)
Copy to Clipboard Toggle word wrap
Now we run the program, and while it is paused, we use qpid-config queues to examine the queues on the broker.
Run the program, and while it is paused, issue the command:
qpid-config queues
Copy to Clipboard Toggle word wrap
You will see an exclusive queue with a unique random ID. This is the queue created and bound to the amq.topic exchange for us,to allow our receiver to receive messages from the exchange. You'll also see a number of other queues with the same ID number at the end of them. These are the queues that the qpid-config utility uses to query the message broker and receive the queue list you run the command. If you run the command again, you'll see that our receiver queue remains the same, and the other queues have a new ID - each time you run a qpid-config command it creates it own queues to receive a response from the server. You won't be able to see that those queues aren't there when you're not running qpid-config, because you need to run qpid-config to see the queues, but you can take my word for it.
Since the receiver's queue is bound to the exchange (subscribed) when the sender sends its message to the exchange, the "Hello World!" message is delivered to the subscription queue by the exchange, and is available for the receiver to fetch when it is ready.
The queue created for the receiver is an exclusive queue, which means that only one session can access it at a time.
Version 2.2 and below
To see the queue-exchange bindings, run:
qpid-config queues -b
Copy to Clipboard Toggle word wrap
The -b switch displays bindings. You'll see that the two dynamically created queues are bound to the amq.topic exchange.
Version 2.3 and above
To see the queue-exchange bindings, run:
qpid-config queues -r
Copy to Clipboard Toggle word wrap
The -r switch displays bindings. You'll see that the two dynamically created queues are bound to the amq.topic exchange.
When the application wakes up and completes execution, the call to connection.close() ends the session, and the two exclusive queues on the broker are deleted. You can run qpid-config queues again to verify that.
Another experiment you can try: create one receiver before the message is sent, and another receiver after the message is sent. We would expect the receiver created before the message is sent to receive the message, and the receiver created after the message is sent to not receive it.
Our simple application uses a dynamically created queue to interact with the amq.topic exchange. This queue is private (randomly named and exclusive), and deleted when the consumer disconnects, so it is not suitable for publishing. In order to make messages available to consumers who may or may not be connected to the exchange when the message is sent, a message-producing application needs to create a publicly-accessible queue (publishing). Consuming applications can then subscribe to this published queue and receive messages in a decoupled fashion.
Of course, if it's not important that your messages are buffered somewhere when no-one is listening, you can use the "Hello World" pattern of simply publishing to an exchange, and leave it to the consumers to create their own queues by subscribing to the exchange. AMQP messaging gives a lot of flexibility in messaging system design.
맨 위로 이동
Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2025 Red Hat