搜索

此内容没有您所选择的语言版本。

Chapter 8. Senders and receivers

download PDF

The client uses sender and receiver links to represent channels for delivering messages. Senders and receivers are unidirectional, with a source end for the message origin, and a target end for the message destination.

Source and targets often point to queues or topics on a message broker. Sources are also used to represent subscriptions.

8.1. Creating queues and topics on demand

Some message servers support on-demand creation of queues and topics. When a sender or receiver is attached, the server uses the sender target address or the receiver source address to create a queue or topic with a name matching the address.

The message server typically defaults to creating either a queue (for one-to-one message delivery) or a topic (for one-to-many message delivery). The client can indicate which it prefers by setting the queue or topic capability on the source or target.

To select queue or topic semantics, follow these steps:

  1. Configure your message server for automatic creation of queues and topics. This is often the default configuration.
  2. Set either the queue or topic capability on your sender target or receiver source, as in the examples below.

Example: Sending to a queue created on demand

class CapabilityOptions(SenderOption):
    def apply(self, sender):
        sender.target.capabilities.put_object(symbol("queue"))

class ExampleHandler(MessagingHandler):
    def on_start(self, event):
        conn = event.container.connect("amqp://example.com")
        event.container.create_sender(conn, "jobs", options=CapabilityOptions())

Example: Receiving from a topic created on demand

class CapabilityOptions(ReceiverOption):
    def apply(self, receiver):
        receiver.source.capabilities.put_object(symbol("topic"))

class ExampleHandler(MessagingHandler):
    def on_start(self, event):
        conn = event.container.connect("amqp://example.com")
        event.container.create_receiver(conn, "notifications", options=CapabilityOptions())

For more information, see the following examples:

8.2. Creating durable subscriptions

A durable subscription is a piece of state on the remote server representing a message receiver. Ordinarily, message receivers are discarded when a client closes. However, because durable subscriptions are persistent, clients can detach from them and then re-attach later. Any messages received while detached are available when the client re-attaches.

Durable subscriptions are uniquely identified by combining the client container ID and receiver name to form a subscription ID. These must have stable values so that the subscription can be recovered.

To create a durable subscription, follow these steps:

  1. Set the connection container ID to a stable value, such as client-1:

    container = Container(handler)
    container.container_id = "client-1"
  2. Configure the receiver source for durability by setting the durability and expiry_policy properties:

    class SubscriptionOptions(ReceiverOption):
        def apply(self, receiver):
            receiver.source.durability = Terminus.DELIVERIES
            receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
  3. Create a receiver with a stable name, such as sub-1, and apply the source properties:

    event.container.create_receiver(conn, "notifications",
                                    name="sub-1",
                                    options=SubscriptionOptions())

To detach from a subscription, use the Receiver.detach() method. To terminate the subscription, use the Receiver.close() method.

For more information, see the durable-subscribe.py example.

8.3. Creating shared subscriptions

A shared subscription is a piece of state on the remote server representing one or more message receivers. Because it is shared, multiple clients can consume from the same stream of messages.

The client configures a shared subscription by setting the shared capability on the receiver source.

Shared subscriptions are uniquely identified by combining the client container ID and receiver name to form a subscription ID. These must have stable values so that multiple client processes can locate the same subscription. If the global capability is set in addition to shared, the receiver name alone is used to identify the subscription.

To create a durable subscription, follow these steps:

  1. Set the connection container ID to a stable value, such as client-1:

    container = Container(handler)
    container.container_id = "client-1"
  2. Configure the receiver source for sharing by setting the shared capability:

    class SubscriptionOptions(ReceiverOption):
        def apply(self, receiver):
            receiver.source.capabilities.put_object(symbol("shared"))
  3. Create a receiver with a stable name, such as sub-1, and apply the source properties:

    event.container.create_receiver(conn, "notifications",
                                    name="sub-1",
                                    options=SubscriptionOptions())

To detach from a subscription, use the Receiver.detach() method. To terminate the subscription, use the Receiver.close() method.

For more information, see the shared-subscribe.py example.

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.