이 콘텐츠는 선택한 언어로 제공되지 않습니다.
Chapter 34. Integration with Apache Spark
34.1. The JBoss Data Grid Apache Spark Connector 링크 복사링크가 클립보드에 복사되었습니다!
JBoss Data Grid includes a Spark connector, providing tight integration with Apache Spark, and allowing applications written either in Java or Scala to utilize JBoss Data Grid as a backing data store.
There actually are two connectors, one that supports Apache Spark 1.6.x, and one that supports Apache Spark 2.x, which in turn support Scala 2.10.x, and 2.11.x, respectively. Both of these connectors are shipped separately from the main distribution.
The Apache Spark 1.6 connector includes support for the following:
- Create an RDD from any cache
- Write a key/value RDD to a cache
- Create a DStream from cache-level events
- Write a key/value DStream to a cache
In addition to the above features, the Apache Spark 2 connector supports these features:
- Use JDG server side filters to create a cache based RDD
- Spark serializer based on JBoss Marshalling
- Dataset API with push down predicates support
Support for Apache Spark is only available in Remote Client-Server Mode.
34.2. Spark Dependencies 링크 복사링크가 클립보드에 복사되었습니다!
The following Maven configuration should be used depending on the desired version of Apache Spark:
pom.xml for Spark 1.6.x
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-spark</artifactId> <version>0.3.0.Final-redhat-2</version> </dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-spark</artifactId>
<version>0.3.0.Final-redhat-2</version>
</dependency>
pom.xml for Spark 2.x
<dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-spark</artifactId> <version>0.6.0.Final-redhat-9</version> </dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-spark</artifactId>
<version>0.6.0.Final-redhat-9</version>
</dependency>
34.3. Configuring the Spark Connector 링크 복사링크가 클립보드에 복사되었습니다!
The Apache Spark version 1.6 and version 2 connectors do not use the same interfaces for configuration. The version 1.6 connector uses properties, and the version 2 connector uses methods.
34.3.1. Properties to Configure the Version 1.6 Connector 링크 복사링크가 클립보드에 복사되었습니다!
Property Name | Description | Default Value |
---|---|---|
| List of JBoss Data Grid nodes | localhost:11222 |
| The name of the cache that will back the RDD | default cache |
| Batch size (number of entries) when reading from the cache | 10000 |
| Batch size (number of entries) when writing to the cache | 500 |
| Numbers of partitions created per JBoss Data Grid server | 2 |
| Map with protobuf file names and contents | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
| List of protostream marshallers classes for the objects in the cache | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
34.3.2. Methods to Configure the Version 2 Connector 링크 복사링크가 클립보드에 복사되었습니다!
The following methods can be used to configure the version 2 connector. They are provided by the org.infinispan.spark.config.ConnectorConfiguration
class.
Method Name | Description | Default Value |
---|---|---|
| List of JBoss Data Grid nodes | localhost:11222 |
| The name of the cache that will back the RDD | default cache |
| Batch size (number of entries) when reading from the cache | 10000 |
| Batch size (number of entries) when writing to the cache | 500 |
| Numbers of partitions created per JBoss Data Grid server | 2 |
| Map with protobuf file names and contents | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
| List of protostream marshallers classes for the objects in the cache | Can be omitted if entities are annotated with protobuf encoding information. Protobuf encoding is required to filter the RDD by Query. |
| Registers a Class containing protobuf annotations |
Alternative to using |
| Causes automatic registration of protobuf schemas in the server | None |
| Configures additional Hot Rod client properties when contacting the JBoss Data Grid Server | None |
| Used in conjunction with the Dataset API to specify the Query target | If omitted, and in case there is only one class annotated with protobuf configured, it will choose that class. |
34.3.3. Connecting to a Secured JDG Cluster 링크 복사링크가 클립보드에 복사되었습니다!
If the JDG cluster is secured, Hot Rod must be configured with security for the Spark connector to work. There are several Hot Rod properties that can be set to configure Hot Rod security. They are described in the Hot Rod Properties table in the Appendix of the Administration and Configuration Guide, starting with infinispan.client.hotrod.use_ssl
These properties are exclusive to the version 2 Apache Spark connector.
34.4. Code Examples for Spark 1.6 링크 복사링크가 클립보드에 복사되었습니다!
34.4.1. Code Examples for Spark 1.6 링크 복사링크가 클립보드에 복사되었습니다!
Since the connector for Apache Spark 1.6 uses a different configuration mechanism than the version 2 connector, and also because the version 2 connector supports some features 1.6 doesn’t, the code examples for each version are separated into their own sections. The following code examples work with version 1.6 of the Spark connector. Follow this link for code examples for Spark 2.
34.4.2. Creating and Using RDDs 링크 복사링크가 클립보드에 복사되었습니다!
With the Apache Spark 1.6 connector, RDDs are created by specifying a Properties
instance with configurations described in Properties to Configure the Version 1.6 Connector, and then using it together with the Spark context to create a InfinispanRDD
that is used with the normal Spark operations.
An example of this is below in both Java and Scala:
34.4.3. Creating an RDD 링크 복사링크가 클립보드에 복사되었습니다!
Creating an RDD (Java)
Creating an RDD (Scala)
34.4.4. Querying an RDD 링크 복사링크가 클립보드에 복사되었습니다!
Once the RDD is available entries in the backing cache may be obtained by using either the Spark RDD operations or Spark’s SQL support. The above example is expanded to count the entries, per author, in the resulting RDD with an SQL query:
Querying an RDD (Java)
Querying an RDD (Scala)
34.4.5. Writing an RDD to the Cache 링크 복사링크가 클립보드에 복사되었습니다!
Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write()
method. This will copy the contents of the RDD to the cache:
Writing an RDD (Java)
Writing an RDD (Scala)
34.4.5.1. Creating and Using DStreams 링크 복사링크가 클립보드에 복사되었습니다!
DStreams represent a continuous stream of data, and are internally represented by a continuous series of RDDs, with each RDD containing data from a specific time interval.
To create a DStream a StreamingContext
will be passed in along with StorageLevel
and the JBoss Data Grid RDD configuration, as seen in the below example:
Creating a DStream (Scala)
The InfinispanInputDStream
can be transformed using the many Spark’s DStream operations, and the processing will occur after calling "start" in the StreamingContext
. For example, to display every 10 seconds the number of books inserted in the cache in the last 30 seconds:
Processing a DStream (Scala)
Writing to JBoss Data Grid with DStreams
Any DStream of Key/Value type can be written to JBoss Data Grid through the InfinispanJavaDStream.writeToInfinispan()
Java method or in Scala using the implicit writeToInfinispan(properties)
method directly on the DStream instance. Both methods take the JBoss Data Grid RDD configuration as input and will write each RDD contained within the DStream
34.4.6. Using the Infinispan Query DSL with Spark 링크 복사링크가 클립보드에 복사되었습니다!
The Infinispan Query DSL can be used as a filter for the InfinispanRDD, allowing data to be pre-filtered at the source rather than at RDD level.
Data in the cache must have been encoded with protobuf for the querying DSL to function correctly. Instructions on using protobuf encoding are found in Protobuf Encoding.
Consider the following example which retrieves a list of books that includes any author whose name contains Doe
:
34.4.7. Filtering by a Query 링크 복사링크가 클립보드에 복사되었습니다!
Filtering by a Query (Scala)
Projections are also fully supported; for instance, the above example may be adjusted to only obtain the title and publication year, and sorting on the latter field:
34.4.8. Filtering with a Projection 링크 복사링크가 클립보드에 복사되었습니다!
Filtering with a Projection (Scala)
In addition, if a filter has already been deployed to the JBoss Data Grid server it may be referenced by name, as seen below:
34.4.9. Filtering with a Deployed Filter 링크 복사링크가 클립보드에 복사되었습니다!
Filtering with a Deployed Filter (Scala)
val rdd = InfinispanRDD[String,Book] = .... // "my-filter-factory" filter and converts Book to a String, and has two parameters val filteredRDD = rdd.filterByCustom[String]("my-filter-factory", "param1", "param2")
val rdd = InfinispanRDD[String,Book] = ....
// "my-filter-factory" filter and converts Book to a String, and has two parameters
val filteredRDD = rdd.filterByCustom[String]("my-filter-factory", "param1", "param2")
34.5. Code Examples for Spark 2 링크 복사링크가 클립보드에 복사되었습니다!
34.5.1. Code Examples for Spark 2 링크 복사링크가 클립보드에 복사되었습니다!
Since the connector for Apache Spark 2 uses a different configuration mechanism than the 1.6 connector, and also because the version 2 connector supports some features 1.6 doesn’t, the code examples for each version are separated into their own sections. The following code examples work with version 2 of the Spark connector. Follow this link for code examples for Spark 1.6.
34.5.2. Creating and Using RDDs 링크 복사링크가 클립보드에 복사되었습니다!
In Apache Spark 2, Resilient Distributed Datasets (RDDs) are created by specifying a ConnectorConfiguration
instance with configurations described in the table from Methods to Configure the Version 2 Connector.
Examples of this in both Java and Scala are below:
34.5.3. Creating an RDD 링크 복사링크가 클립보드에 복사되었습니다!
Creating an RDD (Java)
Creating an RDD (Scala)
34.5.4. Querying an RDD 링크 복사링크가 클립보드에 복사되었습니다!
34.5.4.1. SparkSQL Queries 링크 복사링크가 클립보드에 복사되었습니다!
Using SparkSQL Queries (Java)
Using SparkSQL Queries (Scala)
34.5.5. Writing an RDD to the Cache 링크 복사링크가 클립보드에 복사되었습니다!
Any key/value based RDD can be written to the Data Grid cache by using the static InfinispanJavaRDD.write()
method. This will copy the contents of the RDD to the cache:
Writing an RDD (Java)
Writing an RDD (Scala)
34.5.6. Creating DStreams 링크 복사링크가 클립보드에 복사되었습니다!
DStreams represent a continuous stream of data, and are internally represented by a continuous series of RDDs, with each RDD containing data from a specific time interval.
To create a DStream a StreamingContext
will be passed in along with StorageLevel
and the JBoss Data Grid RDD configuration, as seen in the below example:
Creating a DStream (Java)
Creating a DStream (Scala)
Writing to JBoss Data Grid with DStreams
Any DStream of Key/Value type can be written to JBoss Data Grid through the InfinispanJavaDStream.writeToInfinispan()
Java method or in Scala using the implicit writeToInfinispan(properties)
method directly on the DStream instance. Both methods take the JBoss Data Grid RDD configuration as input and will write each RDD contained within the DStream.
34.5.7. Using The Apache Spark Dataset API 링크 복사링크가 클립보드에 복사되었습니다!
In addition to the Resilient Distributed Dataset (RDD) programming interface, JBoss Data Grid includes the Apache Spark Dataset API, with support for pushing down predicates, similar to rdd.filterByQuery
.
Dataset API Example (Java)
Dataset API Example (Scala)
34.5.8. Using the Infinispan Query DSL with Spark 링크 복사링크가 클립보드에 복사되었습니다!
The Infinispan Query DSL can be used as a filter for the InfinispanRDD, allowing data to be pre-filtered at the source rather than at RDD level.
Data in the cache must have been encoded with protobuf for the querying DSL to function correctly. Instructions on using protobuf encoding are found in Protobuf Encoding.
34.5.9. Filtering with a pre-built Query Object 링크 복사링크가 클립보드에 복사되었습니다!
Filtering with a pre-built Query Object (Java)
Filtering with a pre-built Query Object (Scala)
34.5.10. Filtering with an Ickle Query 링크 복사링크가 클립보드에 복사되었습니다!
Filtering with an Ickle Query (Java)
Filtering with an Ickle Query (Scala)
import org.infinispan.spark.rdd.InfinispanRDD val rdd: InfinispanRDD[String, MyEntity] = // Initialize your InfinispanRDD here val filteredRDD = rdd.filterByQuery("FROM MyEntity e where e.field BETWEEN 10 AND 20")
import org.infinispan.spark.rdd.InfinispanRDD
val rdd: InfinispanRDD[String, MyEntity] = // Initialize your InfinispanRDD here
val filteredRDD = rdd.filterByQuery("FROM MyEntity e where e.field BETWEEN 10 AND 20")
34.5.11. Filtering on the Server 링크 복사링크가 클립보드에 복사되었습니다!
Filtering on the Server (Java)
Filtering on the Server (Scala)
import org.infinispan.spark.rdd.InfinispanRDD val rdd: InfinispanRDD[String, MyEntity] = // Initalize your InfinispanRDD here // "my-filter-factory" filter and converts MyEntity to a Double, and has two parameters val filteredRDD = rdd.filterByCustom[Double]("my-filter-factory", "param1", "param2")
import org.infinispan.spark.rdd.InfinispanRDD
val rdd: InfinispanRDD[String, MyEntity] = // Initalize your InfinispanRDD here
// "my-filter-factory" filter and converts MyEntity to a Double, and has two parameters
val filteredRDD = rdd.filterByCustom[Double]("my-filter-factory", "param1", "param2")
34.6. Spark Performance Considerations 링크 복사링크가 클립보드에 복사되었습니다!
The Data Grid Spark connector creates by default two partitions per each Data Grid node, each partition specifies a subset of the data in that particular node.
Those partitions are then sent to the Spark workers that will process them in parallel. If the number of Spark workers is less than the number of Data Grid nodes, some delay can occur since each worker has a maximum capacity of executing tasks in parallel. For this reason it is recommended to have at least the same number of Spark workers as Data Grid nodes to take advantage of the parallelism.
In addition, if a Spark worker is co-located in the same node as the Data Grid node, the connector will distribute tasks so that each worker only processes data found in the local Data Grid node.