此内容没有您所选择的语言版本。

Chapter 5. Messaging Systems


Abstract

This chapter introduces the fundamental building blocks of a messaging system, such as endpoints, messaging channels, and message routers.

5.1. Message

Overview

A message is the smallest unit for transmitting data in a messaging system (represented by the grey dot in the figure below). The message itself might have some internal structure — for example, a message containing multiple parts — which is represented by geometrical figures attached to the grey dot in Figure 5.1, “Message Pattern”.

Figure 5.1. Message Pattern

Message pattern

Types of message

Apache Camel defines the following distinct message types:

  • In message  —  A message that travels through a route from a consumer endpoint to a producer endpoint (typically, initiating a message exchange).
  • Out message  —  A message that travels through a route from a producer endpoint back to a consumer endpoint (usually, in response to an In message).

All of these message types are represented internally by the org.apache.camel.Message interface.

Message structure

By default, Apache Camel applies the following structure to all message types:

  • Headers  —  Contains metadata or header data extracted from the message.
  • Body  —  Usually contains the entire message in its original form.
  • Attachments  —  Message attachments (required for integrating with certain messaging systems, such as JBI).

It is important to remember that this division into headers, body, and attachments is an abstract model of the message. Apache Camel supports many different components, that generate a wide variety of message formats. Ultimately, it is the underlying component implementation that decides what gets placed into the headers and body of a message.

Correlating messages

Internally, Apache Camel remembers the message IDs, which are used to correlate individual messages. In practice, however, the most important way that Apache Camel correlates messages is through exchange objects.

Exchange objects

An exchange object is an entity that encapsulates related messages, where the collection of related messages is referred to as a message exchange and the rules governing the sequence of messages are referred to as an exchange pattern. For example, two common exchange patterns are: one-way event messages (consisting of an In message), and request-reply exchanges (consisting of an In message, followed by an Out message).

Accessing messages

When defining a routing rule in the Java DSL, you can access the headers and body of a message using the following DSL builder methods:

  • header(String name), body()  —  Returns the named header and the body of the current In message.
  • outBody()  —  Returns the body of the current Out message.

For example, to populate the In message’s username header, you can use the following Java DSL route:

from(SourceURL).setHeader("username", "John.Doe").to(TargetURL);

5.2. Message Channel

Overview

A message channel is a logical channel in a messaging system. That is, sending messages to different message channels provides an elementary way of sorting messages into different message types. Message queues and message topics are examples of message channels. You should remember that a logical channel is not the same as a physical channel. There can be several different ways of physically realizing a logical channel.

In Apache Camel, a message channel is represented by an endpoint URI of a message-oriented component as shown in Figure 5.2, “Message Channel Pattern”.

Figure 5.2. Message Channel Pattern

Message channel pattern

Message-oriented components

The following message-oriented components in Apache Camel support the notion of a message channel:

ActiveMQ

In ActiveMQ, message channels are represented by queues or topics. The endpoint URI for a specific queue, QueueName, has the following format:

activemq:QueueName

The endpoint URI for a specific topic, TopicName, has the following format:

activemq:topic:TopicName

For example, to send messages to the queue, Foo.Bar, use the following endpoint URI:

activemq:Foo.Bar

See ActiveMQ in the Apache Camel Component Reference Guide for more details and instructions on setting up the ActiveMQ component.

JMS

The Java Messaging Service (JMS) is a generic wrapper layer that is used to access many different kinds of message systems (for example, you can use it to wrap ActiveMQ, MQSeries, Tibco, BEA, Sonic, and others). In JMS, message channels are represented by queues, or topics. The endpoint URI for a specific queue, QueueName, has the following format:

jms:QueueName

The endpoint URI for a specific topic, TopicName, has the following format:

jms:topic:TopicName

See Jms in the Apache Camel Component Reference Guide for more details and instructions on setting up the JMS component.

AMQP

In AMQP, message channels are represented by queues, or topics. The endpoint URI for a specific queue, QueueName, has the following format:

amqp:QueueName

The endpoint URI for a specific topic, TopicName, has the following format:

amqp:topic:TopicName

See Amqp in the Apache Camel Component Reference Guide. for more details and instructions on setting up the AMQP component.

5.3. Message Endpoint

Overview

