第31章 Apache Spark との統合
31.1. Apache Spark との統合
JBoss Data Grid には Spark コネクターが含まれ、Apache Spark との密な統合を提供します。また、Java または Scala で書かれたアプリケーションが JBoss Data Grid をバッキングデータストアとして使用できるようにします。このコネクターには以下のサポートが含まれます。
- キャッシュからの RDD の作成
- キーバリュー RDD のキャッシュへの書き込み
- キャッシュレベルイベントからの DStream の作成
- キーバリュー DStream のキャッシュへの書き込み
Apache Spark のサポートは、リモートクライアントサーバーモードでのみ有効です。
31.2. Spark の依存関係
JBoss Data Grid は、Scala 2.10 をサポートする Apache Spark 1.6 と、Scala 2.11 をサポートする Apache Spark 2.0 の 2 つのバージョンの Apache Spark を提供します。これらのバージョンは、メインのディストリビューションとは別に同梱されます。
Apache Spark のバージョンに応じて、以下の Maven 設定を使用する必要があります。
Spark 1.6 の pom.xml
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-spark</artifactId> <version>0.3.0.Final-redhat-2</version> </dependency>
Spark 2.0 の pom.xml
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-spark</artifactId> <version>0.4.0.Final-redhat-2</version> </dependency>
31.3. サポートされる Spark 設定パラメーター
以下の表は、両方のバージョンでサポートされる Spark 設定パラメーターを表しています。
パラメーター名 | 説明 | デフォルト値 |
---|---|---|
|
JBoss Data Grid ノードのリスト |
localhost:11222 |
|
RDD をバックするキャッシュの名前 |
デフォルトキャッシュ |
|
キャッシュから読み取りするときのバッチサイズ (エントリー数) |
10000 |
|
キャッシュへ書き込みするときのバッチサイズ (エントリー数) |
500 |
|
JBoss Data Grid サーバーごとに作成されるパーティションの数 |
2 |
|
protobuf ファイル名およびコンテンツを持つマップ |
エントリーに protobuf エンコーディング情報のアノテーションが付けられている場合は省略できます。protobuf エンコーディングは、クエリーによる RDD のフィルターに必要です。 |
|
キャッシュにあるオブジェクトの protostream マーシャラークラスのリスト |
エントリーに protobuf エンコーディング情報のアノテーションが付けられている場合は省略できます。protobuf エンコーディングは、クエリーによる RDD のフィルターに必要です。 |
以下は、Spark 2.0 のみによってサポートされる設定パラメーターのリストになります。
パラメーター名 | 説明 | デフォルト値 |
---|---|---|
|
SSL を有効にします |
false |
|
Infinispan サーバーで相互の SSL 認証が有効なときに必要な JKS キーストアファイル名。ファイルパスまたはクラスパスリソースを指定できます。例: | |
|
サーバー証明書が含まれる JKS キーストアパスまたはクラスパス | |
|
キーストアのパスワード | |
|
トラストストアのパスワード |
31.4. RDD の作成および使用
RDD は、表 27.1 の設定で Properties
インスタンスを指定して作成されます。また、RDD を Spark コンテキストとともに使用して、通常の Spark 操作で使用される InfinispanRDD
が作成されます。Java と Scala 両方の例は以下のとおりです。
RDD の作成 (Java)
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.spark.rdd.InfinispanJavaRDD; import java.util.Properties; [...] // Begin by defining a new Spark configuration and creating a Spark context from this. SparkConf conf = new SparkConf().setAppName("example-RDD"); JavaSparkContext jsc = new JavaSparkContext(conf); // Create the Properties instance, containing the JBoss Data Grid node and cache name. Properties properties = new Properties(); properties.put("infinispan.client.hotrod.server_list", "server:11222"); properties.put("infinispan.rdd.cacheName","exampleCache"); // Create the RDD JavaPairRDD<Integer, Book> exampleRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, properties); JavaRDD<Book> booksRDD = exampleRDD.values();
RDD の作成 (Scala)
import java.util.Properties import org.apache.spark.{SparkConf, SparkContext} import org.infinispan.spark.rdd.InfinispanRDD import org.infinispan.spark._ // Begin by defining a new Spark configuration and creating a Spark context from this. val conf = new SparkConf().setAppName("example-RDD-scala") val sc = new SparkContext(conf) // Create the Properties instance, containing the JBoss Data Grid node and cache name. val properties = new Properties properties.put("infinispan.client.hotrod.server_list", "server:11222") properties.put("infinispan.rdd.cacheName", "exampleCache") // Create an RDD from the DataGrid cache val exampleRDD = new InfinispanRDD[Integer, Book](sc, properties) val booksRDD = exampleRDD.values
RDD が利用可能になると、Spark RDD 操作または Spark の SQL サポートのいずれかを使用してバッキングキャッシュを取得できます。上記の例を拡張して author (著者) ごとにエントリーをカウントすると、結果となる RDD と SQL クエリーは次のようになります。
RDD でのクエリー (Java)
// The following imports should be added to the list from the previous example import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; [...] // Continuing the previous example // Create a SQLContext, registering the data frame and table SQLContext sqlContext = new SQLContext(jsc); DataFrame dataFrame = sqlContext.createDataFrame(booksRDD, Book.class); dataFrame.registerTempTable("books"); // Run the Query and collect results List<Row> rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collectAsList();
RDD でのクエリー (Scala)
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext [...] // Create a SQLContext, register a data frame and table val sqlContext = new SQLContext(sc) val dataFrame = sqlContext.createDataFrame(booksRDD, classOf[Book]) dataFrame.registerTempTable("books") // Run the Query and collect the results val rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collect()
JBoss Data Grid への書き込み
静的 InfinispanJavaRDD.write()
メソッドを使用すると、あらゆるキーバリューベースの RDD を Data Grid キャッシュに書き込みできます。これにより、RDD の内容がキャッシュにコピーされます。
RDD での書き込み (Java)
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.spark.domain.Address; import org.infinispan.spark.domain.Person; import org.infinispan.spark.rdd.InfinispanJavaRDD; import scala.Tuple2; import java.util.List; import java.util.Properties; [...] // Define the location of the JBoss Data Grid node Properties properties = new Properties(); properties.put("infinispan.client.hotrod.server_list", "localhost:11222"); properties.put("infinispan.rdd.cacheName","exampleCache"); // Create the JavaSparkContext SparkConf conf = new SparkConf().setAppName("write-example-RDD"); JavaSparkContext jsc = new JavaSparkContext(conf); // Defining two entries to be stored in a RDD // Each Book will contain the title, author, and publicationYear Book bookOne = new Book("Linux Bible", "Negus, Chris", "2015"); Book bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014"); List<Tuple2<Integer, Book>> pairs = Arrays.asList( new Tuple2<>(1, bookOne), new Tuple2<>(2, bookTwo) ); // Create the RDD using the newly created List JavaPairRDD<Integer, Book> pairsRDD = jsc.parallelizePairs(pairs); // Write the entries into the cache InfinispanJavaRDD.write(pairsRDD, config);
RDD での書き込み (Scala)
import java.util.Properties import org.infinispan.spark._ import org.infinispan.spark.rdd.InfinispanRDD [...] // Define the location of the JBoss Data Grid node val properties = new Properties properties.put("infinispan.client.hotrod.server_list", "localhost:11222") properties.put("infinispan.rdd.cacheName", "exampleCache") // Create the SparkContext val conf = new SparkConf().setAppName("write-example-RDD-scala") val sc = new SparkContext(conf) // Create an RDD of Books val bookOne = new Book("Linux Bible", "Negus, Chris", "2015") val bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014") val sampleBookRDD = sc.parallelize(Seq(bookOne,bookTwo)) val pairsRDD = sampleBookRDD.zipWithIndex().map(_.swap) // Write the Key/Value RDD to the Data Grid pairsRDD.writeToInfinispan(properties)
31.5. DStreams の作成および使用
DStream は継続するデータのストリームを表し、継続的な一連の RDD によって内部で表されます。各 RDD には特定の時間間隔からのデータが含まれます。
DStream の作成には、以下の例のように StreamingContext
が StorageLevel
および JBoss Data Grid RDD 設定とともに渡されます。
DStream の作成 (Scala)
import org.infinispan.spark.stream._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import java.util.Properties // Spark context val sc = ... // java.util.Properties with Infinispan RDD configuration val props = ... val ssc = new StreamingContext(sc, Seconds(1)) val stream = new InfinispanInputDStream[String, Book](ssc, StorageLevel.MEMORY_ONLY, props)
InfinispanInputDStream
は多くの Spark の DStream 操作を使用して変換でき、StreamingContext
で「start」を呼び出した後に処理が発生します。たとえば、最後の 30 秒でキャッシュに挿入された本の数を 10 秒ごとに表示する場合は以下のようになります。
DStream の処理 (Scala)
import org.infinispan.spark.stream._ val stream = ... // From previous sample // Filter only created entries val createdBooksRDD = stream.filter { case (_, _, t) => t == Type.CLIENT_CACHE_ENTRY_CREATED } // Reduce last 30 seconds of data, every 10 seconds val windowedRDD: DStream[Long] = createdBooksRDD.count().reduceByWindow(_ + _, Seconds(30), Seconds(10)) // Prints the results, couting the number of occurences in each individual RDD windowedRDD.foreachRDD { rdd => println(rdd.reduce(_ + _)) } // Start the processing ssc.start() ssc.awaitTermination()
DStreams での JBoss Data Grid への書き込み
キーバリュー型のすべての DStream は、Java の場合は InfinispanJavaDStream.writeToInfinispan()
Java メソッド経由で JBoss Data Grid へ書き込みでき、Scala の場合は暗黙的な writeToInfinispan(properties)
メソッドを直接 DStream インスタンスで使用して書き込みできます。両方のメソッドは JBoss Data Grid RDD 設定を入力として取り、DStream 内に含まれる各 RDD を書き込みます。
31.6. Spark での Infinispan Query DSL の使用
Infinispan Query DSL は InfinispanRDD のフィルターとして使用でき、RDD レベルではなくソースでデータを事前にフィルターできます。
クエリーする DSL が適切に動作するには、キャッシュのデータを protobuf でエンコードする必要があります。protobuf エンコーティングの使用手順は Protobuf エンコーティングを参照してください。
著者の名前に Doe
含まれる本のリストを取得する以下の例を見てください。
クエリーによるフィルリング (Scala)
import org.infinispan.client.hotrod.impl.query.RemoteQuery import org.infinispan.client.hotrod.{RemoteCacheManager, Search} import org.infinispan.spark.domain._ [...] val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName)) .from(classOf[Book]) .having("author").like("Doe") .build() val rdd = createInfinispanRDD[Int, Book] .filterByQuery[Book]](query, classOf[Book])
射影も完全サポートされます。たとえば、上記の例を調整して著書名と出版年のみを取得し、出版年のフィールドでソートすることができます。
射影を使用したフィルタリング (Scala)
import org.infinispan.client.hotrod.impl.query.RemoteQuery import org.infinispan.client.hotrod.{RemoteCacheManager, Search} import org.infinispan.spark.domain._ [...] val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName)) .select("title","publicationYear") .from(classOf[Book]) .having("author").like("Doe") .groupBy("publicationYear") .build() val rdd = createInfinispanRDD[Int, Book] .filterByQuery[Array[AnyRef]](query, classOf[Book])
さらに、フィルターがすでに JBoss Data Grid サーバーにデプロイされている場合は、以下のように名前で参照することもできます。
デプロイされたフィルターを使用したフィルタリング (Scala)
val rdd = InfinispanRDD[String,Book] = .... // "my-filter-factory" filter and converts Book to a String, and has two parameters val filteredRDD = rdd.filterByCustom[String]("my-filter-factory", "param1", "param2")
31.7. Spark のパフォーマンスに関する注意点
Data Grid の Spark コネクターは、デフォルトでは Data Grid ノードごとに 2 つのパーティションを作成し、各パーティションはそのノードのデータのサブセットを指定します。
これらのパーティションは Spark ワーカーへ送信され、並列処理されます。Spark ワーカーの数が Data Grid ノードよりも少ない場合、各ワーカーにはタスクを並列実行できる最大限度があるため、遅延が発生することがあります。そのため、Spark ワーカーの数を最低でも Data Grid ノードの数と同じにしてこの並行処理を活用することが推奨されます。
また、Spark ワーカーが Data Grid ノードと同じノードに位置する場合、各ワーカーがローカルの Data Grid ノードにあるデータのみを処理するようコネクターによってタスクが分散されます。