6.7. 스트리밍 업로드 모드
스트림 모드를 사용하면 멀티 파트 업로드를 활용하여 미리 데이터 차원을 인식하지 않고도 S3에 데이터를 업로드할 수 있습니다. batchSize가 완료되었거나 batchMessageNumber에 도달한 경우 업로드가 완료됩니다. 이름 지정 전략에는 다음 두 가지가 있습니다.
Progressive
프로그레시브 전략을 사용하면 각 파일에 keyName 옵션과 프로그레시브 카운터로 구성된 이름과 결국 파일 확장(있는 경우)이 있습니다.
random.
임의의 전략을 사용하면 keyName 후에 UUID가 추가되고 결국 파일 확장자가 추가됩니다.
예를 들면 다음과 같습니다.
from(kafka("topic1").brokers("localhost:9092"))
.log("Kafka Message is: ${body}")
.to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
from(kafka("topic2").brokers("localhost:9092"))
.log("Kafka Message is: ${body}")
.to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt"));
배치의 기본 크기는 1Mb이지만 요구 사항에 따라 조정할 수 있습니다.
생산자 경로를 중지하면 생산자가 버퍼링된 나머지 메시지를 플러시하고 업로드를 완료합니다.
스트리밍 업로드에서는 생산자가 남아 있는 시점에서 생산자를 다시 시작할 수 있습니다. 이 기능은 프로그레시브 이름 지정 전략을 사용하는 경우에만 중요합니다.
restartingPolicy를 lastPart로 설정하면 생산자가 남아 있는 마지막 부분 번호에서 파일 및 콘텐츠 업로드를 다시 시작합니다.
예제
- 프로그레시브 이름 지정 전략과 키 이름이 camel.txt로 경로를 시작합니다. batchMessageNumber는 20이고 restartPolicy는 lastPart - Send 70 메시지와 동일합니다.
- 경로 중지
S3 버킷에 4개의 파일이 표시됩니다. * camel.txt
- camel-1.txt
- camel-2.txt
camel-3.txt
처음 3개의 메시지는 20개, 마지막 메시지는 10개입니다.
- 경로를 다시 시작합니다.
- 25 개의 메시지를 보냅니다.
- 경로를 중지합니다.
- 이제 버킷에 두 개의 다른 파일인 camel-5.txt 및 camel-6.txt가 있고 20개의 메시지가 있는 첫 번째 파일과 5개의 메시지가 있습니다.
- 진행 중
이는 임의의 이름 지정 전략을 사용할 때 필요하지 않습니다.
반대로 override restartPolicy를 지정할 수 있습니다. 이 경우 버킷에서 (해당 특정 keyName) 이전에 작성한 모든 항목을 덮어쓸 수 있습니다.
스트리밍 업로드 모드에서 고려해야 할 유일한 keyName 옵션은 끝점 옵션입니다. 헤더를 사용하면 NPE가 throw되며 설계에 의해 수행됩니다. 헤더를 설정하면 각 교환에서 파일 이름을 잠재적으로 변경할 수 있으며 이는 스트리밍 업로드 프로듀서의 목표에 해당합니다. keyName은 고정되고 정적이어야 합니다. 선택한 이름 지정 전략이 나머지 작업을 수행합니다.
또 다른 가능성은 batchMessageNumber 및 batchSize 옵션을 사용하여 streamingUploadTimeout을 지정하는 것입니다. 이 옵션을 사용하면 특정 시간이 경과한 후 파일 업로드를 완료할 수 있습니다. 이러한 방식으로 업로드 완료는 시간 초과, 메시지 수 및 배치 크기의 세 가지 계층에 전달됩니다.
예를 들면 다음과 같습니다.
from(kafka("topic1").brokers("localhost:9092"))
.log("Kafka Message is: ${body}")
.to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).streamingUploadTimeout(10000).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
이 경우 업로드는 10초 후에 완료됩니다.