8.5. 수집기


8.5.1. 개요

그림 8.5. “수집기 패턴” 에 표시된 수집기 패턴을 사용하면 관련 메시지 배치를 단일 메시지에 결합할 수 있습니다.

그림 8.5. 수집기 패턴

수집기 패턴

수집기 동작을 제어하기 위해 Apache Camel을 사용하면 다음과 같이 엔터프라이즈 통합 패턴에 설명된 속성을 지정할 수 있습니다.

  • 상관 관계 표현식 밀리초-단순으로 집계해야 하는 메시지를 결정합니다. 상관관계 표현식은 들어오는 각 메시지에 대해 평가되어 상관관계 키를 생성합니다. 그런 다음 동일한 상관 관계 키를 가진 들어오는 메시지는 동일한 일괄 처리로 그룹화됩니다. 예를 들어 들어오는 모든 메시지를 단일 메시지로 집계하려면 상수 표현식을 사용할 수 있습니다.
  • completeness 조건 Cryo stat- Cryostat 메시지의 일괄 처리가 완료된 시기를 결정합니다. 이를 간단한 크기 제한으로 지정하거나 일반적으로 배치가 완료되면 플래그를 지정하는 서술자 조건을 지정할 수 있습니다.
  • 집계 알고리즘은 단일 상관 키의 메시지 교환을 단일 메시지 교환으로 결합합니다.

예를 들어 초당 Cryostat 메시지를 수신하는 주식 시장 데이터 시스템을 고려해 보십시오. GUI 툴이 이러한 대규모 업데이트 속도로 대처할 수 없는 경우 메시지 흐름을 제한해야 할 수 있습니다. 들어오는 주식 견적은 최신 견적을 선택하고 이전 가격을 폐기하여 간단히 집계 할 수 있습니다. (히스토리 중 일부를 캡처하려는 경우 delta 처리 알고리즘을 적용할 수 있습니다.)

참고

이제 Aggregator가 자세한 정보를 포함하는 ManagedAggregateProcessorMBean 을 사용하여 Cryostat에 등록되었습니다. 이를 통해 집계 컨트롤러를 사용하여 제어할 수 있습니다.

8.5.2. 수집기 작동 방식

그림 8.6. “수집기 구현” 집계기가 작동하는 방식에 대한 개요를 보여줍니다. A, B, C 또는 D와 같은 상관관계 키를 갖는 교환 스트림이 있다고 가정하면 됩니다.

그림 8.6. 수집기 구현

메시지 라우팅 02

그림 8.6. “수집기 구현” 에 표시된 교환의 수신 스트림은 다음과 같이 처리됩니다.

  1. Corre lator 는 상관 관계 키를 기반으로 교환 정렬을 담당합니다. 들어오는 각 교환에 대해 상관관계 표현식이 평가되어 상관 관계 키를 생성합니다. 예를 들어 그림 8.6. “수집기 구현” 에 표시된 교환의 경우 상관 관계 키는 A로 평가됩니다.
  2. 집계 전략은 동일한 상관 관계 키를 사용하여 교환을 병합합니다. 새로운 교환인 A가 들어오면 집계 리포지토리에서 해당 집계 교환, A'를 조회하여 새 교환과 결합합니다.

    특정 집계 사이클이 완료될 때까지 들어오는 교환은 해당 집계 교환과 지속적으로 집계됩니다. 집계 주기는 완료 메커니즘 중 하나에 의해 종료될 때까지 지속됩니다.

    참고

    Camel 2.16에서 새로운 XSLT 집계 전략을 사용하면 두 개의 메시지를 XSLT 파일과 병합할 수 있습니다. toolbox에서 AggregationStrategies.xslt() 파일에 액세스할 수 있습니다.

  3. 완료 서술자가 집계기에 지정된 경우 집계 교환은 경로의 다음 프로세서로 전송할 준비가 되었는지 여부를 확인하기 위해 테스트됩니다. 처리는 다음과 같이 계속됩니다.

    • 완료되면 집계 교환은 경로의 마지막 부분에 의해 처리됩니다. 이에 대한 두 가지 대체 모델이 있습니다. 동기 (기본값)는 호출 스레드를 차단하거나 비동기 ( 병렬 처리가 활성화된 경우)를 실행자 스레드 풀( 그림 8.6. “수집기 구현”에 표시)에 제출합니다.
    • 완료되지 않으면 집계 교환이 다시 집계 리포지토리에 저장됩니다.
  4. 동기 완료 테스트와 동시에 completionTimeout 옵션 또는 completionInterval 옵션을 활성화 하여 비동기 완료 테스트를 활성화할 수 있습니다. 이러한 완료 테스트는 별도의 스레드에서 실행되며 완료 테스트가 충족될 때마다 해당 교환은 완료로 표시되고 경로의 후자 부분에 의해 처리됩니다(행 처리 활성화 여부에 따라 동기적으로 또는 비동기적으로).
  5. 병렬 처리가 활성화된 경우 스레드 풀은 경로의 후자 부분에서 교환 처리를 담당합니다. 기본적으로 이 스레드 풀에는 10개의 스레드가 포함되어 있지만 풀(“스레드 옵션”)을 사용자 정의할 수 있습니다.

8.5.3. Java DSL 예

