311.3.3. 查询参数(6 参数):


Expand
名称描述默认类型

collect (producer)

指明是否应该收集或计算结果。

true

布尔值

dataFrame (producer)

对计算数据进行计算.

 

DataFrame

dataFrameCallback (producer)

对 DataFrame 进行功能执行。

 

DataFrameCallback

rdd (producer)

用于计算的 RDD,

 

JavaRDDLike

rddCallback (producer)

针对 RDD 执行操作的功能.

 

RddCallback

同步 (高级)

设置同步处理是否应当严格使用,还是允许 Camel 使用异步处理(如果受支持)。

false

布尔值

  # RDD job 

要调用 RDD 作业,请使用以下 URI:

spark RDD producer

spark:rdd?rdd=#testFileRdd&rddCallback=#transformation

 其中 rdd 选项是指 RDD 实例的名称(subclass of org.apache.spark.api.java.JavaRDDLike),而 rddCallback 指的是 org.apache.camel.component.spark.RddCallback 接口(也来自 registry)的实现。RDD 回调提供单一方法,用于针对给定的 RDD 应用传入的消息。回调计算结果保存为交换的正文。

spark RDD 回调

public interface RddCallback<T> {
    T onRdd(JavaRDDLike rdd, Object... payloads);
}

以下片段演示了如何将消息作为输入发送到作业并返回结果:

调用 spark 任务

String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);

上面注册为 Spring bean 的 RDD 回调可能如下所示:

spark RDD 回调

@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(JavaRDDLike rdd, Object... payloads) {
            String pattern = (String) payloads[0];
            return rdd.filter({line -> line.contains(pattern)}).count();
        }
    }
}

Spring 中的 RDD 定义可能如下所示:

spark RDD 定义

@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
  return sparkContext.textFile("testrdd.txt");
}
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部