5.7. 스트리밍 업로드 모드
스트림 모드를 활성화하면 사용자는 다중 파트 업로드를 활용하여 데이터의 차원을 미리 모르는 채 S3에 데이터를 업로드할 수 있습니다. batchSize가 완료되거나 batchMessageNumber에 도달하면 업로드가 완료됩니다. 두 가지 가능한 명명 전략이 있습니다.
Progressive
프로그레시브 전략을 사용하면 각 파일의 이름이 keyName 옵션과 프로그레시브 카운터로 구성되고 결국 파일 확장자 (있는 경우)가 있습니다.
random.
임의의 전략을 사용하면 keyName 뒤에 UUID가 추가되고 결국 파일 확장자가 추가됩니다.
예를 들면 다음과 같습니다.
from(kafka("topic1").brokers("localhost:9092"))
.log("Kafka Message is: ${body}")
.to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
from(kafka("topic2").brokers("localhost:9092"))
.log("Kafka Message is: ${body}")
.to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt"));
배치의 기본 크기는 1Mb이지만 요구 사항에 따라 조정할 수 있습니다.
생산자 경로를 중지하면 생산자가 나머지 버퍼링된 메시지를 플러시하고 업로드를 완료합니다.
Streaming upload를 사용하면 남은 지점에서 생산자를 다시 시작할 수 있습니다. 이 기능은 점차적인 명명 전략을 사용할 때만 중요합니다.
restartingPolicy를 lastPart로 설정하면 생산자가 남은 마지막 파트 번호에서 파일 및 콘텐츠 업로드를 다시 시작합니다.
예제
- 점진적으로 이름 지정 전략을 사용하여 경로를 시작하고 keyname은 camel.txt와 동일하며 batchMessageNumber가 20이고 restartingPolicy는 lastPart - Send 70 메시지와 동일합니다.
- 경로 중지
S3 버킷에 4 개의 파일이 표시됩니다. * camel.txt
- camel-1.txt
- camel-2.txt
camel-3.txt
처음 세 개는 20 개의 메시지를 가지고 있으며 마지막 10 개의 메시지 만 있습니다.
- 경로를 다시 시작합니다.
- 25개의 메시지를 보냅니다.
- 경로를 중지합니다.
- 이제 버킷에 다른 두 개의 파일, camel-5.txt 및 camel-6.txt, 첫 번째 파일에는 20 개의 메시지가 있고 두 번째는 5 개의 메시지가 있습니다.
- 계속 진행하십시오.
임의의 이름 지정 전략을 사용할 때는 필요하지 않습니다.
그 반대에서는 override restartPolicy를 지정할 수 있습니다. 이 경우 버킷의 (특정 keyName) 이전에 작성한 모든 항목을 재정의할 수 있습니다.
업로드 모드에서 고려해야 할 유일한 keyName 옵션은 끝점 옵션입니다. 헤더를 사용하면 NPE가 throw되고 이는 설계에 의해 수행됩니다. 헤더를 설정하면 각 교환에서 파일 이름을 변경할 수 있으며 이는 스트리밍 업로드 생산자와 반대됩니다. keyName은 고정 및 고정되어야 합니다. 선택한 명명 전략은 작업의 나머지 부분을 수행합니다.
또 다른 가능성은 batchMessageNumber 및 batchSize 옵션을 사용하여 streamingUploadTimeout을 지정하는 것입니다. 이 옵션을 사용하면 사용자는 특정 시간이 지난 후 파일 업로드를 완료할 수 있습니다. 이러한 방식으로 업로드 완료는 시간 초과, 메시지 수 및 배치 크기의 세 가지 계층에 전달됩니다.
예를 들면 다음과 같습니다.
from(kafka("topic1").brokers("localhost:9092"))
.log("Kafka Message is: ${body}")
.to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).streamingUploadTimeout(10000).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
이 경우 업로드는 10초 후에 완료됩니다.