314.5. RDD 작업
RDD 작업을 호출하려면 다음 URI를 사용합니다.
스파크 RDD 프로듀서
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
Where `rdd` option refers to the name of an RDD instance (subclass of `org.apache.spark.api.java.JavaRDDLike`) from a Camel registry, while `rddCallback` refers to the implementation of `org.apache.camel.component.spark.RddCallback` interface (also from a registry). RDD callback provides a single method used to apply incoming messages against the given RDD. Results of callback computations are saved as a body to an exchange.
스파크 RDD 콜백
public interface RddCallback<T> { T onRdd(JavaRDDLike rdd, Object... payloads); }
다음 스니펫에서는 메시지를 작업에 입력으로 보내고 결과를 반환하는 방법을 보여줍니다.
스파크 작업 호출
String pattern = "job input"; long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
Springans로 등록된 코드 조각의 RDD 콜백은 다음과 같습니다.
스파크 RDD 콜백
@Bean RddCallback<Long> countLinesContaining() { return new RddCallback<Long>() { Long onRdd(JavaRDDLike rdd, Object... payloads) { String pattern = (String) payloads[0]; return rdd.filter({line -> line.contains(pattern)}).count(); } } }
Spring의 RDD 정의는 다음과 같습니다.
스파크 RDD 정의
@Bean JavaRDDLike myRdd(JavaSparkContext sparkContext) { return sparkContext.textFile("testrdd.txt"); }
314.5.1. void RDD 콜백
RDD 콜백에서 Camel 파이프라인으로 값을 반환하지 않으면 null
값을 반환하거나 VoidRddCallback
기본 클래스를 사용할 수 있습니다.
스파크 RDD 정의
@Bean RddCallback<Void> rddCallback() { return new VoidRddCallback() { @Override public void doOnRdd(JavaRDDLike rdd, Object... payloads) { rdd.saveAsTextFile(output.getAbsolutePath()); } }; }
314.5.2. RDD 콜백 변환
RDD 콜백으로 전송할 입력 데이터의 유형을 알고 있는 경우 ConvertingRddCallback
을 사용하고 Camel에 콜백을 삽입하기 전에 수신 메시지를 자동으로 변환할 수 있습니다.
스파크 RDD 정의
@Bean RddCallback<Long> rddCallback(CamelContext context) { return new ConvertingRddCallback<Long>(context, int.class, int.class) { @Override public Long doOnRdd(JavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }; }; }
314.5.3. 주석이 달린 RDD 콜백
RDD 콜백으로 작업하는 가장 쉬운 방법은 @RddCallback
주석으로 표시된 메서드를 클래스에 제공하는 것입니다.
주석이 달린 RDD 콜백 정의
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; @Bean RddCallback<Long> rddCallback() { return annotatedRddCallback(new MyTransformation()); } ... import org.apache.camel.component.spark.annotation.RddCallback; public class MyTransformation { @RddCallback long countLines(JavaRDD<String> textFile, int first, int second) { return textFile.count() * first * second; } }
주석이 달린 RDD 콜백 팩토리 메서드에 CamelContext를 전달하면 주석이 달린 메서드의 매개변수와 일치하도록 생성된 콜백은 들어오는 페이로드를 변환할 수 있습니다.
주석이 달린 RDD 콜백을 위한 본문 변환
import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; @Bean RddCallback<Long> rddCallback(CamelContext camelContext) { return annotatedRddCallback(new MyTransformation(), camelContext); } ... import org.apache.camel.component.spark.annotation.RddCallback; public class MyTransformation { @RddCallback long countLines(JavaRDD<String> textFile, int first, int second) { return textFile.count() * first * second; } } ... // Convert String "10" to integer long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);