313.4. Spring Boot Auto-Configuration
コンポーネントは、以下に記載される 4 つのオプションをサポートします。
名前 | 説明 | デフォルト | タイプ |
---|---|---|---|
camel.component.spark.enabled | Spark コンポーネントを有効にします。 | true | Boolean |
camel.component.spark.rdd | 計算対象の RDD。オプションは org.apache.spark.api.java.JavaRDDLike タイプです。 | String | |
camel.component.spark.rdd-callback | RDD に対してアクションを実行する関数。オプションは org.apache.camel.component.spark.RddCallback タイプです。 | String | |
camel.component.spark.resolve-property-placeholders | 起動時にコンポーネントがプロパティープレースホルダーを解決するかどうか。String タイプのプロパティーのみがプロパティープレースホルダーを使用できます。 | true | Boolean |
# RDD ジョブ
RDD ジョブを呼び出すには、次の URI を使用します。
Spark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
rdd
オプションは Camel レジストリーの RDD インスタンス (org.apache.spark.api.java.JavaRDDLike
のサブクラス) の名前を参照し、rddCallback
は org.apache.camel.component.spark.RddCallback
インターフェイスの実装を参照します。(レジストリーからも)。RDD コールバックは、特定の RDD に対して入力メッセージを適用するために使用される単一のメソッドを提供します。コールバック計算の結果は、ボディーとしてエクスチェンジに保存されます。
Spark RDD コールバック
public interface RddCallback<T> { T onRdd(JavaRDDLike rdd, Object... payloads); }
public interface RddCallback<T> {
T onRdd(JavaRDDLike rdd, Object... payloads);
}
次のスニペットは、ジョブへの入力としてメッセージを送信し、結果を返す方法を示しています。
Spark ジョブの呼び出し
String pattern = "job input"; long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
Spring Bean として登録された上記のスニペットの RDD コールバックは、次のようになります。
Spark RDD コールバック
@Bean RddCallback<Long> countLinesContaining() { return new RddCallback<Long>() { Long onRdd(JavaRDDLike rdd, Object... payloads) { String pattern = (String) payloads[0]; return rdd.filter({line -> line.contains(pattern)}).count(); } } }
@Bean
RddCallback<Long> countLinesContaining() {
return new RddCallback<Long>() {
Long onRdd(JavaRDDLike rdd, Object... payloads) {
String pattern = (String) payloads[0];
return rdd.filter({line -> line.contains(pattern)}).count();
}
}
}
Spring の RDD 定義は次のようになります。
Spark RDD 定義
@Bean JavaRDDLike myRdd(JavaSparkContext sparkContext) { return sparkContext.textFile("testrdd.txt"); }
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}
313.4.1. RDD コールバックを無効にする
RDD コールバックが Camel パイプラインに値を返さない場合は、null
値を返すか、VoidRddCallback
基本クラスを使用できます。
Spark RDD 定義
@Bean RddCallback<Void> rddCallback() { return new VoidRddCallback() { @Override public void doOnRdd(JavaRDDLike rdd, Object... payloads) { rdd.saveAsTextFile(output.getAbsolutePath()); } }; }
@Bean
RddCallback<Void> rddCallback() {
return new VoidRddCallback() {
@Override
public void doOnRdd(JavaRDDLike rdd, Object... payloads) {
rdd.saveAsTextFile(output.getAbsolutePath());
}
};
}
313.4.2. RDD コールバックの変換
RDD コールバックに送信される入力データのタイプがわかっている場合は、ConvertingRddCallback
を使用して、受信メッセージをコールバックに挿入する前に Camel に自動的に変換させることができます。
Spark RDD 定義
@Bean RddCallback<Long> rddCallback(CamelContext context) { return new ConvertingRddCallback<Long>(context, int.class, int.class) { @Override public Long doOnRdd(JavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }; }; }
@Bean
RddCallback<Long> rddCallback(CamelContext context) {
return new ConvertingRddCallback<Long>(context, int.class, int.class) {
@Override
public Long doOnRdd(JavaRDDLike rdd, Object... payloads) {
return rdd.count() * (int) payloads[0] * (int) payloads[1];
}
};
};
}
313.4.3. アノテーション付き RDD コールバック
おそらく、RDD コールバックを操作する最も簡単な方法は、@RddCallback
アノテーションでマークされたメソッドをクラスに提供することです。
アノテーション付き RDD コールバック定義
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; @Bean RddCallback<Long> rddCallback() { return annotatedRddCallback(new MyTransformation()); } ... import org.apache.camel.component.spark.annotation.RddCallback; public class MyTransformation { @RddCallback long countLines(JavaRDD<String> textFile, int first, int second) { return textFile.count() * first * second; } }
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@Bean
RddCallback<Long> rddCallback() {
return annotatedRddCallback(new MyTransformation());
}
...
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyTransformation {
@RddCallback
long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
}
CamelContext をアノテーション付き RDD コールバックファクトリーメソッドに渡す場合、作成されたコールバックは、入力ペイロードを変換して、アノテーション付きメソッドのパラメーターに一致させることができます。
アノテーション付き RDD コールバックのボディー変換
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; @Bean RddCallback<Long> rddCallback(CamelContext camelContext) { return annotatedRddCallback(new MyTransformation(), camelContext); } ... import org.apache.camel.component.spark.annotation.RddCallback; public class MyTransformation { @RddCallback long countLines(JavaRDD<String> textFile, int first, int second) { return textFile.count() * first * second; } } ... // Convert String "10" to integer long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
return annotatedRddCallback(new MyTransformation(), camelContext);
}
...
import org.apache.camel.component.spark.annotation.RddCallback;
public class MyTransformation {
@RddCallback
long countLines(JavaRDD<String> textFile, int first, int second) {
return textFile.count() * first * second;
}
}
...
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);