8.13. 멀티 캐스트


8.13.1. 개요

그림 8.9. “멀티 캐스트 패턴” 에 표시된 멀티 캐스트 패턴은 InOut 메시지 교환 패턴과 호환되는 고정 대상 패턴을 사용하는 수신자 목록 의 변형입니다. 이는 InOnly 교환 패턴과만 호환되는 수신자 목록과 대조적입니다.

그림 8.9. 멀티 캐스트 패턴

멀티 캐스트 패턴

8.13.2. 사용자 정의 집계 전략이 있는 멀티 캐스트

멀티 캐스트 프로세서는 원래 요청(각 수신자마다 하나씩)에 대한 응답으로 여러 개의 Out 메시지를 수신하는 반면 원래 호출자는 단일 응답만 받을 수 있습니다. 따라서 메시지 교환의 응답 결과에는 고유 한 불일치가 있으며 이러한 불일치를 극복하려면 멀티 캐스트 프로세서에 사용자 지정 집계 전략을 제공해야 합니다. 집계 전략 클래스는 모든 외부 메시지를 단일 응답 메시지로 집계해야 합니다.

판매자가 판매자가 판매자 목록에 판매 항목을 제공하는 전자 유도 서비스의 예를 살펴보십시오. 구매자는 각각 항목에 대한 호의를하고, 판매자는 가장 높은 가격으로 호의를 자동으로 선택합니다. 다음과 같이 multicast() DSL 명령을 사용하여 고정 고객 목록에 제안을 배포하는 논리를 구현할 수 있습니다.

from("cxf:bean:offer").multicast(new HighestBidAggregationStrategy()).
    to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");

판매자가 엔드포인트, cxf:bean:offer 로 표시된 경우, 구매자는 엔드포인트, cxf:bean:Buyer1,cxf:bean:Buyer2,cxf:bean:Buyer3 로 표시됩니다. 멀티 캐스트 프로세서는 다양한 구매자로부터 수신된 호의를 통합하기 위해 집계 전략인 HighestBidAggregationStrategy 를 사용합니다. 다음과 같이 Java에서 HighestBidAggregationStrategy 를 구현할 수 있습니다.

// Java
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.Exchange;

public class HighestBidAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        float oldBid = oldExchange.getOut().getHeader("Bid", Float.class);
        float newBid = newExchange.getOut().getHeader("Bid", Float.class);
        return (newBid > oldBid) ? newExchange : oldExchange;
    }
}

구매자가 가맹점 ( Bid )이라는 헤더에 가망 가격을 삽입한다고 가정합니다. 사용자 정의 집계 전략에 대한 자세한 내용은 8.5절. “수집기” 을 참조하십시오.

8.13.3. 병렬 처리

기본적으로 멀티캐스트 프로세서는 수신자 엔드포인트를 각각 차례로 호출합니다( to() 명령에 나열된 순서대로). 경우에 따라 대기 시간이 오래 걸릴 수 있습니다. 이러한 긴 대기 시간을 방지하기 위해 parallelProcessing() 절을 추가하여 병렬 처리를 활성화할 수 있습니다. 예를 들어 전자 유도 예에서 병렬 처리를 활성화하려면 다음과 같이 경로를 정의합니다.

from("cxf:bean:offer")
    .multicast(new HighestBidAggregationStrategy())
        .parallelProcessing()
        .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");

여기서 멀티 캐스트 프로세서는 이제 각 끝점에 대해 하나의 스레드가 있는 스레드 풀을 사용하여 구매자 끝점을 호출합니다.

구매자 엔드포인트를 호출하는 스레드 풀의 크기를 사용자 지정하려면 executorService() 메서드를 호출하여 자체 사용자 지정 executor 서비스를 지정할 수 있습니다. 예를 들면 다음과 같습니다.

from("cxf:bean:offer")
    .multicast(new HighestBidAggregationStrategy())
        .executorService(MyExecutor)
        .to("cxf:bean:Buyer1", "cxf:bean:Buyer2", "cxf:bean:Buyer3");

여기서 MyExecutorjava.util.concurrent.ExecutorService 유형의 인스턴스입니다.

교환에 InOut 패턴이 있으면 집계 전략이 사용하여 응답 메시지를 집계합니다. 기본 집계 전략에서는 최신 응답 메시지를 사용하고 이전 응답을 삭제합니다. 예를 들어 다음 경로에서 사용자 정의 전략 MyAggregationStrategy 는 끝점, direct:a,direct:bdirect:c 의 응답을 집계하는 데 사용됩니다.

from("direct:start")
  .multicast(new MyAggregationStrategy())
      .parallelProcessing()
      .timeout(500)
      .to("direct:a", "direct:b", "direct:c")
  .end()
  .to("mock:result");

8.13.4. XML 구성 예

