6.3. 死信频道


概述

图 6.3 “死信频道模式” 中显示的 死信频道 模式描述了在消息传递系统无法向预期接收方发送消息时要执行的操作。这包括重试发送的功能,如果发送最终失败,将消息发送到死信频道,后者会归档未发送的消息。

图 6.3. 死信频道模式

死信频道模式

在 Java DSL 中创建死信频道

以下示例演示了如何使用 Java DSL 创建死信频道:

errorHandler(deadLetterChannel("seda:errors"));
from("seda:a").to("seda:b");

其中 errorHandler () 方法是 Java DSL 拦截器,这表示当前路由构建器中定义的所有路由都受到此设置的影响。deadLetterChannel () 方法是一个 Java DSL 命令,它创建一个带有指定目标端点 seda:errors 的新死信通道。

errorHandler () 拦截器提供了一个 catch-all 机制 来处理所有 错误类型。如果要应用更精细的方法来异常处理,您可以使用 onException 子句(请参阅 “onException 子句”一节)。

XML DSL 示例

您可以在 XML DSL 中定义死信频道,如下所示:

 <route errorHandlerRef="myDeadLetterErrorHandler">
    ...
 </route>

 <bean id="myDeadLetterErrorHandler" class="org.apache.camel.builder.DeadLetterChannelBuilder">
     <property name="deadLetterUri" value="jms:queue:dead"/>
     <property name="redeliveryPolicy" ref="myRedeliveryPolicyConfig"/>
 </bean>

 <bean id="myRedeliveryPolicyConfig" class="org.apache.camel.processor.RedeliveryPolicy">
     <property name="maximumRedeliveries" value="3"/>
     <property name="redeliveryDelay" value="5000"/>
 </bean>

重新发送策略

通常,如果发送尝试失败,则不会向死信频道直接发送邮件。相反,您可以重新尝试发送最多一些最大限制,并在所有重新发送尝试失败后,您可以将消息发送到 dead letter 频道。若要自定义消息重新发送,您可以将死信频道配置为具有重新发送 策略。例如,要指定最多两个重新发送尝试,并将 exponential backoff 算法应用到发送尝试之间的时间延迟,您可以按照以下方式配置 dead letter 频道:

errorHandler(deadLetterChannel("seda:errors").maximumRedeliveries(2).useExponentialBackOff());
from("seda:a").to("seda:b");

如果您在链中调用相关方法(链中的每个方法会返回对当前 RedeliveryPolicy 对象的引用),在死信频道上设置重新传送选项。表 6.1 “重新发送策略设置” 总结可用于设置重新传送策略的方法。

表 6.1. 重新发送策略设置
方法签名default描述

allowRedeliveryWhileStopping()

true

控制在安全关闭期间还是在路由停止期间尝试重新发送。在停止停止时已正在进行的交付不会中断。

backOffMultiplier (double multiplier)

2

如果启用了 exponential backoff,让 m 成为 backoff 倍数,让 d 成为初始延迟。然后,重新发送尝试的顺序会按照如下所示进行时间:

d, m*d, m*m*d, m*m*m*d, ...

conflictAvoidancePercent (double conflictAvoidancePercent)

15

如果启用了冲突避免,请给 p 避免冲突。冲突避免策略随后将下一个延迟调整到随机数量,最多调整其当前值的加号/减 p%

deadLetterHandleNewException

true

Camel 2.15:指定是否在死信频道中处理消息时是否发生异常。如果为 true,则处理异常并在 WARN 级别记录(这样可保证死信频道可以完成)。如果为 false,则不会处理异常,因此死信频道会失败,并传播新的异常。

delayPattern (String delayPattern)

None

Apache Camel 2.0:请参阅 “redeliver delay 模式”一节

disableRedelivery()

true

Apache Camel 2.0:禁用重新传送功能。要启用重新发送,请将 maximumRedeliveries () 设置为正整数值。

handled (boolean handled)

true

Apache Camel 2.0:如果为 true,当消息移到 dead letter 频道时,当前异常会被清除;如果为 false,则异常会被传播到客户端。

initialRedeliveryDelay (long initialRedeliveryDelay)