A message endpoint is the interface between an application and a messaging system. As shown in Figure 5.3, “Message Endpoint Pattern”, you can have a sender endpoint, sometimes called a proxy or a service consumer, which is responsible for sending In messages, and a receiver endpoint, sometimes called an endpoint or a service, which is responsible for receiving In messages.

Figure 5.3. Message Endpoint Pattern

Message endpoint pattern

Types of endpoint

Apache Camel defines two basic types of endpoint:

  • Consumer endpoint  —  Appears at the start of a Apache Camel route and reads In messages from an incoming channel (equivalent to a receiver endpoint).
  • Producer endpoint  —  Appears at the end of a Apache Camel route and writes In messages to an outgoing channel (equivalent to a sender endpoint). It is possible to define a route with multiple producer endpoints.

Endpoint URIs

In Apache Camel, an endpoint is represented by an endpoint URI, which typically encapsulates the following kinds of data:

  • Endpoint URI for a consumer endpoint  —  Advertises a specific location (for example, to expose a service to which senders can connect). Alternatively, the URI can specify a message source, such as a message queue. The endpoint URI can include settings to configure the endpoint.
  • Endpoint URI for a producer endpoint  —  Contains details of where to send messages and includes the settings to configure the endpoint. In some cases, the URI specifies the location of a remote receiver endpoint; in other cases, the destination can have an abstract form, such as a queue name.

An endpoint URI in Apache Camel has the following general form:

ComponentPrefix:ComponentSpecificURI

Where ComponentPrefix is a URI prefix that identifies a particular Apache Camel component (see Apache Camel Component Reference for details of all the supported components). The remaining part of the URI, ComponentSpecificURI, has a syntax defined by the particular component. For example, to connect to the JMS queue, Foo.Bar, you can define an endpoint URI like the following:

jms:Foo.Bar

To define a route that connects the consumer endpoint, file://local/router/messages/foo, directly to the producer endpoint, jms:Foo.Bar, you can use the following Java DSL fragment:

from("file://local/router/messages/foo").to("jms:Foo.Bar");

Alternatively, you can define the same route in XML, as follows:

<camelContext id="CamelContextID" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="file://local/router/messages/foo"/>
    <to uri="jms:Foo.Bar"/>
  </route>
</camelContext>

Dynamic To

The <toD> parameter allows you to send a message to a dynamically computed endpoint using one or more expressions that are concatenated together.

By default, the Simple language is used to compute the endpoint. The following example sends a message to an endpoint defined by a header:

<route>
  <from uri="direct:start"/>
  <toD uri="${header.foo}"/>
</route>

In Java DSL the format for the same command is:

from("direct:start")
  .toD("${header.foo}");

The URI can also be prefixed with a literal, as shown in the following example:

<route>
  <from uri="direct:start"/>
  <toD uri="mock:${header.foo}"/>
</route>

In Java DSL the format for the same command is:

from("direct:start")
  .toD("mock:${header.foo}");

In the example above, if the value of header.foo is orange, the URI will resolve as mock:orange.

To use a language other than Simple, you need to define the language: parameter. See Part II, “Routing Expression and Predicate Languages”.

The format for using a different language is to use language:languagename: in the URI. For example, to use Xpath use the following format:

<route>
  <from uri="direct:start"/>
  <toD uri="language:xpath:/order/@uri/">
</route>

Here is the same example in Java DSL:

from("direct:start")
  .toD("language:xpath:/order/@uri");

If you do not specify language: then the endpoint is a component name. In some cases a component and a language have the same name, such as xquery.

You can concatenate multiple languages using a + sign. In the example below, the URI is a combination of Simple and Xpath languages. Simple is the default so the language does not have to be defined. After the + sign is the Xpath instruction, indicated by language:xpath.

<route>
  <from uri="direct:start"/>
  <toD uri="jms:${header.base}+language:xpath:/order/@id"/>
</route>

In Java DSL the format is as follows:

from("direct:start")
  .toD("jms:${header.base}+language:xpath:/order/@id");

Many languages can be concatenated at one time, just separate each with a + and specify each language with language:languagename.

The following options are available with toD:

Name

Default Value

Description

uri

 

Mandatory: The URI to use.

pattern

 

Set a specific Exchange Pattern to use when sending to the endpoint. The original MEP is restored afterwards.

cacheSize

 

Configure the cache size of the ProducerCache, which caches producers for reuse. The default cache size is 1000, which will be used if no other value is specified. Setting the value to -1 turns off the cache completely.