다음 예제에서는 XML에서 유사한 경로를 구성하는 방법을 보여줍니다. 여기서 경로는 사용자 지정 집계 전략 및 사용자 지정 스레드 executor를 사용합니다.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    ">

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="cxf:bean:offer"/>
      <multicast strategyRef="highestBidAggregationStrategy"
                 parallelProcessing="true"
                 threadPoolRef="myThreadExcutor">
         <to uri="cxf:bean:Buyer1"/>
         <to uri="cxf:bean:Buyer2"/>
         <to uri="cxf:bean:Buyer3"/>
      </multicast>
    </route>
  </camelContext>

  <bean id="highestBidAggregationStrategy" class="com.acme.example.HighestBidAggregationStrategy"/>
  <bean id="myThreadExcutor" class="com.acme.example.MyThreadExcutor"/>

</beans>

여기서 parallelProcessing 특성과 threadPoolRef 속성은 선택 사항입니다. 멀티 캐스트 프로세서의 스레드 동작을 사용자 지정하려는 경우에만 설정해야 합니다.

8.13.5. 발신 메시지에 사용자 정의 처리 적용

멀티 캐스트 패턴은 소스 Exchange를 복사하고 복사본을 멀티 캐스트합니다. 기본적으로 라우터는 소스 메시지의 단순 복사본을 만듭니다. 부분 복사에서는 원본 메시지의 헤더와 페이로드가 참조로만 복사되므로 원본 메시지의 복사본이 연결됩니다. 멀티캐스트 메시지의 부분 복사본이 연결되므로 메시지 본문을 변경할 수 있는 경우 사용자 지정 처리를 적용할 수 없습니다. 하나의 엔드포인트로 전송된 사본에 적용되는 사용자 지정 처리는 다른 모든 엔드포인트에 전송된 사본에도 적용됩니다.

참고

멀티 캐스트 구문을 사용하면 멀티캐스트 절에서 프로세스 DSL 명령을 호출할 수 있지만 의미가 없으며 onPrepare 와 동일한 효과가 없습니다 (실제로 프로세스 DSL 명령은 영향을 미치지 않습니다).

8.13.6. onPrepare를 사용하여 메시지를 준비할 때 사용자 정의 논리 실행

각 메시지 복제본에 사용자 지정 처리를 엔드포인트로 보내기 전에 적용하려면 multicast 절에서 onPrepare DSL 명령을 호출할 수 있습니다. onPrepare 명령은 메시지가 부분 복사된 직후 그리고 메시지가 끝점으로 디스패치되기 직전에 사용자 지정 프로세서를 삽입합니다. 예를 들어 다음 경로에서 CustomProc 프로세서는 direct:a 로 전송된 메시지에 대해 호출되고 CustomProc 프로세서는 direct:b 로 전송된 메시지에서도 호출됩니다.

from("direct:start")
  .multicast().onPrepare(new CustomProc())
  .to("direct:a").to("direct:b");

onPrepare DSL 명령의 일반적인 사용 사례는 메시지의 일부 또는 모든 요소의 깊은 복사본을 수행하는 것입니다. 예를 들어 다음 CustomProc 프로세서 클래스는 메시지 본문의 깊은 복사본을 수행합니다. 여기서 메시지 본문은 유형인 BodyType.deepCopy()로 추정되고 깊은 복사는 메서드인 BodyType.deepCopy() 에 의해 수행됩니다.

// Java
import org.apache.camel.*;
...
public class CustomProc implements Processor {

    public void process(Exchange exchange) throws Exception {
        BodyType body = exchange.getIn().getBody(BodyType.class);

        // Make a _deep_ copy of of the body object
        BodyType clone =  BodyType.deepCopy();
        exchange.getIn().setBody(clone);

        // Headers and attachments have already been
        // shallow-copied. If you need deep copies,
        // add some more code here.
    }
}

onPrepare 를 사용하여 Exchange 가 멀티 캐스트이기 전에 실행할 모든 종류의 사용자 지정 논리를 구현할 수 있습니다.

참고

변경할 수 없는 오브젝트를 설계하는 것이 좋습니다.

예를 들어 이 Animal 클래스로 변경할 수 있는 메시지 본문이 있는 경우 다음을 수행합니다.

public class Animal implements Serializable {

     private int id;
     private String name;

     public Animal() {
     }

     public Animal(int id, String name) {
         this.id = id;
         this.name = name;
     }

     public Animal deepClone() {
         Animal clone = new Animal();
         clone.setId(getId());
         clone.setName(getName());
         return clone;
     }

     public int getId() {
         return id;
     }

     public void setId(int id) {
         this.id = id;
     }

     public String getName() {
         return name;
     }

     public void setName(String name) {
         this.name = name;
     }

     @Override
     public String toString() {
         return id + " " + name;
     }
 }

그런 다음 메시지 본문을 복제하는 깊은 복제 프로세서를 생성할 수 있습니다.

public class AnimalDeepClonePrepare implements Processor {

     public void process(Exchange exchange) throws Exception {
         Animal body = exchange.getIn().getBody(Animal.class);

         // do a deep clone of the body which wont affect when doing multicasting
         Animal clone = body.deepClone();
         exchange.getIn().setBody(clone);
     }
 }

