292.3.3. クエリーパラメーター(6 パラメーター):
| Name | 説明 | デフォルト | Type |
|---|---|---|---|
| collect (プロデューサー) | 結果を収集またはカウントするかどうかを示します。 | true | boolean |
| dataFrame (producer) | 計算先となる DataFrame。 | DataFrame | |
| dataFrameCallback (producer) | DataFrame に対するアクション実行関数。 | DataFrameCallback | |
| rdd (producer) | 計算対象である RDD。 | JavaRDDLike | |
| rddCallback (producer) | RDD に対するアクションの実行関数。 | RddCallback | |
| 同期 (詳細) | 同期処理を厳密に使用するか、Camel が非同期処理を使用できるようにするかを設定します(サポートされている場合)。 | false | boolean |
# RDD jobs
RDD ジョブを呼び出すには、以下の URI を使用します。
Spark RDD プロデューサー
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
rdd オプションは、Camel レジストリーから RDD インスタンスの名前( org.apache.spark.api.java.JavaRDDLikeのサブクラス)を参照しますが、rddCallback は org.apache.camel.component.spark.RddCallback インターフェース(レジストリーとも呼ばれます)の実装を参照します。RDD コールバックは、指定の RDD に対して受信メッセージを適用する単一の方法を提供します。コールバック計算の結果は、エクスチェンジのボディーとして保存されます。
Spark RDD コールバック
public interface RddCallback<T> {
T onRdd(JavaRDDLike rdd, Object... payloads);
}
public interface RddCallback<T> {
T onRdd(JavaRDDLike rdd, Object... payloads);
}
以下のスニペットは、メッセージをジョブへの入力として送信し、結果を返す方法を示しています。
スパークジョブの呼び出し
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
Spring Bean として登録された上記のスニペットの RDD コールバックは、以下のようになります。
Spark RDD コールバック
Spring の RDD 定義は以下のようになります。
Spark RDD の定義
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}