229.7. 소비자


다음과 같은 여러 유형의 소비자가 있습니다.

229.7.1. 트릴 수 있는 Cursor Consumer

MongoDB는 *nix 시스템의 tail -f 명령처럼 커서를 열어 두어 컬렉션에서 지속적인 데이터를 즉시 소비할 수 있는 메커니즘을 제공합니다. 이 메커니즘은 클라이언트가 새 데이터를 가져오기 위해 예약된 간격으로 다시 ping하는 대신 서버가 사용 가능하게 되는 대로 새 데이터를 클라이언트에 푸시하기 때문에 예약된 폴링보다 훨씬 효율적입니다. 또한 중복된 네트워크 트래픽도 줄입니다.

tailable 커서를 사용해야 하는 것은 하나만 있습니다. 컬렉션에는 "capped collection"이어야 합니다. 즉, N개의 오브젝트만 보유하고 제한에 도달하면 원래 순서에 따라 이전 오브젝트를 플러시합니다. 자세한 내용은 http://www.mongodb.org/display/DOCS/Tailable+Cursors 에서 참조하십시오.

Camel MongoDB 구성 요소는 tailable 커서 소비자를 구현하므로 Camel 경로에서 이 기능을 사용할 수 있습니다. 새로운 객체가 삽입되면 MongoDB는 문서 를 던질 수 있는 커서 소비자로 만들고 이를 교환으로 변환하고 경로 논리를 트리거합니다.

229.7.1.1. tailable 커서 소비자 작동 방식

커서를 tailable 커서로 전환하기 위해 먼저 커서를 생성할 때 몇 가지 특수 플래그가 MongoDB에 신호를 보냅니다. 생성된 커서는 열린 상태로 유지되며 새 데이터가 도달할 때까지 MongoCursor.next() 메서드를 호출하면 차단됩니다. 그러나 MongoDB 서버는 결정되지 않은 기간 후에 새 데이터가 나타나지 않는 경우 커서를 종료할 수 있는 권한을 예약합니다. 새 데이터를 계속 사용하고자 하는 경우 커서를 다시 생성해야 합니다. 그리고 이렇게하려면, 왼쪽 위치가 없거나 맨 위에서 다시 소비하기 시작할 위치를 명심해야합니다.

Camel MongoDB tailable 커서 소비자는 이러한 모든 작업을 처리합니다. 증가한 특성의 데이터에 키를 제공해야 합니다. 이러한 키는 재생성될 때마다 커서를 배치하는 마커 역할을 합니다(예: 타임스탬프, 순차 ID 등). MongoDB에서 지원하는 모든 데이터 유형일 수 있습니다. 날짜, 문자열 및 정수가 제대로 작동하는 것으로 확인됩니다. 이 구성 요소의 컨텍스트에서 이 메커니즘을 "tail tracking"이라고 합니다.

소비자는 이 필드의 마지막 값을 기억할 것이며 커서가 다시 생성될 때마다 increaseField > lastValue 과 같은 필터를 사용하여 쿼리를 실행하여 읽지 않은 데이터만 사용합니다.

increasing 필드 설정: 끝점 URI tailingIncreasingField 옵션에서 increasing 필드의 키를 설정합니다. Camel 2.10에서는 이 필드의 중첩된 탐색이 아직 지원되지 않으므로 데이터의 최상위 필드여야 합니다. 즉, "timestamp" 필드는 부드럽지만 "nested.timestamp"는 작동하지 않습니다. 중첩된 증가하는 필드에 대한 지원이 필요한 경우 Camel JIRA에서 티켓을 작성하십시오.

커서 재생성 지연: 초기 시 새 데이터를 사용할 수 없는 경우 MongoDB에서 커서를 즉시 종료합니다. 이 경우 서버를 재정의하지 않기 때문에 필요에 맞게 수정할 수 있는 cursorRegenerationDelay 옵션이 도입되었습니다(기본값: 1000ms.).

예를 들면 다음과 같습니다.

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime")
    .id("tailableCursorConsumer1")
    .autoStartup(false)
    .to("mock:test");

위의 경로는 기본 재생성 커서 지연이 1000ms인 "departureTime" 필드로 "departureTime"을 사용하여 "flights.cancellations"에서 사용됩니다.