다음 예제에서는 UseLatestAggregationStrategy 집계 전략을 사용하여 동일한 Cryostat Symbol 헤더 값으로 교환을 집계합니다. 지정된 symbol 값에 대해 해당 상관 관계 키와 마지막 교환이 수신된 이후 3초 이상 경과하면 집계된 교환은 완료된 것으로 간주되고 mock 엔드포인트로 전송됩니다.

from("direct:start")
    .aggregate(header("id"), new UseLatestAggregationStrategy())
        .completionTimeout(3000)
    .to("mock:aggregated");

8.5.4. XML DSL 예

다음 예제에서는 XML로 동일한 경로를 구성하는 방법을 보여줍니다.

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <aggregate strategyRef="aggregatorStrategy"
                   completionTimeout="3000">
            <correlationExpression>
                <simple>header.StockSymbol</simple>
            </correlationExpression>
            <to uri="mock:aggregated"/>
        </aggregate>
    </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>

8.5.5. 상관 관계 표현식 지정

Java DSL에서 상관 관계 표현식은 항상 aggregate() DSL 명령에 첫 번째 인수로 전달됩니다. 여기에서 Simple Expression Language를 사용하는 것은 아닙니다. expressions 언어 또는 스크립팅 언어(예: Cryostat, XQuery, SQL 등)를 사용하여 상관 관계 표현식을 지정할 수 있습니다.

Exampe의 경우 Cryostat 표현식을 사용하여 서로 상호 작용하기 위해 다음 Java DSL 경로를 사용할 수 있습니다.

from("direct:start")
    .aggregate(xpath("/stockQuote/@symbol"), new UseLatestAggregationStrategy())
        .completionTimeout(3000)
    .to("mock:aggregated");

특정 들어오는 교환에서 상관 관계 표현식을 평가할 수 없는 경우 집계기가 기본적으로 CamelExchangeException 을 throw합니다. ignoreInvalidCorrelationKeys 옵션을 설정하여 이 예외를 비활성화할 수 있습니다. 예를 들어 Java DSL에서는 다음과 같습니다.

from(...).aggregate(...).ignoreInvalidCorrelationKeys()

XML DSL에서 ignoreInvalidCorrelationKeys 옵션은 다음과 같이 속성으로 설정할 수 있습니다.

<aggregate strategyRef="aggregatorStrategy"
           ignoreInvalidCorrelationKeys="true"
           ...>
    ...
</aggregate>

8.5.6. 집계 전략 지정

Java DSL에서는 집계 전략을 두 번째 인수로 aggregate() DSL 명령에 전달하거나 aggregationStrategy() 절을 사용하여 지정할 수 있습니다. 예를 들어 aggregationStrategy() 절을 다음과 같이 사용할 수 있습니다.

from("direct:start")
    .aggregate(header("id"))
        .aggregationStrategy(new UseLatestAggregationStrategy())
        .completionTimeout(3000)
    .to("mock:aggregated");

Apache Camel은 다음과 같은 기본 집계 전략을 제공합니다(클래스가 org.apache.camel.processor.aggregate Java 패키지에 속하는 경우).

UseLatestAggregationStrategy
지정된 상관 관계 키에 대한 마지막 교환을 반환하여 이 키와 이전 교환을 모두 삭제합니다. 예를 들어, 이 전략은 특정 주식 기호의 최신 가격을 알고 싶은 주식 교환에서 피드를 제한하는 데 유용할 수 있습니다.
UseOriginalAggregationStrategy
지정된 상관 관계 키에 대한 첫 번째 교환을 반환하여 이 키와 이후의 모든 교환을 삭제합니다. 이 전략을 사용하려면 UseOriginalAggregationStrategy.setOriginal() 를 호출하여 첫 번째 교환을 설정해야 합니다.
GroupedExchangeAggregationStrategy
지정된 상관 관계 키에 대한 모든 교환을 목록에 연결합니다. 이는 Exchange.GROUPED_EXCHANGE 교환 속성에 저장됩니다. “그룹화된 교환”을 참조하십시오.

8.5.7. 사용자 정의 집계 전략 구현

다른 집계 전략을 적용하려면 다음 집계 전략 기본 인터페이스 중 하나를 구현할 수 있습니다.

org.apache.camel.processor.aggregate.AggregationStrategy
기본 집계 전략 인터페이스입니다.
org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy

집계 주기 시간이 초과될 때 구현에서 알림을 수신하려면 이 인터페이스를 구현합니다. 시간 초과 알림 방법에는 다음과 같은 서명이 있습니다.

void timeout(Exchange oldExchange, int index, int total, long timeout)
org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy

집계 주기가 정상적으로 완료되면 구현에서 알림을 수신하도록 하려면 이 인터페이스를 구현합니다. 알림 방법에는 다음과 같은 서명이 있습니다.

void onCompletion(Exchange exchange)

