292.3. URI 形式
現在、Spark コンポーネントはプロデューサーのみをサポートしています。これは、Spark ジョブを呼び出して結果を返すことを目的としています。RDD、データフレーム、または Hive SQL ジョブを呼び出すことができます。
Spark URI 形式
spark:{rdd|dataframe|hive}
spark:{rdd|dataframe|hive}
292.3.1. Spark オプション リンクのコピーリンクがクリップボードにコピーされました!
Apache Spark コンポーネントは、以下に示す 3 個のオプションをサポートしています。
| 名前 | 説明 | デフォルト | タイプ |
|---|---|---|---|
| rdd (producer) | 計算対象の RDD。 | JavaRDDLike | |
| rddCallback (producer) | RDD に対してアクションを実行する関数。 | RddCallback | |
| resolveProperty Placeholders (advanced) | 起動時にコンポーネントがプロパティープレースホルダーを解決するかどうか。String タイプのプロパティーのみがプロパティープレースホルダーを使用できます。 | true | boolean |
Apache Spark エンドポイントは、URI 構文を使用して設定されます。
spark:endpointType
spark:endpointType
パスおよびクエリーパラメーターを使用します。
292.3.2. パスパラメーター (1 個のパラメーター): リンクのコピーリンクがクリップボードにコピーされました!
| 名前 | 説明 | デフォルト | タイプ |
|---|---|---|---|
| endpointType | 必須 エンドポイントのタイプ (rdd、データフレーム、ハイブ)。 | EndpointType |
292.3.3. クエリーパラメーター (6 個のパラメーター): リンクのコピーリンクがクリップボードにコピーされました!
| 名前 | 説明 | デフォルト | タイプ |
|---|---|---|---|
| collect (producer) | 結果を収集またはカウントする必要があるかどうかを示します。 | true | boolean |
| dataFrame (producer) | 計算対象の DataFrame。 | DataFrame | |
| dataFrameCallback (producer) | DataFrame に対してアクションを実行する関数。 | DataFrameCallback | |
| rdd (producer) | 計算対象の RDD。 | JavaRDDLike | |
| rddCallback (producer) | RDD に対してアクションを実行する関数。 | RddCallback | |
| synchronous (advanced) | 同期処理を厳密に使用するか、Camel が非同期処理を使用できるかどうかを設定します (サポートされている場合)。 | false | boolean |
# RDD ジョブ
RDD ジョブを呼び出すには、次の URI を使用します。
Spark RDD producer
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
rdd オプションは Camel レジストリーの RDD インスタンス (org.apache.spark.api.java.JavaRDDLike のサブクラス) の名前を参照し、rddCallback は org.apache.camel.component.spark.RddCallback インターフェイスの実装を参照します。(レジストリーからも)。RDD コールバックは、特定の RDD に対して入力メッセージを適用するために使用される単一のメソッドを提供します。コールバック計算の結果は、ボディーとしてエクスチェンジに保存されます。
Spark RDD コールバック
public interface RddCallback<T> {
T onRdd(JavaRDDLike rdd, Object... payloads);
}
public interface RddCallback<T> {
T onRdd(JavaRDDLike rdd, Object... payloads);
}
次のスニペットは、ジョブへの入力としてメッセージを送信し、結果を返す方法を示しています。
Spark ジョブの呼び出し
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
Spring Bean として登録された上記のスニペットの RDD コールバックは、次のようになります。
Spark RDD コールバック
Spring の RDD 定義は次のようになります。
Spark RDD 定義
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}
292.3.4. RDD コールバックを無効にする リンクのコピーリンクがクリップボードにコピーされました!
RDD コールバックが Camel パイプラインに値を返さない場合は、null 値を返すか、VoidRddCallback 基本クラスを使用できます。
Spark RDD 定義
292.3.5. RDD コールバックの変換 リンクのコピーリンクがクリップボードにコピーされました!
RDD コールバックに送信される入力データのタイプがわかっている場合は、ConvertingRddCallback を使用して、受信メッセージをコールバックに挿入する前に Camel に自動的に変換させることができます。
Spark RDD 定義
292.3.6. アノテーション付き RDD コールバック リンクのコピーリンクがクリップボードにコピーされました!
おそらく、RDD コールバックを操作する最も簡単な方法は、@RddCallback アノテーションでマークされたメソッドをクラスに提供することです。
アノテーション付き RDD コールバック定義
CamelContext をアノテーション付き RDD コールバックファクトリーメソッドに渡す場合、作成されたコールバックは、入力ペイロードを変換して、アノテーション付きメソッドのパラメーターに一致させることができます。
アノテーション付き RDD コールバックのボディー変換