229.7.1.2. 영구 테일 추적

표준 테일 추적은 volatile이며 마지막 값은 메모리에만 유지됩니다. 그러나 실제로는 이제 Camel 컨테이너를 다시 시작해야 하지만 마지막 값이 손실되고 tailable 커서 소비자가 상단에서 다시 소비되기 시작하여 경로에 중복 레코드를 보낼 가능성이 높습니다.

이 상황을 극복하기 위해 영구 테일 추적 기능을 활성화하여 MongoDB 데이터베이스 내부의 특수 컬렉션에서 마지막으로 소비되는 값을 추적할 수 있습니다. 소비자가 다시 초기화하면 마지막으로 추적한 값을 복원하고 아무 일도 없던 것처럼 계속됩니다.

마지막 읽기 값은 두 번 실행됩니다. 커서가 재생될 때마다 그리고 소비자가 종료될 때입니다. 수요가 있는 경우 향후 (5초 마다 유입) 일정한 간격으로 유지되는 것을 고려할 수 있습니다. 이 기능을 요청하려면 Camel JIRA에서 티켓을 작성하십시오.

229.7.1.3. 영구 테일 추적 활성화

이 기능을 활성화하려면 끝점 URI에서 다음 옵션을 최소한으로 설정합니다.

  • persistentTailECDHE ing 옵션 true
  • 많은 소비자에서 동일한 컬렉션을 재사용할 수 있도록 이 소비자의 고유 식별자에 대한 persistentId 옵션

또한 tailECDHE Db , tail ECDHEECDHE 및 tail ECDHEField 옵션을 설정하여 런타임 정보가 저장될 위치를 사용자 지정할 수 있습니다. 각 옵션에 대한 설명은 이 페이지 상단에 있는 끝점 옵션 테이블을 참조하십시오.

예를 들어 다음 경로는 증가하는 필드로 "departureTime"을 사용하여 1000ms의 기본 재생 커서 지연과 함께 1000ms의 "flights.cancellations"에서 소비되고 "flight.camelTail wishing" ID에 따라 유지됩니다. 마지막으로 처리된 값을 "최종" 필드에 저장(camelTailECDHEing 및 lastECDHE ingValue 은 기본값임).

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker")
    .id("tailableCursorConsumer2")
    .autoStartup(false)
    .to("mock:test");

다음은 위의 예와 동일하지만 영구 테일 추적 런타임 정보는 "lastProcessedDepartureTime" 필드의 "trackers.camelECDHEers" 컬렉션에 저장됩니다.

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" +
     "&tailTrackField=lastProcessedDepartureTime")
    .id("tailableCursorConsumer3")
    .autoStartup(false)
    .to("mock:test");

229.7.2. Change Streams Consumer

Change Streams를 사용하면 애플리케이션이 MongoDB oplog의 복잡성과 위험없이 실시간 데이터 변경에 액세스할 수 있습니다. 애플리케이션에서는 변경 스트림을 사용하여 컬렉션의 모든 데이터 변경 사항을 구독하고 즉시 대응할 수 있습니다. 변경 스트림은 집계 프레임워크를 사용하므로 애플리케이션은 특정 변경 사항에 대해 필터링하거나 알림을 원하는 대로 변환할 수도 있습니다.

Change Streams Consumer를 구성하려면 이벤트를 필터링하려면 consumerType,database,collection 및 선택적 JSON 속성 streamFilter 를 지정해야 합니다. 해당 JSON 속성은 표준 MongoDB $match 집계입니다. XML DSL 구성을 사용하여 쉽게 지정할 수 있습니다.

<route id="filterConsumer">
    <from uri="mongodb3:myDb?consumerType=changeStreams&amp;database=flights&amp;collection=tickets"/>
    <to uri="mock:test"/>

    <routeProperty key="streamFilter" value="{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}"/>
</route>

Java 구성:

from("mongodb3:myDb?consumerType=changeStreams&database=flights&collection=tickets")
    .routeProperty("streamFilter", "{'$match':{'$or':[{'fullDocument.stringValue': 'specificValue'}]}}")
    .to("mock:test");
Red Hat logoGithubRedditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

© 2024 Red Hat, Inc.