Questo contenuto non è disponibile nella lingua selezionata.
Chapter 31. Integration with Apache Spark
31.1. Integration with Apache Spark Copia collegamentoCollegamento copiato negli appunti!
JBoss Data Grid includes a Spark connector, providing tight integration with Apach Spark, and allowing applications written either in Java or Scala to utilize JBoss Data Grid as a backing data store. This connector includes support for the following:
- Create an RDD from any cache
- Write a key/value RDD to a cache
- Create a DStream from cache-level events
- Write a key/value DStream to a cache
Support for Apache Spark is only available in Remote Client-Server Mode.
31.2. Spark Dependencies Copia collegamentoCollegamento copiato negli appunti!
JBoss Data Grid provides two versions of Apache Spark, 1.6 and 2.0, which supports Scala 2.10 and 2.11 respectively. Both of these versions are shipped separately from the main distribution.
The following Maven configuration should be used depending on the desired version of Apache Spark:
pom.xml for Spark 1.6
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-spark</artifactId> <version>0.3.0.Final-redhat-2</version> </dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-spark</artifactId>
<version>0.3.0.Final-redhat-2</version>
</dependency>
pom.xml for Spark 2.0
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-spark</artifactId> <version>0.4.0.Final-redhat-2</version> </dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-spark</artifactId>
<version>0.4.0.Final-redhat-2</version>
</dependency>
31.3. Supported Spark Configuration Parameters Copia collegamentoCollegamento copiato negli appunti!
The following table shows the Spark configuration parameters that are supported by both versions:
Parameter Name | Description | Default Value |
---|---|---|
| List of JBoss Data Grid nodes | localhost:11222 |
| The name of the cache that will back the RDD | default cache |
| Batch size (number of entries) when reading from the cache | 10000 |
| Batch size (number of entries) when writing to the cache | 500 |
| Numbers of partitions created per JBoss Data Grid server | 2 |
| Map with protobuf file names and contents | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
| List of protostream marshallers classes for the objects in the cache | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
The following is a list of configuration parameters supported exclusively by Spark 2.0:
Parameter Name | Description | Default Value |
---|---|---|
| Enable SSL | false |
|
The JKS keystore file name, required when mutual SSL authentication is enabled in the Infinispan server. Can be either the file path or a class path resource. Examples: | |
| The JKS keystore path or classpath containing server certificates | |
| Password for the key store | |
| Password for the trust store |
31.4. Creating and Using RDDs Copia collegamentoCollegamento copiato negli appunti!
RDDs are created by specifying a Properties
instance with configurations described in the table 27.1, and then using it together with the Spark context to create a InfinispanRDD
that is used with the normal Spark operations. An example of this is below in both Java and Scala:
Creating a RDD (Java)
Creating a RDD (Scala)
Once the RDD is available entries in the backing cache may be obtained by using either the Spark RDD operations or Spark’s SQL support. The above example is expanded to count the entries, per author, in the resulting RDD with an SQL query:
Querying with a RDD (Java)
Querying with a RDD (Scala)
Writing to JBoss Data Grid
Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write()
method. This will copy the contents of the RDD to the cache:
Writing with a RDD (Java)
Writing with a RDD (Scala)
31.5. Creating and Using DStreams Copia collegamentoCollegamento copiato negli appunti!
DStreams represent a continuous stream of data, and are internally represented by a continuous series of RDDs, with each RDD containing data from a specific time interval.
To create a DStream a StreamingContext
will be passed in along with StorageLevel
and the JBoss Data Grid RDD configuration, as seen in the below example:
Creating a DStream (Scala)
The InfinispanInputDStream
can be transformed using the many Spark’s DStream operations, and the processing will occur after calling "start" in the StreamingContext
. For example, to display every 10 seconds the number of books inserted in the cache in the last 30 seconds:
Processing a DStream (Scala)
Writing to JBoss Data Grid with DStreams
Any DStream of Key/Value type can be written to JBoss Data Grid through the InfinispanJavaDStream.writeToInfinispan()
Java method or in Scala using the implicit writeToInfinispan(properties)
method directly on the DStream instance. Both methods take the JBoss Data Grid RDD configuration as input and will write each RDD contained within the DStream
31.6. Using the Infinispan Query DSL with Spark Copia collegamentoCollegamento copiato negli appunti!
The Infinispan Query DSL can be used as a filter for the InfinispanRDD, allowing data to be pre-filtered at the source rather than at RDD level.
Data in the cache must have been encoded with protobuf for the querying DSL to function correctly. Instructions on using protobuf encoding are found in Protobuf Encoding.
Consider the following example which retrieves a list of books that includes any author whose name contains Doe
:
Filtering by a Query (Scala)
Projections are also fully supported; for instance, the above example may be adjusted to only obtain the title and publication year, and sorting on the latter field:
Filtering with a Projection (Scala)
In addition, if a filter has already been deployed to the JBoss Data Grid server it may be referenced by name, as seen below:
Filtering with a Deployed Filter (Scala)
val rdd = InfinispanRDD[String,Book] = .... // "my-filter-factory" filter and converts Book to a String, and has two parameters val filteredRDD = rdd.filterByCustom[String]("my-filter-factory", "param1", "param2")
val rdd = InfinispanRDD[String,Book] = ....
// "my-filter-factory" filter and converts Book to a String, and has two parameters
val filteredRDD = rdd.filterByCustom[String]("my-filter-factory", "param1", "param2")
31.7. Spark Performance Considerations Copia collegamentoCollegamento copiato negli appunti!
The Data Grid Spark connector creates by default two partitions per each Data Grid node, each partition specifies a subset of the data in that particular node.
Those partitions are then sent to the Spark workers that will process them in parallel. If the number of Spark workers is less than the number of Data Grid nodes, some delay can occur since each worker has a maximum capacity of executing tasks in parallel. For this reason it is recommended to have at least the same number of Spark workers as Data Grid nodes to take advantage of the parallelism.
In addition, if a Spark worker is co-located in the same node as the Data Grid node, the connector will distribute tasks so that each worker only processes data found in the local Data Grid node.