26.3. Creating and Using RDDs
Properties
instance with configurations described in the table 27.1, and then using it together with the Spark context to create a InfinispanRDD
that is used with the normal Spark operations. An example of this is below in both Java and Scala:
Example 26.1. Creating a RDD (Java)
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.spark.rdd.InfinispanJavaRDD; import java.util.Properties; [...] // Begin by defining a new Spark configuration and creating a Spark context from this. SparkConf conf = new SparkConf().setAppName("example-RDD"); JavaSparkContext jsc = new JavaSparkContext(conf); // Create the Properties instance, containing the JBoss Data Grid node and cache name. Properties properties = new Properties(); properties.put("infinispan.client.hotrod.server_list", "server:11222"); properties.put("infinispan.rdd.cacheName","exampleCache"); // Create the RDD JavaPairRDD<Integer, Book> exampleRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, properties); JavaRDD<Book> booksRDD = exampleRDD.values();
Example 26.2. Creating a RDD (Scala)
import java.util.Properties import org.apache.spark.{SparkConf, SparkContext} import org.infinispan.spark.rdd.InfinispanRDD import org.infinispan.spark._ // Begin by defining a new Spark configuration and creating a Spark context from this. val conf = new SparkConf().setAppName("example-RDD-scala") val sc = new SparkContext(conf) // Create the Properties instance, containing the JBoss Data Grid node and cache name. val properties = new Properties properties.put("infinispan.client.hotrod.server_list", "server:11222") properties.put("infinispan.rdd.cacheName", "exampleCache") // Create an RDD from the DataGrid cache val exampleRDD = new InfinispanRDD[Integer, Book](sc, properties) val booksRDD = exampleRDD.values
Example 26.3. Querying with a RDD (Java)
// The following imports should be added to the list from the previous example import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; [...] // Continuing the previous example // Create a SQLContext, registering the data frame and table SQLContext sqlContext = new SQLContext(jsc); DataFrame dataFrame = sqlContext.createDataFrame(booksRDD, Book.class); dataFrame.registerTempTable("books"); // Run the Query and collect results List<Row> rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collectAsList();
Example 26.4. Querying with a RDD (Scala)
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext [...] // Create a SQLContext, register a data frame and table val sqlContext = new SQLContext(sc) val dataFrame = sqlContext.createDataFrame(booksRDD, classOf[Book]) dataFrame.registerTempTable("books") // Run the Query and collect the results val rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collect()
Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write()
method. This will copy the contents of the RDD to the cache:
Example 26.5. Writing with a RDD (Java)
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.spark.domain.Address; import org.infinispan.spark.domain.Person; import org.infinispan.spark.rdd.InfinispanJavaRDD; import scala.Tuple2; import java.util.List; import java.util.Properties; [...] // Define the location of the JBoss Data Grid node Properties properties = new Properties(); properties.put("infinispan.client.hotrod.server_list", "localhost:11222"); properties.put("infinispan.rdd.cacheName","exampleCache"); // Create the JavaSparkContext SparkConf conf = new SparkConf().setAppName("write-example-RDD"); JavaSparkContext jsc = new JavaSparkContext(conf); // Defining two entries to be stored in a RDD // Each Book will contain the title, author, and publicationYear Book bookOne = new Book("Linux Bible", "Negus, Chris", "2015"); Book bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014"); List<Tuple2<Integer, Book>> pairs = Arrays.asList( new Tuple2<>(1, bookOne), new Tuple2<>(2, bookTwo) ); // Create the RDD using the newly created List JavaPairRDD<Integer, Book> pairsRDD = jsc.parallelizePairs(pairs); // Write the entries into the cache InfinispanJavaRDD.write(pairsRDD, config);
Example 26.6. Writing with a RDD (Scala)
import java.util.Properties import org.infinispan.spark._ import org.infinispan.spark.rdd.InfinispanRDD [...] // Define the location of the JBoss Data Grid node val properties = new Properties properties.put("infinispan.client.hotrod.server_list", "localhost:11222") properties.put("infinispan.rdd.cacheName", "exampleCache") // Create the SparkContext val conf = new SparkConf().setAppName("write-example-RDD-scala") val sc = new SparkContext(conf) // Create an RDD of Books val bookOne = new Book("Linux Bible", "Negus, Chris", "2015") val bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014") val sampleBookRDD = sc.parallelize(Seq(bookOne,bookTwo)) val pairsRDD = sampleBookRDD.zipWithIndex().map(_.swap) // Write the Key/Value RDD to the Data Grid pairsRDD.writeToInfinispan(properties)