第 7 章 在网格中执行代码
缓存的主要优点是能够快速查找其键,甚至跨机器查找值。实际上,这单独使用可能是许多用户使用 Data Grid 的原因。但是,Data Grid 可以提供许多无法立即明显的好处。由于 Data Grid 通常在机器集群中使用,因此我们还提供相应的功能,可帮助利用整个集群来执行用户所需的工作负载。
7.1. Cluster Executor
由于您有一组计算机,因此最好利用其组合计算能力在所有这些计算机上执行代码。Cache Manager 附带一个 nice 工具,可让您在集群中执行任意代码。请注意,这个功能不需要使用缓存。此 Cluster Executor 可通过调用 EmbeddedCacheManager
上的 executor ()来检索。这个 executor 在集群和非集群配置中可以被检索。
ClusterExecutor 专为执行代码不依赖于缓存中数据的代码而设计,而是作为帮助用户在集群中轻松执行代码的方法。
此管理器专门围绕 Java 流 API 构建,因此所有方法都使用功能接口作为参数。因为这些参数将发送到其他节点,因此它们需要按顺序排序。我们甚至使用 nice trick 来确保我们的 lambda 可立即串行。这是通过参数实现 Serializable 和 real 参数类型(如 Runnable 或 Function)。在确定要调用的方法时,JRE 将选择最具体的类,因此在这样,您的 lambdas 始终可以被序列化。也可以使用外部工具来进一步减小消息大小。
默认情况下,管理器将向集群中的所有节点提交给定命令,包括从中提交它的节点。您可以使用 filterTargets
方法控制在其上执行任务的节点,如 部分中所述。
7.1.1. 过滤执行节点
可以限制命令要运行的节点。例如,您可能只想在同一机架的计算机上运行计算。或者,您可能想要在本地站点中一次执行操作,并在不同的站点上再次执行操作。集群 executor 可以限制在相同或不同机器、机架或站点级别向发送请求的节点。
SameRack.java
EmbeddedCacheManager manager = ...; manager.executor().filterTargets(ClusterExecutionPolicy.SAME_RACK).submit(...)
要使用此拓扑基本过滤,您必须通过服务器提示启用拓扑感知一致的哈希。
您还可以根据节点的地址使用 predicate 进行过滤。这也可以选择与之前代码片段中的基于拓扑过滤结合使用。
我们还允许使用任何方法选择目标节点,该方法使用 Predicate
来过滤哪些节点可以被视为执行。请注意,这也可与 Topology 过滤结合使用,以便更精细地控制您在集群中执行代码的位置。
Predicate.java
EmbeddedCacheManager manager = ...; // Just filter manager.executor().filterTargets(a -> a.equals(..)).submit(...) // Filter only those in the desired topology manager.executor().filterTargets(ClusterExecutionPolicy.SAME_SITE, a -> a.equals(..)).submit(...)
7.1.2. Timeout(超时)
集群可执行文件允许为每个调用设置超时。默认为在传输配置中配置的分布式同步超时。这个超时可在集群和非集群缓存管理器中正常工作。当超时过期时,executor 可能会或可能无法中断执行任务的线程。但是,当超时发生任何 Consumer
或 future
时,将完成传递 TimeoutException
。这个值可以通过激活 超时 方法并提供所需持续时间来覆盖。
7.1.3. 单一节点提交
Cluster Executor 也可以以单一节点提交模式运行,而不是向所有节点提交命令,而不必选择通常会收到该命令的其中一个节点,并将其提交到只有一个节点。每个提交都可能会使用不同的节点来执行任务。这对将 ClusterExecutor 用作 java.util.concurrent.Executor
非常有用,您可能会注意到 ClusterExecutor 已实现该 ClusterExecutor。
SingleNode.java
EmbeddedCacheManager manager = ...; manager.executor().singleNodeSubmission().submit(...)
7.1.3.1. 故障切换
在单一节点提交中运行时,可能需要允许 Cluster Executor 处理给定命令期间发生异常的情况,再次重试命令。当发生这种情况时,Cluster Executor 将再次选择一个一个节点,以重新将命令重新提交到所需的故障转移尝试次数。请注意,所选节点可以是通过拓扑或 predicate 检查的任何节点。通过调用覆盖的 singleNodeSubmission 方法来启用故障转移。给定的命令将再次提交到一个节点,直到命令完成且无例外,或者总提交数等于提供的故障切换计数。
7.1.4. 示例:PI Approximation
本例演示了如何使用 ClusterExecutor 估算 PI 的值。
Pi approximation 大大受益于通过 Cluster Executor 的并行分布式执行。回想一下,块的区域是 Sa = 4r2,圆圈的区域是 Ca=pi*r2。将 r2 从第二个 equation 替换成第一个,它会关闭 pi = 4 * Ca/Sa。现在,我们可以将大量 darts 成方块的图像;如果我们将这个大体中的 darts sto 分值放在圆圈中,那么我们将大约是 Ca/Sa 值。由于我们知道,pi = 4 * Ca/Sa 我们可以轻松获得 pi 的大约价值。更糟糕的是,我们实现了更好的投放。在以下示例中,我们找到了 1 亿 darts,而不是"缩小"它们串行地并行化整个数据网格集群中的 dart shooting 的工作。请注意,这在 1 的集群中可以正常工作,但会较慢。
public class PiAppx { public static void main (String [] arg){ EmbeddedCacheManager cacheManager = .. boolean isCluster = .. int numPoints = 1_000_000_000; int numServers = isCluster ? cacheManager.getMembers().size() : 1; int numberPerWorker = numPoints / numServers; ClusterExecutor clusterExecutor = cacheManager.executor(); long start = System.currentTimeMillis(); // We receive results concurrently - need to handle that AtomicLong countCircle = new AtomicLong(); CompletableFuture<Void> fut = clusterExecutor.submitConsumer(m -> { int insideCircleCount = 0; for (int i = 0; i < numberPerWorker; i++) { double x = Math.random(); double y = Math.random(); if (insideCircle(x, y)) insideCircleCount++; } return insideCircleCount; }, (address, count, throwable) -> { if (throwable != null) { throwable.printStackTrace(); System.out.println("Address: " + address + " encountered an error: " + throwable); } else { countCircle.getAndAdd(count); } }); fut.whenComplete((v, t) -> { // This is invoked after all nodes have responded with a value or exception if (t != null) { t.printStackTrace(); System.out.println("Exception encountered while waiting:" + t); } else { double appxPi = 4.0 * countCircle.get() / numPoints; System.out.println("Distributed PI appx is " + appxPi + " using " + numServers + " node(s), completed in " + (System.currentTimeMillis() - start) + " ms"); } }); // May have to sleep here to keep alive if no user threads left } private static boolean insideCircle(double x, double y) { return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2)) <= Math.pow(0.5, 2); } }