10.6. distribution/Replication/Scattered


这是流进入它们的位置。执行流操作时,它将向具有特定数据的每个节点发送各种中间和终端操作。这允许在拥有数据的节点中处理中间值,并且只能将最终结果发送到原始节点,从而提高性能。

10.6.1. Rehash Aware

数据内部被分段,每个节点仅在它拥有为主所有者的数据上执行操作。这允许平均处理数据,假设片段足以提供每个节点上的相同数据。

当您使用分布式缓存时,当新节点加入或离开时,可以在节点间重新生成数据。分布式流会自动处理这些数据,因此您不必在节点离开或加入集群时担心监控。Reshuffled 条目可能会第二次处理,并且我们在密钥级别或网段级别(取决于终端操作)跟踪已处理的条目,以限制重复处理的数量。

有可能禁用对流的重新哈希感知。只有在您的请求只能看到重新哈希时,才应考虑这一点。这可以通过调用 CacheStream.disableRehashAware () 来达到大多数操作的性能,当无法完全发生重新哈希时。唯一的例外是迭代器和各自使用较少的内存,因为它们不必跟踪处理的密钥。

警告

请参考禁用重新哈希感知,除非您真正知道您的功能。

10.6.2. 序列化

由于操作发送到其他节点,它们必须可以被 Data Grid marshalling 序列化。这允许将操作发送到其他节点。

最简单的方法是使用 CacheStream 实例,并像通常一样使用 lambda。Data Grid 会覆盖所有各种流中间和终端方法,以获取参数的 Serializable 版本(如 SerializableFunction、SerializablePredicate…​)您可以在 CacheStream 中找到这些方法。这依赖于 spec 来选择 此处定义的 最具体的方法。

在上例中,我们使用 Collector 将所有结果收集到 映射中。不幸的是 Collectors 类不会生成 Serializable 实例。因此,如果您需要使用这些方法,可以通过两种方式完成此操作:

一个选项是使用 CacheCollectors 类,允许提供一个 Supplier<Collector& gt;。然后,此实例可以使用 Collector 提供没有序列化的 Collector。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html

Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(e -> e.getValue().contains("Jboss"))
              .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
Copy to Clipboard Toggle word wrap

或者,您可以避免使用 CacheCollectors,而是使用使用 Supplier<Collector > 的超载 收集 方法。这些超载 收集 方法只能通过 CacheStream 接口提供。

Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(e -> e.getValue().contains("Jboss"))
              .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Copy to Clipboard Toggle word wrap

但是,如果您无法使用 CacheCacheStream 接口,则无法使用 Serializable 参数,您必须通过将 lambdas 广播到多个接口来手动分配 lambda。它并不是一个用户友善的,但会完成该作业。

Map<Object, String> jbossValues = map.entrySet().stream()
              .filter((Serializable & Predicate<Map.Entry<Object, String>>) e -> e.getValue().contains("Jboss"))
              .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
Copy to Clipboard Toggle word wrap

建议和最常见的方法是使用 AdvancedExternalizer,因为这提供了最小的有效负载。不幸的是,在手动之前,您无法使用 lamdbas 作为高级外部化器需要定义类。

您可以使用高级外部化程序,如下所示:

   Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(new ContainsFilter("Jboss"))
              .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

   class ContainsFilter implements Predicate<Map.Entry<Object, String>> {
      private final String target;

      ContainsFilter(String target) {
         this.target = target;
      }

      @Override
      public boolean test(Map.Entry<Object, String> e) {
         return e.getValue().contains(target);
      }
   }

   class JbossFilterExternalizer implements AdvancedExternalizer<ContainsFilter> {

      @Override
      public Set<Class<? extends ContainsFilter>> getTypeClasses() {
         return Util.asSet(ContainsFilter.class);
      }

      @Override
      public Integer getId() {
         return CUSTOM_ID;
      }

      @Override
      public void writeObject(ObjectOutput output, ContainsFilter object) throws IOException {
         output.writeUTF(object.target);
      }

      @Override
      public ContainsFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException {
         return new ContainsFilter(input.readUTF());
      }
   }
Copy to Clipboard Toggle word wrap

您还可以将高级外部化器用于收集器供应商,以进一步减小有效负载大小。

Map<Object, String> map = (Map<Object, String>) cache.entrySet().stream()
              .filter(new ContainsFilter("Jboss"))
              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

 class ToMapCollectorSupplier<K, U> implements Supplier<Collector<Map.Entry<K, U>, ?, Map<K, U>>> {
      static final ToMapCollectorSupplier INSTANCE = new ToMapCollectorSupplier();

      private ToMapCollectorSupplier() { }

      @Override
      public Collector<Map.Entry<K, U>, ?, Map<K, U>> get() {
         return Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue);
      }
   }

   class ToMapCollectorSupplierExternalizer implements AdvancedExternalizer<ToMapCollectorSupplier> {

      @Override
      public Set<Class<? extends ToMapCollectorSupplier>> getTypeClasses() {
         return Util.asSet(ToMapCollectorSupplier.class);
      }

      @Override
      public Integer getId() {
         return CUSTOM_ID;
      }

      @Override
      public void writeObject(ObjectOutput output, ToMapCollectorSupplier object) throws IOException {
      }

      @Override
      public ToMapCollectorSupplier readObject(ObjectInput input) throws IOException, ClassNotFoundException {
         return ToMapCollectorSupplier.INSTANCE;
      }
   }
Copy to Clipboard Toggle word wrap
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部