ignoreInvalidEndpoint

false

Specifies whether to ignore an endpoint URI that could not be resolved. If disabled, Camel will throw an exception identifying the invalid endpoint URI.

5.4. Pipes and Filters

Overview

The pipes and filters pattern, shown in Figure 5.4, “Pipes and Filters Pattern”, describes a way of constructing a route by creating a chain of filters, where the output of one filter is fed into the input of the next filter in the pipeline (analogous to the UNIX pipe command). The advantage of the pipeline approach is that it enables you to compose services (some of which can be external to the Apache Camel application) to create more complex forms of message processing.

Figure 5.4. Pipes and Filters Pattern

Pipes and filters pattern

Pipeline for the InOut exchange pattern

Normally, all of the endpoints in a pipeline have an input (In message) and an output (Out message), which implies that they are compatible with the InOut message exchange pattern. A typical message flow through an InOut pipeline is shown in Figure 5.5, “Pipeline for InOut Exchanges”.

Figure 5.5. Pipeline for InOut Exchanges

Pipeline for InOut exchanges

The pipeline connects the output of each endpoint to the input of the next endpoint. The Out message from the final endpoint is sent back to the original caller. You can define a route for this pipeline, as follows:

from("jms:RawOrders").pipeline("cxf:bean:decrypt", "cxf:bean:authenticate", "cxf:bean:dedup", "jms:CleanOrders");

The same route can be configured in XML, as follows:

<camelContext id="buildPipeline" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="jms:RawOrders"/>
    <to uri="cxf:bean:decrypt"/>
    <to uri="cxf:bean:authenticate"/>
    <to uri="cxf:bean:dedup"/>
    <to uri="jms:CleanOrders"/>
  </route>
</camelContext>

There is no dedicated pipeline element in XML. The preceding combination of from and to elements is semantically equivalent to a pipeline. See the section called “Comparison of pipeline() and to() DSL commands”.

Pipeline for the InOnly and RobustInOnly exchange patterns

When there are no Out messages available from the endpoints in the pipeline (as is the case for the InOnly and RobustInOnly exchange patterns), a pipeline cannot be connected in the normal way. In this special case, the pipeline is constructed by passing a copy of the original In message to each of the endpoints in the pipeline, as shown in Figure 5.6, “Pipeline for InOnly Exchanges”. This type of pipeline is equivalent to a recipient list with fixed destinations (see Section 8.3, “Recipient List”).

Figure 5.6. Pipeline for InOnly Exchanges

Pipeline for InOnly exchanges

The route for this pipeline is defined using the same syntax as an InOut pipeline (either in Java DSL or in XML).

Comparison of pipeline() and to() DSL commands

In the Java DSL, you can define a pipeline route using either of the following syntaxes:

  • Using the pipeline() processor command  —  Use the pipeline processor to construct a pipeline route as follows:

    from(SourceURI).pipeline(FilterA, FilterB, TargetURI);
  • Using the to() command  — Use the to() command to construct a pipeline route as follows:

    from(SourceURI).to(FilterA, FilterB, TargetURI);

    Alternatively, you can use the equivalent syntax:

    from(SourceURI).to(FilterA).to(FilterB).to(TargetURI);

Exercise caution when using the to() command syntax, because it is not always equivalent to a pipeline processor. In Java DSL, the meaning of to() can be modified by the preceding command in the route. For example, when the multicast() command precedes the to() command, it binds the listed endpoints into a multicast pattern, instead of a pipeline pattern (see Section 8.13, “Multicast”).

5.5. Message Router

Overview

A message router, shown in Figure 5.7, “Message Router Pattern”, is a type of filter that consumes messages from a single consumer endpoint and redirects them to the appropriate target endpoint, based on a particular decision criterion. A message router is concerned only with redirecting messages; it does not modify the message content.

However, by default, whenever Camel routes a message exchange to a recipient endpoint, it sends is a shallow copy of the original exchange object. In a shallow copy, elements of the original exchange, such as the message body, headers, and attachments, are copied by reference only. By sending shallow copies that reuse resources, Camel optimizes for performance. But because these shallow copies are all linked, in cases where Camel routes a message to multiple endpoints, the tradeoff is that you lose the ability to apply custom logic to copies that are routed to different recipients. For information about how to enable Camel to route unique versions of a message to different endpoints, see "Apply custom processing to the outgoing messages".