예를 들어 다음 코드는 StringAggregationStrategy 및 Cryostat AggregationStrategy: 두 가지 다른 사용자 정의 집계 전략입니다.

 //simply combines Exchange String body values using '' as a delimiter
 class StringAggregationStrategy implements AggregationStrategy {

     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         if (oldExchange == null) {
             return newExchange;
         }

         String oldBody = oldExchange.getIn().getBody(String.class);
         String newBody = newExchange.getIn().getBody(String.class);
         oldExchange.getIn().setBody(oldBody + "" + newBody);
         return oldExchange;
     }
 }

 //simply combines Exchange body values into an ArrayList<Object>
 class ArrayListAggregationStrategy implements AggregationStrategy {

     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
 	    Object newBody = newExchange.getIn().getBody();
     	ArrayList<Object> list = null;
         if (oldExchange == null) {
 		    list = new ArrayList<Object>();
 		    list.add(newBody);
 		    newExchange.getIn().setBody(list);
 		    return newExchange;
         } else {
 	        list = oldExchange.getIn().getBody(ArrayList.class);
 	    	list.add(newBody);
 		    return oldExchange;
 	    }
     }
 }
참고

Apache Camel 2.0부터 AggregationStrategy.aggregate() 콜백 방법도 첫 번째 교환에 대해 호출됩니다. 집계 메서드를 처음 호출하면 oldExchange 매개 변수는 null 이고 newExchange 매개 변수에는 처음 들어오는 교환이 포함되어 있습니다.

사용자 정의 전략 클래스를 사용하여 메시지를 집계하려면 Cryostat AggregationStrategy 에서는 다음과 같은 경로를 정의합니다.

from("direct:start")
    .aggregate(header("StockSymbol"), new ArrayListAggregationStrategy())
    .completionTimeout(3000)
    .to("mock:result");

다음과 같이 XML에서 사용자 정의 집계 전략을 사용하여 경로를 구성할 수도 있습니다.

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <aggregate strategyRef="aggregatorStrategy"
               completionTimeout="3000">
      <correlationExpression>
        <simple>header.StockSymbol</simple>
      </correlationExpression>
      <to uri="mock:aggregated"/>
    </aggregate>
  </route>
</camelContext>

<bean id="aggregatorStrategy" class="com.my_package_name.ArrayListAggregationStrategy"/>

8.5.8. 사용자 정의 집계 전략의 라이프사이클 제어

라이프사이클이 이를 제어하는 엔터프라이즈 통합 패턴의 라이프사이클과 일치하도록 사용자 정의 집계 전략을 구현할 수 있습니다. 이는 집계 전략이 정상적으로 종료될 수 있는지 확인하는 데 유용할 수 있습니다.

라이프사이클 지원을 통해 집계 전략을 구현하려면 org.apache.camel.Service 인터페이스( AggregationStrategy 인터페이스 외에도)를 구현하고 start()stop() 라이프사이클 메서드 구현을 제공해야 합니다. 예를 들어 다음 코드 예제에서는 라이프사이클 지원이 포함된 집계 전략에 대한 개요를 보여줍니다.

// Java
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.Service;
import java.lang.Exception;
...
class MyAggStrategyWithLifecycleControl
       implements AggregationStrategy, Service {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // Implementation not shown...
        ...
    }

    public void start() throws Exception {
        // Actions to perform when the enclosing EIP starts up
        ...
    }

    public void stop() throws Exception {
        // Actions to perform when the enclosing EIP is stopping
        ...
    }
}

8.5.9. 속성 교환

다음 속성은 집계된 각 교환에 대해 설정됩니다.

헤더유형집계된 교환 속성 설명

Exchange.AGGREGATED_SIZE

int

이 교환에 포함된 총 거래 거래 수입니다.

Exchange.AGGREGATED_COMPLETED_BY

문자열

집계 교환을 완료하는 메커니즘을 나타냅니다. 가능한 값은 서술자,크기,시간 초과,간격 또는 소비자 입니다.

다음 속성은 SQL 구성 요소 집계 리포지터리에 의해 재전송되는 교환에 설정되어 있습니다( “영구 집계 리포지토리”참조).

헤더유형Redelivered Exchange Properties 설명

Exchange.REDELIVERY_COUNTER

int

현재 재전송 시도의 시퀀스 번호( 1에서 시작)입니다.

8.5.10. 완료 조건 지정

집계 교환이 수집기를 떠나 경로의 다음 노드로 진행하는 시기를 결정하는 완료 조건을 하나 이상 지정해야 합니다. 다음 완료 조건을 지정할 수 있습니다.

completionPredicate
각 교환이 집계된 후 서술자를 평가하여 완전성을 결정합니다. true 값은 집계 교환이 완료되었음을 나타냅니다. 또는 이 옵션을 설정하는 대신 Predicate 인터페이스를 구현하는 사용자 정의 AggregationStrategy 를 정의할 수 있습니다. 이 경우 AggregationStrategy 가 completion 서술자로 사용됩니다.
completionSize
지정된 수의 교환이 집계된 후 집계 교환을 완료합니다.
completionTimeout

( completionInterval과 호환되지 않음) 지정된 타임아웃 내에 들어오는 교환이 집계되지 않는 경우 집계 교환을 완료합니다.

즉, 타임아웃 메커니즘은 상관 관계 키 값에 대한 타임아웃을 추적합니다. 클록은 특정 키 값을 사용한 최신 교환이 수신된 후 진드기가 시작됩니다. 동일한 키 값을 가진 다른 교환이 지정된 시간 내에 수신 되지 않으면 해당 집계 교환이 완료로 표시되고 경로의 다음 노드로 전송됩니다.

