9.13. 例子
字数
字数是经典的,如 map/reduce 模式。假定我们有密钥
public class WordCountExample {
/**
* In this example replace c1 and c2 with
* real Cache references
*
* @param args
*/
public static void main(String[] args) {
Cache<String, String> c1 = ...;
Cache<String, String> c2 = ...;
c1.put("1", "Hello world here I am");
c2.put("2", "Infinispan rules the world");
c1.put("3", "JUDCon is in Boston");
c2.put("4", "JBoss World is in Boston as well");
c1.put("12","JBoss Application Server");
c2.put("15", "Hello world");
c1.put("14", "Infinispan community");
c2.put("15", "Hello world");
c1.put("111", "Infinispan open source");
c2.put("112", "Boston is close to Toronto");
c1.put("113", "Toronto is a capital of Ontario");
c2.put("114", "JUDCon is cool");
c1.put("211", "JBoss World is awesome");
c2.put("212", "JBoss rules");
c1.put("213", "JBoss division of RedHat ");
c2.put("214", "RedHat community");
Map<String, Long> wordCountMap = c1.entrySet().parallelStream()
.map(e -> e.getValue().split("\\s"))
.flatMap(Arrays::stream)
.collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()));
}
}
在这种情况下,执行上一示例中的字数非常简单。
但是,如果我们希望找到示例中最频繁的词语,该怎么做?如果再考虑这种情况,您需要首先计算所有词语并在本地可用。因此,我们有很多选项。
我们可以在收集器上使用 finisher,它会在收集所有结果后在用户线程上调用。已从先前示例中删除了一些冗余行。
public class WordCountExample {
public static void main(String[] args) {
// Lines removed
String mostFrequentWord = c1.entrySet().parallelStream()
.map(e -> e.getValue().split("\\s"))
.flatMap(Arrays::stream)
.collect(() -> Collectors.collectingAndThen(
Collectors.groupingBy(Function.identity(), Collectors.counting()),
wordCountMap -> {
String mostFrequent = null;
long maxCount = 0;
for (Map.Entry<String, Long> e : wordCountMap.entrySet()) {
int count = e.getValue().intValue();
if (count > maxCount) {
maxCount = count;
mostFrequent = e.getKey();
}
}
return mostFrequent;
}));
}
不幸的是,最后一个步骤只会在单个线程中运行,如果我们有很多词语可能会非常慢。可能还有另一种方法使用 Streams 并行化它。
在处理完本地节点之前,我们之前提到过,因此实际上可以使用有关映射结果的流。因此,我们可以对结果使用并行流。
public class WordFrequencyExample {
public static void main(String[] args) {
// Lines removed
Map<String, Long> wordCount = c1.entrySet().parallelStream()
.map(e -> e.getValue().split("\\s"))
.flatMap(Arrays::stream)
.collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()));
Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream().reduce(
(e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);
这样,在计算最频繁的元素时,您仍然可以在本地使用所有内核。
删除特定条目
分布式流也可以用作修改数据的方式。例如,您可以删除包含特定词语的缓存中的所有条目。
public class RemoveBadWords {
public static void main(String[] args) {
// Lines removed
String word = ..
c1.entrySet().parallelStream()
.filter(e -> e.getValue().contains(word))
.forEach((c, e) -> c.remove(e.getKey()));
如果我们仔细注意的是序列化是什么,我们注意到,随着 lambda 捕获的那样,只有相关操作被序列化为其他 nods。但是,真正的保存部分是缓存操作是在主所有者上执行的,从而减少了从缓存中删除这些值所需的网络流量量。因为我们提供一个特殊的 BiConsumer 方法覆盖缓存,因此当每个节点中的调用将缓存传递给 BiConsumer 时,缓存不会被捕获。
以这种方式考虑将每个命令使用 的 一件事情是,底层流没有锁定。缓存移除操作仍然会自然而获得锁定,但该值可能会改变从流发现的结果。这意味着,在流读取但实际删除条目后,可能会更改该条目。
我们专门添加了一个新的变体,名为 LockedStream。
其他示例的方便
Streams API 是一个 JRE 工具,它使用一些示例。只需记住,您的操作需要以某种方式进行 Serializa。