9.4. 例子
例如,您需要订购一个新顺序,且您的系统中有两个不同的服务:一个用于管理该订单和一个管理学分。如果您有足够的分数,则可以在逻辑上放置一个订单。借助 Saga EIP,您可以采用 Sag y 路由作为由两种不同操作的 Saga 组成,一个用于创建顺序,另一个用于获取学分。必须执行这两个操作,否则 没有学分的订单都不能被视为不一致的结果(以及没有订单付款)。
from("direct:buy") .saga() .to("direct:newOrder") .to("direct:reserveCredit");
购买操作不会更改其他示例。用于对新订单和 Reserve credit 操作进行建模的不同选项如下:
from("direct:newOrder") .saga() .propagation(SagaPropagation.MANDATORY) .compensation("direct:cancelOrder") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(orderManagerService, "newOrder") .log("Order ${body} created");
这里的传播模式被设置为 MANDATORY 意味着此路由中的任何交换流都必须是 Saga 的一部分(本例中为 Saga),因为 Saga 是在 direct:buy 路由中创建的 SANDATORY。direct:newOrder 路由声明了一个称为 direct:cancelOrder 的补偿操作,负责撤销 Saga 取消的顺序。
每个交换始终包含一个 Exchange.SAGA_LONG_RUNNING_ACTION
标头,此处用作订单的 ID。这标识了在相应的compensating 操作中删除的顺序,但这不是一个要求(选项可以作为替代解决方案使用)。direct:newOrder 的补偿操作为 direct:cancelOrder,它如下所示:
from("direct:cancelOrder") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(orderManagerService, "cancelOrder") .log("Order ${body} cancelled");
当应该取消订单时,Saga EIP 实施将自动调用。它不会遇到错误。如果在 direct:cancelOrder 路由中引发错误,则 EIP 实施应定期重试,以对特定限制执行相关的操作。这意味着,任何补偿操作都必须是幂等的,因此应该考虑多次触发它,且不应在任何情况下失败。如果在所有重试重试后无法进行补偿,则 Saga 实施中应该会触发手动干预流程。
这可能是因为直接执行 直接:newOrder 路由造成的延迟导致,该 Saga 被代表另一个方取消(因为并行路由或 Saga 级别上的超时)存在错误。因此,当 compensating action direct:cancelOrder 被调用时,它无法找到被取消的 Order 记录。为了保证完全全局一致性,为了保证全局一致性,任何主要操作及其相应的补偿措施都是相互作用,例如,如果补偿在主要操作之前发生,则它应该具有相同的效果。
另一种可能的方法是,如果无法使用组合行为,则始终会在强制操作中失败,直到找到由主要操作生成的数据(或者重试次数上限)。这种方法可能在许多环境中工作,但是它很 深远。
该信贷服务几乎按照与订购服务相同。
from("direct:reserveCredit") .saga() .propagation(SagaPropagation.MANDATORY) .compensation("direct:refundCredit") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(creditService, "reserveCredit") .log("Credit ${header.amount} reserved in action ${body}");
调用compensation 操作:
from("direct:refundCredit") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(creditService, "refundCredit") .log("Credit for action ${body} refunded");
此处为信用预订提供补偿操作,即退款。
9.4.1. 处理完成事件
Saga 完成后,需要一些类型的处理。当发生错误并且 Saga 被取消时,会调用补救端点。当 Saga 成功完成时,可以调用完成端点 以执行进一步处理。例如,在以上订购服务中,我们需要知道订单已完成(保留的信用卡)以实际准备顺序。如果没有付款,我们并不想开始准备订单(与大多数现代 CPU 相同),在确保您是否有权阅读它前,我们不会让您访问保留内存的现代 CPU。这可以通过修改的 direct:newOrder 端点轻松完成:
- 调用完整端点:
from("direct:newOrder") .saga() .propagation(SagaPropagation.MANDATORY) .compensation("direct:cancelOrder") .completion("direct:completeOrder") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(orderManagerService, "newOrder") .log("Order ${body} created");
- direct:cancelOrder 与上例中的相同。调用成功完成,如下所示:
from("direct:completeOrder") .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION) .bean(orderManagerService, "findExternalId") .to("jms:prepareOrder") .log("Order ${body} sent for preparation");
完成 Saga 后,订单将发送到 JMS 队列,以进行准备。与补偿操作类似,Saga 协调器可能会多次调用完成操作(特别是出现错误,如网络错误)。在本例中,侦听 prepareOrder JMS 队列的服务已准备好保存可能的重复(请参阅 Idempent Consumer EIP)以了解如何处理重复的示例。
9.4.2. 使用自定义标识符和选项
您可以使用 Saga 选项来注册自定义标识符。例如,重构了 credit 服务,如下所示:
- 生成自定义 ID 并在正文中设置,如下所示:
from("direct:reserveCredit") .bean(idService, "generateCustomId") .to("direct:creditReservation")
- 委派操作并根据需要标记当前正文(compensating 操作)。
from("direct:creditReservation") .saga() .propagation(SagaPropagation.SUPPORTS) .option("CreditId", body()) .compensation("direct:creditRefund") .bean(creditService, "reserveCredit") .log("Credit ${header.amount} reserved. Custom Id used is ${body}");
- 只有在取消 saga 时才会从标头检索 creditId 选项。
from("direct:creditRefund") .transform(header("CreditId")) // retrieve the CreditId option from headers .bean(creditService, "refundCredit") .log("Credit for Custom Id ${body} refunded");
通过将传播模式设置为 SUPPORTS,可以在 Saga 之外调用 direct:creditReservation 端点。这样,可在 Saga 路由中声明多个选项。
9.4.3. 设置超时
在 Saga EIPs 设置超时可确保在机器失败时,Saga 不会永久处于停滞状态。Saga EIP 实施在未明确指定的所有 Saga EIP 上设置了默认超时。当超时到期时,Saga EIP 将决定 取消 Saga (并补偿所有参与者),除非之前已采取不同的决定。
可以在 Saga 参与者上设置超时,如下所示:
from("direct:newOrder") .saga() .timeout(1, TimeUnit.MINUTES) // newOrder requires that the saga is completed within 1 minute .propagation(SagaPropagation.MANDATORY) .compensation("direct:cancelOrder") .completion("direct:completeOrder") // ... .log("Order ${body} created");
所有参与者(如信贷服务、订单服务)都可以设置自己的超时。这些超时的最小值会在 saga 一起组成时作为 saga 的超时时间。也可以在 Saga 级别指定超时,如下所示:
from("direct:buy") .saga() .timeout(5, TimeUnit.MINUTES) // timeout at saga level .to("direct:newOrder") .to("direct:reserveCredit");
9.4.4. 选择传播
在上面的示例中,我们使用了 MANDATORY 和 SUPPORTS propagation 模式,同时也是 REQUIRED propagation 模式,这是未指定任何其他时使用的默认传播模式。这些传播模式映射 1:1 等同于事务上下文中使用的模式。
传播 | 描述 |
---|---|
| 加入现有 Saga 或创建新 Saga (如果不存在)。 |
| 始终创建一个新的 Saga。挂起旧的 Saga,并在新 Saga 终止时恢复它。 |
| Saga 必须已存在。现有 Saga 被加入。 |
| 如果 Saga 已存在,则将其加入。 |
| 如果 Saga 已存在,当当前块完成后它会暂停并恢复。 |
| 当前块不得在 Saga 中调用。 |
9.4.5. 使用手动完成(高级)
当 Saga 无法以同步的方式执行所有 Saga 时,它要求使用异步通信频道与外部服务通信,然后完成模式无法设置为 AUTO (默认),因为当创建该文件时 Saga 不会在创建该文件时完成 Saga。这通常是执行时间较长的 Saga EIP (小时、天)。在这些情况下,应使用 MANUAL completion 模式。
from("direct:mysaga") .saga() .completionMode(SagaCompletionMode.MANUAL) .completion("direct:finalize") .timeout(2, TimeUnit.HOURS) .to("seda:newOrder") .to("seda:reserveCredit");
为 seda:newOrder 和 seda:reserveCredit 添加异步处理。这些会将异步回调发送到 seda:operationCompleted。
from("seda:operationCompleted") // an asynchronous callback .saga() .propagation(SagaPropagation.MANDATORY) .bean(controlService, "actionExecuted") .choice() .when(body().isEqualTo("ok")) .to("saga:complete") // complete the current saga manually (saga component) .end()
您可以添加 direct:finalize 端点来执行最终操作。
将完成模式设置为 MANUAL 意味着当交换在路由 direct:mysaga 中处理时 Saga 不会被完成,但它会持续超过 2 小时(最长持续时间设置为 2 小时)。完成两个异步操作后,Saga 已完成。完成的调用是使用 Camel Saga 组件的 saga:complete 端点完成的。有一个类似的端点用于手动分离 Saga (saga:compensate)。