completionInterval

( completionTimeout과 호환되지 않음) 각 시간 간격 (특정 길이)이 경과한 후 모든 미결한 집계 교환을 완료합니다.

시간 간격은 각 집계 교환에 맞게 조정 되지 않습니다. 이 메커니즘은 모든 미결 집계 교환을 동시에 완료하도록 합니다. 따라서 경우에 따라 이 메커니즘은 집계를 시작한 직후 집계 교환을 완료할 수 있습니다.

completionFromBatchConsumer
배치 소비자 메커니즘을 지원하는 소비자 끝점과 함께 사용되는 경우, 이 완료 옵션은 소비자 끝점에서 수신하는 정보를 기반으로 현재 교환 처리가 완료된 시기를 자동으로 파악합니다. “배치 소비자”을 참조하십시오.
forceCompletionOnStop
이 옵션을 사용하면 현재 경로 컨텍스트가 중지될 때 모든 미결 집계 교환이 강제 완료됩니다.

이전 완료 조건은 동시에 활성화할 수 없는 completionTimeoutcompletionInterval 조건을 제외하고 임의로 결합할 수 있습니다. 조건이 함께 사용되는 경우 일반적인 규칙은 트리거하는 첫 번째 완료 조건이 효과적인 완료 조건이라는 것입니다.

8.5.11. 완료 서술자 지정

집계된 교환이 완료된 시기를 결정하는 임의의 서술자 표현식을 지정할 수 있습니다. 서술자 표현식을 평가하는 방법은 다음 두 가지가 있습니다.

  • 최신 집계 교환에서 기본 동작입니다.
  • 가장 최근에 들어오는 교환 Cryostat-침반 동작에서는 eagerCheckCompletion 옵션을 활성화할 때 이 동작이 선택됩니다.

예를 들어 ALERT 메시지를 받을 때마다 주식 따옴표의 스트림을 종료하려는 경우(최신 들어오는 교환에서 MsgType 헤더 값으로 표시됨) 다음과 같은 경로를 정의할 수 있습니다.

from("direct:start")
    .aggregate(
      header("id"),
      new UseLatestAggregationStrategy()
    )
        .completionPredicate(
          header("MsgType").isEqualTo("ALERT")
         )
        .eagerCheckCompletion()
    .to("mock:result");

다음 예제에서는 XML을 사용하여 동일한 경로를 구성하는 방법을 보여줍니다.

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <aggregate strategyRef="aggregatorStrategy"
               eagerCheckCompletion="true">
      <correlationExpression>
          <simple>header.StockSymbol</simple>
      </correlationExpression>
      <completionPredicate>
          <simple>$MsgType = 'ALERT'</simple>
      </completionPredicate>
      <to uri="mock:result"/>
    </aggregate>
  </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>

8.5.12. 동적 완료 시간 지정

들어오는 모든 교환에 대해 시간 초과 값이 다시 계산되는 동적 완료 시간 초과를 지정할 수 있습니다. 예를 들어 들어오는 각 교환에서 시간 초과 값을 설정하려면 다음과 같이 경로를 정의할 수 있습니다.

from("direct:start")
    .aggregate(header("StockSymbol"), new UseLatestAggregationStrategy())
        .completionTimeout(header("timeout"))
    .to("mock:aggregated");

다음과 같이 XML DSL에서 동일한 경로를 구성할 수 있습니다.

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <aggregate strategyRef="aggregatorStrategy">
            <correlationExpression>
                <simple>header.StockSymbol</simple>
            </correlationExpression>
            <completionTimeout>
                <header>timeout</header>
            </completionTimeout>
            <to uri="mock:aggregated"/>
        </aggregate>
    </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.UseLatestAggregationStrategy"/>
참고

동적 값이 null 또는 0 인 경우 Apache Camel이 이 값을 사용하도록 고정 시간 초과 값을 추가할 수도 있습니다.

8.5.13. 동적 완료 크기 지정

들어오는 모든 교환에 대해 완료 크기가 다시 계산되는 동적 완료 크기를 지정할 수 있습니다. 예를 들어 들어오는 각 교환에서 mySize 헤더에서 완료 크기를 설정하려면 다음과 같이 경로를 정의할 수 있습니다.

from("direct:start")
    .aggregate(header("StockSymbol"), new UseLatestAggregationStrategy())
        .completionSize(header("mySize"))
    .to("mock:aggregated");

Spring XML을 사용하는 것과 동일한 예:

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <aggregate strategyRef="aggregatorStrategy">
            <correlationExpression>
                <simple>header.StockSymbol</simple>
            </correlationExpression>
            <completionSize>
                <header>mySize</header>
            </completionSize>
            <to uri="mock:aggregated"/>
        </aggregate>
    </route>
</camelContext>

<bean id="aggregatorStrategy"
      class="org.apache.camel.processor.UseLatestAggregationStrategy"/>
참고

동적 값이 null 또는 0 인 경우 Apache Camel이 이 값을 사용하도록 고정 크기 값을 추가할 수도 있습니다.

8.5.14. AggregationStrategy 내에서 단일 그룹 강제 완료