1000

指定第一次重新发送前的延迟(以毫秒为单位)。

logNewException

true

指定是否在死信频道中引发异常时记录 WARN 级别。

logStackTrace(boolean logStackTrace)

false

Apache Camel 2.0:如果为 true,则 JVM 堆栈追踪包含在错误日志中。

maximumRedeliveries (整数)

0

Apache Camel 2.0:最大发送尝试数。

maximumRedeliveryDelay (long maxDelay)

60000

Apache Camel 2.0:在使用 exponential backoff 策略时(请参阅 ExponentialBackOff ()),在理论上可以重新发送延迟以无限增加。此属性对重新发送延迟施加上限(以毫秒为单位)

onRedelivery (进程处理器)

None

Apache Camel 2.0:配置在每次重新发送尝试前调用的处理器。

redeliveryDelay (long int)

0

Apache Camel 2.0:指定重新发送尝试之间的延迟(以毫秒为单位)。Apache Camel 2.16.0 :默认重新传送延迟为 1 秒。

retriesExhaustedLogLevel(LoggingLevel logLevel)

LoggingLevel.ERROR

Apache Camel 2.0:指定日志交付失败的日志级别(指定为 org.apache.camel.LoggingLevel 常)。

retryAttemptedLogLevel (LoggingLevel logLevel)

LoggingLevel.DEBUG

Apache Camel 2.0:指定重新传送尝试的日志级别(指定为 org.apache.camel.LoggingLevel 常)。

useCollisionAvoidance()

false

启用冲突避免,这会在 backoff 计时添加一些随机化以减少竞争概率。

useOriginalMessage()

false

Apache Camel 2.0:如果启用了此功能,则发送到死信频道的消息是 原始消息 交换的副本,因为它存在于路由开始时(在 from () 节点)。

useExponentialBackOff()

false

启用指数避退。

重新发送标头

如果 Apache Camel 尝试重新设计信息,它会自动设置 In 消息 表 6.2 “死信重新发送标头” 中描述的标头。

表 6.2. 死信重新发送标头
标头名称类型描述

CamelRedeliveryCounter

整数

Apache Camel 2.0:计算发送尝试失败次数。此值也在 Exchange.REDELIVERY_COUNTER 中设置。

CamelRedelivered

布尔值

Apache Camel 2.0: True,如果进行了一个或多个重新发送尝试。此值也在 Exchange.REDELIVERED 中设置。

CamelRedeliveryMaxCounter

整数

Apache Camel 2.6:保存最大重新传送设置(也在 Exchange.REDELIVERY_MAX_COUNTER 交换属性中设置)。如果使用 retryWhile 或配置了无限的最大重新发送,则缺少此标头。

重新发送交换属性

如果 Apache Camel 尝试重新设计信息,它会自动设置 表 6.3 “重新发送交换属性” 中描述的交换属性。

表 6.3. 重新发送交换属性
Exchange 属性名称类型描述

Exchange.FAILURE_ROUTE_ID

字符串

提供失败的路由 ID。此属性的字面名称为 CamelFailureRouteId

使用原始消息

从 Apache Camel 2.0 开始,由于交换对象在通过路由时进行修改,因此当引发异常时,创建的交换不一定要存储在死信频道中的副本。在很多情况下,最好在路由开始时记录到达路由的消息,然后再受到路由的任何转换。例如,请考虑以下路由:

from("jms:queue:order:input")
       .to("bean:validateOrder");
       .to("bean:transformOrder")
       .to("bean:handleOrder");

前面的路由侦听传入的 JMS 消息,然后使用 beans 序列处理消息: validateOrdertransformOrder并处理Order。但是,当发生错误时,我们不知道消息所处的状态。在 transformOrder bean 或 after 之前发生了错误?通过启用 useOriginalMessage 选项,可以确保来自 jms:queue:order:input 的原始消息记录到 dead letter 频道,如下所示:

