第156章 Apache Spark


Apache Spark コンポーネント

このドキュメントページでは、Apache Camel の Apache Spark コンポーネントについて説明します。Camel との Spark インテグレーションの主な目的は、Camel コネクターと Spark タスクの間にブリッジを提供することです。特に Camel コネクターは、さまざまなトランスポートからメッセージをルーティングする方法を提供し、実行するタスクを動的に選択し、受信メッセージをそのタスクの入力データとして使用し、最後に実行の結果を Camel パイプラインに戻します。
注記
Apache Spark は Apache Karaf ではサポートされていません。

サポート対象のアーキテクチャースタイル

Spark コンポーネントは、アプリケーションサーバーにデプロイされるドライバーアプリケーションとして使用できます(または fat jar として実行)。
Spark コンポーネントは、Spark クラスターに直接ジョブとして送信することもできます。
Spark コンポーネントは、Spark クラスターと他のエンドポイント間のブリッジとして機能する 長時間実行されるジョブ として機能するように設計されていますが、これは 1 回 限りの短いジョブとして使用することもできます。

OSGi サーバーでの Spark の実行

現在、Spark コンポーネントは OSGi コンテナーの実行をサポートしていません。Spark は、fat jar として実行されるように設計されています。通常はジョブとしてクラスターに送信されます。OSGi サーバーで Spark を実行する理由は、少なくとも困難であり、Camel ではサポートされません。

URI 形式

現在、Spark コンポーネントはプロデューサーのみをサポートします。これは、Spark ジョブを呼び出して結果を返すことを目的としています。RDD、データフレーム、または Hive SQL ジョブを呼び出すことができます。
spark:{rdd|dataframe|hive}
Copy to Clipboard Toggle word wrap

RDD ジョブ

RDD ジョブを呼び出すには、以下の URI を使用します。
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
Copy to Clipboard Toggle word wrap
rdd オプションは Camel レジストリーから RDD インスタンスの名前( org.apache.spark.api.java.JavaRDDLikeのサブクラス)を指し、rddCallbackorg.apache.camel.component.spark.RddCallback インターフェイス(別名レジストリー)の実装を指します。RDD コールバックは、指定の RDD に対して受信メッセージを適用するために使用される単一のメソッドを提供します。コールバック計算の結果は、エクスチェンジへのボディーとして保存されます。
public interface RddCallback<T> {
    T onRdd(JavaRDDLike rdd, Object... payloads);
}
Copy to Clipboard Toggle word wrap
以下のスニペットは、メッセージを入力としてジョブに送信し、結果を返す方法を示しています。
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
Copy to Clipboard Toggle word wrap
上記の Spring Bean として登録されたスニペットの RDD コールバックは、以下のようになります。
@Bean
RddCallback<Long> countLinesContaining() {
    return new RddCallback<Long>() {
        Long onRdd(JavaRDDLike rdd, Object... payloads) {
            String pattern = (String) payloads[0];
            return rdd.filter({line -> line.contains(pattern)}).count();
        }
    }
}
Copy to Clipboard Toggle word wrap
Spring の RDD 定義は以下のようになります。
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
  return sparkContext.textFile("testrdd.txt");
}
Copy to Clipboard Toggle word wrap

RDD ジョブオプション

Expand
オプション説明デフォルト値
rdd RDD インスタンス(サブクラス: org.apache.spark.api.java.JavaRDDLike null
rddCallback org.apache.camel.component.spark.RddCallback インターフェイスのインスタンス。 null

void RDD コールバック

RDD コールバックが Camel パイプラインに値を返さない場合は、null 値を返すか、VoidRddCallback ベースクラスを使用できます。
@Bean
RddCallback<Void> rddCallback() {
  return new VoidRddCallback() {
        @Override
        public void doOnRdd(JavaRDDLike rdd, Object... payloads) {
            rdd.saveAsTextFile(output.getAbsolutePath());
        }
    };
}
Copy to Clipboard Toggle word wrap

RDD コールバックの変換

RDD コールバックに送信される入力データのタイプが分かっている場合は、ConvertingRddCallback を使用し、Camel がそれらをコールバックに挿入する前に受信メッセージを自動的に変換させることができます。
@Bean
RddCallback<Long> rddCallback(CamelContext context) {
  return new ConvertingRddCallback<Long>(context, int.class, int.class) {
            @Override
            public Long doOnRdd(JavaRDDLike rdd, Object... payloads) {
                return rdd.count() * (int) payloads[0] * (int) payloads[1];
            }
        };
    };
}
Copy to Clipboard Toggle word wrap

