このコンテンツは選択した言語では利用できません。
26.4. Creating and Using DStreams
DStreams represent a continuous stream of data, and are internally represented by a continuous series of RDDs, with each RDD containing data from a specific time interval.
To create a DStream a
StreamingContext
will be passed in along with StorageLevel
and the JBoss Data Grid RDD configuration, as seen in the below example:
Example 26.7. Creating a DStream (Scala)
import org.infinispan.spark.stream._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import java.util.Properties // Spark context val sc = ... // java.util.Properties with Infinispan RDD configuration val props = ... val ssc = new StreamingContext(sc, Seconds(1)) val stream = new InfinispanInputDStream[String, Book](ssc, StorageLevel.MEMORY_ONLY, props)
The
InfinispanInputDStream
can be transformed using the many Spark's DStream operations, and the processing will occur after calling "start" in the StreamingContext
. For example, to display every 10 seconds the number of books inserted in the cache in the last 30 seconds:
Example 26.8. Processing a DStream (Scala)
import org.infinispan.spark.stream._ val stream = ... // From previous sample // Filter only created entries val createdBooksRDD = stream.filter { case (_, _, t) => t == Type.CLIENT_CACHE_ENTRY_CREATED } // Reduce last 30 seconds of data, every 10 seconds val windowedRDD: DStream[Long] = createdBooksRDD.count().reduceByWindow(_ + _, Seconds(30), Seconds(10)) // Prints the results, couting the number of occurences in each individual RDD windowedRDD.foreachRDD { rdd => println(rdd.reduce(_ + _)) } // Start the processing ssc.start() ssc.awaitTermination()
Writing to JBoss Data Grid with DStreams
Any DStream of Key/Value type can be written to JBoss Data Grid through the InfinispanJavaDStream.writeToInfinispan()
Java method or in Scala using the implicit writeToInfinispan(properties)
method directly on the DStream instance. Both methods take the JBoss Data Grid RDD configuration as input and will write each RDD contained within the DStream