6.7. ストリーミングアップロードモード
ストリームモードを有効にすると、ユーザーはマルチパートアップロードを利用することで、データの次元を事前に知らなくても S3 にデータをアップロードできます。アップロードは、batchSize が完了したか、batchMessageNumber に達したときに完了します。次の 2 つの命名戦略が考えられます。
progressive
プログレッシブ戦略では、各ファイルには keyName オプションとプログレッシブカウンターで設定された名前が付けられ、最終的にはファイル拡張子 (存在する場合) が付けられます。
ランダム
ランダム戦略では、keyName の後に UUID が追加され、最終的にファイル拡張子が追加されます。
たとえば、以下のようになります。
バッチのデフォルトサイズは 1 Mb ですが、必要に応じて調整できます。
producer ルートを停止すると、producer はバッファーリングされた残りのメッセージをフラッシュし、アップロードを完了します。
ストリーミングアップロードでは、producer を離れたところから再開できます。この機能は、プログレッシブ命名戦略を使用する場合にのみ重要であることに注意してください。
restartingPolicy を lastPart に設定することで、ファイルとコンテンツのアップロードを producer が残した最後のパーツ番号から再開します。
例
- プログレッシブ命名戦略でルートを開始し、keyname は camel.txt に等しく、batchMessageNumber は 20 に等しく、restartingPolicy は lastPart に等しい - 70 個のメッセージを送信します。
- ルートを停止
S3 バケットには、次の 4 つのファイルが表示されます: * camel.txt
- camel-1.txt
- camel-2.txt
camel-3.txt
最初の 3 つには 20 件のメッセージが含まれますが、最後の 1 つには 10 件しかありません。
- ルートを再開します。
- メッセージを 25 回送信します。
- ルートを停止します。
- バケットには他に 2 つのファイルがあります。camel-5.txt と camel-6.txt です。最初のファイルには 20 件のメッセージがあり、2 つ目のファイルには 5 件のメッセージがあります。
- どうぞ
ランダムな命名戦略を使用する場合、これは必要ありません。
反対に、オーバーライドの restartingPolicy を指定できます。その場合、バケットで以前に書いたものを (その特定のキー名に対して) 上書きすることができます。
ストリーミングアップロードモードでは、考慮される唯一の keyName オプションは endpoint オプションです。ヘッダーを使用すると NPE が出力されますが、これは設計によるものです。ヘッダーを設定すると、各交換でファイル名が変更される可能性があり、これはストリーミングアップロード producer の目的に反します。keyName は固定で静的である必要があります。選択した命名戦略によって、残りの作業が行われます。
もう 1 つの可能性は、batchMessageNumber および batchSize オプションで streamingUploadTimeout を指定することです。このオプションを使用すると、ユーザーは一定の時間が経過した後にファイルのアップロードを完了することができます。このように、アップロードの完了は、タイムアウト、メッセージ数、およびバッチサイズの 3 つの層で渡されます。
たとえば、以下のようになります。
from(kafka("topic1").brokers("localhost:9092")) .log("Kafka Message is: ${body}") .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).streamingUploadTimeout(10000).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
from(kafka("topic1").brokers("localhost:9092"))
.log("Kafka Message is: ${body}")
.to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).streamingUploadTimeout(10000).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
この場合、アップロードは 10 秒後に完了します。