13.13. 例子
字数
单词 count 是典型的,如果过度使用,如映射/缩减范例。假设我们在 Data Grid 节点上存储了 key
public class WordCountExample { /** * In this example replace c1 and c2 with * real Cache references * * @param args */ public static void main(String[] args) { Cache<String, String> c1 = ...; Cache<String, String> c2 = ...; c1.put("1", "Hello world here I am"); c2.put("2", "Infinispan rules the world"); c1.put("3", "JUDCon is in Boston"); c2.put("4", "JBoss World is in Boston as well"); c1.put("12","JBoss Application Server"); c2.put("15", "Hello world"); c1.put("14", "Infinispan community"); c2.put("15", "Hello world"); c1.put("111", "Infinispan open source"); c2.put("112", "Boston is close to Toronto"); c1.put("113", "Toronto is a capital of Ontario"); c2.put("114", "JUDCon is cool"); c1.put("211", "JBoss World is awesome"); c2.put("212", "JBoss rules"); c1.put("213", "JBoss division of RedHat "); c2.put("214", "RedHat community"); Map<String, Long> wordCountMap = c1.entrySet().parallelStream() .map(e -> e.getValue().split("\\s")) .flatMap(Arrays::stream) .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())); } }
在这种情况下,可以简单地执行上例中的单词 count。
但是,如果我们想要在示例中找到最频繁的字词,该怎么办?如果您考虑第二个情况,您将意识到您需要首先计算所有词语,并首先可用。因此,我们实际上有一些选项。
我们可以在收集器上使用一个 finisher,它会在收集所有结果后在用户线程上调用。之前示例中删除了一些冗余行。
public class WordCountExample { public static void main(String[] args) { // Lines removed String mostFrequentWord = c1.entrySet().parallelStream() .map(e -> e.getValue().split("\\s")) .flatMap(Arrays::stream) .collect(() -> Collectors.collectingAndThen( Collectors.groupingBy(Function.identity(), Collectors.counting()), wordCountMap -> { String mostFrequent = null; long maxCount = 0; for (Map.Entry<String, Long> e : wordCountMap.entrySet()) { int count = e.getValue().intValue(); if (count > maxCount) { maxCount = count; mostFrequent = e.getKey(); } } return mostFrequent; })); }
不幸的是,最后一步只会在单一线程中运行,如果我们有很多词语可能非常慢。可以通过另一种方式与 Streams 并行化。
在处理后,我们提到了本地节点,因此我们实际上可以在映射结果上使用流。因此,我们可以在结果中使用并行流。
public class WordFrequencyExample { public static void main(String[] args) { // Lines removed Map<String, Long> wordCount = c1.entrySet().parallelStream() .map(e -> e.getValue().split("\\s")) .flatMap(Arrays::stream) .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting())); Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream().reduce( (e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);
这样,在计算最频繁的元素时,您仍然可以在本地使用所有内核。
删除特定条目
分布式流也可以用作修改它所在数据的方法。例如,您可能想要删除包含特定词语的缓存中的所有条目。
public class RemoveBadWords { public static void main(String[] args) { // Lines removed String word = .. c1.entrySet().parallelStream() .filter(e -> e.getValue().contains(word)) .forEach((c, e) -> c.remove(e.getKey()));
如果我们仔细注意了要序列化的内容,并且没有什么,我们注意到,只有词语以及操作才会按顺序化到其他 nods,因为它由 lambda 捕获。但是,实际保存部分是缓存操作在主所有者上执行,从而减少从缓存中删除这些值所需的网络流量数量。lambda 不会捕获缓存,因为我们在每个节点上调用时提供一个特殊的 BiConsumer 方法覆盖,这会将缓存传递给 BiConsumer
以这种方式考虑使用 for
each 命令的一个问题是底层流没有获得锁定。缓存删除操作仍将自然而获得锁定,但该值可能会从流看到的内容改变。这意味着,在流读取后条目可能会改变,但删除会实际删除它。
我们特别添加了名为 LockedStream
的新变体。
其他示例的 plenty
Streams
API 是一个 JRE 工具,有许多示例使用它。只需记住您的操作需要以某种方式 Serializ。