アノテーション付きの RDD コールバック

RDD コールバックを使用する最も簡単な方法は、@RddCallback アノテーションが付けられたメソッドをクラスに提供することです。
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
 
@Bean
RddCallback<Long> rddCallback() {
    return annotatedRddCallback(new MyTransformation());
}
 
...
 
import org.apache.camel.component.spark.annotation.RddCallback;
 
public class MyTransformation {
 
    @RddCallback
    long countLines(JavaRDD<String> textFile, int first, int second) {
        return textFile.count() * first * second;
    }
 
}
Copy to Clipboard Toggle word wrap
CamelContext をアノテーション付きの RDD コールバックファクトリーメソッドに渡すと、作成されたコールバックは受信ペイロードをアノテーション付きメソッドのパラメーターに一致するように変換できます。
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback;
 
@Bean
RddCallback<Long> rddCallback(CamelContext camelContext) {
    return annotatedRddCallback(new MyTransformation(), camelContext);
}
 
...

 
import org.apache.camel.component.spark.annotation.RddCallback;
 
public class MyTransformation {
 
    @RddCallback
    long countLines(JavaRDD<String> textFile, int first, int second) {
        return textFile.count() * first * second;
    }
 
}
 
...
 
// Convert String "10" to integer
long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);
Copy to Clipboard Toggle word wrap

DataFrame ジョブ

RDDs Spark コンポーネントを操作する代わりに、DataFrames でも機能します。
DataFrame ジョブを呼び出すには、以下の URI を使用します。
spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation
Copy to Clipboard Toggle word wrap
dataFrame オプションは Camel レジストリーからのデータフレームインスタンスの名前( org.apache.spark.sql.DataFrameのインスタンス)を指し、dataFrameCallbackorg.apache.camel.component.spark.DataFrameCallback インターフェイスの実装(またはレジストリーから)を指します。DataFrame コールバックは、指定の DataFrame に対して受信メッセージを適用するために使用される単一のメソッドを提供します。コールバック計算の結果は、エクスチェンジへのボディーとして保存されます。
public interface DataFrameCallback<T> {
    T onDataFrame(DataFrame dataFrame, Object... payloads);
}
Copy to Clipboard Toggle word wrap
以下のスニペットは、メッセージを入力としてジョブに送信し、結果を返す方法を示しています。
String model = "Micra";
long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class);
Copy to Clipboard Toggle word wrap
上記のスニペットの DataFrame コールバック(Spring Bean として登録されたもの)は、以下のようになります。
@Bean
RddCallback<Long> findCarWithModel() {
    return new DataFrameCallback<Long>() {
    	@Override
    	public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
        	String model = (String) payloads[0];
        	return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count();
    	}
	};
}
Copy to Clipboard Toggle word wrap
Spring の DataFrame 定義は以下のようになります。
@Bean
DataFrame cars(HiveContext hiveContext) {
  	DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
 	jsonCars.registerTempTable("cars");
	return jsonCars;
}
Copy to Clipboard Toggle word wrap

DataFrame ジョブオプション

Expand
オプション説明デフォルト値
dataFrame DataFrame インスタンス(サブクラス: org.apache.spark.sql.DataFrame null
dataFrameCallback org.apache.camel.component.spark.DataFrameCallback インターフェイスのインスタンス。 null

Hive ジョブ

RDD または DataFrame Spark コンポーネントを使用する代わりに、Hive SQL クエリーをペイロードとして受信することもできます。Hive クエリーを Spark コンポーネントに送信するには、以下の URI を使用します。
spark:hive
Copy to Clipboard Toggle word wrap
以下のスニペットは、メッセージを入力としてジョブに送信し、結果を返す方法を示しています。
long carsCount = template.requestBody("spark:hive?collect=false", "SELECT * FROM cars", Long.class);
List<Row> cars = template.requestBody("spark:hive", "SELECT * FROM cars", List.class);
Copy to Clipboard Toggle word wrap
クエリーを実行するテーブルは、クエリーをクエリーする前に HiveContext に登録する必要があります。たとえば、Spring ではこのような登録は以下のようになります。
@Bean
DataFrame cars(HiveContext hiveContext) {
  	DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json");
 	jsonCars.registerTempTable("cars");
	return jsonCars;
}
Copy to Clipboard Toggle word wrap

Hive ジョブオプション

Expand
オプション説明デフォルト値
collect 結果を( org.apache.spark.sql.Row インスタンスのリストとして)収集するか、または count() をそれらに対して呼び出す必要があるかどうかを示します。 true
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat