314.5. RDD 作业
要调用 RDD 作业,请使用以下 URI:
spark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
Where `rdd` option refers to the name of an RDD instance (subclass of `org.apache.spark.api.java.JavaRDDLike`) from a Camel registry, while `rddCallback` refers to the implementation of `org.apache.camel.component.spark.RddCallback` interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.
spark RDD 回调
public interface RddCallback<T> { T onRdd(JavaRDDLike rdd, Object... payloads); }
以下片段演示了如何将消息作为输入发送到作业并返回结果:
调用 spark 作业
String pattern = "job input"; long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
上面注册的代码片段的 RDD 回调,如下所示:
spark RDD 回调
@Bean RddCallback<Long> countLinesContaining() { return new RddCallback<Long>() { Long onRdd(JavaRDDLike rdd, Object... payloads) { String pattern = (String) payloads[0]; return rdd.filter({line -> line.contains(pattern)}).count(); } } }
Spring 中的 RDD 定义可能如下所示:
spark RDD 定义
@Bean JavaRDDLike myRdd(JavaSparkContext sparkContext) { return sparkContext.textFile("testrdd.txt"); }
314.5.1. void RDD 回调
如果您的 RDD 回调没有返回任何值回 Camel 管道,您可以返回 null
值或使用 VoidRddCallback
基础类:
spark RDD 定义
@Bean RddCallback<Void> rddCallback() { return new VoidRddCallback() { @Override public void doOnRdd(JavaRDDLike rdd, Object... payloads) { rdd.saveAsTextFile(output.getAbsolutePath()); } }; }
314.5.2. 转换 RDD 回调
如果您知道将哪些类型的输入数据发送到 RDD 回调,您可以使用 ConvertingRddCallback
,并让 Camel 在将它们插入到回调前自动转换传入的信息:
spark RDD 定义
@Bean RddCallback<Long> rddCallback(CamelContext context) { return new ConvertingRddCallback<Long>(context, int.class, int.class) { @Override public Long doOnRdd(JavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }; }; }
314.5.3. 注解的 RDD 回调
使用 RDD 回调的最简单方法是为类提供标记为 @RddCallback
注释的方法:
注解的 RDD 回调定义
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; @Bean RddCallback<Long> rddCallback() { return annotatedRddCallback(new MyTransformation()); } ... import org.apache.camel.component.spark.annotation.RddCallback; public class MyTransformation { @RddCallback long countLines(JavaRDD<String> textFile, int first, int second) { return textFile.count() * first * second; } }
如果您将 CamelContext 传递给注解的 RDD 回调工厂方法,创建的回调将能够转换传入的有效负载以匹配注解的方法的参数:
注解的 RDD 回调的正文转换
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; @Bean RddCallback<Long> rddCallback(CamelContext camelContext) { return annotatedRddCallback(new MyTransformation(), camelContext); } ... import org.apache.camel.component.spark.annotation.RddCallback; public class MyTransformation { @RddCallback long countLines(JavaRDD<String> textFile, int first, int second) { return textFile.count() * first * second; } } ... // Convert String "10" to integer long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);