9.4. 例子
例如,您要放置一个新的订购,并且系统中有两个不同的服务:一个管理订购,另一个管理信用。如果您有足够的学分来,您可以按顺序排列。通过 Saga EIP,您可以将 direct:buy 路由建模为由两个不同操作组成的 Saga,一个用于创建顺序,另一个则取用以取信。必须执行这两个操作,或作为没有 信用的订单来执行任何操作都将被视为不一致的结果(以及无订单的付款)。
from("direct:buy") .saga() .to("direct:newOrder") .to("direct:reserveCredit");
buy 操作不会更改其他示例。用于对 New Order 和 Reserve credit 操作进行建模的不同选项如下:
from("direct:newOrder") .saga() .propagation(SagaPropagation.MANDATORY) .compensation("direct:cancelOrder") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(orderManagerService, "newOrder") .log("Order ${body} created");
此处的传播模式被设置为 MANDATORY 表示此路由中的任何交换流都必须已经是 Saga 的一部分(在本例中是因为在 direct:buy 路由中创建的 Saga)。direct:newOrder 路由声明一个称为 direct:cancelOrder 的 compensating 操作,它负责撤销 Saga 取消的顺序。
每个交换始终都包含一个 Exchange.SAGA_LONG_RUNNING_ACTION
标头,此处用作顺序的 id。这标识了在相应的compensating 操作中删除的顺序,但它并不是一个要求(选项可用作替代解决方案)。direct:newOrder 的compensating 操作是 direct:cancelOrder,它如下所示:
from("direct:cancelOrder") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(orderManagerService, "cancelOrder") .log("Order ${body} cancelled");
当应取消顺序时,Saga EIP 实施会自动调用它。它不会终止并显示错误。如果在 direct:cancelOrder 路由中抛出错误,EIP 实现应定期重试来执行最多某一限制的操作。这意味着,任何编译操作都必须是幂等的,因此应该考虑它可能会多次触发,且不应在任何情况下失败。如果在所有重试后无法完成补偿,则 Saga 实施应触发手动干预过程。
可能会因为执行 direct:newOrder 路由时出现一个延迟,Saga 被 meantime 中的另一方取消(因为并行路由或 Saga 级别中的一个超时)被另一个方取消。因此,当调用 compensating action direct:cancelOrder 时,它可能无法找到被取消的 Order 记录。为保证完全的全局一致性非常重要,例如,如果 编译过程在主操作前发生相同效果,则任何主要操作及其相应的补偿操作都是 可取的。
不允许使用 commutative 行为时,在找到主操作(或最多重试次数)的数据之前,在compensating 操作中持续失败(或最多重试次数)。这种方法在很多上下文中可能工作,但 非常繁琐。
信用服务按照与订购服务相同的方式实施。
from("direct:reserveCredit") .saga() .propagation(SagaPropagation.MANDATORY) .compensation("direct:refundCredit") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(creditService, "reserveCredit") .log("Credit ${header.amount} reserved in action ${body}");
调用 compensation 操作:
from("direct:refundCredit") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(creditService, "refundCredit") .log("Credit for action ${body} refunded");
此处为信用卡预留的补偿操作是退款。
9.4.1. 处理完成事件
当 Saga 完成后,需要一些类型的处理。当发生错误并且 Saga 被取消时,会调用 compensation 端点。成功完成后,可以调用完成端点来进一步处理。例如,在上述订购服务中,我们可能需要知道何时完成订购(以及保留信用)来实际开始准备订单。如果不完成付款,我们不希望开始准备订单(除非大多数现代 CPU 可让您访问保留内存,然后再确保您具有读取它的权限)。这可以通过 direct:newOrder 端点的修改版本轻松完成:
- 调用完整端点:
from("direct:newOrder") .saga() .propagation(SagaPropagation.MANDATORY) .compensation("direct:cancelOrder") .completion("direct:completeOrder") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(orderManagerService, "newOrder") .log("Order ${body} created");
- direct:cancelOrder 与上例中的相同。在成功完成时调用,如下所示:
from("direct:completeOrder") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(orderManagerService, "findExternalId") .to("jms:prepareOrder") .log("Order ${body} sent for preparation");
Saga 完成后,顺序将发送到 JMS 队列以准备工作。与补偿操作一样,Saga coordinator 可以多次调用完成操作(特别是当出现错误时,如网络错误)。在本例中,侦听 prepareOrder JMS 队列的服务已准备好容纳可能的重复项(请参阅 Idempotent Consumer EIP 以了解如何处理重复的示例)。
9.4.2. 使用自定义标识符和选项
您可以使用 Saga 选项注册自定义标识符。例如,信用服务重构如下:
- 生成自定义 ID 并在正文中设置它,如下所示:
from("direct:reserveCredit") .bean(idService, "generateCustomId") .to("direct:creditReservation")
- 在 compensating 操作中根据需要委派操作并标记当前的正文。
from("direct:creditReservation") .saga() .propagation(SagaPropagation.SUPPORTS) .option("CreditId", body()) .compensation("direct:creditRefund") .bean(creditService, "reserveCredit") .log("Credit ${header.amount} reserved. Custom Id used is ${body}");
- 只有在 saga 被取消时才从标头检索 creditId 选项。
from("direct:creditRefund") .transform(header("CreditId")) // retrieve the CreditId option from headers .bean(creditService, "refundCredit") .log("Credit for Custom Id ${body} refunded");
通过将传播模式设置为 SUPPORTS,可以在 Saga 之外调用 direct:creditReservation 端点。这样,可以在 Saga 路由中声明多个选项。
9.4.3. 设置超时
在 Saga EIPs 上设置超时保证,Saga 在机器失败时不会一直卡住。Saga EIP 实现在所有未明确指定它的 Saga EIP 上都设置了默认超时。当超时过期时,Saga EIP 将决定 取消 Saga (并补偿所有参与者),除非之前已经采取了不同的决定。
可在 Saga 参与者上设置超时,如下所示:
from("direct:newOrder") .saga() .timeout(1, TimeUnit.MINUTES) // newOrder requires that the saga is completed within 1 minute .propagation(SagaPropagation.MANDATORY) .compensation("direct:cancelOrder") .completion("direct:completeOrder") // ... .log("Order ${body} created");
所有参与者(如信用服务、订单服务)都可以设置自己的超时。当这些超时被组成时,会取为 saga 的超时时间。可在 Saga 级别指定超时,如下所示:
from("direct:buy") .saga() .timeout(5, TimeUnit.MINUTES) // timeout at saga level .to("direct:newOrder") .to("direct:reserveCredit");
9.4.4. 选择传播
在上例中,我们使用了 MANDATORY 和 SUPPORTS 传播模式,但也使用 REQUIRED 传播模式,这是任何指定任何其他用途的默认传播。这些传播模式映射 1:1 事务上下文中使用的等效模式。
传播 | 描述 |
---|---|
| 加入现有的 Saga 或创建新 Saga (如果不存在)。 |
| 始终创建新的 Saga。挂起旧的 Saga,并在一个新 Saga 终止时恢复它。 |
| 必须已存在 Saga。现有的 Saga 已加入。 |
| 如果 Saga 已存在,则加入它。 |
| 如果 Saga 已存在,则在当前块完成后它会被挂起并恢复。 |
| 当前块必须永远不会在 Saga 中调用。 |
9.4.5. 使用手动完成(高级)
当无法以同步的方式执行 Saga 时,但要求使用异步通信频道与外部服务通信,那么完成模式无法设置为 AUTO (默认),因为 Saga 在交换完成时不会完成。对于具有长时间执行时间(小时、天)的 Saga EIP,通常会出现这种情况。在这些情况下,应使用 MANUAL 自动完成模式。
from("direct:mysaga") .saga() .completionMode(SagaCompletionMode.MANUAL) .completion("direct:finalize") .timeout(2, TimeUnit.HOURS) .to("seda:newOrder") .to("seda:reserveCredit");
为 seda:newOrder 和 seda:reserveCredit 添加异步处理。它们向 seda:operationCompleted 发送异步回调。
from("seda:operationCompleted") // an asynchronous callback .saga() .propagation(SagaPropagation.MANDATORY) .bean(controlService, "actionExecuted") .choice() .when(body().isEqualTo("ok")) .to("saga:complete") // complete the current saga manually (saga component) .end()
您可以添加 direct:finalize 端点来执行最终操作。
将完成模式设置为 MANUAL 意味着,当路由 direct:mysaga 中处理交换时,Saga 不会被完成,但它将最后更长的时间(最大持续时间设置为 2 小时)。当两个异步操作都完成后,Saga 已完成。要完成的调用是使用 Camel Saga 组件的 saga:complete 端点完成的。有一个类似的端点用于手动补偿 Saga (saga:compensate)。