사용자 정의 집계Strategy 클래스를 구현하는 경우 AggregationStrategy. aggregate() 메서드에서 반환된 교환 시 Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP 교환 속성을 true 로 설정하여 현재 메시지 그룹을 강제로 완료할 수 있는 메커니즘이 있습니다. 이 메커니즘은 현재 그룹에 영향을 미칩니다. 다른 메시지 그룹(다른 상관관계 ID 포함)은 완료 하지 않습니다. 이 메커니즘은 서술자, 크기, 시간 초과 등과 같은 기타 완료 메커니즘을 재정의합니다.

예를 들어 다음 샘플 AggregationStrategy 클래스는 메시지 본문 크기가 5보다 큰 경우 현재 그룹을 완료합니다.

// Java
public final class MyCompletionStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }
        String body = oldExchange.getIn().getBody(String.class) + "+"
            + newExchange.getIn().getBody(String.class);
        oldExchange.getIn().setBody(body);
        if (body.length() >= 5) {
            oldExchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
        }
        return oldExchange;
    }
}

8.5.15. 특수 메시지로 모든 그룹 강제 완료

특수 헤더가 있는 메시지를 경로로 전송하여 모든 미해결 집계 메시지를 강제로 완료할 수 있습니다. 강제 완료에 사용할 수 있는 두 가지 대체 헤더 설정이 있습니다.

Exchange.AGGREGATION_COMPLETE_ALL_GROUPS
현재 집계 주기를 강제 완료하려면 true 로 설정합니다. 이 메시지는 순전히 신호 역할을 하며 집계 주기에 포함되지 않습니다. 이 신호 메시지를 처리하면 메시지의 콘텐츠가 삭제됩니다.
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE
현재 집계 주기를 강제 완료하려면 true 로 설정합니다. 이 메시지는 현재 집계 주기에 포함됩니다.

8.5.16. AggregateController 사용

org.apache.camel.processor.aggregate.AggregateController 를 사용하면 Java 또는 Cryostat API를 사용하여 런타임 시 집계를 제어할 수 있습니다. 이는 교환 그룹을 강제 수행하거나 현재 런타임 통계를 쿼리하는 데 사용할 수 있습니다.

사용자 지정이 구성되지 않은 경우 집계기는 getAggregateController() 메서드를 사용하여 액세스할 수 있는 기본 구현을 제공합니다. 그러나 aggregateController를 사용하여 경로에서 컨트롤러를 쉽게 구성할 수 있습니다.

private AggregateController controller = new DefaultAggregateController();

from("direct:start")
   .aggregate(header("id"), new MyAggregationStrategy()).completionSize(10).id("myAggregator")
      .aggregateController(controller)
      .to("mock:aggregated");

또한 AggregateController에서 API를 사용하여 강제로 완료할 수 있습니다. 예를 들어, foo 키가 있는 그룹을 완료하려면 다음을 수행합니다.

int groups = controller.forceCompletionOfGroup("foo");

반환되는 수는 완료된 그룹 수입니다. 다음은 모든 그룹을 완료하는 API입니다.

 int groups = controller.forceCompletionOfAllGroups();

8.5.17. 고유한 상관 관계 키 강제 적용

일부 집계 시나리오에서는 상관 관계 키가 교환 배치마다 고유한 조건을 적용해야 할 수 있습니다. 즉, 특정 상관 키의 집계 교환이 완료되면 해당 상관 키와의 집계 교환이 더 이상 진행되지 않도록 해야 합니다. 예를 들어 경로의 후자 부분이 고유한 상관관계 키 값으로 교환을 처리할 것으로 예상되는 경우 이 조건을 시행할 수 있습니다.

완료 조건을 구성하는 방법에 따라 특정 상관 관계를 사용하여 여러 집계 교환이 생성될 위험이 있을 수 있습니다. 예를 들어 특정 상관 키와의 모든 교환이 수신될 때까지 대기하도록 설계된 완료 서술자를 정의할 수 있지만 해당 키로의 모든 교환이 도달하기 전에 실행될 수 있는 완료 시간 초과를 정의할 수도 있습니다. 이 경우 늦은 교환으로 인해 동일한 상관 관계 키 값을 사용하여 두 번째 집계 교환이 발생할 수 있습니다.

이러한 시나리오의 경우 closeCorrelationKeyOnCompletion 옵션을 설정하여 이전 상관 키 값을 복제하는 집계 교환을 억제하도록 집계기를 구성할 수 있습니다. 중복된 상관관계 키 값을 억제하려면 집계자가 캐시에 이전 상관관계 키 값을 기록해야 합니다. 이 캐시의 크기(캐시된 상관 키 수)는 closeCorrelationKeyOnCompletion() DSL 명령에 대한 인수로 지정됩니다. 무제한 크기의 캐시를 지정하려면 0 또는 음수 정수 값을 전달할 수 있습니다. 예를 들어 캐시 크기를 10000 키 값으로 지정하려면 다음을 수행합니다.

from("direct:start")
    .aggregate(header("UniqueBatchID"), new MyConcatenateStrategy())
        .completionSize(header("mySize"))
        .closeCorrelationKeyOnCompletion(10000)
    .to("mock:aggregated");

집계 교환이 중복된 상관관계 키 값으로 완료되면 수집기에서 ClosedCorrelationKeyException 예외를 throw합니다.

8.5.18. 단순 표현식을 사용한 스트림 기반 처리

