311.3.3. 쿼리 매개변수(6 매개변수):
| 이름 | 설명 | 기본값 | 유형 |
|---|---|---|---|
| 수집 (producer) | 결과를 수집하거나 계산해야 하는지 여부를 나타냅니다. | true | boolean |
| DataFrame (producer) | 계산할 데이터 프레임입니다. | DataFrame | |
| dataFrameCallback (producer) | 데이터 프레임에 대한 작업 수행. | DataFrameCallback | |
| RDD (producer) | 계산하기 위한 RDD입니다. | JavaRDDLike | |
| rddCallback (producer) | RDD에 대해 작업을 수행합니다. | RddCallback | |
| synchronous (advanced) | 동기 처리를 엄격하게 사용해야 하는지 또는 Camel이 비동기 처리를 사용할 수 있는지 여부를 설정합니다(지원되는 경우). | false | boolean |
# RDD 작업
RDD 작업을 호출하려면 다음 URI를 사용합니다.
Spark RDD 생산자
spark:rdd?rdd=#testFileRdd&rddCallback=#transformation
여기서 rdd 옵션은 Camel 레지스트리에서 RDD 인스턴스의 이름을 나타내는 반면, rddCallback 은 org.apache. camel.component.spark.RddCallCallCallback 인터페이스 (또는 레지스트리에서도 마찬가지)의 구현을 나타냅니다. RDD 콜백은 지정된 RDD에 수신되는 메시지를 적용하는 데 사용되는 단일 메서드를 제공합니다. 콜백 계산의 결과는 교환에 본문으로 저장됩니다.
Spark RDD 콜백
public interface RddCallback<T> {
T onRdd(JavaRDDLike rdd, Object... payloads);
}
다음 코드 조각은 메시지를 작업에 입력으로 보내고 결과를 반환하는 방법을 보여줍니다.
spark 작업 호출
String pattern = "job input";
long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class);
Spring 빈으로 등록된 위의 코드 조각에 대한 RDD 콜백은 다음과 같습니다.
Spark RDD 콜백
@Bean
RddCallback<Long> countLinesContaining() {
return new RddCallback<Long>() {
Long onRdd(JavaRDDLike rdd, Object... payloads) {
String pattern = (String) payloads[0];
return rdd.filter({line -> line.contains(pattern)}).count();
}
}
}
Spring의 RDD 정의는 다음과 같습니다.
Spark RDD 정의
@Bean
JavaRDDLike myRdd(JavaSparkContext sparkContext) {
return sparkContext.textFile("testrdd.txt");
}