314.6. DataFrame 作业


除了使用 RDDs Spark 组件外,也可以使用 DataFrames。

要调用 DataFrame 作业,请使用以下 URI:

spark RDD producer

spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation
 Where `dataFrame` option refers to the name of an DataFrame instance
(`instances of org.apache.spark.sql.Dataset and org.apache.spark.sql.Row`) from a Camel registry,
while `dataFrameCallback` refers to the implementation
of `org.apache.camel.component.spark.DataFrameCallback` interface (also
from a registry). DataFrame callback provides a single method used to
apply incoming messages against the given DataFrame. Results of callback
computations are saved as a body to an exchange.

spark RDD 回调

public interface DataFrameCallback<T> {
    T onDataFrame(Dataset<Row> dataFrame, Object... payloads);
}

以下片段演示了如何将消息作为输入发送到作业并返回结果:

调用 spark 作业

String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class);

上面注册的代码片段的 DataFrame 回调,如下所示:

spark RDD 回调

@Bean
RddCallback<Long> findCarWithModel() {
    return new DataFrameCallback<Long>() {
        @Override
        public Long onDataFrame(Dataset<Row> dataFrame, Object... payloads) {
            String model = (String) payloads[0];
            return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count();
        }
    };
}

Spring 中的 DataFrame 定义可能如下所示:

spark RDD 定义

@Bean
Dataset<Row> cars(HiveContext hiveContext) {
    Dataset<Row> jsonCars = hiveContext.read().json("/var/data/cars.json");
    jsonCars.registerTempTable("cars");
    return jsonCars;
}
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.