Search

Chapter 3. Configuring messaging protocols in network connections

download PDF

AMQ Broker has a pluggable protocol architecture, so that you can easily enable one or more protocols for a network connection.

The broker supports the following protocols:

Note

In addition to the protocols above, the broker also supports its own native protocol known as "Core". Past versions of this protocol were known as "HornetQ" and used by Red Hat JBoss Enterprise Application Platform.

3.1. Configuring a network connection to use a messaging protocol

You must associate a protocol with a network connection before you can use it. (See Chapter 2, Configuring acceptors and connectors in network connections for more information about how to create and configure network connections.) The default configuration, located in the file <broker_instance_dir>/etc/broker.xml, includes several connections already defined. For convenience, AMQ Broker includes an acceptor for each supported protocol, plus a default acceptor that supports all protocols.

Overview of default acceptors

Shown below are the acceptors included by default in the broker.xml configuration file.

<configuration>
  <core>
    ...
    <acceptors>

      <!-- All-protocols acceptor -->
      <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

      <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic -->
      <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

      <!-- STOMP Acceptor -->
      <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

      <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
      <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

      <!-- MQTT Acceptor -->
      <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>

    </acceptors>
    ...
  </core>
</configuration>

The only requirement to enable a protocol on a given network connnection is to add the protocols parameter to the URI for the acceptor. The value of the parameter must be a comma separated list of protocol names. If the protocol parameter is omitted from the URI, all protocols are enabled.

For example, to create an acceptor for receiving messages on port 3232 using the AMQP protocol, follow these steps:

  1. Open the <broker_instance_dir>/etc/broker.xml configuration file.
  2. Add the following line to the <acceptors> stanza:
<acceptor name="ampq">tcp://0.0.0.0:3232?protocols=AMQP</acceptor>

Additional parameters in default acceptors

In a minimal acceptor configuration, you specify a protocol as part of the connection URI. However, the default acceptors in the broker.xml configuration file have some additional parameters configured. The following table details the additional parameters configured for the default acceptors.

Acceptor(s)ParameterDescription

All-protocols acceptor

AMQP

STOMP

tcpSendBufferSize

Size of the TCP send buffer in bytes. The default value is 32768.

tcpReceiveBufferSize

Size of the TCP receive buffer in bytes. The default value is 32768.

TCP buffer sizes should be tuned according to the bandwidth and latency of your network.

In summary TCP send/receive buffer sizes should be calculated as:

buffer_size = bandwidth * RTT.

Where bandwidth is in bytes per second and network round trip time (RTT) is in seconds. RTT can be easily measured using the ping utility.

For fast networks you may want to increase the buffer sizes from the defaults.

All-protocols acceptor

AMQP

STOMP

HornetQ

MQTT

useEpoll

Use Netty epoll if using a system (Linux) that supports it. The Netty native transport offers better performance than the NIO transport. The default value of this option is true. If you set the option to false, NIO is used.

All-protocols acceptor

AMQP

amqpCredits

Maximum number of messages that an AMQP producer can send, regardless of the total message size. The default value is 1000.

To learn more about how credits are used to block AMQP messages, see Section 7.3.2, “Blocking AMQP producers”.

All-protocols acceptor

AMQP

amqpLowCredits

Lower threshold at which the broker replenishes producer credits. The default value is 300. When the producer reaches this threshold, the broker sends the producer sufficient credits to restore the amqpCredits value.

To learn more about how credits are used to block AMQP messages, see Section 7.3.2, “Blocking AMQP producers”.

HornetQ compatibility acceptor

anycastPrefix

Prefix that clients use to specify the anycast routing type when connecting to an address that uses both anycast and multicast. The default value is jms.queue.

For more information about configuring a prefix to enable clients to specify a routing type when connecting to an address, see Section 4.6, “Adding a routing type to an acceptor configuration”.

multicastPrefix

Prefix that clients use to specify the multicast routing type when connecting to an address that uses both anycast and multicast. The default value is jms.topic.

For more information about configuring a prefix to enable clients to specify a routing type when connecting to an address, see Section 4.6, “Adding a routing type to an acceptor configuration”.

