311.3.3. 查询参数(6 参数):
| 名称 | 描述 | 默认 | 类型 |
|---|---|---|---|
| collect (producer) | 指明是否应该收集或计算结果。 | true | 布尔值 |
| dataFrame (producer) | 对计算数据进行计算. | DataFrame | |
| dataFrameCallback (producer) | 对 DataFrame 进行功能执行。 | DataFrameCallback | |
| rdd (producer) | 用于计算的 RDD, | JavaRDDLike | |
| rddCallback (producer) | 针对 RDD 执行操作的功能. | RddCallback | |
| 同步 (高级) | 设置同步处理是否应当严格使用,还是允许 Camel 使用异步处理(如果受支持)。 | false | 布尔值 |
# RDD job
要调用 RDD 作业,请使用以下 URI:
spark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
其中 rdd 选项是指 RDD 实例的名称(subclass of org.apache.spark.api.java.JavaRDDLike),而 rddCallback 指的是 org.apache.camel.component.spark.RddCallback 接口(也来自 registry)的实现。RDD 回调提供单一方法,用于针对给定的 RDD 应用传入的消息。回调计算结果保存为交换的正文。
spark RDD 回调
public interface RddCallback<T> {
T onRdd(JavaRDDLike rdd, Object... payloads);
}
以下片段演示了如何将消息作为输入发送到作业并返回结果:
调用 spark 任务
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
上面注册为 Spring bean 的 RDD 回调可能如下所示:
spark RDD 回调
@Bean
RddCallback<Long> countLinesContaining() {
return new RddCallback<Long>() {
Long onRdd(JavaRDDLike rdd, Object... payloads) {
String pattern = (String) payloads[0];
return rdd.filter({line -> line.contains(pattern)}).count();
}
}
}
Spring 中的 RDD 定义可能如下所示:
spark RDD 定义
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}