// will use original body
errorHandler(deadLetterChannel("jms:queue:dead")
       .useOriginalMessage().maximumRedeliveries(5).redeliveryDelay(5000);

redeliver delay 模式

从 Apache Camel 2.0 开始,可以使用 delayPattern 选项指定重新发送计数的特定范围的延迟。延迟模式具有以下语法: limit1:delay1;limit2:delay2;limit3:delay3;…​,其中每个 delayN applied to redeliveries in the range limitN rhacm redeliveryCount < limitN+1

例如,考虑模式 5:1000;10:5000;20:20000,它定义了三个组,并导致以下重新发送延迟:

  • 尝试号 1..4 = 0 毫秒(因为第一个组以 5 开始)。
  • 尝试编号 5..9 = 1000 毫秒(第一个组)。
  • Try number 10.19 = 5000 毫秒(第二个组)。
  • Try number 20.. = 20000 毫秒(最后一个组)。

您可以使用限制 1 启动组,以定义启动延迟。例如,1:1000;5:5000 会产生以下重新发送延迟:

  • Try number 1.4 = 1000 millis (第一个组)
  • Try number 5.. = 5000 millis (最后一个组)

不要求下一个延迟应高于上一个延迟,您可以使用您喜欢的任何延迟值。例如,延迟模式 1:5000;3:1000、以 5 秒的延迟开始,然后将延迟减少为 1 秒。

哪个端点失败?

当 Apache Camel 路由消息时,它会更新包含 Exchange 发送到 的最后一个 端点的 Exchange 属性。因此,您可以使用以下代码获取当前交换的最新目的地的 URI:

// Java
String lastEndpointUri = exchange.getProperty(Exchange.TO_ENDPOINT, String.class);

其中 Exchange.TO_ENDPOINT 是与 CamelToEndpoint 相等的字符串常量。每当 Camel 发送消息到任何端点时,此属性都会更新。

如果在路由期间发生错误,并且交换移到死信队列中,Apache Camel 还将设置一个名为 CamelFailureEndpoint 的属性,该属性标识交换在发生错误之前发送到的最后一个目的地。因此,您可以使用以下代码从死信队列中访问失败端点:

// Java
String failedEndpointUri = exchange.getProperty(Exchange.FAILURE_ENDPOINT, String.class);

其中 Exchange.FAILURE_ENDPOINT 是一个字符串常量等于 CamelFailureEndpoint

注意

这些属性保留在当前交换中,即使给定目标端点完成 处理后 发生失败。例如,请考虑以下路由:

        from("activemq:queue:foo")
        .to("http://someserver/somepath")
        .beanRef("foo");

现在假设 foo bean 中发生了故障。在这种情况下,Exchange.TO_ENDPOINT 属性和 Exchange.FAILURE_ENDPOINT 属性仍然包含该值。

onRedelivery 处理器

当死信频道执行红色时,可以配置仅在每次重新发送尝试前执行的处理器。 这可用于在某些情况下,您需要在消息被重新设计前更改消息。

例如,以下死信频道配置为在重新设计交换前调用 MyRedeliverProcessor

// we configure our Dead Letter Channel to invoke
// MyRedeliveryProcessor before a redelivery is
// attempted. This allows us to alter the message before
errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(5)
        .onRedelivery(new MyRedeliverProcessor())
        // setting delay to zero is just to make unit teting faster
        .redeliveryDelay(0L));

其中 MyRedeliveryProcessor 进程实现,如下所示:

// This is our processor that is executed before every redelivery attempt
// here we can do what we want in the java code, such as altering the message
public class MyRedeliverProcessor implements Processor {

    public void process(Exchange exchange) throws Exception {
        // the message is being redelivered so we can alter it

        // we just append the redelivery counter to the body
        // you can of course do all kind of stuff instead
        String body = exchange.getIn().getBody(String.class);
        int count = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);

        exchange.getIn().setBody(body + count);

        // the maximum redelivery was set to 5
        int max = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
        assertEquals(5, max);
    }
}

控制在关闭或停止过程中重新发送

如果您停止路由或启动安全关闭,则错误处理程序的默认行为是继续尝试重新发送。由于这通常是所需的行为,您可以选择在关闭或停止期间禁用重新发送,方法是将 allowRedeliveryWhileStopping 选项设置为 false,如下例所示:

