6.3. 死信频道
概述
图 6.3 “死信频道模式” 中显示的 死信频道 模式描述了在消息传递系统无法向预期收件人发送邮件时要执行的操作。这包括重试发送等功能,如果发送最终失败,将消息发送到 dead letter 频道,该通道归档未发送的消息。
图 6.3. 死信频道模式
在 Java DSL 中创建死信频道
以下示例演示了如何使用 Java DSL 创建死信频道:
errorHandler(deadLetterChannel("seda:errors")); from("seda:a").to("seda:b");
其中 errorHandler ()
方法是一个 Java DSL 拦截器,这意味着当前路由构建器中定义的所有路由都受到此设置的影响。deadLetterChannel ()
方法是一个 Java DSL 命令,它利用指定的目的地端点 seda:errors
创建一个新的死信频道。
errorHandler ()
拦截器提供 处理所有 错误类型的概括性机制。如果要应用更精细的方法来异常处理,您可以使用 onException
子句替代(请参阅 “onException 子句”一节)。
XML DSL 示例
您可以在 XML DSL 中定义死信频道,如下所示:
<route errorHandlerRef="myDeadLetterErrorHandler"> ... </route> <bean id="myDeadLetterErrorHandler" class="org.apache.camel.builder.DeadLetterChannelBuilder"> <property name="deadLetterUri" value="jms:queue:dead"/> <property name="redeliveryPolicy" ref="myRedeliveryPolicyConfig"/> </bean> <bean id="myRedeliveryPolicyConfig" class="org.apache.camel.processor.RedeliveryPolicy"> <property name="maximumRedeliveries" value="3"/> <property name="redeliveryDelay" value="5000"/> </bean>
重新传送策略
通常,如果发送尝试失败,您不会直接向死信频道发送邮件。相反,您会重新传送到一些最大限制,在所有重新传送尝试失败时,您要将消息发送到死信频道。要自定义消息重新发送,您可以配置 dead letter 频道来 重新传送策略。例如,要指定最多两个重新传送尝试,并在发送尝试之间把 exponential backoff 算法应用到时间延迟,您可以配置 dead letter 频道,如下所示:
errorHandler(deadLetterChannel("seda:errors").maximumRedeliveries(2).useExponentialBackOff()); from("seda:a").to("seda:b");
当您通过调用链中的相关方法来设置死信频道的重新传送选项的位置(每个链中的方法是返回对当前 RedeliveryPolicy
对象的引用)。表 6.1 “重新传送策略设置” 总结了可用于设置重新传送策略的方法。
方法签名 | 默认 | 描述 |
---|---|---|
|
| 控制在安全关闭或当路由停止期间是否尝试重新发送。在启动停止时已进行中的交付不会中断。 |
|
|
如果启用了 exponential backoff,让 d, m*d, m*m*d, m*m*m*d, ... |
|
|
如果启用冲突避免启用, |
|
|
Camel 2.15:指定是否处理在死信频道中处理消息时出现的异常。如果为 |
| None | Apache Camel 2.0:请查看 “redeliver delay pattern”一节。 |
|
|
Apache Camel 2.0:禁用重新传送功能。要启用重新传送,将 |
|
|
Apache Camel 2.0:如果为 |
|
| 在尝试第一个重新发送前指定延迟(以毫秒为单位)。 |
|
| 指定在死信频道出现异常时是否会出现 WARN 级别的日志。 |
|
|
Apache Camel 2.0:如果为 |
|
| Apache Camel 2.0:最大交付尝试数. |
|
|
Apache Camel 2.0:使用 exponential backoff 策略时(请参阅 |
| None | Apache Camel 2.0:配置在在每次重新传送尝试之前调用的处理器。 |
|
| Apache Camel 2.0:指定重新发送尝试之间的延迟(以毫秒为单位)。Apache Camel 2.16.0 :默认的重新传送延迟为一秒。 |
|
|
Apache Camel 2.0:指定要记录发送失败的日志记录级别(指定为 |
|
|
Apache Camel 2.0:指定要重新发送尝试的日志记录级别(指定为 |
|
| 启用冲突避免,这会为后退时时间增加一些随机性,以减少冲突的概率。 |
|
|
Apache Camel 2.0:如果启用了这个功能,则发送到死信频道的消息是 原始消息 交换的副本,因为它存在于路由的开头( |
|
| 启用 exponential backoff。 |
重新传送标头
如果 Apache Camel 尝试恢复消息,它会在信息中自动设置 表 6.2 “死 Letter 重新传送标头” 中描述的标头。
标头名称 | 类型 | 描述 |
---|---|---|
|
|
Apache Camel 2.0:计算失败的发送尝试次数。此值也在 |
|
|
Apache Camel 2.0:True,如果已经进行一个或多个重新发送尝试,则为 True。这个值也在 |
|
|
Apache Camel 2.6:包含最大重新传送设置(也可在 |
重新传送交换属性
如果 Apache Camel 尝试恢复消息,它会自动设置 表 6.3 “重新传送交换属性” 中描述的交换属性。
Exchange Property Name | 类型 | 描述 |
---|---|---|
|
|
提供失败的路由的路由 ID。此属性的文字名称为 |
使用原始消息
作为 Apache Camel 2.0 的形式提供,因为交换对象因为通过路由而受到修改,因此当引发异常并不一定要存储在 dead letter 频道中的副本时,会得到当前的交换。在很多情况下,在路由开始之前,最好记录消息到达路由开始时。例如,请考虑以下路由:
from("jms:queue:order:input") .to("bean:validateOrder"); .to("bean:transformOrder") .to("bean:handleOrder");
前面的路由侦听传入的 JMS 消息,然后使用 Bean 序列来处理消息: validateOrder
、transformOrder
和 handleOrder
。但发生错误时,我们不知道该消息处于哪个状态。在 transformOrder
bean 或之后发生错误吗?我们可通过启用 useOriginalMessage
选项,确保来自 jms:queue:order:input
的原始消息记录到 dead letter 频道,如下所示:
// will use original body errorHandler(deadLetterChannel("jms:queue:dead") .useOriginalMessage().maximumRedeliveries(5).redeliveryDelay(5000);
redeliver delay pattern
以 Apache Camel 2.0 开始,使用 delayPattern
选项指定特定范围重新传送计数的延迟。延迟模式具有以下语法: limit1:delay1;limit2:delay2;limit3:delay3
:… 应用到每个 delayN 应用到红色的,在范围 limitN vRAN redeliveryCount < limitN+1
例如,考虑模式 5:1000;10:5000;20:20000
,它定义了三个组,从而导致以下重新发送延迟:
- 尝试编号 1.4 = 0 毫秒(因为第一个组以 5 开始)。
- 尝试以数字 5..9 = 1000 毫秒(第一个组)。
- 尝试编号 10.19 = 5000 毫秒(第二个组)。
- 尝试数量 20.. = 20000 毫秒(最后一个组)。
您可以使用 limits 1 启动组来定义起始延迟。例如,1:1000;5:5000
会导致以下重新发送延迟:
- 尝试编号 1.4 = 1000 millis (第一个组)
- 尝试编号 5.. = 5000 millis (最后一个组)
不要求下一个延迟应高于前一个延迟,您可以使用任何延迟值。例如,延迟模式 1:5000;3:1000
,以 5 秒延迟开始,然后减少 1 秒的延迟。
哪一端点失败?
当 Apache Camel 路由消息时,它会更新包含交换发送到 的最后一个 端点的 Exchange 属性。因此,您可以使用以下代码获取当前交换的最新目的地的 URI:
// Java String lastEndpointUri = exchange.getProperty(Exchange.TO_ENDPOINT, String.class);
这里的 Exchange.TO_ENDPOINT
是一个字符串常量等于 CamelToEndpoint
。每当 Camel 发送消息 到任何 端点时,都会更新此属性。
如果在路由过程中发生错误,且交换被移到死信队列中,Apache Camel 将额外设置名为 CamelFailureEndpoint
的属性,这用于标识交换在出错前发送到的最后目的地。因此,您可以使用以下代码从死信队列中访问故障端点:
// Java String failedEndpointUri = exchange.getProperty(Exchange.FAILURE_ENDPOINT, String.class);
这里的 Exchange.FAILURE_ENDPOINT
是字符串常量等于 CamelFailureEndpoint
。
这些属性在当前交换中保持设置,即使给定目标端点完成处理后失败也是如此。例如,请考虑以下路由:
from("activemq:queue:foo") .to("http://someserver/somepath") .beanRef("foo");
现在假定 foo
bean 中发生失败。在这种情况下,Exchange.TO_ENDPOINT
属性和 Exchange.FAILURE_ENDPOINT
属性仍然包含该值。
onRedelivery 处理器
当一个死信频道正在执行红色时,可以配置仅在每次重新传送尝试 之前执行 的处理器。当您需要在信息被恢复前修改信息时,可以使用它。
例如,以下 dead letter 频道被配置为在重载交换前调用 MyRedeliverProcessor
:
// we configure our Dead Letter Channel to invoke // MyRedeliveryProcessor before a redelivery is // attempted. This allows us to alter the message before errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(5) .onRedelivery(new MyRedeliverProcessor()) // setting delay to zero is just to make unit teting faster .redeliveryDelay(0L));
在实施 MyRedeliveryProcessor
进程的位置,如下所示:
// This is our processor that is executed before every redelivery attempt
// here we can do what we want in the java code, such as altering the message
public class MyRedeliverProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
// the message is being redelivered so we can alter it
// we just append the redelivery counter to the body
// you can of course do all kind of stuff instead
String body = exchange.getIn().getBody(String.class);
int count = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
exchange.getIn().setBody(body + count);
// the maximum redelivery was set to 5
int max = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
assertEquals(5, max);
}
}
控制在关闭或停止期间重新发送
如果您停止路由或启动安全关闭,则错误处理程序的默认行为是继续尝试重新发送。由于这通常不是所需的行为,因此您可以通过将 allowRedeliveryWhileStopping
选项设置为 false
来禁用重新传送,如下例中所示:
errorHandler(deadLetterChannel("jms:queue:dead")
.allowRedeliveryWhileStopping(false)
.maximumRedeliveries(20)
.redeliveryDelay(1000)
.retryAttemptedLogLevel(LoggingLevel.INFO));
对于向后兼容的原因,默认情况下,allowRedeliveryWhileStopping
选项为 true
。但是,在主动关闭过程中,始终会禁止重新传送,无论在安全关闭关闭后(例如,在安全关闭后)出现。
使用 onExceptionOccurred Processor
dead Letter 通道支持对ExceptionOccurred 处理器的 onExceptionOccurred 处理器,在发生异常后允许自定义处理消息。您还可以使用它进行自定义日志记录。所有来自ExceptionOccurred 处理器上的新异常都会被记录为 WARN 并忽略,而不覆盖现有的异常。
onRedelivery processor 和 onExceptionOccurred 处理器之间的区别是在重新传送尝试前完全处理前一个。但是,在异常发生后它不会立即发生。例如,如果您将错误处理程序配置为在重新传送尝试之间进行五秒延迟,那么后在异常发生后会调用重新传送处理器。
以下示例说明了如何在发生异常时执行自定义日志记录。您需要配置 onExceptionOccurred 以使用自定义处理器。
errorHandler(defaultErrorHandler().maximumRedeliveries(3).redeliveryDelay(5000).onExceptionOccurred(myProcessor));
onException 子句
除了在路由构建器中使用 errorHandler ()
拦截器,您可以定义一系列 onException ()
子句,为各种异常类型定义不同的重新传送策略和不同的死信频道。例如,为每一 NullPointerException
、IOException
和 Exception
类型定义不同的行为,您可以使用 Java DSL 在路由构建器中定义以下规则:
onException(NullPointerException.class) .maximumRedeliveries(1) .setHeader("messageInfo", "Oh dear! An NPE.") .to("mock:npe_error"); onException(IOException.class) .initialRedeliveryDelay(5000L) .maximumRedeliveries(3) .backOffMultiplier(1.0) .useExponentialBackOff() .setHeader("messageInfo", "Oh dear! Some kind of I/O exception.") .to("mock:io_error"); onException(Exception.class) .initialRedeliveryDelay(1000L) .maximumRedeliveries(2) .setHeader("messageInfo", "Oh dear! An exception.") .to("mock:error"); from("seda:a").to("seda:b");
通过串联重新传送策略方法(如 表 6.1 “重新传送策略设置”中列出的)来指定重新发送选项的位置,您可以使用 to ()
DSL 命令指定 dead letter 频道的端点。您也可以在 onException ()
子句中调用其他 Java DSL 命令。例如,前面的示例调用 setHeader ()
在名为 messageInfo
的消息标头中记录一些错误详情。
在本例中,特别配置 NullPointerException
和 IOException
异常类型。所有其他异常类型都由通用 Exception
异常拦截器处理。默认情况下,Apache Camel 会应用最匹配的异常拦截器。如果无法找到准确的匹配项,它将尝试匹配最接近的基本类型,以此类推。最后,如果没有其他拦截器匹配,则 例外
类型的拦截器与所有剩余的异常匹配。
OnPrepareFailure
在将交换传递给死信队列之前,您可以使用 onPrepare
选项允许自定义处理器准备交换。它可让您添加有关交换的信息,如交换失败的原因。例如,以下处理器添加了一个例外消息的标头。
public class MyPrepareProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); exchange.getIn().setHeader("FailedBecause", cause.getMessage()); } }
您可以按如下方式配置错误处理程序以使用处理器。
errorHandler(deadLetterChannel("jms:dead").onPrepareFailure(new MyPrepareProcessor()));
但是,也可以使用默认错误处理程序提供 onPrepare
选项。
<bean id="myPrepare" class="org.apache.camel.processor.DeadLetterChannelOnPrepareTest.MyPrepareProcessor"/> <errorHandler id="dlc" type="DeadLetterChannel" deadLetterUri="jms:dead" onPrepareFailureRef="myPrepare"/>