10.6. distribution/Replication/Scattered
这是流进入它们的位置。执行流操作时,它将向具有特定数据的每个节点发送各种中间和终端操作。这允许在拥有数据的节点中处理中间值,并且只能将最终结果发送到原始节点,从而提高性能。
10.6.1. Rehash Aware 复制链接链接已复制到粘贴板!
数据内部被分段,每个节点仅在它拥有为主所有者的数据上执行操作。这允许平均处理数据,假设片段足以提供每个节点上的相同数据。
当您使用分布式缓存时,当新节点加入或离开时,可以在节点间重新生成数据。分布式流会自动处理这些数据,因此您不必在节点离开或加入集群时担心监控。Reshuffled 条目可能会第二次处理,并且我们在密钥级别或网段级别(取决于终端操作)跟踪已处理的条目,以限制重复处理的数量。
有可能禁用对流的重新哈希感知。只有在您的请求只能看到重新哈希时,才应考虑这一点。这可以通过调用 CacheStream.disableRehashAware () 来达到大多数操作的性能,当无法完全发生重新哈希时。唯一的例外是迭代器和各自使用较少的内存,因为它们不必跟踪处理的密钥。
请参考禁用重新哈希感知,除非您真正知道您的功能。
10.6.2. 序列化 复制链接链接已复制到粘贴板!
由于操作发送到其他节点,它们必须可以被 Data Grid marshalling 序列化。这允许将操作发送到其他节点。
最简单的方法是使用 CacheStream 实例,并像通常一样使用 lambda。Data Grid 会覆盖所有各种流中间和终端方法,以获取参数的 Serializable 版本(如 SerializableFunction、SerializablePredicate…)您可以在 CacheStream 中找到这些方法。这依赖于 spec 来选择 此处定义的 最具体的方法。
在上例中,我们使用 Collector 将所有结果收集到 映射中。不幸的是 Collectors 类不会生成 Serializable 实例。因此,如果您需要使用这些方法,可以通过两种方式完成此操作:
一个选项是使用 CacheCollectors 类,允许提供一个 Supplier<Collector& gt;。然后,此实例可以使用 Collector 提供没有序列化的 Collector。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html
Map<Object, String> jbossValues = cache.entrySet().stream()
.filter(e -> e.getValue().contains("Jboss"))
.collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
或者,您可以避免使用 CacheCollectors,而是使用使用 Supplier<Collector > 的超载 收集 方法。这些超载 收集 方法只能通过 CacheStream 接口提供。
Map<Object, String> jbossValues = cache.entrySet().stream()
.filter(e -> e.getValue().contains("Jboss"))
.collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
但是,如果您无法使用 Cache 和 CacheStream 接口,则无法使用 参数,您必须通过将 lambdas 广播到多个接口来手动分配 lambda。它并不是一个用户友善的,但会完成该作业。
Serializable
Map<Object, String> jbossValues = map.entrySet().stream()
.filter((Serializable & Predicate<Map.Entry<Object, String>>) e -> e.getValue().contains("Jboss"))
.collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
建议和最常见的方法是使用 AdvancedExternalizer,因为这提供了最小的有效负载。不幸的是,在手动之前,您无法使用 lamdbas 作为高级外部化器需要定义类。
您可以使用高级外部化程序,如下所示:
Map<Object, String> jbossValues = cache.entrySet().stream()
.filter(new ContainsFilter("Jboss"))
.collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
class ContainsFilter implements Predicate<Map.Entry<Object, String>> {
private final String target;
ContainsFilter(String target) {
this.target = target;
}
@Override
public boolean test(Map.Entry<Object, String> e) {
return e.getValue().contains(target);
}
}
class JbossFilterExternalizer implements AdvancedExternalizer<ContainsFilter> {
@Override
public Set<Class<? extends ContainsFilter>> getTypeClasses() {
return Util.asSet(ContainsFilter.class);
}
@Override
public Integer getId() {
return CUSTOM_ID;
}
@Override
public void writeObject(ObjectOutput output, ContainsFilter object) throws IOException {
output.writeUTF(object.target);
}
@Override
public ContainsFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException {
return new ContainsFilter(input.readUTF());
}
}
您还可以将高级外部化器用于收集器供应商,以进一步减小有效负载大小。
Map<Object, String> map = (Map<Object, String>) cache.entrySet().stream()
.filter(new ContainsFilter("Jboss"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
class ToMapCollectorSupplier<K, U> implements Supplier<Collector<Map.Entry<K, U>, ?, Map<K, U>>> {
static final ToMapCollectorSupplier INSTANCE = new ToMapCollectorSupplier();
private ToMapCollectorSupplier() { }
@Override
public Collector<Map.Entry<K, U>, ?, Map<K, U>> get() {
return Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue);
}
}
class ToMapCollectorSupplierExternalizer implements AdvancedExternalizer<ToMapCollectorSupplier> {
@Override
public Set<Class<? extends ToMapCollectorSupplier>> getTypeClasses() {
return Util.asSet(ToMapCollectorSupplier.class);
}
@Override
public Integer getId() {
return CUSTOM_ID;
}
@Override
public void writeObject(ObjectOutput output, ToMapCollectorSupplier object) throws IOException {
}
@Override
public ToMapCollectorSupplier readObject(ObjectInput input) throws IOException, ClassNotFoundException {
return ToMapCollectorSupplier.INSTANCE;
}
}