errorHandler(deadLetterChannel("jms:queue:dead")
    .allowRedeliveryWhileStopping(false)
    .maximumRedeliveries(20)
    .redeliveryDelay(1000)
    .retryAttemptedLogLevel(LoggingLevel.INFO));
注意

对于向后兼容的原因,allowRedeliveryWhileStopping 选项默认为 true。但是,在积极关闭过程中,无论此选项设置如何(例如,在安全关闭超时后)总是被禁止重新传送。

使用 onExceptionOccurred Processor

死信频道支持 onExceptionOccurred 处理器,以便在出现异常后自定义处理消息。您还可以使用它来自定义日志记录。从 onExceptionOccurred 处理器抛出的新异常都会记录为 WARN 并忽略,而不是覆盖现有的异常。

onRedelivery processor 和 onExceptionOccurred 处理器之间的差别是,您可以在重新传送尝试前完全处理前者的处理。但是,它不会在异常发生后立即发生。例如,如果您将错误处理程序配置为在重新发送尝试之间执行五秒延迟,则稍后重新发送处理器会在出现异常后调用 5 秒。

以下示例解释了如何在出现异常时执行自定义日志记录。您需要配置 onExceptionOccurred 以使用自定义处理器。

errorHandler(defaultErrorHandler().maximumRedeliveries(3).redeliveryDelay(5000).onExceptionOccurred(myProcessor));

onException 子句

您可以在路由构建器中使用 errorHandler () 拦截器,而是定义一系列 onException () 子句,以定义不同的重新传送策略以及各种异常类型的不同死信频道。例如,要为每个 NullPointerExceptionIOExceptionException 类型定义不同的行为,您可以使用 Java DSL 在路由构建器中定义以下规则:

onException(NullPointerException.class)
    .maximumRedeliveries(1)
    .setHeader("messageInfo", "Oh dear! An NPE.")
    .to("mock:npe_error");

onException(IOException.class)
    .initialRedeliveryDelay(5000L)
    .maximumRedeliveries(3)
    .backOffMultiplier(1.0)
    .useExponentialBackOff()
    .setHeader("messageInfo", "Oh dear! Some kind of I/O exception.")
    .to("mock:io_error");

onException(Exception.class)
    .initialRedeliveryDelay(1000L)
    .maximumRedeliveries(2)
    .setHeader("messageInfo", "Oh dear! An exception.")
    .to("mock:error");

from("seda:a").to("seda:b");

其中,通过串联重新传送策略方法(在 表 6.1 “重新发送策略设置”中列出的)来指定重新发送选项,并使用 to () DSL 命令指定 dead letter 频道的端点。您还可以在 onException () 子句中调用其他 Java DSL 命令。例如,前面的示例调用 setHeader () 在名为 messageInfo 的消息标头中记录一些错误详情。

在本例中,NullPointerExceptionIOException 异常类型特别配置。所有其他例外类型都由通用 例外 异常拦截器处理。默认情况下,Apache Camel 应用最接近抛出异常的异常拦截器。如果找不到完全匹配,它将尝试匹配最接近的基础类型,以此类推。最后,如果没有其他拦截器匹配,Exception 类型的拦截器与所有剩余的例外匹配。

OnPrepareFailure

在将交换传递给死信队列之前,您可以使用 onPrepare 选项来允许自定义处理器准备交换。它可让您添加关于交换的信息,如交换失败的原因。例如,以下处理器添加了一个带有例外消息的标头。

public class MyPrepareProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
        exchange.getIn().setHeader("FailedBecause", cause.getMessage());
    }
}

您可以按照如下所示,配置错误处理程序以使用处理器:

errorHandler(deadLetterChannel("jms:dead").onPrepareFailure(new MyPrepareProcessor()));

但是,onPrepare 选项也可以使用默认错误处理程序。

<bean id="myPrepare"
class="org.apache.camel.processor.DeadLetterChannelOnPrepareTest.MyPrepareProcessor"/>

<errorHandler id="dlc" type="DeadLetterChannel" deadLetterUri="jms:dead" onPrepareFailureRef="myPrepare"/>
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.