230.7. 소비자
소비자에는 여러 종류가 있습니다.
230.7.1. Tailable Cursor 소비자
MongoDB는 *nix 시스템의 tail -f
명령과 마찬가지로 커서를 열어 두어 컬렉션의 지속적인 데이터를 즉시 소비하는 메커니즘을 제공합니다. 이 메커니즘은 클라이언트가 예약된 간격으로 다시 ping하여 새 데이터를 가져오는 대신 서버가 클라이언트에 새 데이터를 푸시한다는 사실 때문에 예약된 폴링보다 훨씬 효율적입니다. 또한 중복 네트워크 트래픽도 줄일 수 있습니다.
테일러 커서를 사용해야 하는 필수 조건은 하나뿐입니다. 즉, 컬렉션은 N 오브젝트만 보유해야 하며, 제한에 도달하면 MongoDB 플러시는 원래 삽입된 순서와 동일한 순서로 이전 오브젝트를 플러시합니다. 자세한 내용은 http://www.mongodb.org/display/DOCS/Tailable+Cursors 을 참조하십시오.
Camel MongoDB 구성 요소는 테일 가능 커서 소비자를 구현하여 Camel 경로에서 이 기능을 사용할 수 있도록 합니다. 새 개체가 삽입되면 MongoDB는 테일 수 있는 커서 소비자에 자연적으로 문서로 푸시하여 교환으로 변환하고 경로 논리를 트리거합니다.
230.7.1.1. tailable 커서 소비자의 작동 방식
커서를 테일 수 있는 커서로 전환하려면 커서를 처음 생성할 때 몇 가지 특수 플래그를 MongoDB로 표시해야 합니다. 생성된 후 커서는 열린 상태로 유지되며 새 데이터가 전달될 때까지 MongoCursor.next()
메서드를 호출하면 차단됩니다. 그러나 MongoDB 서버는 결정되지 않은 기간이 지나면 새 데이터가 나타나지 않으면 커서를 종료할 수 있는 권한을 보유합니다. 새 데이터를 계속 사용하려면 커서를 다시 생성해야 합니다. 그리고 그렇게하기 위해, 당신은 당신이 떠나거나 다른 곳에서는 다시 맨 위에서 소비하기 시작할 위치를 기억해야합니다.
Camel MongoDB tailable 커서 소비자는 이러한 모든 작업을 처리합니다. 증가하는 데이터의 일부 필드에 키를 제공해야 합니다. 이는 타임스탬프, 순차적 ID 등과 같이 다시 생성될 때마다 커서를 배치하는 마커 역할을 합니다. MongoDB에서 지원하는 모든 데이터 유형일 수 있습니다. 날짜, 문자열 및 정수가 제대로 작동하는 것으로 확인됩니다. 이 메커니즘을 이 구성 요소의 컨텍스트에서 "일시 추적"이라고 합니다.
소비자는 이 필드의 마지막 값을 기억하고 커서를 재생성할 때마다 increasingField > lastValue
와 같은 필터를 사용하여 쿼리를 실행하여 읽지 않은 데이터만 사용합니다.
증가 필드 설정: 엔드포인트 URI tailTrackingIncreasingField
옵션에서 증가 필드의 키를 설정합니다. Camel 2.10에서는 이 필드에 대한 중첩된 탐색이 아직 지원되지 않으므로 데이터의 최상위 필드여야 합니다. 즉, "timestamp" 필드는 무의미하지만 "nested.timestamp"는 작동하지 않습니다. 중첩된 증가하는 필드에 대한 지원이 필요한 경우 Camel JIRA에서 티켓을 여십시오.
커서 다시 생성 지연: 초기화 시 새 데이터를 사용할 수 없는 경우 MongoDB가 커서를 즉시 종료한다는 점에 유의해야 합니다. 이 경우 서버를 덮어쓰지 않으므로 필요에 맞게 수정할 수 있는 cursorRegenerationDelay
옵션이 도입되었습니다(기본값: 1000ms).
예를 들면 다음과 같습니다.
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime") .id("tailableCursorConsumer1") .autoStartup(false) .to("mock:test");
위의 경로는 "flights.cancellations" capped 컬렉션에서 사용되며, 증가 필드로 "departureTime"을 사용하고 기본 재생성 커서 지연은 1000ms입니다.
230.7.1.2. 영구 테일 추적
표준 tail 추적은 volatile이며 마지막 값은 메모리에만 유지됩니다. 그러나 실제로 언제든지 Camel 컨테이너를 다시 시작해야 하지만 마지막 값이 손실되고 테일 수 있는 커서 소비자가 다시 사용하기 시작합니다. 따라서 중복 레코드를 경로로 보낼 가능성이 큽니다.
이러한 상황을 극복하기 위해 지속적인 테일 추적 기능을 활성화하여 MongoDB 데이터베이스 내부의 특수 컬렉션에서도 마지막으로 소비되는 증가 값을 추적할 수 있습니다. 소비자가 다시 초기화하면 마지막으로 추적된 값을 복원하고 아무 일도 발생하지 않은 것처럼 계속합니다.
마지막 읽기 값은 두 번 유지됩니다. 커서가 다시 생성될 때마다 소비자가 종료됩니다. 요구가 있는 경우 강력한 안정성을 위해 향후 (5초마다 플러시) 정기적으로 지속하는 것을 고려할 수 있습니다. 이 기능을 요청하려면 Camel JIRA에서 티켓을 여십시오.
230.7.1.3. 영구 테일 추적 활성화
이 기능을 활성화하려면 끝점 URI에서 최소한 다음 옵션을 설정합니다.
-
persistentTailTracking
옵션을true
로 설정 -
이 소비자의 고유 식별자에 대한
persistentId
옵션을 사용하면 많은 소비자에서 동일한 컬렉션을 재사용할 수 있습니다.
또한 tailTrackDb
,tailTrackCollection
및 tailTrackField
옵션을 설정하여 런타임 정보가 저장되는 위치를 사용자 지정할 수 있습니다. 각 옵션에 대한 설명은 이 페이지 상단에 있는 끝점 옵션 표를 참조하십시오.
예를 들어, 다음 경로는 "flights.cancellations" capped 컬렉션에서 소비하고, 증가 필드로 "departureTime"을 사용하고, 1000ms의 기본 다시 생성 커서 지연과 함께 영구 tail 추적이 켜지고 "flights.camelTailTracking"의 "cancellationsTracker" ID 아래에 유지됩니다. "lastTrackingValue" 필드에 마지막으로 처리된 값을 저장합니다(camelTailTracking
및 lastTrackingValue
는 기본값임).
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker") .id("tailableCursorConsumer2") .autoStartup(false) .to("mock:test");
다음은 위의 항목과 동일한 예입니다. 그러나 영구 테일 추적 런타임 정보는 "마지막ProcessedDepartureTime" 필드의 "trackers.camelTrackers" 컬렉션에 저장됩니다.
from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" + "&tailTrackField=lastProcessedDepartureTime") .id("tailableCursorConsumer3") .autoStartup(false) .to("mock:test");
230.7.2. 스트림 소비자 변경
Change Streams를 사용하면 애플리케이션이 MongoDB oplog를 테일링하는 복잡성과 위험 없이 실시간 데이터 변경에 액세스할 수 있습니다. 애플리케이션은 변경 스트림을 사용하여 컬렉션의 모든 데이터 변경 사항을 구독하고 즉시 응답할 수 있습니다. 변경 스트림은 집계 프레임워크를 사용하므로 애플리케이션은 특정 변경 사항을 필터링하거나 원하는 대로 알림을 변환할 수도 있습니다.
Change Streams Consumer를 구성하려면 이벤트를 필터링하기 위해 consumerType
,데이터베이스
,컬렉션
및 선택적 JSON 속성 streamFilter
를 지정해야 합니다. JSON 속성은 표준 MongoDB $match
집계입니다. XML DSL 구성을 사용하여 쉽게 지정할 수 있습니다.
<route id="filterConsumer"> <from uri="mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets"/> <to uri="mock:test"/> <routeProperty key="streamFilter" value="{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}"/> </route>
Java 구성:
from("mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets") .routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}") .to("mock:test");