第 7 章 在网格中执行代码


缓存的主要优点是能够通过其键(甚至跨计算机)快速查找值。事实上,这本身是许多用户使用 Data Grid 的原因。但是,数据网格可提供并非立即明显的更多好处。由于在机器集群中通常使用 Data Grid,所以我们也有可用的功能,可帮助利用整个集群来执行用户所需的工作负载。

7.1. 集群可执行文件

既然您拥有一组计算机,因此最好利用其综合计算能力在所有这些计算机上执行代码。Cache Manager 附带一个 nice 工具,可让您在集群中执行任意代码。请注意,这个功能不需要使用缓存。通过在 EmbeddedCacheManager 上调用 executor ()来检索此集群可执行文件。https://access.redhat.com/webassets/avalon/d/red-hat-data-grid/8.4/api/org/infinispan/manager/ClusterExecutor.html这个 executor 可在集群和非集群配置中实现。

注意

ClusterExecutor 专门设计用来执行代码,这些代码不会依赖于缓存中的数据,而是用来帮助用户在集群中轻松地执行代码。

此管理器专门使用 Java 8 构建,因此在这种情况下,所有方法都采用功能接口作为参数。另外,这些参数也会发送到其他需要被序列化的节点。我们甚至使用一种不错的技巧,以确保我们都立即达到 Serializable。这种情况就是让 参数同时实现 Serializable 和 real 参数类型(例如,可运行或功能)。当确定要调用哪一种方法时,JRE 将选择最具体的类,在这种情况下,您的布局始终是序列化的。也可以使用 Externalizer 来进一步减少消息大小。

默认情况下,经理会将给定命令提交到集群中的所有节点,包括从中提交的节点。您可以使用 中的 filterTargets 方法控制任务在哪些节点上,具体如 节中所述。

7.1.1. 过滤执行节点

可以限制将要运行的命令的节点。例如,您可能希望在同一机架的机器上仅运行计算。或者,您可能要在本地站点内执行一次操作,也可以再次在不同的站点上执行操作。集群 executor 可以限制其将请求发送到同一或不同机器、机架或站点级别的节点。

SameRack.java

EmbeddedCacheManager manager = ...;
manager.executor().filterTargets(ClusterExecutionPolicy.SAME_RACK).submit(...)

要使用此拓扑基本过滤,您必须通过服务器提示启用拓扑感知一致的哈希。

您还可以使用基于节点地址的 predicate 进行过滤。这也可以选择在上一个代码片段中根据拓扑过滤。

我们还允许任何方式选择目标节点,方法使用指示过滤过滤哪些节点可用于执行。请注意,这也可以与 Topology 过滤结合使用,以便更加精细地控制您在集群中执行代码的位置。

Predicate.java

EmbeddedCacheManager manager = ...;
// Just filter
manager.executor().filterTargets(a -> a.equals(..)).submit(...)
// Filter only those in the desired topology
manager.executor().filterTargets(ClusterExecutionPolicy.SAME_SITE, a -> a.equals(..)).submit(...)

7.1.2. Timeout(超时)

集群可执行文件允许为每个调用设置超时。默认为在传输配置中配置的分布式同步超时。此超时在集群和非集群缓存管理器中可以正常工作。执行执行器可能会在超时过期时中断执行任务的线程。但是,当超时发生任何 ConsumerFuture 时,将完成返回 TimeoutException。这个值可通过激活 超时方法并提供 所需的持续时间来覆盖。

7.1.3. 单一节点提交

集群可执行文件也可以以单一节点提交模式运行,而不是将命令提交到所有节点,而是选择通常会收到该命令的节点之一,而是仅将其提交到一个节点。每个提交都会使用不同的节点来在其上执行任务。使用 ClusterExecutor 作为 java.util.concurrent.Executor,您可能会注意到 ClusterExecutor 实施,这非常有用。

SingleNode.java

EmbeddedCacheManager manager = ...;
manager.executor().singleNodeSubmission().submit(...)

7.1.3.1. 故障切换

当在单个节点中运行时,可能还需要允许 Cluster Executor 处理给定命令处理给定命令时出现异常情况,方法是重新重试 命令。当发生这种情况时,Cluster Executor 将再次选择一个节点,以将命令重新提交到所需的故障转移尝试数量。请注意,所选节点可以是通过拓扑或 predicate 检查的任何节点。通过调用覆盖的 singleNodeSubmission 方法来启用故障切换。指定的命令将再次重新提交到单个节点,直到命令完成前没有例外,或者提交总数等于提供的故障转移计数。

7.1.4. 示例:PI Approximation

本例演示了如何使用 ClusterExecutor 来估算 PI 的值。

通过集群可执行文件,传送传送可能会大大受益于并行分布式执行。回想一下,方括号中的区域为 Sa = 4r2,圆圈为 Ca=pi*r2。将 r2 替换为第二个 equation 到第一个 pi = 4 * Ca/Sa。现在,我们可以将大量 darts 拍摄到一个平方;如果我们占有 darts shot 的 dart 排在圆圈的比率,则我们会将大量 droximate Ca/Sa 值的比例。既然我们知道,pi = 4 * Ca/Sa,我们可以轻松获得传送价值。我们带来了更优越的应用程序。在以下示例中,我们拍摄了 1 亿多语,而是对整个数据网格集群进行并行化工作,而不是对整个数据网格群集进行并行化。请注意,这会在 1 集群中正常工作,但会较慢。

public class PiAppx {

   public static void main (String [] arg){
      EmbeddedCacheManager cacheManager = ..
      boolean isCluster = ..

      int numPoints = 1_000_000_000;
      int numServers = isCluster ? cacheManager.getMembers().size() : 1;
      int numberPerWorker = numPoints / numServers;

      ClusterExecutor clusterExecutor = cacheManager.executor();
      long start = System.currentTimeMillis();
      // We receive results concurrently - need to handle that
      AtomicLong countCircle = new AtomicLong();
      CompletableFuture<Void> fut = clusterExecutor.submitConsumer(m -> {
         int insideCircleCount = 0;
         for (int i = 0; i < numberPerWorker; i++) {
            double x = Math.random();
            double y = Math.random();
            if (insideCircle(x, y))
               insideCircleCount++;
         }
         return insideCircleCount;
      }, (address, count, throwable) -> {
         if (throwable != null) {
            throwable.printStackTrace();
            System.out.println("Address: " + address + " encountered an error: " + throwable);
         } else {
            countCircle.getAndAdd(count);
         }
      });
      fut.whenComplete((v, t) -> {
         // This is invoked after all nodes have responded with a value or exception
         if (t != null) {
            t.printStackTrace();
            System.out.println("Exception encountered while waiting:" + t);
         } else {
            double appxPi = 4.0 * countCircle.get() / numPoints;

            System.out.println("Distributed PI appx is " + appxPi +
                  " using " + numServers + " node(s), completed in " + (System.currentTimeMillis() - start) + " ms");
         }
      });

      // May have to sleep here to keep alive if no user threads left
   }

   private static boolean insideCircle(double x, double y) {
      return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2))
            <= Math.pow(0.5, 2);
   }
}
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部