314.5. RDD 作业
要调用 RDD 作业,请使用以下 URI:
Spark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
Where `rdd` option refers to the name of an RDD instance (subclass of
`org.apache.spark.api.java.JavaRDDLike`) from a Camel registry, while
`rddCallback` refers to the implementation
of `org.apache.camel.component.spark.RddCallback` interface (also from a
registry). RDD callback provides a single method used to apply incoming
messages against the given RDD. Results of callback computations are
saved as a body to an exchange.
Spark RDD 回调
public interface RddCallback<T> {
T onRdd(JavaRDDLike rdd, Object... payloads);
}
以下片段演示了如何将消息作为输入发送到作业并返回结果:
调用 spark 作业
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
上面注册的代码片段的 RDD 回调如下:
Spark RDD 回调
@Bean
RddCallback<Long> countLinesContaining() {
return new RddCallback<Long>() {
Long onRdd(JavaRDDLike rdd, Object... payloads) {
String pattern = (String) payloads[0];
return rdd.filter({line -> line.contains(pattern)}).count();
}
}
}
Spring 中的 RDD 定义如下所示:
spark RDD 定义
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}
314.5.1. void RDD 回调 复制链接链接已复制到粘贴板!
如果您的 RDD 回调没有返回任何值返回到 Camel 管道,您可以返回 null 值,或使用 VoidRddCallback 基础类:
spark RDD 定义
@Bean
RddCallback<Void> rddCallback() {
return new VoidRddCallback() {
@Override
public void doOnRdd(JavaRDDLike rdd, Object... payloads) {
rdd.saveAsTextFile(output.getAbsolutePath());
}
};
}
314.5.2. 转换 RDD 回调 复制链接链接已复制到粘贴板!
如果您知道将哪些输入数据发送到 RDD 回调,您可以使用 ConvertingRddCallback,并让 Camel 在将它们插入到回调前自动转换传入的消息:
spark RDD 定义
@Bean
RddCallback<Long> rddCallback(CamelContext context) {
return new ConvertingRddCallback<Long>(context, int.class, int.class) {
@Override
public Long doOnRdd(JavaRDDLike rdd, Object... payloads) {
return rdd.count() * (int) payloads[0] * (int) payloads[1];
}
};
};
}
314.5.3. 注解的 RDD 回调 复制链接链接已复制到粘贴板!
使用 RDD 回调的最简单方法是为类提供标记为 @RddCallback 注释的方法:
注解的 RDD 回调定义
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@Bean
RddCallback<Long> rddCallback() {
return annotatedRddCallback(new MyTransformation());
}
...
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyTransformation {
@RddCallback
long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
}
如果您将 CamelContext 传递给注解的 RDD 回调工厂方法,则创建的回调将能够转换传入的有效负载以匹配被注释的方法的参数:
注解的 RDD 回调的正文转换
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
return annotatedRddCallback(new MyTransformation(), camelContext);
}
...
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyTransformation {
@RddCallback
long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
}
...
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);