splitter 는 들어오는 메시지를 일련의 발신 메시지로 분할하는 라우터 유형입니다. 발신 메시지의 각 메시지에는 원본 메시지의 일부가 포함되어 있습니다. Apache Camel에서 그림 8.4. “Splitter 패턴” 에 표시된 splitter 패턴은 split() Java DSL 명령으로 구현됩니다.
Splitter/aggregator kafka- Cryostat는 메시지 조각이 처리된 후 메시지의 조각이 다시 조합되도록 집계기 패턴과 함께 분할 패턴을 결합합니다.
splitter가 원본 메시지를 여러 부분으로 분리하기 전에 원본 메시지의 부분 복사본을 만듭니다. 단순 복사본에서는 원본 메시지의 헤더와 페이로드가 참조로만 복사됩니다. splitter는 결과 메시지 부분을 다른 끝점으로 라우팅하지 않지만 분할 메시지의 일부는 보조 라우팅을 받을 수 있습니다.
메시지 부분은 부분 복사본이므로 원래 메시지에 연결된 상태로 유지됩니다. 따라서 독립적으로 수정할 수 없습니다. 일련의 엔드포인트로 라우팅하기 전에 메시지 부분의 다른 사본에 사용자 지정 논리를 적용하려면 splitter 절에서 onPrepareRef DSL 옵션을 사용하여 원래 메시지의 깊은 복사본을 만들어야 합니다. 옵션 사용에 대한 자세한 내용은 “옵션” 을 참조하십시오.
다음 예제에서는 들어오는 메시지의 각 행을 별도의 발신 메시지로 변환하여 메시지를 분할하는 seda:a 에서 seda:b 로 경로를 정의합니다.
RouteBuilder builder = new RouteBuilder() {
public void configure() {
from("seda:a")
.split(bodyAs(String.class).tokenize("\n"))
.to("seda:b");
}
};
RouteBuilder builder = new RouteBuilder() {
public void configure() {
from("seda:a")
.split(bodyAs(String.class).tokenize("\n"))
.to("seda:b");
}
};
Copy to ClipboardCopied!Toggle word wrapToggle overflow
splitter는 모든 표현식 언어를 사용할 수 있으므로 Cryostat, XQuery 또는 SQL과 같은 지원되는 스크립팅 언어를 사용하여 메시지를 분할할 수 있습니다( II 부. 라우팅 표현식 및 서술자 언어참조). 다음 예제에서는 들어오는 메시지에서 bar 요소를 추출하여 별도의 발신 메시지에 삽입합니다.
Copy to ClipboardCopied!Toggle word wrapToggle overflow
XML DSL에서 tokenize 표현식을 사용하여 토큰을 사용하여 본문 또는 헤더를 분할할 수 있습니다. 여기서 tokenize 표현식은 tokenize 요소를 사용하여 정의됩니다. 다음 예에서 메시지 본문은 \n 구분 기호 문자를 사용하여 토큰화됩니다. 정규식 패턴을 사용하려면 tokenize 요소에서 regex=true 를 설정합니다.
from("direct:start")
// split by new line and group by 3, and skip the very first element
.split().tokenize("\n", 3, true).streaming()
.to("mock:group");
from("direct:start")
// split by new line and group by 3, and skip the very first element
.split().tokenize("\n", 3, true).streaming()
.to("mock:group");
Copy to ClipboardCopied!Toggle word wrapToggle overflow
splitter는 임의의 표현식을 사용하여 분할을 수행할 수 있으므로 method() 표현식을 호출하여 빈을 사용하여 분할을 수행할 수 있습니다. Cryostat는 java.util.Collection,java.util.Iterator 또는 배열과 같은 반복 가능한 값을 반환해야 합니다.
다음 경로는 my CryostatterBean -20 instances에서 메서드를 호출하는 method() 표현식을 정의합니다.
from("direct:body")
// here we use a POJO bean mySplitterBean to do the split of the payload
.split()
.method("mySplitterBean", "splitBody")
.to("mock:result");
from("direct:message")
// here we use a POJO bean mySplitterBean to do the split of the message
// with a certain header value
.split()
.method("mySplitterBean", "splitMessage")
.to("mock:result");
from("direct:body")
// here we use a POJO bean mySplitterBean to do the split of the payload
.split()
.method("mySplitterBean", "splitBody")
.to("mock:result");
from("direct:message")
// here we use a POJO bean mySplitterBean to do the split of the message
// with a certain header value
.split()
.method("mySplitterBean", "splitMessage")
.to("mock:result");
Copy to ClipboardCopied!Toggle word wrapToggle overflow
여기서 my CryostatterBean 은 다음과 같이 정의된 My CryostatterBean 클래스의 인스턴스입니다.
public class MySplitterBean {
/**
* The split body method returns something that is iteratable such as a java.util.List.
*
* @param body the payload of the incoming message
* @return a list containing each part split
*/
public List<String> splitBody(String body) {
// since this is based on an unit test you can of couse
// use different logic for splitting as {router} have out
// of the box support for splitting a String based on comma
// but this is for show and tell, since this is java code
// you have the full power how you like to split your messages
List<String> answer = new ArrayList<String>();
String[] parts = body.split(",");
for (String part : parts) {
answer.add(part);
}
return answer;
}
/**
* The split message method returns something that is iteratable such as a java.util.List.
*
* @param header the header of the incoming message with the name user
* @param body the payload of the incoming message
* @return a list containing each part split
*/
public List<Message> splitMessage(@Header(value = "user") String header, @Body String body) {
// we can leverage the Parameter Binding Annotations
// http://camel.apache.org/parameter-binding-annotations.html
// to access the message header and body at same time,
// then create the message that we want, splitter will
// take care rest of them.
// *NOTE* this feature requires {router} version >= 1.6.1
List<Message> answer = new ArrayList<Message>();
String[] parts = header.split(",");
for (String part : parts) {
DefaultMessage message = new DefaultMessage();
message.setHeader("user", part);
message.setBody(body);
answer.add(message);
}
return answer;
}
}
public class MySplitterBean {
/**
* The split body method returns something that is iteratable such as a java.util.List.
*
* @param body the payload of the incoming message
* @return a list containing each part split
*/
public List<String> splitBody(String body) {
// since this is based on an unit test you can of couse
// use different logic for splitting as {router} have out
// of the box support for splitting a String based on comma
// but this is for show and tell, since this is java code
// you have the full power how you like to split your messages
List<String> answer = new ArrayList<String>();
String[] parts = body.split(",");
for (String part : parts) {
answer.add(part);
}
return answer;
}
/**
* The split message method returns something that is iteratable such as a java.util.List.
*
* @param header the header of the incoming message with the name user
* @param body the payload of the incoming message
* @return a list containing each part split
*/
public List<Message> splitMessage(@Header(value = "user") String header, @Body String body) {
// we can leverage the Parameter Binding Annotations
// http://camel.apache.org/parameter-binding-annotations.html
// to access the message header and body at same time,
// then create the message that we want, splitter will
// take care rest of them.
// *NOTE* this feature requires {router} version >= 1.6.1
List<Message> answer = new ArrayList<Message>();
String[] parts = header.split(",");
for (String part : parts) {
DefaultMessage message = new DefaultMessage();
message.setHeader("user", part);
message.setBody(body);
answer.add(message);
}
return answer;
}
}
Copy to ClipboardCopied!Toggle word wrapToggle overflow
Splitter EIP와 함께 CryostatIO Cryostatter 오브젝트를 사용하면 전체 콘텐츠를 메모리에 읽지 않도록 스트림 모드를 사용하여 큰 페이로드를 분할할 수 있습니다. 다음 예제에서는 classpath에서 로드된 매핑 파일을 사용하여 Cryostat IO Cryostatter 오브젝트를 설정하는 방법을 보여줍니다.
참고
Camel 2.18의 새로운 클래스입니다. Camel 2.17에서는 사용할 수 없습니다.
BeanIOSplitter splitter = new BeanIOSplitter();
splitter.setMapping("org/apache/camel/dataformat/beanio/mappings.xml");
splitter.setStreamName("employeeFile");
// Following is a route that uses the beanio data format to format CSV data
// in Java objects:
from("direct:unmarshal")
// Here the message body is split to obtain a message for each row:
.split(splitter).streaming()
.to("log:line")
.to("mock:beanio-unmarshal");
BeanIOSplitter splitter = new BeanIOSplitter();
splitter.setMapping("org/apache/camel/dataformat/beanio/mappings.xml");
splitter.setStreamName("employeeFile");
// Following is a route that uses the beanio data format to format CSV data
// in Java objects:
from("direct:unmarshal")
// Here the message body is split to obtain a message for each row:
.split(splitter).streaming()
.to("log:line")
.to("mock:beanio-unmarshal");
Copy to ClipboardCopied!Toggle word wrapToggle overflow
다음 예제에서는 모든 메시지 조각이 처리된 후 사용자 정의 집계 전략을 사용하여 분할 메시지를 다시 가져오는 방법을 보여줍니다.
from("direct:start")
.split(body().tokenize("@"), new MyOrderStrategy())
// each split message is then send to this bean where we can process it
.to("bean:MyOrderService?method=handleOrder")
// this is important to end the splitter route as we do not want to do more routing
// on each split message
.end()
// after we have split and handled each message we want to send a single combined
// response back to the original caller, so we let this bean build it for us
// this bean will receive the result of the aggregate strategy: MyOrderStrategy
.to("bean:MyOrderService?method=buildCombinedResponse")
from("direct:start")
.split(body().tokenize("@"), new MyOrderStrategy())
// each split message is then send to this bean where we can process it
.to("bean:MyOrderService?method=handleOrder")
// this is important to end the splitter route as we do not want to do more routing
// on each split message
.end()
// after we have split and handled each message we want to send a single combined
// response back to the original caller, so we let this bean build it for us
// this bean will receive the result of the aggregate strategy: MyOrderStrategy
.to("bean:MyOrderService?method=buildCombinedResponse")
Copy to ClipboardCopied!Toggle word wrapToggle overflow
이전 경로에 사용되는 사용자 정의 집계 전략 MyOrderStrategy 는 다음과 같이 구현됩니다.
/**
* This is our own order aggregation strategy where we can control
* how each split message should be combined. As we do not want to
* lose any message, we copy from the new to the old to preserve the
* order lines as long we process them
*/
public static class MyOrderStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
// put order together in old exchange by adding the order from new exchange
if (oldExchange == null) {
// the first time we aggregate we only have the new exchange,
// so we just return it
return newExchange;
}
String orders = oldExchange.getIn().getBody(String.class);
String newLine = newExchange.getIn().getBody(String.class);
LOG.debug("Aggregate old orders: " + orders);
LOG.debug("Aggregate new order: " + newLine);
// put orders together separating by semi colon
orders = orders + ";" + newLine;
// put combined order back on old to preserve it
oldExchange.getIn().setBody(orders);
// return old as this is the one that has all the orders gathered until now
return oldExchange;
}
}
/**
* This is our own order aggregation strategy where we can control
* how each split message should be combined. As we do not want to
* lose any message, we copy from the new to the old to preserve the
* order lines as long we process them
*/
public static class MyOrderStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
// put order together in old exchange by adding the order from new exchange
if (oldExchange == null) {
// the first time we aggregate we only have the new exchange,
// so we just return it
return newExchange;
}
String orders = oldExchange.getIn().getBody(String.class);
String newLine = newExchange.getIn().getBody(String.class);
LOG.debug("Aggregate old orders: " + orders);
LOG.debug("Aggregate new order: " + newLine);
// put orders together separating by semi colon
orders = orders + ";" + newLine;
// put combined order back on old to preserve it
oldExchange.getIn().setBody(orders);
// return old as this is the one that has all the orders gathered until now
return oldExchange;
}
}
Copy to ClipboardCopied!Toggle word wrapToggle overflow
병렬 처리가 활성화되면 이론적으로 이전 메시지 조각이 이전 문서보다 먼저 집계할 수 있습니다. 즉, 메시지 조각이 순서대로 수집기에 도달할 수 있습니다. 기본적으로 분할자 구현은 메시지를 수집기로 전달하기 전에 메시지를 원래 순서로 다시 정렬하므로 이러한 상황이 발생하지 않습니다.
메시지 조각을 준비한 즉시 집계하려는 경우 다음과 같이 스트리밍 옵션을 활성화할 수 있습니다.
from("direct:streaming")
.split(body().tokenize(","), new MyOrderStrategy())
.parallelProcessing()
.streaming()
.to("activemq:my.parts")
.end()
.to("activemq:all.parts");
from("direct:streaming")
.split(body().tokenize(","), new MyOrderStrategy())
.parallelProcessing()
.streaming()
.to("activemq:my.parts")
.end()
.to("activemq:all.parts");
Copy to ClipboardCopied!Toggle word wrapToggle overflow
하위 메시지의 응답을 8.4절. “Splitter” 에서 보내는 단일 메시지로 어셈블하는 데 사용되는 집계Strategy 를 나타냅니다. 기본적으로 사용되는 항목에 대해서는 아래 분할자 반환의 제목 섹션 을 참조하십시오.
strategyMethodName
이 옵션을 사용하면 Cryostat를 AggregationStrategy 로 사용할 메서드 이름을 명시적으로 지정할 수 있습니다.
strategyMethodAllowNull
false
이 옵션은 Cryostat를 AggregationStrategy 로 사용할 때 사용할 수 있습니다. false 인 경우 보강할 데이터가 없는 경우 집계 방법이 사용되지 않습니다. true 인 경우 보강할 데이터가 없는 경우 oldExchange 에null 값이 사용됩니다.
parallelProcessing
false
이 옵션을 사용하면 하위 메시지 처리가 동시에 수행됩니다. 호출자 스레드는 계속 진행하기 전에 모든 하위 메시지가 완전히 처리될 때까지 대기합니다.
parallelAggregate
false
활성화하면 AggregationStrategy 의 집계 메서드를 동시에 호출할 수 있습니다. 이를 위해서는 스레드로부터 안전한 AggregationStrategy 를 구현해야 합니다. 기본적으로 이 옵션은 false 입니다. 즉, Camel이 집계 메서드에 자동으로 호출을 동기화합니다. 그러나 일부 사용 사례에서는 AggregationStrategy 를 스레드로부터 안전한 것으로 구현하고 이 옵션을 true 로 설정하여 성능을 향상시킬 수 있습니다.
executorServiceRef
병렬 처리에 사용할 사용자 지정 스레드 풀을 나타냅니다.Indicates to a custom Thread Pool to be used for parallel processing. 이 옵션을 설정하면 병렬 처리가 자동으로 표시되고 해당 옵션도 활성화할 필요가 없습니다.
stopOnException
false
Camel 2.2: 예외가 발생한 경우 즉시 처리를 중지해야 하는지의 여부입니다. disable인 경우 Camel은 실패했는지에 관계없이 하위 메시지를 계속 분할하고 처리합니다. 이를 처리하는 방법을 완전히 제어하는 AggregationStrategy 클래스에서 예외를 처리할 수 있습니다.
streaming
false
Camel이 활성화된 경우 Camel은 스트리밍 방식으로 분할되므로 입력 메시지를 청크로 분할합니다. 이렇게 하면 메모리 오버헤드가 줄어듭니다. 예를 들어 큰 메시지를 분할하는 경우 스트리밍을 활성화하는 것이 좋습니다. 스트리밍이 활성화되면 하위 메시지 응답이 순서대로 집계됩니다(예: 반환 순서에 따라). 비활성화된 경우 Camel은 분할된 위치와 동일한 순서로 하위 메시지 응답을 처리합니다.