Figure 5.7. Message Router Pattern

Message router pattern

A message router can easily be implemented in Apache Camel using the choice() processor, where each of the alternative target endpoints can be selected using a when() subclause (for details of the choice processor, see Section 1.5, “Processors”).

Java DSL example

The following Java DSL example shows how to route messages to three alternative destinations (either seda:a, seda:b, or seda:c) depending on the contents of the foo header:

from("seda:a").choice()
    .when(header("foo").isEqualTo("bar")).to("seda:b")
    .when(header("foo").isEqualTo("cheese")).to("seda:c")
    .otherwise().to("seda:d");

XML configuration example

The following example shows how to configure the same route in XML:

<camelContext id="buildSimpleRouteWithChoice" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="seda:a"/>
    <choice>
      <when>
        <xpath>$foo = 'bar'</xpath>
        <to uri="seda:b"/>
      </when>
      <when>
        <xpath>$foo = 'cheese'</xpath>
        <to uri="seda:c"/>
      </when>
      <otherwise>
        <to uri="seda:d"/>
      </otherwise>
    </choice>
  </route>
</camelContext>

Choice without otherwise

If you use choice() without an otherwise() clause, any unmatched exchanges are dropped by default.

5.6. Message Translator

Overview

The message translator pattern, shown in Figure 5.8, “Message Translator Pattern” describes a component that modifies the contents of a message, translating it to a different format. You can use Apache Camel’s bean integration feature to perform the message translation.

Figure 5.8. Message Translator Pattern

Message translator pattern

Bean integration

You can transform a message using bean integration, which enables you to call a method on any registered bean. For example, to call the method, myMethodName(), on the bean with ID, myTransformerBean:

from("activemq:SomeQueue")
  .beanRef("myTransformerBean", "myMethodName")
  .to("mqseries:AnotherQueue");

Where the myTransformerBean bean is defined in either a Spring XML file or in JNDI. If, you omit the method name parameter from beanRef(), the bean integration will try to deduce the method name to invoke by examining the message exchange.

You can also add your own explicit Processor instance to perform the transformation, as follows:

from("direct:start").process(new Processor() {
    public void process(Exchange exchange) {
        Message in = exchange.getIn();
        in.setBody(in.getBody(String.class) + " World!");
    }
}).to("mock:result");

Or, you can use the DSL to explicitly configure the transformation, as follows:

from("direct:start").setBody(body().append(" World!")).to("mock:result");

You can also use templating to consume a message from one destination, transform it with something like Velocity or XQuery and then send it on to another destination. For example, using the InOnly exchange pattern (one-way messaging) :

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm").
  to("activemq:Another.Queue");

If you want to use InOut (request-reply) semantics to process requests on the My.Queue queue on ActiveMQ with a template generated response, then you could use a route like the following to send responses back to the JMSReplyTo destination:

from("activemq:My.Queue").
  to("velocity:com/acme/MyResponse.vm");

5.7. Message History

Overview

The Message History pattern enables you to analyze and debug the flow of messages in a loosely coupled system. If you attach a message history to the message, it displays a list of all applications that the message passed through since its origination.

In Apache Camel, using the getTracedRouteNodes method, you can trace a message flow using the Tracer or access information using the Java API from UnitOfWork.

Limiting Character Length in Logs

When you run Apache Camel with logging mechanism, it enables you to log the messages and its content from time to time.

Some messages may contain very big payloads. By default, Apache Camel will clip the log message and show only the first 1000 characters. For example, it displays the following log as:

[DEBUG ProducerCache  - >>>> Endpoint[direct:start] Exchange[Message: 01234567890123456789... [Body clipped after 20 characters, total length is 1000]

You can customize the limit when Apache Camel clips the body in the log. You can also set zero or a negative value, such as -1, means the message body is not logged.

Customizing the Limit using Java DSL

You can set the limit in Camel properties using Java DSL. For example,

 context.getProperties().put(Exchange.LOG_DEBUG_BODY_MAX_CHARS, "500");
Customizing the Limit using Spring DSL

You can set the limit in Camel properties using Spring DSL. For example,

<camelContext>
    <properties>
        <property key="CamelLogDebugBodyMaxChars" value="500"/>
   </properties>
</camelContext>
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.