Chapter 34. Integration with Apache Spark
34.1. The JBoss Data Grid Apache Spark Connector
JBoss Data Grid includes a Spark connector, providing tight integration with Apache Spark, and allowing applications written either in Java or Scala to utilize JBoss Data Grid as a backing data store.
There actually are two connectors, one that supports Apache Spark 1.6.x, and one that supports Apache Spark 2.x, which in turn support Scala 2.10.x, and 2.11.x, respectively. Both of these connectors are shipped separately from the main distribution.
The Apache Spark 1.6 connector includes support for the following:
- Create an RDD from any cache
- Write a key/value RDD to a cache
- Create a DStream from cache-level events
- Write a key/value DStream to a cache
In addition to the above features, the Apache Spark 2 connector supports these features:
- Use JDG server side filters to create a cache based RDD
- Spark serializer based on JBoss Marshalling
- Dataset API with push down predicates support
Support for Apache Spark is only available in Remote Client-Server Mode.
34.2. Spark Dependencies
The following Maven configuration should be used depending on the desired version of Apache Spark:
pom.xml for Spark 1.6.x
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-spark</artifactId> <version>0.3.0.Final-redhat-2</version> </dependency>
pom.xml for Spark 2.x
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-spark</artifactId> <version>0.6.0.Final-redhat-9</version> </dependency>
34.3. Configuring the Spark Connector
The Apache Spark version 1.6 and version 2 connectors do not use the same interfaces for configuration. The version 1.6 connector uses properties, and the version 2 connector uses methods.
34.3.1. Properties to Configure the Version 1.6 Connector
Property Name | Description | Default Value |
---|---|---|
| List of JBoss Data Grid nodes | localhost:11222 |
| The name of the cache that will back the RDD | default cache |
| Batch size (number of entries) when reading from the cache | 10000 |
| Batch size (number of entries) when writing to the cache | 500 |
| Numbers of partitions created per JBoss Data Grid server | 2 |
| Map with protobuf file names and contents | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
| List of protostream marshallers classes for the objects in the cache | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
34.3.2. Methods to Configure the Version 2 Connector
The following methods can be used to configure the version 2 connector. They are provided by the org.infinispan.spark.config.ConnectorConfiguration
class.
Method Name | Description | Default Value |
---|---|---|
| List of JBoss Data Grid nodes | localhost:11222 |
| The name of the cache that will back the RDD | default cache |
| Batch size (number of entries) when reading from the cache | 10000 |
| Batch size (number of entries) when writing to the cache | 500 |
| Numbers of partitions created per JBoss Data Grid server | 2 |
| Map with protobuf file names and contents | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
| List of protostream marshallers classes for the objects in the cache | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
| Registers a Class containing protobuf annotations |
Alternative to using |
| Causes automatic registration of protobuf schemas in the server | None |
| Configures additional Hot Rod client properties when contacting the JBoss Data Grid Server | None |
| Used in conjunction with the Dataset API to specify the Query target | If omitted, and in case there is only one class annotated with protobuf configured, it will choose that class. |
34.3.3. Connecting to a Secured JDG Cluster
If the JDG cluster is secured, Hot Rod must be configured with security for the Spark connector to work. There are several Hot Rod properties that can be set to configure Hot Rod security. They are described in the Hot Rod Properties table in the Appendix of the Administration and Configuration Guide, starting with infinispan.client.hotrod.use_ssl
These properties are exclusive to the version 2 Apache Spark connector.
34.4. Code Examples for Spark 1.6
34.4.1. Code Examples for Spark 1.6
Since the connector for Apache Spark 1.6 uses a different configuration mechanism than the version 2 connector, and also because the version 2 connector supports some features 1.6 doesn’t, the code examples for each version are separated into their own sections. The following code examples work with version 1.6 of the Spark connector. Follow this link for code examples for Spark 2.
34.4.2. Creating and Using RDDs
With the Apache Spark 1.6 connector, RDDs are created by specifying a Properties
instance with configurations described in Properties to Configure the Version 1.6 Connector, and then using it together with the Spark context to create a InfinispanRDD
that is used with the normal Spark operations.
An example of this is below in both Java and Scala:
34.4.3. Creating an RDD
Creating an RDD (Java)
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.spark.rdd.InfinispanJavaRDD; import java.util.Properties; [...] // Begin by defining a new Spark configuration and creating a Spark context from this. SparkConf conf = new SparkConf().setAppName("example-RDD"); JavaSparkContext jsc = new JavaSparkContext(conf); // Create the Properties instance, containing the JBoss Data Grid node and cache name. Properties properties = new Properties(); properties.put("infinispan.client.hotrod.server_list", "server:11222"); properties.put("infinispan.rdd.cacheName","exampleCache"); // Create the RDD JavaPairRDD<Integer, Book> exampleRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, properties); JavaRDD<Book> booksRDD = exampleRDD.values();
Creating an RDD (Scala)
import java.util.Properties import org.apache.spark.{SparkConf, SparkContext} import org.infinispan.spark.rdd.InfinispanRDD import org.infinispan.spark._ // Begin by defining a new Spark configuration and creating a Spark context from this. val conf = new SparkConf().setAppName("example-RDD-scala") val sc = new SparkContext(conf) // Create the Properties instance, containing the JBoss Data Grid node and cache name. val properties = new Properties properties.put("infinispan.client.hotrod.server_list", "server:11222") properties.put("infinispan.rdd.cacheName", "exampleCache") // Create an RDD from the DataGrid cache val exampleRDD = new InfinispanRDD[Integer, Book](sc, properties) val booksRDD = exampleRDD.values
34.4.4. Querying an RDD
Once the RDD is available entries in the backing cache may be obtained by using either the Spark RDD operations or Spark’s SQL support. The above example is expanded to count the entries, per author, in the resulting RDD with an SQL query:
Querying an RDD (Java)
// The following imports should be added to the list from the previous example import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; [...] // Continuing the previous example // Create a SQLContext, registering the data frame and table SQLContext sqlContext = new SQLContext(jsc); DataFrame dataFrame = sqlContext.createDataFrame(booksRDD, Book.class); dataFrame.registerTempTable("books"); // Run the Query and collect results List<Row> rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collectAsList();
Querying an RDD (Scala)
import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext [...] // Create a SQLContext, register a data frame and table val sqlContext = new SQLContext(sc) val dataFrame = sqlContext.createDataFrame(booksRDD, classOf[Book]) dataFrame.registerTempTable("books") // Run the Query and collect the results val rows = sqlContext.sql("SELECT author, count(*) as a from books WHERE author != 'N/A' GROUP BY author ORDER BY a desc").collect()
34.4.5. Writing an RDD to the Cache
Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write()
method. This will copy the contents of the RDD to the cache:
Writing an RDD (Java)
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.spark.domain.Address; import org.infinispan.spark.domain.Person; import org.infinispan.spark.rdd.InfinispanJavaRDD; import scala.Tuple2; import java.util.List; import java.util.Properties; [...] // Define the location of the JBoss Data Grid node Properties properties = new Properties(); properties.put("infinispan.client.hotrod.server_list", "localhost:11222"); properties.put("infinispan.rdd.cacheName","exampleCache"); // Create the JavaSparkContext SparkConf conf = new SparkConf().setAppName("write-example-RDD"); JavaSparkContext jsc = new JavaSparkContext(conf); // Defining two entries to be stored in a RDD // Each Book will contain the title, author, and publicationYear Book bookOne = new Book("Linux Bible", "Negus, Chris", "2015"); Book bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014"); List<Tuple2<Integer, Book>> pairs = Arrays.asList( new Tuple2<>(1, bookOne), new Tuple2<>(2, bookTwo) ); // Create the RDD using the newly created List JavaPairRDD<Integer, Book> pairsRDD = jsc.parallelizePairs(pairs); // Write the entries into the cache InfinispanJavaRDD.write(pairsRDD, config);
Writing an RDD (Scala)
import java.util.Properties import org.infinispan.spark._ import org.infinispan.spark.rdd.InfinispanRDD [...] // Define the location of the JBoss Data Grid node val properties = new Properties properties.put("infinispan.client.hotrod.server_list", "localhost:11222") properties.put("infinispan.rdd.cacheName", "exampleCache") // Create the SparkContext val conf = new SparkConf().setAppName("write-example-RDD-scala") val sc = new SparkContext(conf) // Create an RDD of Books val bookOne = new Book("Linux Bible", "Negus, Chris", "2015") val bookTwo = new Book("Java 8 in Action", "Urma, Raoul-Gabriel", "2014") val sampleBookRDD = sc.parallelize(Seq(bookOne,bookTwo)) val pairsRDD = sampleBookRDD.zipWithIndex().map(_.swap) // Write the Key/Value RDD to the Data Grid pairsRDD.writeToInfinispan(properties)
34.4.5.1. Creating and Using DStreams
DStreams represent a continuous stream of data, and are internally represented by a continuous series of RDDs, with each RDD containing data from a specific time interval.
To create a DStream a StreamingContext
will be passed in along with StorageLevel
and the JBoss Data Grid RDD configuration, as seen in the below example:
Creating a DStream (Scala)
import org.infinispan.spark.stream._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import java.util.Properties // Spark context val sc = ... // java.util.Properties with Infinispan RDD configuration val props = ... val ssc = new StreamingContext(sc, Seconds(1)) val stream = new InfinispanInputDStream[String, Book](ssc, StorageLevel.MEMORY_ONLY, props)
The InfinispanInputDStream
can be transformed using the many Spark’s DStream operations, and the processing will occur after calling "start" in the StreamingContext
. For example, to display every 10 seconds the number of books inserted in the cache in the last 30 seconds:
Processing a DStream (Scala)
import org.infinispan.spark.stream._ val stream = ... // From previous sample // Filter only created entries val createdBooksRDD = stream.filter { case (_, _, t) => t == Type.CLIENT_CACHE_ENTRY_CREATED } // Reduce last 30 seconds of data, every 10 seconds val windowedRDD: DStream[Long] = createdBooksRDD.count().reduceByWindow(_ + _, Seconds(30), Seconds(10)) // Prints the results, couting the number of occurences in each individual RDD windowedRDD.foreachRDD { rdd => println(rdd.reduce(_ + _)) } // Start the processing ssc.start() ssc.awaitTermination()
Writing to JBoss Data Grid with DStreams
Any DStream of Key/Value type can be written to JBoss Data Grid through the InfinispanJavaDStream.writeToInfinispan()
Java method or in Scala using the implicit writeToInfinispan(properties)
method directly on the DStream instance. Both methods take the JBoss Data Grid RDD configuration as input and will write each RDD contained within the DStream
34.4.6. Using the Infinispan Query DSL with Spark
The Infinispan Query DSL can be used as a filter for the InfinispanRDD, allowing data to be pre-filtered at the source rather than at RDD level.
Data in the cache must have been encoded with protobuf for the querying DSL to function correctly. Instructions on using protobuf encoding are found in Protobuf Encoding.
Consider the following example which retrieves a list of books that includes any author whose name contains Doe
:
34.4.7. Filtering by a Query
Filtering by a Query (Scala)
import org.infinispan.client.hotrod.impl.query.RemoteQuery import org.infinispan.client.hotrod.{RemoteCacheManager, Search} import org.infinispan.spark.domain._ [...] val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName)) .from(classOf[Book]) .having("author").like("Doe") .toBuilder[RemoteQuery].build() val rdd = createInfinispanRDD[Int, Book] .filterByQuery[Book]](query, classOf[Book])
Projections are also fully supported; for instance, the above example may be adjusted to only obtain the title and publication year, and sorting on the latter field:
34.4.8. Filtering with a Projection
Filtering with a Projection (Scala)
import org.infinispan.client.hotrod.impl.query.RemoteQuery import org.infinispan.client.hotrod.{RemoteCacheManager, Search} import org.infinispan.spark.domain._ [...] val query = Search.getQueryFactory(remoteCacheManager.getCache(getCacheName)) .select("title","publicationYear") .from(classOf[Book]) .having("author").like("Doe") .groupBy("publicationYear") .toBuilder[RemoteQuery].build() val rdd = createInfinispanRDD[Int, Book] .filterByQuery[Array[AnyRef]](query, classOf[Book])
In addition, if a filter has already been deployed to the JBoss Data Grid server it may be referenced by name, as seen below:
34.4.9. Filtering with a Deployed Filter
Filtering with a Deployed Filter (Scala)
val rdd = InfinispanRDD[String,Book] = .... // "my-filter-factory" filter and converts Book to a String, and has two parameters val filteredRDD = rdd.filterByCustom[String]("my-filter-factory", "param1", "param2")
34.5. Code Examples for Spark 2
34.5.1. Code Examples for Spark 2
Since the connector for Apache Spark 2 uses a different configuration mechanism than the 1.6 connector, and also because the version 2 connector supports some features 1.6 doesn’t, the code examples for each version are separated into their own sections. The following code examples work with version 2 of the Spark connector. Follow this link for code examples for Spark 1.6.
34.5.2. Creating and Using RDDs
In Apache Spark 2, Resilient Distributed Datasets (RDDs) are created by specifying a ConnectorConfiguration
instance with configurations described in the table from Methods to Configure the Version 2 Connector.
Examples of this in both Java and Scala are below:
34.5.3. Creating an RDD
Creating an RDD (Java)
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.spark.config.ConnectorConfiguration; import org.infinispan.spark.rdd.InfinispanJavaRDD; JavaSparkContext jsc = new JavaSparkContext(); ConnectorConfiguration config = new ConnectorConfiguration() .setCacheName("exampleCache").setServerList("server:11222"); JavaPairRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, config); JavaRDD<MyEntity> entitiesRDD = infinispanRDD.values();
Creating an RDD (Scala)
import org.apache.spark.SparkContext import org.infinispan.spark.config.ConnectorConfiguration import org.infinispan.spark.rdd.InfinispanRDD val sc: SparkContext = new SparkContext() val config = new ConnectorConfiguration().setCacheName("my-cache").setServerList("10.9.0.8:11222") val infinispanRDD = new InfinispanRDD[String, MyEntity](sc, config) val entitiesRDD = infinispanRDD.values
34.5.4. Querying an RDD
34.5.4.1. SparkSQL Queries
Using SparkSQL Queries (Java)
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.infinispan.spark.config.ConnectorConfiguration; import org.infinispan.spark.rdd.InfinispanJavaRDD; JavaSparkContext jsc = new JavaSparkContext(); ConnectorConfiguration conf = new ConnectorConfiguration(); // Obtain the values from an InfinispanRDD JavaPairRDD<Long, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf); JavaRDD<MyEntity> valuesRDD = infinispanRDD.values(); // Create a DataFrame from a SparkSession SparkSession sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate(); Dataset<Row> dataFrame = sparkSession.createDataFrame(valuesRDD, MyEntity.class); // Create a view dataFrame.createOrReplaceTempView("myEntities"); // Create and run the Query Dataset<Row> rows = sparkSession.sql("SELECT field1, count(*) as c from myEntities WHERE field1 != 'N/A' GROUP BY field1 ORDER BY c desc");
Using SparkSQL Queries (Scala)
import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.infinispan.spark.config.ConnectorConfiguration import org.infinispan.spark.rdd._ val sc: SparkContext = // Initialize your SparkContext here val config = new ConnectorConfiguration().setServerList("myserver1:port,myserver2:port") // Obtain the values from an InfinispanRDD val infinispanRDD = new InfinispanRDD[Long, MyEntity](sc, config) val valuesRDD = infinispanRDD.values // Create a DataFrame from a SparkSession val sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate() val dataFrame = sparkSession.createDataFrame(valuesRDD, classOf[MyEntity]) // Create a view dataFrame.createOrReplaceTempView("myEntities") // Create and run the Query, collect and print results sparkSession.sql("SELECT field1, count(*) as c from myEntities WHERE field1 != 'N/A' GROUP BY field1 ORDER BY c desc") .collect().take(20).foreach(println)
34.5.5. Writing an RDD to the Cache
Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write()
method. This will copy the contents of the RDD to the cache:
Writing an RDD (Java)
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.spark.config.ConnectorConfiguration; import org.infinispan.spark.rdd.InfinispanJavaRDD; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; JavaSparkContext jsc = new JavaSparkContext(); ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(); List<Integer> numbers = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList()); JavaPairRDD<Integer, Long> numbersRDD = jsc.parallelize(numbers).zipWithIndex(); InfinispanJavaRDD.write(numbersRDD, connectorConfiguration);
Writing an RDD (Scala)
import org.apache.spark.SparkContext import org.infinispan.spark._ import org.infinispan.spark.config.ConnectorConfiguration val config: ConnectorConfiguration = // Initialize your ConnectorConfiguration here val sc: SparkContext = // Initialize your SparkContext here val simpleRdd = sc.parallelize(1 to 1000).zipWithIndex() simpleRdd.writeToInfinispan(config)
34.5.6. Creating DStreams
DStreams represent a continuous stream of data, and are internally represented by a continuous series of RDDs, with each RDD containing data from a specific time interval.
To create a DStream a StreamingContext
will be passed in along with StorageLevel
and the JBoss Data Grid RDD configuration, as seen in the below example:
Creating a DStream (Java)
import org.apache.spark.SparkConf; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.infinispan.spark.config.ConnectorConfiguration; import org.infinispan.spark.stream.InfinispanJavaDStream; import static org.apache.spark.storage.StorageLevel.MEMORY_ONLY; SparkConf conf = new SparkConf().setAppName("my-stream-app"); ConnectorConfiguration configuration = new ConnectorConfiguration(); JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(1)); InfinispanJavaDStream.createInfinispanInputDStream(jsc, MEMORY_ONLY(), configuration);
Creating a DStream (Scala)
import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.infinispan.spark.config.ConnectorConfiguration import org.infinispan.spark.stream._ val sc = new SparkContext() val config = new ConnectorConfiguration() val ssc = new StreamingContext(sc, Seconds(1)) val stream = new InfinispanInputDStream[String, MyEntity](ssc, StorageLevel.MEMORY_ONLY, config)
Writing to JBoss Data Grid with DStreams
Any DStream of Key/Value type can be written to JBoss Data Grid through the InfinispanJavaDStream.writeToInfinispan()
Java method or in Scala using the implicit writeToInfinispan(properties)
method directly on the DStream instance. Both methods take the JBoss Data Grid RDD configuration as input and will write each RDD contained within the DStream.
34.5.7. Using The Apache Spark Dataset API
In addition to the Resilient Distributed Dataset (RDD) programming interface, JBoss Data Grid includes the Apache Spark Dataset API, with support for pushing down predicates, similar to rdd.filterByQuery
.
Dataset API Example (Java)
import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.infinispan.spark.config.ConnectorConfiguration; import java.util.List; // Configure the connector using the ConnectorConfiguration: register entities annotated with Protobuf, // and turn on automatic registration of schemas ConnectorConfiguration connectorConfig = new ConnectorConfiguration() .setServerList("server1:11222,server2:11222") .addProtoAnnotatedClass(User.class) .setAutoRegisterProto(); // Create the SparkSession SparkSession sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate(); // Load the "infinispan" datasource into a DataFame, using the infinispan config Dataset<Row> df = sparkSession.read().format("infinispan").options(connectorConfig.toStringsMap()).load(); // From here it's possible to query using the DatasetSample API... List<Row> rows = df.filter(df.col("age").gt(30)).filter(df.col("age").lt(40)).collectAsList(); // ... or execute SQL queries df.createOrReplaceTempView("user"); String query = "SELECT first(r.name) as name, first(r.age) as age FROM user u GROUP BY r.age"; List<Row> results = sparkSession.sql(query).collectAsList();
Dataset API Example (Scala)
import org.apache.spark._ import org.apache.spark.sql._ import org.infinispan.protostream.annotations.{ProtoField, ProtoMessage} import org.infinispan.spark.config.ConnectorConfiguration import scala.annotation.meta.beanGetter import scala.beans.BeanProperty // Entities can be annotated in order to automatically generate protobuf schemas. // Also, they should be valid java beans. From Scala this can be achieved as: @ProtoMessage(name = "user") class User(@(ProtoField@beanGetter)(number = 1, required = true) @BeanProperty var name: String, @(ProtoField@beanGetter)(number = 2, required = true) @BeanProperty var age: Int) { def this() = { this(name = "", age = -1) } } // Configure the connector using the ConnectorConfiguration: register entities annotated with Protobuf, // and turn on automatic registration of schemas val infinispanConfig: ConnectorConfiguration = new ConnectorConfiguration() .setServerList("server1:11222,server2:11222") .addProtoAnnotatedClass(classOf[User]) .setAutoRegisterProto() // Create the SparkSession val sparkSession = SparkSession.builder().config(new SparkConf().setMaster("masterHost")).getOrCreate() // Load the "infinispan" datasource into a DataFame, using the infinispan config val df = sparkSession.read.format("infinispan").options(infinispanConfig.toStringsMap).load() // From here it's possible to query using the DatasetSample API... val rows: Array[Row] = df.filter(df("age").gt(30)).filter(df("age").lt(40)).collect() // ... or execute SQL queries df.createOrReplaceTempView("user") val query = "SELECT first(r.name) as name, first(r.age) as age FROM user u GROUP BY r.age" val rowsFromSQL: Array[Row] = sparkSession.sql(query).collect()
34.5.8. Using the Infinispan Query DSL with Spark
The Infinispan Query DSL can be used as a filter for the InfinispanRDD, allowing data to be pre-filtered at the source rather than at RDD level.
Data in the cache must have been encoded with protobuf for the querying DSL to function correctly. Instructions on using protobuf encoding are found in Protobuf Encoding.
34.5.9. Filtering with a pre-built Query Object
Filtering with a pre-built Query Object (Java)
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCacheManager; import org.infinispan.client.hotrod.Search; import org.infinispan.query.dsl.Query; import org.infinispan.spark.config.ConnectorConfiguration; import org.infinispan.spark.rdd.InfinispanJavaRDD; JavaSparkContext jsc = new JavaSparkContext(); ConnectorConfiguration conf = new ConnectorConfiguration(); InfinispanJavaRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf); RemoteCache<String, MyEntity> remoteCache = new RemoteCacheManager().getCache(); // Assuming MyEntity is already stored in the cache with protobuf encoding, and has protobuf annotations. Query query = Search.getQueryFactory(remoteCache).from(MyEntity.class).having("field").equal("value").build(); JavaPairRDD<String, MyEntity> filtered = infinispanRDD.filterByQuery(query);
Filtering with a pre-built Query Object (Scala)
import org.infinispan.client.hotrod.{RemoteCache, Search} import org.infinispan.spark.rdd.InfinispanRDD val rdd: InfinispanRDD[String, MyEntity] = // Initalize your InfinispanRDD here val cache: RemoteCache[String, MyEntity] = // Initalize your RemoteCache here // Assuming MyEntity is already stored in the cache with protobuf encoding, and has protobuf annotations. val query = Search.getQueryFactory(cache).from(classOf[MyEntity]).having("field").equal("value").build() val filteredRDD = rdd.filterByQuery(query)
34.5.10. Filtering with an Ickle Query
Filtering with an Ickle Query (Java)
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.spark.config.ConnectorConfiguration; import org.infinispan.spark.rdd.InfinispanJavaRDD; JavaSparkContext jsc = new JavaSparkContext(); ConnectorConfiguration conf = new ConnectorConfiguration(); InfinispanJavaRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf); JavaPairRDD<String, MyEntity> filtered = infinispanRDD.filterByQuery("From myEntity where field = 'value'");
Filtering with an Ickle Query (Scala)
import org.infinispan.spark.rdd.InfinispanRDD val rdd: InfinispanRDD[String, MyEntity] = // Initialize your InfinispanRDD here val filteredRDD = rdd.filterByQuery("FROM MyEntity e where e.field BETWEEN 10 AND 20")
34.5.11. Filtering on the Server
Filtering on the Server (Java)
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.infinispan.spark.config.ConnectorConfiguration; import org.infinispan.spark.rdd.InfinispanJavaRDD; JavaSparkContext jsc = new JavaSparkContext(); ConnectorConfiguration conf = new ConnectorConfiguration(); InfinispanJavaRDD<String, MyEntity> infinispanRDD = InfinispanJavaRDD.createInfinispanRDD(jsc, conf); JavaPairRDD<String, MyEntity> filtered = infinispanRDD.filterByCustom("my-filter", "param1", "param2");
Filtering on the Server (Scala)
import org.infinispan.spark.rdd.InfinispanRDD val rdd: InfinispanRDD[String, MyEntity] = // Initalize your InfinispanRDD here // "my-filter-factory" filter and converts MyEntity to a Double, and has two parameters val filteredRDD = rdd.filterByCustom[Double]("my-filter-factory", "param1", "param2")
34.6. Spark Performance Considerations
The Data Grid Spark connector creates by default two partitions per each Data Grid node, each partition specifies a subset of the data in that particular node.
Those partitions are then sent to the Spark workers that will process them in parallel. If the number of Spark workers is less than the number of Data Grid nodes, some delay can occur since each worker has a maximum capacity of executing tasks in parallel. For this reason it is recommended to have at least the same number of Spark workers as Data Grid nodes to take advantage of the parallelism.
In addition, if a Spark worker is co-located in the same node as the Data Grid node, the connector will distribute tasks so that each worker only processes data found in the local Data Grid node.