Additional resources

3.2. Using AMQP with a network connection

The broker supports the AMQP 1.0 specification. An AMQP link is a uni-directional protocol for messages between a source and a target, that is, a client and the broker.

Procedure

  1. Open the <broker_instance_dir>/etc/broker.xml configuration file.
  2. Add or configure an acceptor to receive AMQP clients by including the protocols parameter with a value of AMQP as part of the URI, as shown in the following example:
<acceptors>
  <acceptor name="amqp-acceptor">tcp://localhost:5672?protocols=AMQP</acceptor>
  ...
</acceptors>

In the preceding example, the broker accepts AMQP 1.0 clients on port 5672, which is the default AMQP port.

An AMQP link has two endpoints, a sender and a receiver. When senders transmit a message, the broker converts it into an internal format, so it can be forwarded to its destination on the broker. Receivers connect to the destination at the broker and convert the messages back into AMQP before they are delivered.

If an AMQP link is dynamic, a temporary queue is created and either the remote source or the remote target address is set to the name of the temporary queue. If the link is not dynamic, the address of the remote target or source is used for the queue. If the remote target or source does not exist, an exception is sent.

A link target can also be a Coordinator, which is used to handle the underlying session as a transaction, either rolling it back or committing it.

Note

AMQP allows the use of multiple transactions per session, amqp:multi-txns-per-ssn, however the current version of AMQ Broker will support only single transactions per session.

Note

The details of distributed transactions (XA) within AMQP are not provided in the 1.0 version of the specification. If your environment requires support for distributed transactions, it is recommended that you use the AMQ Core Protocol JMS.

See the AMQP 1.0 specification for more information about the protocol and its features.

3.2.2. Configuring AMQP security

The broker supports AMQP SASL Authentication. See Security for more information about how to configure SASL-based authentication on the broker.

3.3. Using MQTT with a network connection

The broker supports MQTT v3.1.1 and v5.0 (and also the older v3.1 code message format). MQTT is a lightweight, client to server, publish/subscribe messaging protocol. MQTT reduces messaging overhead and network traffic, as well as a client’s code footprint. For these reasons, MQTT is ideally suited to constrained devices such as sensors and actuators and is quickly becoming the de facto standard communication protocol for Internet of Things(IoT).

Procedure

  1. Open the <broker_instance_dir>/etc/broker.xml configuration file.
  2. Add an acceptor with the MQTT protocol enabled. For example:
<acceptors>
  <acceptor name="mqtt">tcp://localhost:1883?protocols=MQTT</acceptor>
  ...
</acceptors>

MQTT comes with a number of useful features including:

Quality of Service
Each message can define a quality of service that is associated with it. The broker will attempt to deliver messages to subscribers at the highest quality of service level defined.
Retained Messages

Messages can be retained for a particular address. New subscribers to that address receive the last-sent retained message before any other messages, even if the retained message was sent before the client connected.

Retained messages are stored in a queue named sys.mqtt.<topic name> and remain in the queue until a client deletes the retained message or, if an expiry is configured, until the message expires. When a queue is empty, the queue is not removed until you explicitly delete it. For example, the following configuration deletes a queue:

<address-setting match="$sys.mqtt.retain.#">
   <auto-delete-queues>true</auto-delete-queues>
   <auto-delete-addresses>true</auto-delete-addresses>
</address-setting>
Wild card subscriptions
MQTT addresses are hierarchical, similar to the hierarchy of a file system. Clients are able to subscribe to specific topics or to whole branches of a hierarchy.
Will Messages
Clients are able to set a "will message" as part of their connect packet. If the client abnormally disconnects, the broker will publish the will message to the specified address. Other subscribers receive the will message and can react accordingly.

For more information about the MQTT protocol, see the specification.

3.3.1. Configuring MQTT properties

You can append key-value pairs to the MQTT acceptor to configure connection properties. For example:

<acceptors>
  <acceptor name="mqtt">tcp://localhost:1883?protocols=MQTT;receiveMaximum=50000;topicAliasMaximum=50000;maximumPacketSize;134217728;
serverKeepAlive=30;closeMqttConnectionOnPublishAuthorizationFailure=false</acceptor>
  ...
