Chapter 31. Integration with Apache Spark


31.1. Integration with Apache Spark

JBoss Data Grid includes a Spark connector, providing tight integration with Apach Spark, and allowing applications written either in Java or Scala to utilize JBoss Data Grid as a backing data store. This connector includes support for the following:

  • Create an RDD from any cache
  • Write a key/value RDD to a cache
  • Create a DStream from cache-level events
  • Write a key/value DStream to a cache
Note

Support for Apache Spark is only available in Remote Client-Server Mode.

31.2. Spark Dependencies

JBoss Data Grid provides two versions of Apache Spark, 1.6 and 2.0, which supports Scala 2.10 and 2.11 respectively. Both of these versions are shipped separately from the main distribution.

The following Maven configuration should be used depending on the desired version of Apache Spark:

pom.xml for Spark 1.6

<dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-spark</artifactId>
    <version>0.3.0.Final-redhat-2</version>
</dependency>

pom.xml for Spark 2.0

<dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-spark</artifactId>
    <version>0.4.0.Final-redhat-2</version>
</dependency>

31.3. Supported Spark Configuration Parameters

The following table shows the Spark configuration parameters that are supported by both versions:

Table 31.1. Supported Spark Configuration Parameters
Parameter NameDescriptionDefault Value

infinispan.client.hotrod.server_list

List of JBoss Data Grid nodes

localhost:11222

infinispan.rdd.cacheName

The name of the cache that will back the RDD

default cache

infinispan.rdd.read_batch_size

Batch size (number of entries) when reading from the cache

10000

infinispan.rdd.write_batch_size

Batch size (number of entries) when writing to the cache

500

infinispan.rdd.number_server_partitions

Numbers of partitions created per JBoss Data Grid server

2

infinispan.rdd.query.proto.protofiles

Map with protobuf file names and contents

Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query.

infinispan.rdd.query.proto.marshallers

List of protostream marshallers classes for the objects in the cache

Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query.

The following is a list of configuration parameters supported exclusively by Spark 2.0:

Table 31.2. Spark Configuration Parameters Exclusive to 2.0
Parameter NameDescriptionDefault Value

infinispan.client.hotrod.use_ssl

Enable SSL

false

infinispan.client.hotrod.key_store_file_name

The JKS keystore file name, required when mutual SSL authentication is enabled in the Infinispan server. Can be either the file path or a class path resource. Examples: /usr/local/keystore.jks, classpath:/keystore.jks

 

infinispan.client.hotrod.trust_store_file_name

The JKS keystore path or classpath containing server certificates

 

infinispan.client.hotrod.key_store_password

Password for the key store

 

infinispan.client.hotrod.trust_store_password

Password for the trust store

 

31.4. Creating and Using RDDs

RDDs are created by specifying a Properties instance with configurations described in the table 27.1, and then using it together with the Spark context to create a InfinispanRDD that is used with the normal Spark operations. An example of this is below in both Java and Scala:

Creating a RDD (Java)

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.spark.rdd.InfinispanJavaRDD;
import java.util.Properties;
[...]
// Begin by defining a new Spark configuration and creating a Spark context from this.
SparkConf conf = new SparkConf().setAppName("example-RDD");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Create the Properties instance, containing the JBoss Data Grid node and cache name.
Properties properties = new Properties();
properties.put("infinispan.client.hotrod.server_list", "server:11222");
properties.put("infinispan.rdd.cacheName","exampleCache");

// Create the RDD
JavaPairRDD<Integer, Book> exampleRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, properties);

JavaRDD<Book> booksRDD = exampleRDD.values();

Creating a RDD (Scala)

import java.util.Properties

import org.apache.spark.{SparkConf, SparkContext}
import org.infinispan.spark.rdd.InfinispanRDD
import org.infinispan.spark._

// Begin by defining a new Spark configuration and creating a Spark context from this.
val conf = new SparkConf().setAppName("example-RDD-scala")
val sc = new SparkContext(conf)