스트리밍 모드에서 tokenize XML 하위 명령을 사용하여 간단한 언어 표현식을 토큰으로 사용할 수 있습니다. 간단한 언어 표현식을 사용하면 동적 토큰을 지원할 수 있습니다.

예를 들어 Java를 사용하여 태그 사람이 구분하는 일련의 이름을 분할하기 위해 tokenize XML files 및 Simple 언어 토큰을 사용하여 파일을 name 요소로 분할할 수 있습니다.

public void testTokenizeXMLPairSimple() throws Exception {
        Expression exp = TokenizeLanguage.tokenizeXML("${header.foo}", null);

< person>로 구분되는 이름의 입력 문자열을 가져오고 토큰으로 < person >를 설정합니다.

        exchange.getIn().setHeader("foo", "<person>");
        exchange.getIn().setBody("<persons><person>James</person><person>Claus</person><person>Jonathan</person><person>Hadrian</person></persons>");

입력에서 분리된 이름을 나열합니다.

        List<?> names = exp.evaluate(exchange, List.class);
        assertEquals(4, names.size());

        assertEquals("<person>James</person>", names.get(0));
        assertEquals("<person>Claus</person>", names.get(1));
        assertEquals("<person>Jonathan</person>", names.get(2));
        assertEquals("<person>Hadrian</person>", names.get(3));
    }

8.5.19. 그룹화된 교환

발신 배치에서 집계된 모든 교환을 단일 org.apache.camel.impl.GroupedExchange 홀더 클래스로 결합할 수 있습니다. 그룹화된 교환을 활성화하려면 다음 Java DSL 경로에 표시된 것처럼 group exchanges() 옵션을 지정합니다.

from("direct:start")
    .aggregate(header("StockSymbol"))
        .completionTimeout(3000)
        .groupExchanges()
    .to("mock:result");

mock:result 로 전송된 그룹화된 교환에는 메시지 본문에 집계된 교환 목록이 포함되어 있습니다. 다음 코드 줄은 후속 프로세서가 목록 형태로 그룹화된 교환의 콘텐츠에 액세스하는 방법을 보여줍니다.

// Java
List<Exchange> grouped = ex.getIn().getBody(List.class);
참고

그룹화된 교환 기능을 활성화하면 집계 전략을 구성 해서는 안 됩니다(그룹화된 교환 기능은 집계 전략 자체임).

참고

발신 교환의 자산에서 그룹화된 교환에 액세스하는 오래된 접근 방식은 더 이상 사용되지 않으며 향후 릴리스에서 제거됩니다.

8.5.20. 배치 소비자

집계기를 배치 소비자 패턴과 함께 작동하여 배치 소비자가 보고한 총 메시지 수를 집계할 수 있습니다(배치 소비자 엔드포인트는 들어오는 교환에 CamelBatchSize,CamelBatchIndex , CamelBatchComplete 속성 설정). 예를 들어 File 소비자 엔드포인트에서 찾은 모든 파일을 집계하려면 다음과 같은 경로를 사용할 수 있습니다.

from("file://inbox")
    .aggregate(xpath("//order/@customerId"), new AggregateCustomerOrderStrategy())
    .completionFromBatchConsumer()
    .to("bean:processOrder");

현재 다음 엔드포인트는 파일, FTP, 메일, iBatis 및 JPA와 같은 배치 소비자 메커니즘을 지원합니다.

8.5.21. 영구 집계 리포지토리

기본 집계자는 메모리 내 전용 AggregationRepository 를 사용합니다. 보류 중인 집계된 리테일링을 영구적으로 저장하려면 SQL 구성 요소를 영구 집계 리포지토리로 사용할 수 있습니다. SQL 구성 요소에는 집계된 메시지를 on-the-fly로 유지하고 메시지를 손실하지 않는 JdbcAggregationRepository 가 포함되어 있습니다.

교환이 성공적으로 처리되면 리포지토리에서 확인 메서드를 호출할 때 완료로 표시됩니다. 즉, 동일한 교환이 다시 실패하면 성공할 때까지 다시 시도됩니다.

camel-sql에 대한 종속성 추가

SQL 구성 요소를 사용하려면 프로젝트에 camel-sql 에 대한 종속성을 포함해야 합니다. 예를 들어 Maven pom.xml 파일을 사용하는 경우:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-sql</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

집계 데이터베이스 테이블 생성

지속성을 위해 별도의 집계 및 완료된 데이터베이스 테이블을 생성해야 합니다. 예를 들어 다음 쿼리는 my_aggregation_repo 라는 데이터베이스에 대한 테이블을 생성합니다.

CREATE TABLE my_aggregation_repo (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 constraint aggregation_pk PRIMARY KEY (id)
);

CREATE TABLE my_aggregation_repo_completed (
 id varchar(255) NOT NULL,
 exchange blob NOT NULL,
 constraint aggregation_completed_pk PRIMARY KEY (id)
);
}

집계 리포지토리 구성

프레임워크 XML 파일에서 집계 리포지토리를 구성해야 합니다(예: Spring 또는 블루프린트).

<bean id="my_repo"
    class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
    <property name="repositoryName" value="my_aggregation_repo"/>
    <property name="transactionManager" ref="my_tx_manager"/>
    <property name="dataSource" ref="my_data_source"/>
    ...
