第 9 章 在网格中执行代码


缓存的主要优点是能够快速为其键查找值,甚至跨机器也是如此。实际上,仅使用这个用途可能是许多用户使用 Data Grid 的原因。但是,数据中心可以提供许多无法立即出现的好处。由于 Data Grid 通常在一台机器集群中使用,因此我们也有可用功能,可帮助利用整个集群来执行用户所需的工作负载。

注意

本节仅涵盖使用嵌入式缓存在网格中执行代码,如果您使用远程缓存,您应该查看在远程网格中执行代码的详细信息。

9.1. Cluster Executor

由于您拥有一组计算机,因此利用它们组合的计算能力在所有这些机器上执行代码。缓存管理器附带一个 一 个实用程序,可让您在集群中执行任意代码。请注意,这个功能不需要使用缓存。此 Cluster Executor 可以通过调用 EmbeddedCacheManager 上的 executor ()来检索。这个 executor 可在集群和非集群配置中检索。

注意

ClusterExecutor 是专门执行代码,代码在缓存中的数据不会依赖,而是用作帮助用户在集群中轻松执行代码的方法。

此管理器专门使用 Java 8 构建,因此所有方法都使用 Java 8,因此所有方法都使用一个功能接口作为参数。此外,由于这些参数将发送到需要序列化的其他节点。我们甚至使用一种相似的欺骗来确保我们的 lambdas 会立即被序列化。通过具有参数实现 Serializable 和 real 参数类型(例如,可运行或功能)。在确定要调用的方法时,JRE 将选择最具体的类,因此您的 lambdas 将始终可以被序列化。也可以使用 Externalizer 来进一步减小消息大小。

管理器默认会将给定命令提交到集群中的所有节点,包括从中提交的节点。您可以使用 filterTargets 方法来控制在哪些节点上执行任务,如 部分所述。

9.1.1. 过滤执行节点

可以在哪个节点上运行命令。例如,您可能只想在同一机架的机器上运行计算。或者,您可能希望在本地站点和不同站点上执行一次操作。集群执行器可以限制其在相同或不同机器、机架或站点级别发送请求的节点。

SameRack.java

EmbeddedCacheManager manager = ...;
manager.executor().filterTargets(ClusterExecutionPolicy.SAME_RACK).submit(...)
Copy to Clipboard Toggle word wrap

要使用这个拓扑基础过滤,您必须通过 Server Hinting 来启用拓扑了解一致的哈希。

您还可以根据节点的地址使用 predicate 来过滤。这也可以选择与之前的代码片段中基于拓扑的过滤合并。

我们还允许任何方法选择目标节点,使用 Predicate 来过滤哪些节点可以被考虑执行。请注意,这也可以与 Topology 过滤结合使用,以便更精细地控制集群中执行代码的位置。

Predicate.java

EmbeddedCacheManager manager = ...;
// Just filter
manager.executor().filterTargets(a -> a.equals(..)).submit(...)
// Filter only those in the desired topology
manager.executor().filterTargets(ClusterExecutionPolicy.SAME_SITE, a -> a.equals(..)).submit(...)
Copy to Clipboard Toggle word wrap

9.1.2. Timeout(超时)

Cluster Executor 允许在每次调用设置超时。默认为传输配置中配置的分布式同步超时。这个超时可在集群和非集群缓存管理器中工作。当超时过期时,executor 可能会或可能无法中断执行任务的线程。但是,当超时发生任何 消费者将来 时,将完成后一个 TimeoutException。此值可通过 ivoking timeout 方法并提供所需的持续时间来覆盖。

9.1.3. 单一节点子传输

Cluster Executor 也可以在单一节点提交模式下运行,而不是向所有节点提交命令,而是选择通常收到该命令的节点,而是将其提交给一个节点。每个提交都可能会使用不同的节点来执行该任务。这对使用 ClusterExecutor 作为 java.util.concurrent.Executor 非常有用,您可能会注意到 ClusterExecutor 实现。

SingleNode.java

EmbeddedCacheManager manager = ...;
manager.executor().singleNodeSubmission().submit(...)
Copy to Clipboard Toggle word wrap

9.1.3.1. 故障切换

在单节点提交中运行时,可能还希望允许 Cluster Executor 处理在处理给定命令的过程中发生异常的情况,方法是再次重试命令。当发生这种情况时,Cluster Executor 将再次选择一个节点,以重新提交命令,以最多重新提交所需的故障切换尝试。请注意,所选节点可以是通过拓扑或 predicate 检查的任何节点。通过调用覆盖的 singleNodeSubmission 方法来启用故障转移。指定的命令将重新提交到单一节点,直到命令完成且无例外,或者总提交数等于提供的故障转移数。

9.1.4. 示例: PI Approximation

本例演示了如何使用 ClusterExecutor 来估算 PI 的值。

从通过 Cluster Executor 的并行分布式执行中受益。重新调用方括号的区域是 Sa = 4r2,circle 的区域是 Ca=pi598r2。将 r2 从第二个 equation 取代成第一个,它关闭了前者 = 4114 Ca/Sa。现在,我们可以将大量 darts 放入一个方括号中的镜像;如果我们花费了在总圆圈数上,我们会有一个大约 Ca/Sa 值的 darts 比率。由于我们知道,像像 = 4114 Ca/Sa,我们可以轻松派生出实际价值。我们还可以获得更好的应用方案。在以下示例中,我们执行 1 亿的 dart,而不是"kooting"它们,我们可在整个 Data Grid 集群中并行化工作。请注意,这会在 1 的集群中正常工作,但会较慢。

public class PiAppx {

   public static void main (String [] arg){
      EmbeddedCacheManager cacheManager = ..
      boolean isCluster = ..

      int numPoints = 1_000_000_000;
      int numServers = isCluster ? cacheManager.getMembers().size() : 1;
      int numberPerWorker = numPoints / numServers;

      ClusterExecutor clusterExecutor = cacheManager.executor();
      long start = System.currentTimeMillis();
      // We receive results concurrently - need to handle that
      AtomicLong countCircle = new AtomicLong();
      CompletableFuture<Void> fut = clusterExecutor.submitConsumer(m -> {
         int insideCircleCount = 0;
         for (int i = 0; i < numberPerWorker; i++) {
            double x = Math.random();
            double y = Math.random();
            if (insideCircle(x, y))
               insideCircleCount++;
         }
         return insideCircleCount;
      }, (address, count, throwable) -> {
         if (throwable != null) {
            throwable.printStackTrace();
            System.out.println("Address: " + address + " encountered an error: " + throwable);
         } else {
            countCircle.getAndAdd(count);
         }
      });
      fut.whenComplete((v, t) -> {
         // This is invoked after all nodes have responded with a value or exception
         if (t != null) {
            t.printStackTrace();
            System.out.println("Exception encountered while waiting:" + t);
         } else {
            double appxPi = 4.0 * countCircle.get() / numPoints;

            System.out.println("Distributed PI appx is " + appxPi +
                  " using " + numServers + " node(s), completed in " + (System.currentTimeMillis() - start) + " ms");
         }
      });

      // May have to sleep here to keep alive if no user threads left
   }

   private static boolean insideCircle(double x, double y) {
      return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2))
            <= Math.pow(0.5, 2);
   }
}
Copy to Clipboard Toggle word wrap
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部