313.5. DataFrame 作业


无法使用 RDDs Spark 组件也可以使用 DataFrames。 

要调用 DataFrame 作业,请使用以下 URI:

spark RDD producer

spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation

 其中 dataFrame 选项是指 DataFrame实例的名称( org.apache.spark.sql.Dataset 和 org.apache.spark.sql.Row)的名称,而 dataFrameCallback 则引用 org.apache.camel.component.spark.sql.sparDataFeCallback 接口(也来自 registry)。DataFrame 回调提供了一种单一方法,用于针对给定的 DataFrame 应用传入的消息。回调计算结果保存为交换的正文。

spark RDD 回调

public interface DataFrameCallback<T> {
    T onDataFrame(Dataset<Row> dataFrame, Object... payloads);
}

以下片段演示了如何将消息作为输入发送到作业并返回结果:

调用 spark 任务

String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class);

以 Spring bean 形式注册的代码片段的数据Frame 回调如下所示:

spark RDD 回调

@Bean
RddCallback<Long> findCarWithModel() {
    return new DataFrameCallback<Long>() {
        @Override
        public Long onDataFrame(Dataset<Row> dataFrame, Object... payloads) {
            String model = (String) payloads[0];
            return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count();
        }
    };
}

Spring 中的 DataFrame 定义如下:

spark RDD 定义

@Bean
Dataset<Row> cars(HiveContext hiveContext) {
    Dataset<Row> jsonCars = hiveContext.read().json("/var/data/cars.json");
    jsonCars.registerTempTable("cars");
    return jsonCars;
}
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.