</acceptors>
receiveMaximum
Enables flow-control by specifying the maximum number of QoS 1 and 2 messages that the broker can receive from a client before an acknowledgment is required. The default value is 65535. A value of -1 disables flow-control from clients to the broker. This has the same effect as setting the value to 0 but reduces the size of the CONNACK packet.
topicAliasMaximum
Specifies for clients the maximum number of aliases that the broker supports. The default value is 65535. A value of -1 prevents the broker from informing the client of a topic alias limit. This has the same effect as setting the value to 0, but reduces the size of the CONNACK packet.
maximumPacketSize
Specifies the maximum packet size that the broker can accept from clients. The default value is 268435455. A value of -1 prevents the broker from informing the client of a maximum packet size, which means that no limit is enforced on the size of incoming packets.
serverKeepAlive
Specifies the duration the broker keeps an inactive client connection open. The configured value is applied to the connection only if it is less than the keep-alive value configured for the client or if the value configured for the client is 0. The default value is 60 seconds. A value of -1 means that the broker always accepts the client’s keep alive value (even if that value is 0).
closeMqttConnectionOnPublishAuthorizationFailure
By default, if a PUBLISH packet fails due to a lack of authorization, the broker closes the network connection. If you want the broker to sent a positive acknowledgment instead of closing the network connection, set closeMqttConnectionOnPublishAuthorizationFailure to false.

3.4. Using OpenWire with a network connection

The broker supports the OpenWire protocol, which allows a JMS client to talk directly to a broker. Use this protocol to communicate with older versions of AMQ Broker.

Currently AMQ Broker supports OpenWire clients that use standard JMS APIs only.

Procedure

  1. Open the <broker_instance_dir>/etc/broker.xml configuration file.
  2. Add or modify an acceptor so that it includes OPENWIRE as part of the protocol parameter, as shown in the following example:

    <acceptors>
      <acceptor name="openwire-acceptor">tcp://localhost:61616?protocols=OPENWIRE</acceptor>
      ...
    </acceptors>

In the preceding example, the broker will listen on port 61616 for incoming OpenWire commands.

For more details, see the Openwire examples.

3.5. Using STOMP with a network connection

STOMP is a text-orientated wire protocol that allows STOMP clients to communicate with STOMP Brokers. The broker supports STOMP 1.0, 1.1 and 1.2. STOMP clients are available for several languages and platforms making it a good choice for interoperability.

Procedure

  1. Open the <broker_instance_dir>/etc/broker.xml configuration file.
  2. Configure an existing acceptor or create a new one and include a protocols parameter with a value of STOMP, as below.
<acceptors>
  <acceptor name="stomp-acceptor">tcp://localhost:61613?protocols=STOMP</acceptor>
  ...
</acceptors>

In the preceding example, the broker accepts STOMP connections on the port 61613, which is the default.

For an example of how to configure a broker with STOMP, see the STOMP example.

3.5.1. STOMP limitations

When using STOMP, the following limitations apply:

  1. The broker currently does not support virtual hosting, which means the host header in CONNECT frames are ignored.
  2. Message acknowledgments are not transactional. The ACK frame cannot be part of a transaction, and it is ignored if its transaction header is set).

3.5.2. Providing IDs for STOMP Messages

When receiving STOMP messages through a JMS consumer or a QueueBrowser, the messages do not contain any JMS properties, for example JMSMessageID, by default. However, you can set a message ID on each incoming STOMP message by using a broker paramater.

Procedure

  1. Open the <broker_instance_dir>/etc/broker.xml configuration file.
  2. Set the stompEnableMessageId parameter to true for the acceptor used for STOMP connections, as shown in the following example:
<acceptors>
  <acceptor name="stomp-acceptor">tcp://localhost:61613?protocols=STOMP;stompEnableMessageId=true</acceptor>
  ...
</acceptors>

By using the stompEnableMessageId parameter, each stomp message sent using this acceptor has an extra property added. The property key is amq-message-id and the value is a String representation of an internal message id prefixed with “STOMP”, as shown in the following example:

amq-message-id : STOMP12345

