8.6. Resequencer
8.6.1. 개요
그림 8.7. “Resequencer 패턴” 에 표시된 resequencer 패턴을 사용하면 sequencing 표현식에 따라 메시지를 다시 시작할 수 있습니다. sequencing 식에 대한 낮은 값을 생성하는 메시지는 높은 값을 생성하는 일괄 처리 및 메시지 앞으로 이동됩니다.
그림 8.7. Resequencer 패턴
Apache Camel은 다음 두 가지 resequencing 알고리즘을 지원합니다.
- 일괄 처리 resequencing ECDHE - 메시지를 일괄 처리로 수집, 메시지를 정렬하고 출력으로 보냅니다.
- 메시지 간 격차 감지 를 기반으로 메시지 스트림을 재시동(continuous)하는 스트림 입니다.
기본적으로 resequencer는 중복 메시지를 지원하지 않으며 메시지가 동일한 메시지 표현식으로 도달하는 경우 마지막 메시지만 유지합니다. 그러나 배치 모드에서는 resequencer를 활성화하여 중복을 허용할 수 있습니다.
8.6.2. 일괄 재쿼싱
배치 resequencing 알고리즘은 기본적으로 활성화되어 있습니다. 예를 들어 TimeStamp
헤더에 포함된 타임스탬프의 값에 따라 들어오는 메시지 배치를 다시 얻으려면 Java DSL에서 다음 경로를 정의할 수 있습니다.
from("direct:start").resequence(header("TimeStamp")).to("mock:result");
기본적으로 일괄 처리는 최대 100개의 메시지(기본 일괄 처리 크기)까지 1000밀리초(기본 배치 시간 초과)의 시간 간격에 도달하는 들어오는 모든 메시지를 수집하여 가져옵니다. BatchResequencerConfig
인스턴스를 유일한 인수로 사용하는 batch()
DSL 명령을 추가하여 배치 제한 시간 및 배치 크기의 값을 사용자 지정할 수 있습니다. 예를 들어, 배치가 4000밀리 초의 시간 창에서 수집된 메시지로 구성되도록 이전 경로를 수정하려면 최대 300개의 메시지까지 다음과 같이 Java DSL 경로를 정의할 수 있습니다.
import org.apache.camel.model.config.BatchResequencerConfig; RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start").resequence(header("TimeStamp")).batch(new BatchResequencerConfig(300,4000L)).to("mock:result"); } };
XML 구성을 사용하여 배치 resequencer 패턴을 지정할 수도 있습니다. 다음 예제에서는 배치 크기가 300이고 일괄 시간 초과는 4000밀리초인 배치 resequencer를 정의합니다.
<camelContext id="resequencerBatch" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start" /> <resequence> <!-- batch-config can be omitted for default (batch) resequencer settings --> <batch-config batchSize="300" batchTimeout="4000" /> <simple>header.TimeStamp</simple> <to uri="mock:result" /> </resequence> </route> </camelContext>
8.6.3. 배치 옵션
표 8.2. “일괄 복원기 옵션” 배치 모드에서만 사용할 수 있는 옵션을 표시합니다.
Java DSL | XML DSL | 기본 | 설명 |
---|---|---|---|
|
|
|
|
|
|
|
|
예를 들어 JMSPriority
를 기반으로 JMS 대기열에서 메시지를 분리하려면 다음과 같이 옵션 allowDuplicates
및 reverse
를 결합해야 합니다.
from("jms:queue:foo") // sort by JMSPriority by allowing duplicates (message can have same JMSPriority) // and use reverse ordering so 9 is first output (most important), and 0 is last // use batch mode and fire every 3th second .resequence(header("JMSPriority")).batch().timeout(3000).allowDuplicates().reverse() .to("mock:result");
8.6.4. 스트림 resequencing
스트림 resequencing 알고리즘을 활성화하려면 stream()
을 resequence()
DSL 명령에 추가해야 합니다. 예를 들어 seqnum
헤더의 시퀀스 번호 값을 기반으로 들어오는 메시지를 다시 만들려면 다음과 같이 DSL 경로를 정의합니다.
from("direct:start").resequence(header("seqnum")).stream().to("mock:result");
스트림 처리 resequencer 알고리즘은 고정된 배치 크기가 아닌 메시지 스트림의 격차 감지를 기반으로 합니다. 시간 초과와 함께 갭 탐지는 시퀀스의 메시지 수(즉, 배치 크기)를 미리 알아야 하는 제약 조건을 제거합니다. 메시지에는 고유한 시퀀스 번호가 포함되어야 하며 이 시퀀스 번호는 다음과 같이 알려져 있어야 합니다. 예를 들어, 시퀀스 번호 3
이 있는 메시지는 시퀀스 번호 2
와 시퀀스 번호 4
가 있는 연속 메시지인 메시지입니다. 메시지 시퀀스 2,3,5
는 3
의 후속 작업이 누락되었기 때문에 차이가 있습니다. 따라서 resequencer는 메시지 4
가 도착하거나 시간 초과가 발생할 때까지 메시지 5
를 유지해야 합니다.
기본적으로 스트림 resequencer는 시간 초과 1000밀리초와 최대 메시지 용량 100으로 구성됩니다. 스트림의 시간 제한 및 메시지 용량을 사용자 지정하려면 StreamResequencerConfig
오브젝트를 stream()
에 대한 인수로 전달할 수 있습니다. 예를 들어 메시지 용량이 5000이고 시간 초과를 4000밀리초로 스트림 재시저를 구성하려면 다음과 같이 경로를 정의합니다.
// Java import org.apache.camel.model.config.StreamResequencerConfig; RouteBuilder builder = new RouteBuilder() { public void configure() { from("direct:start").resequence(header("seqnum")). stream(new StreamResequencerConfig(5000, 4000L)). to("mock:result"); } };
메시지 스트림에서 연속 메시지(즉, 인접한 시퀀스 번호가 있는 메시지) 간의 최대 시간 지연이 알려지면 resequencer의 timeout 매개변수를 이 값으로 설정해야 합니다. 이 경우 스트림의 모든 메시지가 올바른 순서로 다음 프로세서로 전달되도록 보장할 수 있습니다. out-of-sequence 시간 차이에 비해 시간 초과 값이 낮을수록 resequencer가 시퀀스에서 메시지를 전달할 가능성이 높습니다. 용량 매개변수가 충분히 높은 용량 값에 의해 대규모 시간 초과 값을 지원해야 합니다. 여기서 resequencer가 메모리가 부족하지 않도록 합니다.
long
이외의 일부 유형의 시퀀스 번호를 사용하려면 다음과 같이 사용자 지정 비교기를 정의해야 합니다.
// Java ExpressionResultComparator<Exchange> comparator = new MyComparator(); StreamResequencerConfig config = new StreamResequencerConfig(5000, 4000L, comparator); from("direct:start").resequence(header("seqnum")).stream(config).to("mock:result");
XML 구성을 사용하여 스트림 resequencer 패턴을 지정할 수도 있습니다. 다음 예제에서는 메시지 용량이 5000이고 시간 초과가 4000밀리초인 스트림 resequencer를 정의합니다.
<camelContext id="resequencerStream" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <resequence> <stream-config capacity="5000" timeout="4000"/> <simple>header.seqnum</simple> <to uri="mock:result" /> </resequence> </route> </camelContext>
8.6.5. 잘못된 교환 무시
resequencer EIP는 들어오는 교환이 유효하지 않은 경우, 어떤 이유로든 sequencing 표현식을 평가할 수 없는 경우(예: 누락된 헤더로 인해) CamelExchangeException
예외를 발생시킵니다. ignoreInvalidExchanges
옵션을 사용하여 이러한 예외를 무시할 수 있습니다. 즉, resequencer는 잘못된 교환을 건너뜁니다.
from("direct:start")
.resequence(header("seqno")).batch().timeout(1000)
// ignore invalid exchanges (they are discarded)
.ignoreInvalidExchanges()
.to("mock:result");
8.6.6. 이전 메시지 거부
rejectOld
옵션을 사용하면 메시지를 다시 시작하는 데 사용되는 메커니즘에 관계없이 메시지가 순서대로 전송되는 것을 방지할 수 있습니다. rejectOld
옵션이 활성화되면 resequencer는 수신되는 메시지가 마지막 전달된 메시지보다 오래된 메시지(현재 비교기에 의해 정의됨)인 경우 수신되는 메시지를 거부합니다( MessageRejectedException
예외를 throw).
from("direct:start")
.onException(MessageRejectedException.class).handled(true).to("mock:error").end()
.resequence(header("seqno")).stream().timeout(1000).rejectOld()
.to("mock:result");