314.5. RDD 作业


要调用 RDD 作业,请使用以下 URI:

spark RDD producer

spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
 Where `rdd` option refers to the name of an RDD instance (subclass of
`org.apache.spark.api.java.JavaRDDLike`) from a Camel registry, while
`rddCallback` refers to the implementation
of `org.apache.camel.component.spark.RddCallback` interface (also from a
registry). RDD callback provides a single method used to apply incoming
messages against the given RDD. Results of callback computations are
saved as a body to an exchange.

spark RDD 回调

public interface RddCallback<T> {
    T onRdd(JavaRDDLike rdd, Object... payloads);
}

以下片段演示了如何将消息作为输入发送到作业并返回结果:

调用 spark 作业

String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);

上面注册的代码片段的 RDD 回调,如下所示:

spark RDD 回调

@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(JavaRDDLike rdd, Object... payloads) {
            String pattern = (String) payloads[0];
            return rdd.filter({line -> line.contains(pattern)}).count();
        }
    }
}

Spring 中的 RDD 定义可能如下所示:

spark RDD 定义

@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
  return sparkContext.textFile("testrdd.txt");
}

314.5.1. void RDD 回调

如果您的 RDD 回调没有返回任何值回 Camel 管道,您可以返回 null 值或使用 VoidRddCallback 基础类:

spark RDD 定义

@Bean
RddCallback<Void> rddCallback() {
  return new VoidRddCallback() {
        @Override
        public void doOnRdd(JavaRDDLike rdd, Object... payloads) {
            rdd.saveAsTextFile(output.getAbsolutePath());
        }
    };
}

314.5.2. 转换 RDD 回调

如果您知道将哪些类型的输入数据发送到 RDD 回调,您可以使用 ConvertingRddCallback,并让 Camel 在将它们插入到回调前自动转换传入的信息:

spark RDD 定义

@Bean
RddCallback<Long> rddCallback(CamelContext context) {
  return new ConvertingRddCallback<Long>(context, int.class, int.class) {
            @Override
            public Long doOnRdd(JavaRDDLike rdd, Object... payloads) {
                return rdd.count() * (int) payloads[0] * (int) payloads[1];
            }
        };
    };
}

314.5.3. 注解的 RDD 回调

使用 RDD 回调的最简单方法是为类提供标记为 @RddCallback 注释的方法:

注解的 RDD 回调定义

import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;

@Bean
RddCallback<Long> rddCallback() {
    return annotatedRddCallback(new MyTransformation());
}

...

import org.apache.camel.component.spark.annotation.RddCallback;

public class MyTransformation {

    @RddCallback
    long countLines(JavaRDD<String> textFile, int first, int second) {
        return textFile.count() * first * second;
    }

}

如果您将 CamelContext 传递给注解的 RDD 回调工厂方法,创建的回调将能够转换传入的有效负载以匹配注解的方法的参数:

注解的 RDD 回调的正文转换

import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;

@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
    return annotatedRddCallback(new MyTransformation(), camelContext);
}

...


import org.apache.camel.component.spark.annotation.RddCallback;

public class MyTransformation {

    @RddCallback
    long countLines(JavaRDD<String> textFile, int first, int second) {
        return textFile.count() * first * second;
    }

}

...

// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.