このコンテンツは選択した言語では利用できません。
26.3. Creating and Using RDDs
RDDs are created by specifying a
Properties
instance with configurations described in the table 27.1, and then using it together with the Spark context to create a InfinispanRDD
that is used with the normal Spark operations. An example of this is below in both Java and Scala:
Example 26.1. Creating a RDD (Java)
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.spark.rdd.InfinispanJavaRDD; import java.util.Properties; [...] // Begin by defining a new Spark configuration and creating a Spark context from this. SparkConf conf = new SparkConf().setAppName("example-RDD"); JavaSparkContext jsc = new JavaSparkContext(conf); // Create the Properties instance, containing the JBoss Data Grid node and cache name. Properties properties = new Properties(); properties.put("infinispan.client.hotrod.server_list", "server:11222"); properties.put("infinispan.rdd.cacheName","exampleCache"); // Create the RDD JavaPairRDD<Integer, Book> exampleRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, properties); JavaRDD<Book> booksRDD = exampleRDD.values();
Example 26.2. Creating a RDD (Scala)
import java.util.Properties import org.apache.spark.{SparkConf, SparkContext} import org.infinispan.spark.rdd.InfinispanRDD import org.infinispan.spark._ // Begin by defining a new Spark configuration and creating a Spark context from this. val conf = new SparkConf().setAppName("example-RDD-scala") val sc = new SparkContext(conf) // Create the Properties instance, containing the JBoss Data Grid node and cache name. val properties = new Properties properties.put("infinispan.client.hotrod.server_list", "server:11222") properties.put("infinispan.rdd.cacheName", "exampleCache") // Create an RDD from the DataGrid cache val exampleRDD = new InfinispanRDD[Integer, Book](sc, properties) val booksRDD = exampleRDD.values
Once the RDD is available entries in the backing cache may be obtained by using either the Spark RDD operations or Spark's SQL support. The above example is expanded to count the entries, per author, in the resulting RDD with an SQL query:
Example 26.3. Querying with a RDD (Java)
// The following imports should be added to the list from the previous example import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; [...] // Continuing the previous example // Create a SQLContext, registering the data frame and table SQLContext sqlContext = new SQLContext(jsc); DataFrame dataFrame = sqlContext.createDataFrame(booksRDD, Book.class); dataFrame.registerTempTable("books"); // Run the Query and collect results List<Row> rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collectAsList();
Example 26.4. Querying with a RDD (Scala)
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext [...] // Create a SQLContext, register a data frame and table val sqlContext = new SQLContext(sc) val dataFrame = sqlContext.createDataFrame(booksRDD, classOf[Book]) dataFrame.registerTempTable("books") // Run the Query and collect the results val rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collect()
Writing to JBoss Data Grid
Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write()
method. This will copy the contents of the RDD to the cache:
Example 26.5. Writing with a RDD (Java)
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.spark.domain.Address; import org.infinispan.spark.domain.Person; import org.infinispan.spark.rdd.InfinispanJavaRDD; import scala.Tuple2; import java.util.List; import java.util.Properties; [...] // Define the location of the JBoss Data Grid node Properties properties = new Properties(); properties.put("infinispan.client.hotrod.server_list", "localhost:11222"); properties.put("infinispan.rdd.cacheName","exampleCache"); // Create the JavaSparkContext SparkConf conf = new SparkConf().setAppName("write-example-RDD"); JavaSparkContext jsc = new JavaSparkContext(conf); // Defining two entries to be stored in a RDD // Each Book will contain the title, author, and publicationYear Book bookOne = new Book("Linux Bible", "Negus, Chris", "2015"); Book bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014"); List<Tuple2<Integer, Book>> pairs = Arrays.asList( new Tuple2<>(1, bookOne), new Tuple2<>(2, bookTwo) ); // Create the RDD using the newly created List JavaPairRDD<Integer, Book> pairsRDD = jsc.parallelizePairs(pairs); // Write the entries into the cache InfinispanJavaRDD.write(pairsRDD, config);
Example 26.6. Writing with a RDD (Scala)
import java.util.Properties import org.infinispan.spark._ import org.infinispan.spark.rdd.InfinispanRDD [...] // Define the location of the JBoss Data Grid node val properties = new Properties properties.put("infinispan.client.hotrod.server_list", "localhost:11222") properties.put("infinispan.rdd.cacheName", "exampleCache") // Create the SparkContext val conf = new SparkConf().setAppName("write-example-RDD-scala") val sc = new SparkContext(conf) // Create an RDD of Books val bookOne = new Book("Linux Bible", "Negus, Chris", "2015") val bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014") val sampleBookRDD = sc.parallelize(Seq(bookOne,bookTwo)) val pairsRDD = sampleBookRDD.zipWithIndex().map(_.swap) // Write the Key/Value RDD to the Data Grid pairsRDD.writeToInfinispan(properties)