</bean>

repositoryName,transactionManagerdataSource 속성이 필요합니다. 영구 집계 리포지토리에 대한 자세한 내용은 Apache Camel 구성 요소 참조 가이드의 SQL 구성 요소를 참조하십시오.

8.5.22. 스레드 옵션

그림 8.6. “수집기 구현” 에 표시된 대로 집계기는 경로의 후자 부분과 분리되며, 경로의 후자 부분으로 전송된 교환은 전용 스레드 풀에 의해 처리됩니다. 기본적으로 이 풀에는 단일 스레드만 포함됩니다. 여러 스레드가 있는 풀을 지정하려면 다음과 같이 parallelProcessing 옵션을 활성화합니다.

from("direct:start")
    .aggregate(header("id"), new UseLatestAggregationStrategy())
        .completionTimeout(3000)
        .parallelProcessing()
    .to("mock:aggregated");

기본적으로 10개의 작업자 스레드가 있는 풀이 생성됩니다.

생성된 스레드 풀을 더 많이 제어하려면 executorService 옵션을 사용하여 사용자 지정 java.util.concurrent.ExecutorService 인스턴스를 지정합니다(이 경우 병렬Processing 옵션을 활성화할 필요가 없음).

8.5.23. 목록에 집계

일반적인 집계 시나리오에서는 일련의 들어오는 메시지 본문을 List 오브젝트에 집계해야 합니다. 이 시나리오를 용이하게 하기 위해 Apache Camel은 AbstractListAggregationStrategy 추상 클래스를 제공합니다. 이 클래스를 빠르게 확장하여 이 사례에 대한 집계 전략을 생성할 수 있습니다. 유형 T 의 들어오는 메시지 본문은 List<T> 유형의 메시지 본문을 사용하여 완료된 교환으로 집계됩니다.

예를 들어 일련의 Integer 메시지 본문을 List <Integer> 오브젝트에 집계하려면 다음과 같이 정의된 집계 전략을 사용할 수 있습니다.

import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
...
/**
 * Strategy to aggregate integers into a List<Integer>.
 */
public final class MyListOfNumbersStrategy extends AbstractListAggregationStrategy<Integer> {

    @Override
    public Integer getValue(Exchange exchange) {
        // the message body contains a number, so just return that as-is
        return exchange.getIn().getBody(Integer.class);
    }
}

8.5.24. 수집기 옵션

수집기는 다음 옵션을 지원합니다.

표 8.1. 수집기 옵션
옵션Default설명

correlationExpression

 

집계에 사용할 상관관계 키를 평가하는 필수 표현식입니다. 동일한 상관관계 키가 있는 교환은 함께 집계됩니다. correlation 키를 평가할 수 없는 경우 예외가 발생합니다. ignoreBadCorrelationKeys 옵션을 사용하여 이를 비활성화할 수 있습니다.

aggregationStrategy

 

들어오는 교환을 이미 병합한 교환과 병합 하는 데 사용되는 필수 집계Strategy 입니다. 처음 호출 시 oldExchange 매개변수는 null 입니다. 이후 호출 시 oldExchange 에는 병합된 교환이 포함되어 있으며 newExchange 는 물론 새로운 들어오는 교환입니다. Camel 2.9.2 이후의 전략은 선택 옵션으로 시간 초과 콜백을 지원하는 TimeoutAwareAggregationStrategy 구현일 수 있습니다. Camel 2.16 이후부터 이 전략은 PreCompletionAwareAggregationStrategy 구현일 수도 있습니다. 사전 완료 모드에서 완료 검사를 실행합니다.

strategyRef

 

레지스트리에서 집계Strategy 를 조회하는 참조입니다.

completionSize

 

집계가 완료되기 전에 집계된 메시지 수입니다. 이 옵션은 고정 값으로 설정하거나 동적으로 크기를 평가할 수 있는 Expression을 사용하여 설정할 수 있으며 결과적으로 Integer 를 사용합니다. If both are set Camel will fallback to use the fixed value if the expression result was null or 0.

completionTimeout

 

집계된 교환이 완료되기 전에 비활성화되어야 하는 시간(밀리초)입니다. 이 옵션을 고정 값으로 설정하거나 시간 초과를 동적으로 평가할 수 있는 Expression을 사용하면 으로 사용할 수 있습니다. If both are set Camel will fallback to use the fixed value if the expression result was null or 0. completionInterval과 함께 이 옵션을 사용할 수 없으며 둘 중 하나만 사용할 수 있습니다.

completionInterval

 

집계자가 현재 집계된 모든 교환을 완료하는 밀리 초 단위의 반복 기간입니다. Camel에는 매번 트리거되는 백그라운드 작업이 있습니다. completionTimeout과 함께 이 옵션을 사용할 수 없으며 그 중 하나만 사용할 수 있습니다.

completionPredicate

 

집계된 교환이 완료되면 신호인 org.apache.camel.Predicate 유형의 서술자를 지정합니다. 또는 이 옵션을 설정하는 대신 Predicate 인터페이스를 구현하는 사용자 정의 AggregationStrategy 를 정의할 수 있습니다. 이 경우 AggregationStrategy 가 completion 서술자로 사용됩니다.