// Create the Properties instance, containing the JBoss Data Grid node and cache name.
val properties = new Properties
properties.put("infinispan.client.hotrod.server_list", "server:11222")
properties.put("infinispan.rdd.cacheName", "exampleCache")

// Create an RDD from the DataGrid cache
val exampleRDD = new InfinispanRDD[Integer, Book](sc, properties)

val booksRDD = exampleRDD.values

Once the RDD is available entries in the backing cache may be obtained by using either the Spark RDD operations or Spark’s SQL support. The above example is expanded to count the entries, per author, in the resulting RDD with an SQL query:

Querying with a RDD (Java)

// The following imports should be added to the list from the previous example
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
[...]
// Continuing the previous example

// Create a SQLContext, registering the data frame and table
SQLContext sqlContext = new SQLContext(jsc);
DataFrame dataFrame = sqlContext.createDataFrame(booksRDD, Book.class);
dataFrame.registerTempTable("books");

// Run the Query and collect results
List<Row> rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collectAsList();

Querying with a RDD (Scala)

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
[...]
// Create a SQLContext, register a data frame and table
val sqlContext = new SQLContext(sc)
val dataFrame = sqlContext.createDataFrame(booksRDD, classOf[Book])
dataFrame.registerTempTable("books")

 // Run the Query and collect the results
val rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collect()

Writing to JBoss Data Grid

Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write() method. This will copy the contents of the RDD to the cache:

Writing with a RDD (Java)

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.spark.domain.Address;
import org.infinispan.spark.domain.Person;
import org.infinispan.spark.rdd.InfinispanJavaRDD;
import scala.Tuple2;
import java.util.List;
import java.util.Properties;
[...]
// Define the location of the JBoss Data Grid node
Properties properties = new Properties();
properties.put("infinispan.client.hotrod.server_list", "localhost:11222");
properties.put("infinispan.rdd.cacheName","exampleCache");

// Create the JavaSparkContext
SparkConf conf = new SparkConf().setAppName("write-example-RDD");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Defining two entries to be stored in a RDD
// Each Book will contain the title, author, and publicationYear
Book bookOne = new Book("Linux Bible", "Negus, Chris", "2015");
Book bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014");

List<Tuple2<Integer, Book>> pairs = Arrays.asList(
    new Tuple2<>(1, bookOne),
    new Tuple2<>(2, bookTwo)
);

// Create the RDD using the newly created List
JavaPairRDD<Integer, Book> pairsRDD = jsc.parallelizePairs(pairs);

// Write the entries into the cache
InfinispanJavaRDD.write(pairsRDD, config);

Writing with a RDD (Scala)

import java.util.Properties
import org.infinispan.spark._
import org.infinispan.spark.rdd.InfinispanRDD
[...]
// Define the location of the JBoss Data Grid node
val properties = new Properties
properties.put("infinispan.client.hotrod.server_list", "localhost:11222")
properties.put("infinispan.rdd.cacheName", "exampleCache")

// Create the SparkContext
val conf = new SparkConf().setAppName("write-example-RDD-scala")
val sc = new SparkContext(conf)

// Create an RDD of Books
val bookOne = new Book("Linux Bible", "Negus, Chris", "2015")
val bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014")

val sampleBookRDD = sc.parallelize(Seq(bookOne,bookTwo))
val pairsRDD = sampleBookRDD.zipWithIndex().map(_.swap)

// Write the Key/Value RDD to the Data Grid
pairsRDD.writeToInfinispan(properties)

31.5. Creating and Using DStreams

DStreams represent a continuous stream of data, and are internally represented by a continuous series of RDDs, with each RDD containing data from a specific time interval.

To create a DStream a StreamingContext will be passed in along with StorageLevel and the JBoss Data Grid RDD configuration, as seen in the below example:

Creating a DStream (Scala)

import org.infinispan.spark.stream._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import java.util.Properties