If stompEnableMessageId is not specified in the configuration, the default value is false.

3.5.3. Setting a connection time to live

STOMP clients must send a DISCONNECT frame before closing their connections. This allows the broker to close any server-side resources, such as sessions and consumers. However, if STOMP clients exit without sending a DISCONNECT frame, or if they fail, the broker will have no way of knowing immediately whether the client is still alive. STOMP connections therefore are configured to have a "Time to Live" (TTL) of 1 minute. The means that the broker stops the connection to the STOMP client if it has been idle for more than one minute.

Procedure

  1. Open the <broker_instance_dir>/etc/broker.xml configuration file.
  2. Add the connectionTTL parameter to URI of the acceptor used for STOMP connections, as shown in the following example:
<acceptors>
  <acceptor name="stomp-acceptor">tcp://localhost:61613?protocols=STOMP;connectionTTL=20000</acceptor>
  ...
</acceptors>

In the preceding example, any stomp connection that using the stomp-acceptor will have its TTL set to 20 seconds.

Note

Version 1.0 of the STOMP protocol does not contain any heartbeat frame. It is therefore the user’s responsibility to make sure data is sent within connection-ttl or the broker will assume the client is dead and clean up server-side resources. With version 1.1, you can use heart-beats to maintain the life cycle of stomp connections.

Overriding the broker default time to live

As noted, the default TTL for a STOMP connection is one minute. You can override this value by adding the connection-ttl-override attribute to the broker configuration.

Procedure

  1. Open the <broker_instance_dir>/etc/broker.xml configuration file.
  2. Add the connection-ttl-override attribute and provide a value in milliseconds for the new default. It belongs inside the <core> stanza, as below.
<configuration ...>
  ...
  <core ...>
    ...
    <connection-ttl-override>30000</connection-ttl-override>
    ...
  </core>
<configuration>

In the preceding example, the default Time to Live (TTL) for a STOMP connection is set to 30 seconds, 30000 milliseconds.

3.5.4. Sending and consuming STOMP messages from JMS

STOMP is mainly a text-orientated protocol. To make it simpler to interoperate with JMS, the STOMP implementation checks for presence of the content-length header to decide how to map a STOMP message to JMS.

If you want a STOMP message to map to a …​The message should…​.

JMS TextMessage

Not include a content-length header.

JMS BytesMessage

Include a content-length header.

The same logic applies when mapping a JMS message to STOMP. A STOMP client can confirm the presence of the content-length header to determine the type of the message body (string or bytes).

See the STOMP specification for more information about message headers.

3.5.5. Mapping STOMP destinations to AMQ Broker addresses and queues

When sending messages and subscribing, STOMP clients typically include a destination header. Destination names are string values, which are mapped to a destination on the broker. In AMQ Broker, these destinations are mapped to addresses and queues. See the STOMP specification for more information about the destination frame.

Take for example a STOMP client that sends the following message (headers and body included):

SEND
destination:/my/stomp/queue

hello queue a
^@

In this case, the broker will forward the message to any queues associated with the address /my/stomp/queue.

For example, when a STOMP client sends a message (by using a SEND frame), the specified destination is mapped to an address.

It works the same way when the client sends a SUBSCRIBE or UNSUBSCRIBE frame, but in this case AMQ Broker maps the destination to a queue.

SUBSCRIBE
destination: /other/stomp/queue
ack: client

^@

In the preceding example, the broker will map the destination to the queue /other/stomp/queue.

Mapping STOMP destinations to JMS destinations

JMS destinations are also mapped to broker addresses and queues. If you want to use STOMP to send messages to JMS destinations, the STOMP destinations must follow the same convention:

  • Send or subscribe to a JMS Queue by prepending the queue name by jms.queue.. For example, to send a message to the orders JMS Queue, the STOMP client must send the frame:

    SEND
    destination:jms.queue.orders
    hello queue orders
    ^@
  • Send or subscribe to a JMS Topic by prepending the topic name by jms.topic.. For example, to subscribe to the stocks JMS Topic, the STOMP client must send a frame similar to the following:

    SUBSCRIBE
    destination:jms.topic.stocks
    ^@
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.