Fuse 6 is no longer supported
As of February 2025, Red Hat Fuse 6 is no longer supported. If you are using Fuse 6, please upgrade to Red Hat build of Apache Camel.第156章 Apache Spark
Apache Spark コンポーネント リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
このドキュメントページでは、Apache Camel の Apache Spark コンポーネントについて説明します。Camel との Spark インテグレーションの主な目的は、Camel コネクターと Spark タスクの間にブリッジを提供することです。特に Camel コネクターは、さまざまなトランスポートからメッセージをルーティングする方法を提供し、実行するタスクを動的に選択し、受信メッセージをそのタスクの入力データとして使用し、最後に実行の結果を Camel パイプラインに戻します。
注記
Apache Spark は Apache Karaf ではサポートされていません。
サポート対象のアーキテクチャースタイル リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
Spark コンポーネントは、アプリケーションサーバーにデプロイされるドライバーアプリケーションとして使用できます(または fat jar として実行)。
Spark コンポーネントは、Spark クラスターに直接ジョブとして送信することもできます。
Spark コンポーネントは、Spark クラスターと他のエンドポイント間のブリッジとして機能する 長時間実行されるジョブ として機能するように設計されていますが、これは 1 回 限りの短いジョブとして使用することもできます。
OSGi サーバーでの Spark の実行 リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
現在、Spark コンポーネントは OSGi コンテナーの実行をサポートしていません。Spark は、fat jar として実行されるように設計されています。通常はジョブとしてクラスターに送信されます。OSGi サーバーで Spark を実行する理由は、少なくとも困難であり、Camel ではサポートされません。
URI 形式 リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
現在、Spark コンポーネントはプロデューサーのみをサポートします。これは、Spark ジョブを呼び出して結果を返すことを目的としています。RDD、データフレーム、または Hive SQL ジョブを呼び出すことができます。
spark:{rdd|dataframe|hive}
spark:{rdd|dataframe|hive}
RDD ジョブ リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
RDD ジョブを呼び出すには、以下の URI を使用します。
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
rdd
オプションは Camel レジストリーから RDD インスタンスの名前( org.apache.spark.api.java.JavaRDDLike
のサブクラス)を指し、rddCallback
は org.apache.camel.component.spark.RddCallback
インターフェイス(別名レジストリー)の実装を指します。RDD コールバックは、指定の RDD に対して受信メッセージを適用するために使用される単一のメソッドを提供します。コールバック計算の結果は、エクスチェンジへのボディーとして保存されます。
public interface RddCallback<T> { T onRdd(JavaRDDLike rdd, Object... payloads); }
public interface RddCallback<T> {
T onRdd(JavaRDDLike rdd, Object... payloads);
}
以下のスニペットは、メッセージを入力としてジョブに送信し、結果を返す方法を示しています。
String pattern = "job input"; long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
上記の Spring Bean として登録されたスニペットの RDD コールバックは、以下のようになります。
Spring の RDD 定義は以下のようになります。
@Bean JavaRDDLike myRdd(JavaSparkContext sparkContext) { return sparkContext.textFile("testrdd.txt"); }
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}
RDD ジョブオプション リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
オプション | 説明 | デフォルト値 |
---|---|---|
rdd
|
RDD インスタンス(サブクラス: org.apache.spark.api.java.JavaRDDLike )
|
null
|
rddCallback
|
org.apache.camel.component.spark.RddCallback インターフェイスのインスタンス。
|
null
|
void RDD コールバック リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
RDD コールバックが Camel パイプラインに値を返さない場合は、
null
値を返すか、VoidRddCallback
ベースクラスを使用できます。
RDD コールバックの変換 リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
RDD コールバックに送信される入力データのタイプが分かっている場合は、
ConvertingRddCallback
を使用し、Camel がそれらをコールバックに挿入する前に受信メッセージを自動的に変換させることができます。
アノテーション付きの RDD コールバック リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
RDD コールバックを使用する最も簡単な方法は、
@RddCallback
アノテーションが付けられたメソッドをクラスに提供することです。
CamelContext をアノテーション付きの RDD コールバックファクトリーメソッドに渡すと、作成されたコールバックは受信ペイロードをアノテーション付きメソッドのパラメーターに一致するように変換できます。
DataFrame ジョブ リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
RDDs Spark コンポーネントを操作する代わりに、DataFrames でも機能します。
DataFrame ジョブを呼び出すには、以下の URI を使用します。
spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation
spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation
dataFrame
オプションは Camel レジストリーからのデータフレームインスタンスの名前( org.apache.spark.sql.DataFrame
のインスタンス)を指し、dataFrameCallback
は org.apache.camel.component.spark.DataFrameCallback
インターフェイスの実装(またはレジストリーから)を指します。DataFrame コールバックは、指定の DataFrame に対して受信メッセージを適用するために使用される単一のメソッドを提供します。コールバック計算の結果は、エクスチェンジへのボディーとして保存されます。
public interface DataFrameCallback<T> { T onDataFrame(DataFrame dataFrame, Object... payloads); }
public interface DataFrameCallback<T> {
T onDataFrame(DataFrame dataFrame, Object... payloads);
}
以下のスニペットは、メッセージを入力としてジョブに送信し、結果を返す方法を示しています。
String model = "Micra"; long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class);
String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class);
上記のスニペットの DataFrame コールバック(Spring Bean として登録されたもの)は、以下のようになります。
Spring の DataFrame 定義は以下のようになります。
DataFrame ジョブオプション リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
オプション | 説明 | デフォルト値 |
---|---|---|
dataFrame
|
DataFrame インスタンス(サブクラス: org.apache.spark.sql.DataFrame )
|
null
|
dataFrameCallback
|
org.apache.camel.component.spark.DataFrameCallback インターフェイスのインスタンス。
|
null
|
Hive ジョブ リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
RDD または DataFrame Spark コンポーネントを使用する代わりに、Hive SQL クエリーをペイロードとして受信することもできます。Hive クエリーを Spark コンポーネントに送信するには、以下の URI を使用します。
spark:hive
spark:hive
以下のスニペットは、メッセージを入力としてジョブに送信し、結果を返す方法を示しています。
long carsCount = template.requestBody("spark:hive?collect=false", "SELECT * FROM cars", Long.class); List<Row> cars = template.requestBody("spark:hive", "SELECT * FROM cars", List.class);
long carsCount = template.requestBody("spark:hive?collect=false", "SELECT * FROM cars", Long.class);
List<Row> cars = template.requestBody("spark:hive", "SELECT * FROM cars", List.class);
クエリーを実行するテーブルは、クエリーをクエリーする前に HiveContext に登録する必要があります。たとえば、Spring ではこのような登録は以下のようになります。
Hive ジョブオプション リンクのコピーリンクがクリップボードにコピーされました!
リンクのコピーリンクがクリップボードにコピーされました!
オプション | 説明 | デフォルト値 |
---|---|---|
collect
|
結果を( org.apache.spark.sql.Row インスタンスのリストとして)収集するか、または count() をそれらに対して呼び出す必要があるかどうかを示します。
|
true
|