그런 다음 onPrepare 옵션을 사용하여 멀티 캐스트 경로에서 AnimalDeepClonePrepare 클래스를 사용할 수 있습니다.

from("direct:start")
     .multicast().onPrepare(new AnimalDeepClonePrepare()).to("direct:a").to("direct:b");

XML DSL의 동일한 예

<camelContext xmlns="http://camel.apache.org/schema/spring">
     <route>
         <from uri="direct:start"/>
         <!-- use on prepare with multicast -->
         <multicast onPrepareRef="animalDeepClonePrepare">
             <to uri="direct:a"/>
             <to uri="direct:b"/>
         </multicast>
     </route>

     <route>
         <from uri="direct:a"/>
         <process ref="processorA"/>
         <to uri="mock:a"/>
     </route>
     <route>
         <from uri="direct:b"/>
         <process ref="processorB"/>
         <to uri="mock:b"/>
     </route>
 </camelContext>

 <!-- the on prepare Processor which performs the deep cloning -->
 <bean id="animalDeepClonePrepare" class="org.apache.camel.processor.AnimalDeepClonePrepare"/>

 <!-- processors used for the last two routes, as part of unit test -->
 <bean id="processorA" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorA"/>
 <bean id="processorB" class="org.apache.camel.processor.MulticastOnPrepareTest$ProcessorB"/>

8.13.7. 옵션

멀티 캐스트 DSL 명령은 다음 옵션을 지원합니다.

이름

기본값

설명

strategyRef

 

멀티 캐스트의 응답을 멀티 캐스트에서 보내는 단일 메시지로 어셈블하는 데 사용되는 집계Strategy 를 나타냅니다. ??? 기본적으로 Camel은 마지막 응답을 발신 메시지로 사용합니다.

strategyMethodName

 

이 옵션을 사용하면 Cryostat를 AggregationStrategy 로 사용할 메서드 이름을 명시적으로 지정할 수 있습니다.

strategyMethodAllowNull

false

이 옵션은 Cryostat를 AggregationStrategy 로 사용할 때 사용할 수 있습니다. false 인 경우 보강할 데이터가 없는 경우 집계 방법이 사용되지 않습니다. true 인 경우 보강할 데이터가 없는 경우 oldExchangenull 값이 사용됩니다.

parallelProcessing

false

활성화된 경우 멀티캐스트로 메시지를 동시에 보냅니다. 호출자 스레드는 계속 진행하기 전에 모든 메시지가 완전히 처리될 때까지 대기합니다. 동시에 발생하는 멀티 캐스트에서 응답을 전송 및 처리하는 경우에만 해당합니다.

parallelAggregate

false

활성화하면 AggregationStrategy집계 메서드를 동시에 호출할 수 있습니다. 이를 위해서는 스레드로부터 안전한 AggregationStrategy 를 구현해야 합니다. 기본적으로 이 옵션은 false 입니다. 즉, Camel이 집계 메서드에 자동으로 호출을 동기화합니다. 그러나 일부 사용 사례에서는 AggregationStrategy 를 스레드로부터 안전한 것으로 구현하고 이 옵션을 true 로 설정하여 성능을 향상시킬 수 있습니다.

executorServiceRef

 

병렬 처리에 사용할 사용자 지정 스레드 풀을 나타냅니다.Indicates to a custom Thread Pool to be used for parallel processing. 이 옵션을 설정하면 병렬 처리가 자동으로 표시되고 해당 옵션도 활성화할 필요가 없습니다.

stopOnException

false

Camel 2.2: 예외가 발생한 경우 즉시 처리를 중지해야 하는지의 여부입니다. disable인 경우 Camel은 실패했는지에 관계없이 모든 멀티캐스트로 메시지를 보냅니다. 이를 처리하는 방법을 완전히 제어하는 AggregationStrategy 클래스에서 예외를 처리할 수 있습니다.

스트리밍

false

활성화된 경우 Camel은 주문 외 응답(예: 다시 들어오는 순서)을 처리합니다. 비활성화된 경우 Camel은 멀티 캐스트와 동일한 순서로 응답을 처리합니다.

timeout

 

Camel 2.5: 밀리초 단위로 지정된 총 타임아웃을 설정합니다. 멀티캐스트 가 지정된 시간 내에 모든 응답을 전송 및 처리할 수 없는 경우 시간 초과 트리거와 멀티캐스트 가 중단되고 계속됩니다. TimeoutAwareAggregationStrategy 를 제공하면 중단하기 전에 timeout 메서드가 호출됩니다.

onPrepareRef

 

Camel 2.8: 사용자 지정 프로세서를 참조하여 각 멀티 캐스트가 수신하는 교환 사본을 준비합니다. 이를 통해 필요한 경우 메시지 페이로드를 깊이 복제하는 등 사용자 지정 논리를 수행할 수 있습니다.

shareUnitOfWork

false

Camel 2.8: 작업 단위를 공유해야 하는지 여부입니다. 자세한 내용은 8.4절. “Splitter” 에서 동일한 옵션을 참조하십시오.

Red Hat logoGithubRedditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

© 2024 Red Hat, Inc.