completionFromBatchConsumer

false

이 옵션은 교환이 배치 소비자에서 오는 경우입니다. 그런 다음 8.5절. “수집기” 는 메시지 헤더 CamelBatchSize 에서 Batch Consumer에 의해 결정된 배치 크기를 사용합니다. 자세한 내용은 Batch Consumer를 참조하십시오. 이는 지정된 폴링에서 사용되는 모든 파일을 참조하는 파일 끝점에서 집계하는 데 사용할 수 있습니다.

eagerCheckCompletion

false

새로 들어오는 교환이 수신될 때 완료를 신속하게 점검할지 여부입니다. 이 옵션은 그에 따라 변경 사항이 전달될 때 completionPredicate 옵션의 동작에 영향을 미칩니다. Predicate에서 전달된 교환이 집계된 교환인 경우, AggregationStrategy 에서 집계된 교환에 저장할 수 있는 모든 정보를 서술자에 사용할 수 있습니다. Predicate에서 전달된 교환이 true 인 경우 들어오는 교환이 됩니다. 즉, 들어오는 교환에서 데이터에 액세스할 수 있습니다.

forceCompletionOnStop

false

true 인 경우 현재 경로 컨텍스트가 중지되면 집계된 모든 교환을 완료합니다.

groupExchanges

false

활성화된 경우 Camel은 집계된 모든 교환을 집계된 모든 교환을 하나의 결합된 org.apache.camel.impl.GroupedExchange 홀더 클래스로 그룹화합니다. 결과적으로 수집기에서 하나의 교환만 전송됩니다. 사용자 지정 집계Strategy 를 코딩하지 않고 들어오는 많은 교환을 단일 출력 교환으로 결합하는 데 사용할 수 있습니다.

ignoreInvalidCorrelationKeys

false

값으로 평가할 수 없는 상관관계 키를 무시할지 여부입니다. 기본적으로 Camel은 예외를 발생하지만 이 옵션을 활성화하고 대신 이 상황을 무시할 수 있습니다.

closeCorrelationKeyOnCompletion

 

늦은 교환을 수락하거나 허용하지 않는지 여부입니다. 이를 통해 상관관계 키가 이미 완료된 경우 동일한 상관 관계 키를 가진 새로운 교환이 거부됨을 나타낼 수 있습니다. 그런 다음 Camel은 closedCorrelationKeyException 예외를 throw합니다. 이 옵션을 사용할 때 마지막 X의 닫힌 상관 키 수를 유지하는 LRUCache의 숫자인 정수 를 전달합니다. 0 또는 음수 값을 전달하여 바인딩되지 않은 캐시를 표시할 수 있습니다. 숫자를 전달하면 서로 다른 상관 키의 로그를 사용하는 경우 캐시가 너무 커지지 않도록 합니다.

discardOnCompletionTimeout

false

Camel 2.5: 시간 초과로 인해 완료되는 교환이 취소되어야 하는지의 여부입니다. 활성화하면 시간 초과가 발생하면 집계된 메시지가 전송 되지 않고 삭제되지 않습니다 (거부됨).

aggregationRepository

 

org.apache.camel.spi.AggregationRepository 의 자체 구현을 연결하여 현재 진행 중인 집계 교환을 추적할 수 있습니다. Camel은 기본적으로 메모리 기반 구현을 사용합니다.

aggregationRepositoryRef

 

레지스트리에서 aggregationRepository 조회에 대한 참조입니다.

parallelProcessing

false

집계가 완료되면 수집기에서 전송됩니다. 이 옵션은 Camel에서 동시성에 여러 스레드가 있는 스레드 풀을 사용해야 하는지 여부를 나타냅니다. 사용자 지정 스레드 풀이 지정되지 않은 경우 Camel은 10개의 동시 스레드가 있는 기본 풀을 생성합니다.

executorService

 

parallelProcessing 을 사용하는 경우 사용할 사용자 지정 스레드 풀을 지정할 수 있습니다. 실제로 병렬Processing 이 사용자 정의 스레드 풀을 사용하지 않는 경우에도 집계된 교환을 보내는 데도 사용됩니다.

executorServiceRef

 

레지스트리에서 executorService 조회에 대한 참조

timeoutCheckerExecutorService

 

completionTimeout , completionTimeout Expression 또는 completionInterval 옵션 중 하나를 사용하는 경우 백그라운드 스레드가 생성되어 모든 수집기가 완료되었는지 확인합니다. 모든 수집기에 대해 새 스레드를 생성하는 대신 사용할 사용자 지정 스레드 풀을 제공하려면 이 옵션을 설정합니다.

timeoutCheckerExecutorServiceRef

 

레지스트리에서 timeoutCheckerExecutorService 를 조회하는 데 대한 참조입니다.

completeAllOnStop

 

Aggregator를 중지하면 이 옵션을 사용하면 집계 리포지토리에서 보류 중인 모든 리스터를 완료할 수 있습니다.

optimisticLocking

false

집계 리포지토리와 함께 사용할 수 있는 최적의 잠금을 설정합니다.

optimisticLockRetryPolicy

 

최적화 잠금에 대한 재시도 정책을 구성합니다.

Red Hat logoGithubRedditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

© 2024 Red Hat, Inc.