// Spark context
val sc = ...
// java.util.Properties with Infinispan RDD configuration
val props = ...
val ssc = new StreamingContext(sc, Seconds(1))

val stream = new InfinispanInputDStream[String, Book](ssc, StorageLevel.MEMORY_ONLY, props)

The InfinispanInputDStream can be transformed using the many Spark’s DStream operations, and the processing will occur after calling "start" in the StreamingContext. For example, to display every 10 seconds the number of books inserted in the cache in the last 30 seconds:

Processing a DStream (Scala)

import org.infinispan.spark.stream._

val stream = ... // From previous sample

// Filter only created entries
val createdBooksRDD = stream.filter { case (_, _, t) => t == Type.CLIENT_CACHE_ENTRY_CREATED }

// Reduce last 30 seconds of data, every 10 seconds
val windowedRDD: DStream[Long] = createdBooksRDD.count().reduceByWindow(_ + _, Seconds(30), Seconds(10))

// Prints the results, couting the number of occurences in each individual RDD
windowedRDD.foreachRDD { rdd => println(rdd.reduce(_ + _)) }

// Start the processing
ssc.start()
ssc.awaitTermination()

Writing to JBoss Data Grid with DStreams

Any DStream of Key/Value type can be written to JBoss Data Grid through the InfinispanJavaDStream.writeToInfinispan() Java method or in Scala using the implicit writeToInfinispan(properties) method directly on the DStream instance. Both methods take the JBoss Data Grid RDD configuration as input and will write each RDD contained within the DStream

31.6. Using the Infinispan Query DSL with Spark

The Infinispan Query DSL can be used as a filter for the InfinispanRDD, allowing data to be pre-filtered at the source rather than at RDD level.

Important

Data in the cache must have been encoded with protobuf for the querying DSL to function correctly. Instructions on using protobuf encoding are found in Protobuf Encoding.

Consider the following example which retrieves a list of books that includes any author whose name contains Doe:

Filtering by a Query (Scala)

import org.infinispan.client.hotrod.impl.query.RemoteQuery
import org.infinispan.client.hotrod.{RemoteCacheManager, Search}
import org.infinispan.spark.domain._
[...]
val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName))
    .from(classOf[Book])
    .having("author").like("Doe")
    .build()

val rdd = createInfinispanRDD[Int, Book]
    .filterByQuery[Book]](query, classOf[Book])

Projections are also fully supported; for instance, the above example may be adjusted to only obtain the title and publication year, and sorting on the latter field:

Filtering with a Projection (Scala)

import org.infinispan.client.hotrod.impl.query.RemoteQuery
import org.infinispan.client.hotrod.{RemoteCacheManager, Search}
import org.infinispan.spark.domain._
[...]
val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName))
    .select("title","publicationYear")
    .from(classOf[Book])
    .having("author").like("Doe")
    .groupBy("publicationYear")
    .build()

val rdd = createInfinispanRDD[Int, Book]
    .filterByQuery[Array[AnyRef]](query, classOf[Book])

In addition, if a filter has already been deployed to the JBoss Data Grid server it may be referenced by name, as seen below:

Filtering with a Deployed Filter (Scala)

val rdd = InfinispanRDD[String,Book] = ....
// "my-filter-factory" filter and converts Book to a String, and has two parameters
val filteredRDD = rdd.filterByCustom[String]("my-filter-factory", "param1", "param2")

31.7. Spark Performance Considerations

The Data Grid Spark connector creates by default two partitions per each Data Grid node, each partition specifies a subset of the data in that particular node.

Those partitions are then sent to the Spark workers that will process them in parallel. If the number of Spark workers is less than the number of Data Grid nodes, some delay can occur since each worker has a maximum capacity of executing tasks in parallel. For this reason it is recommended to have at least the same number of Spark workers as Data Grid nodes to take advantage of the parallelism.

In addition, if a Spark worker is co-located in the same node as the Data Grid node, the connector will distribute tasks so that each worker only processes data found in the local Data Grid node.

Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.