6.7. 스트리밍 업로드 모드
스트림 모드를 사용하면 멀티 파트 업로드를 활용하여 미리 데이터 차원을 인식하지 않고도 S3에 데이터를 업로드할 수 있습니다. batchSize가 완료되었거나 batchMessageNumber에 도달한 경우 업로드가 완료됩니다. 이름 지정 전략에는 다음 두 가지가 있습니다.
Progressive
프로그레시브 전략을 사용하면 각 파일에 keyName 옵션과 프로그레시브 카운터로 구성된 이름과 결국 파일 확장(있는 경우)이 있습니다.
random.
임의의 전략을 사용하면 keyName 후에 UUID가 추가되고 결국 파일 확장자가 추가됩니다.
예를 들면 다음과 같습니다.
배치의 기본 크기는 1Mb이지만 요구 사항에 따라 조정할 수 있습니다.
생산자 경로를 중지하면 생산자가 버퍼링된 나머지 메시지를 플러시하고 업로드를 완료합니다.
스트리밍 업로드에서는 생산자가 남아 있는 시점에서 생산자를 다시 시작할 수 있습니다. 이 기능은 프로그레시브 이름 지정 전략을 사용하는 경우에만 중요합니다.
restartingPolicy를 lastPart로 설정하면 생산자가 남아 있는 마지막 부분 번호에서 파일 및 콘텐츠 업로드를 다시 시작합니다.
예제
- 프로그레시브 이름 지정 전략과 키 이름이 camel.txt로 경로를 시작합니다. batchMessageNumber는 20이고 restartPolicy는 lastPart - Send 70 메시지와 동일합니다.
- 경로 중지
S3 버킷에 4개의 파일이 표시됩니다. * camel.txt
- camel-1.txt
- camel-2.txt
camel-3.txt
처음 3개의 메시지는 20개, 마지막 메시지는 10개입니다.
- 경로를 다시 시작합니다.
- 25 개의 메시지를 보냅니다.
- 경로를 중지합니다.
- 이제 버킷에 두 개의 다른 파일인 camel-5.txt 및 camel-6.txt가 있고 20개의 메시지가 있는 첫 번째 파일과 5개의 메시지가 있습니다.
- 진행 중
이는 임의의 이름 지정 전략을 사용할 때 필요하지 않습니다.
반대로 override restartPolicy를 지정할 수 있습니다. 이 경우 버킷에서 (해당 특정 keyName) 이전에 작성한 모든 항목을 덮어쓸 수 있습니다.
스트리밍 업로드 모드에서 고려해야 할 유일한 keyName 옵션은 끝점 옵션입니다. 헤더를 사용하면 NPE가 throw되며 설계에 의해 수행됩니다. 헤더를 설정하면 각 교환에서 파일 이름을 잠재적으로 변경할 수 있으며 이는 스트리밍 업로드 프로듀서의 목표에 해당합니다. keyName은 고정되고 정적이어야 합니다. 선택한 이름 지정 전략이 나머지 작업을 수행합니다.
또 다른 가능성은 batchMessageNumber 및 batchSize 옵션을 사용하여 streamingUploadTimeout을 지정하는 것입니다. 이 옵션을 사용하면 특정 시간이 경과한 후 파일 업로드를 완료할 수 있습니다. 이러한 방식으로 업로드 완료는 시간 초과, 메시지 수 및 배치 크기의 세 가지 계층에 전달됩니다.
예를 들면 다음과 같습니다.
from(kafka("topic1").brokers("localhost:9092")) .log("Kafka Message is: ${body}") .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).streamingUploadTimeout(10000).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
from(kafka("topic1").brokers("localhost:9092"))
.log("Kafka Message is: ${body}")
.to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).streamingUploadTimeout(10000).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
이 경우 업로드는 10초 후에 완료됩니다.