Chapter 31. Integration with Apache Spark
31.1. Integration with Apache Spark
JBoss Data Grid includes a Spark connector, providing tight integration with Apach Spark, and allowing applications written either in Java or Scala to utilize JBoss Data Grid as a backing data store. This connector includes support for the following:
- Create an RDD from any cache
- Write a key/value RDD to a cache
- Create a DStream from cache-level events
- Write a key/value DStream to a cache
Support for Apache Spark is only available in Remote Client-Server Mode.
31.2. Spark Dependencies
JBoss Data Grid provides two versions of Apache Spark, 1.6 and 2.0, which supports Scala 2.10 and 2.11 respectively. Both of these versions are shipped separately from the main distribution.
The following Maven configuration should be used depending on the desired version of Apache Spark:
pom.xml for Spark 1.6
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-spark</artifactId> <version>0.3.0.Final-redhat-2</version> </dependency>
pom.xml for Spark 2.0
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-spark</artifactId> <version>0.4.0.Final-redhat-2</version> </dependency>
31.3. Supported Spark Configuration Parameters
The following table shows the Spark configuration parameters that are supported by both versions:
Parameter Name | Description | Default Value |
---|---|---|
| List of JBoss Data Grid nodes | localhost:11222 |
| The name of the cache that will back the RDD | default cache |
| Batch size (number of entries) when reading from the cache | 10000 |
| Batch size (number of entries) when writing to the cache | 500 |
| Numbers of partitions created per JBoss Data Grid server | 2 |
| Map with protobuf file names and contents | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
| List of protostream marshallers classes for the objects in the cache | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
The following is a list of configuration parameters supported exclusively by Spark 2.0:
Parameter Name | Description | Default Value |
---|---|---|
| Enable SSL | false |
|
The JKS keystore file name, required when mutual SSL authentication is enabled in the Infinispan server. Can be either the file path or a class path resource. Examples: | |
| The JKS keystore path or classpath containing server certificates | |
| Password for the key store | |
| Password for the trust store |
31.4. Creating and Using RDDs
RDDs are created by specifying a Properties
instance with configurations described in the table 27.1, and then using it together with the Spark context to create a InfinispanRDD
that is used with the normal Spark operations. An example of this is below in both Java and Scala:
Creating a RDD (Java)
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.spark.rdd.InfinispanJavaRDD; import java.util.Properties; [...] // Begin by defining a new Spark configuration and creating a Spark context from this. SparkConf conf = new SparkConf().setAppName("example-RDD"); JavaSparkContext jsc = new JavaSparkContext(conf); // Create the Properties instance, containing the JBoss Data Grid node and cache name. Properties properties = new Properties(); properties.put("infinispan.client.hotrod.server_list", "server:11222"); properties.put("infinispan.rdd.cacheName","exampleCache"); // Create the RDD JavaPairRDD<Integer, Book> exampleRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, properties); JavaRDD<Book> booksRDD = exampleRDD.values();
Creating a RDD (Scala)
import java.util.Properties import org.apache.spark.{SparkConf, SparkContext} import org.infinispan.spark.rdd.InfinispanRDD import org.infinispan.spark._ // Begin by defining a new Spark configuration and creating a Spark context from this. val conf = new SparkConf().setAppName("example-RDD-scala") val sc = new SparkContext(conf) // Create the Properties instance, containing the JBoss Data Grid node and cache name. val properties = new Properties properties.put("infinispan.client.hotrod.server_list", "server:11222") properties.put("infinispan.rdd.cacheName", "exampleCache") // Create an RDD from the DataGrid cache val exampleRDD = new InfinispanRDD[Integer, Book](sc, properties) val booksRDD = exampleRDD.values
Once the RDD is available entries in the backing cache may be obtained by using either the Spark RDD operations or Spark’s SQL support. The above example is expanded to count the entries, per author, in the resulting RDD with an SQL query:
Querying with a RDD (Java)
// The following imports should be added to the list from the previous example import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; [...] // Continuing the previous example // Create a SQLContext, registering the data frame and table SQLContext sqlContext = new SQLContext(jsc); DataFrame dataFrame = sqlContext.createDataFrame(booksRDD, Book.class); dataFrame.registerTempTable("books"); // Run the Query and collect results List<Row> rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collectAsList();
Querying with a RDD (Scala)
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext [...] // Create a SQLContext, register a data frame and table val sqlContext = new SQLContext(sc) val dataFrame = sqlContext.createDataFrame(booksRDD, classOf[Book]) dataFrame.registerTempTable("books") // Run the Query and collect the results val rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collect()
Writing to JBoss Data Grid
Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write()
method. This will copy the contents of the RDD to the cache:
Writing with a RDD (Java)
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.spark.domain.Address; import org.infinispan.spark.domain.Person; import org.infinispan.spark.rdd.InfinispanJavaRDD; import scala.Tuple2; import java.util.List; import java.util.Properties; [...] // Define the location of the JBoss Data Grid node Properties properties = new Properties(); properties.put("infinispan.client.hotrod.server_list", "localhost:11222"); properties.put("infinispan.rdd.cacheName","exampleCache"); // Create the JavaSparkContext SparkConf conf = new SparkConf().setAppName("write-example-RDD"); JavaSparkContext jsc = new JavaSparkContext(conf); // Defining two entries to be stored in a RDD // Each Book will contain the title, author, and publicationYear Book bookOne = new Book("Linux Bible", "Negus, Chris", "2015"); Book bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014"); List<Tuple2<Integer, Book>> pairs = Arrays.asList( new Tuple2<>(1, bookOne), new Tuple2<>(2, bookTwo) ); // Create the RDD using the newly created List JavaPairRDD<Integer, Book> pairsRDD = jsc.parallelizePairs(pairs); // Write the entries into the cache InfinispanJavaRDD.write(pairsRDD, config);
Writing with a RDD (Scala)
import java.util.Properties import org.infinispan.spark._ import org.infinispan.spark.rdd.InfinispanRDD [...] // Define the location of the JBoss Data Grid node val properties = new Properties properties.put("infinispan.client.hotrod.server_list", "localhost:11222") properties.put("infinispan.rdd.cacheName", "exampleCache") // Create the SparkContext val conf = new SparkConf().setAppName("write-example-RDD-scala") val sc = new SparkContext(conf) // Create an RDD of Books val bookOne = new Book("Linux Bible", "Negus, Chris", "2015") val bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014") val sampleBookRDD = sc.parallelize(Seq(bookOne,bookTwo)) val pairsRDD = sampleBookRDD.zipWithIndex().map(_.swap) // Write the Key/Value RDD to the Data Grid pairsRDD.writeToInfinispan(properties)
31.5. Creating and Using DStreams
DStreams represent a continuous stream of data, and are internally represented by a continuous series of RDDs, with each RDD containing data from a specific time interval.
To create a DStream a StreamingContext
will be passed in along with StorageLevel
and the JBoss Data Grid RDD configuration, as seen in the below example:
Creating a DStream (Scala)
import org.infinispan.spark.stream._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import java.util.Properties // Spark context val sc = ... // java.util.Properties with Infinispan RDD configuration val props = ... val ssc = new StreamingContext(sc, Seconds(1)) val stream = new InfinispanInputDStream[String, Book](ssc, StorageLevel.MEMORY_ONLY, props)
The InfinispanInputDStream
can be transformed using the many Spark’s DStream operations, and the processing will occur after calling "start" in the StreamingContext
. For example, to display every 10 seconds the number of books inserted in the cache in the last 30 seconds:
Processing a DStream (Scala)
import org.infinispan.spark.stream._ val stream = ... // From previous sample // Filter only created entries val createdBooksRDD = stream.filter { case (_, _, t) => t == Type.CLIENT_CACHE_ENTRY_CREATED } // Reduce last 30 seconds of data, every 10 seconds val windowedRDD: DStream[Long] = createdBooksRDD.count().reduceByWindow(_ + _, Seconds(30), Seconds(10)) // Prints the results, couting the number of occurences in each individual RDD windowedRDD.foreachRDD { rdd => println(rdd.reduce(_ + _)) } // Start the processing ssc.start() ssc.awaitTermination()
Writing to JBoss Data Grid with DStreams
Any DStream of Key/Value type can be written to JBoss Data Grid through the InfinispanJavaDStream.writeToInfinispan()
Java method or in Scala using the implicit writeToInfinispan(properties)
method directly on the DStream instance. Both methods take the JBoss Data Grid RDD configuration as input and will write each RDD contained within the DStream
31.6. Using the Infinispan Query DSL with Spark
The Infinispan Query DSL can be used as a filter for the InfinispanRDD, allowing data to be pre-filtered at the source rather than at RDD level.
Data in the cache must have been encoded with protobuf for the querying DSL to function correctly. Instructions on using protobuf encoding are found in Protobuf Encoding.
Consider the following example which retrieves a list of books that includes any author whose name contains Doe
:
Filtering by a Query (Scala)
import org.infinispan.client.hotrod.impl.query.RemoteQuery import org.infinispan.client.hotrod.{RemoteCacheManager, Search} import org.infinispan.spark.domain._ [...] val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName)) .from(classOf[Book]) .having("author").like("Doe") .build() val rdd = createInfinispanRDD[Int, Book] .filterByQuery[Book]](query, classOf[Book])
Projections are also fully supported; for instance, the above example may be adjusted to only obtain the title and publication year, and sorting on the latter field:
Filtering with a Projection (Scala)
import org.infinispan.client.hotrod.impl.query.RemoteQuery import org.infinispan.client.hotrod.{RemoteCacheManager, Search} import org.infinispan.spark.domain._ [...] val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName)) .select("title","publicationYear") .from(classOf[Book]) .having("author").like("Doe") .groupBy("publicationYear") .build() val rdd = createInfinispanRDD[Int, Book] .filterByQuery[Array[AnyRef]](query, classOf[Book])
In addition, if a filter has already been deployed to the JBoss Data Grid server it may be referenced by name, as seen below:
Filtering with a Deployed Filter (Scala)
val rdd = InfinispanRDD[String,Book] = .... // "my-filter-factory" filter and converts Book to a String, and has two parameters val filteredRDD = rdd.filterByCustom[String]("my-filter-factory", "param1", "param2")
31.7. Spark Performance Considerations
The Data Grid Spark connector creates by default two partitions per each Data Grid node, each partition specifies a subset of the data in that particular node.
Those partitions are then sent to the Spark workers that will process them in parallel. If the number of Spark workers is less than the number of Data Grid nodes, some delay can occur since each worker has a maximum capacity of executing tasks in parallel. For this reason it is recommended to have at least the same number of Spark workers as Data Grid nodes to take advantage of the parallelism.
In addition, if a Spark worker is co-located in the same node as the Data Grid node, the connector will distribute tasks so that each worker only processes data found in the local Data Grid node.