10.13. 例子


单词计数

单词数(如果过度使用)是映射/缩减示例。假设我们有一个键 发送的映射,存储在 Data Grid 节点上。key 是一个字符串,每个句子都是字符串,且必须在所有可用的句子范围内出现所有词语。此类分布式任务的实现可定义如下:

public class WordCountExample {

   /**
    * In this example replace c1 and c2 with
    * real Cache references
    *
    * @param args
    */
   public static void main(String[] args) {
      Cache<String, String> c1 = ...;
      Cache<String, String> c2 = ...;

      c1.put("1", "Hello world here I am");
      c2.put("2", "Infinispan rules the world");
      c1.put("3", "JUDCon is in Boston");
      c2.put("4", "JBoss World is in Boston as well");
      c1.put("12","JBoss Application Server");
      c2.put("15", "Hello world");
      c1.put("14", "Infinispan community");
      c2.put("15", "Hello world");

      c1.put("111", "Infinispan open source");
      c2.put("112", "Boston is close to Toronto");
      c1.put("113", "Toronto is a capital of Ontario");
      c2.put("114", "JUDCon is cool");
      c1.put("211", "JBoss World is awesome");
      c2.put("212", "JBoss rules");
      c1.put("213", "JBoss division of RedHat ");
      c2.put("214", "RedHat community");

      Map<String, Long> wordCountMap = c1.entrySet().parallelStream()
         .map(e -> e.getValue().split("\\s"))
         .flatMap(Arrays::stream)
         .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()));
   }
}

在这种情况下,执行上例中的单词计数非常简单。

但是,如果我们希望在示例中找到最频繁的单词,该怎么样?如果您需要第二个内容来考虑这种情况,您需要首先拥有所有词语计算并在本地可用。因此,我们实际上有几个选项。

我们可以在收集器上使用一个完成器,它会在收集所有结果后在用户线程上调用。上例中删除了一些冗余行。

public class WordCountExample {
   public static void main(String[] args) {
      // Lines removed

      String mostFrequentWord = c1.entrySet().parallelStream()
         .map(e -> e.getValue().split("\\s"))
         .flatMap(Arrays::stream)
         .collect(() -> Collectors.collectingAndThen(
            Collectors.groupingBy(Function.identity(), Collectors.counting()),
               wordCountMap -> {
                  String mostFrequent = null;
                  long maxCount = 0;
                     for (Map.Entry<String, Long> e : wordCountMap.entrySet()) {
                        int count = e.getValue().intValue();
                        if (count > maxCount) {
                           maxCount = count;
                           mostFrequent = e.getKey();
                        }
                     }
                     return mostFrequent;
               }));

}

不幸的是,最后一步只在单个线程中运行,如果我们有大量词语可能非常慢。可能是通过 Streams 并行化此操作的另一种方法。

我们在处理后在本地节点之前提到,因此我们可以实际对映射结果使用流。因此,我们可以在结果上使用并行流。

public class WordFrequencyExample {
   public static void main(String[] args) {
      // Lines removed

      Map<String, Long> wordCount = c1.entrySet().parallelStream()
              .map(e -> e.getValue().split("\\s"))
              .flatMap(Arrays::stream)
              .collect(() -> Collectors.groupingBy(Function.identity(), Collectors.counting()));
      Optional<Map.Entry<String, Long>> mostFrequent = wordCount.entrySet().parallelStream().reduce(
              (e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2);

这样,在计算最频繁的元素时,您仍然可以在本地利用所有内核。

删除特定条目

分布式流也可以用作修改其所在的数据的方法。例如,您可能想要删除包含特定单词的缓存中的所有条目。

public class RemoveBadWords {
   public static void main(String[] args) {
      // Lines removed
      String word = ..

      c1.entrySet().parallelStream()
         .filter(e -> e.getValue().contains(word))
         .forEach((c, e) -> c.remove(e.getKey()));

如果我们仔细注意了什么被序列化的内容,我们注意到,只有词语与操作一起被序列化到其他 nods,因为它被 lambda 捕获。但是,实际节省的是,缓存操作是在主所有者上执行,从而减少了从缓存中删除这些值所需的网络流量数量。lambda 不会捕获缓存,因为我们提供了一个特殊的 BiConsumer 方法覆盖,在每个节点上调用缓存到 BiConsumer

以这种方式记住每个命令的一个 事项 是,底层流不会获得锁定。缓存删除操作仍会在自然上获得锁定,但该值可能会从流的意义中有所变化。这意味着,在流读取后可能会更改该条目,但删除实际被删除。

我们专门添加了一个称为 LockedStream 的新变体。

其它示例

Streams API 是一个 JRE 工具,有大量使用它的示例。请记住,您的操作需要以某种方式进行序列化。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.