6.7. ストリーミングアップロードモード


ストリームモードを有効にすると、ユーザーはマルチパートアップロードを利用することで、データの次元を事前に知らなくても S3 にデータをアップロードできます。アップロードは、batchSize が完了したか、batchMessageNumber に達したときに完了します。次の 2 つの命名戦略が考えられます。

  • progressive

    プログレッシブ戦略では、各ファイルには keyName オプションとプログレッシブカウンターで設定された名前が付けられ、最終的にはファイル拡張子 (存在する場合) が付けられます。

  • ランダム

    ランダム戦略では、keyName の後に UUID が追加され、最終的にファイル拡張子が追加されます。

たとえば、以下のようになります。

from(kafka("topic1").brokers("localhost:9092"))
        .log("Kafka Message is: ${body}")
        .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));

from(kafka("topic2").brokers("localhost:9092"))
         .log("Kafka Message is: ${body}")
         .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt"));
Copy to Clipboard Toggle word wrap

バッチのデフォルトサイズは 1 Mb ですが、必要に応じて調整できます。

producer ルートを停止すると、producer はバッファーリングされた残りのメッセージをフラッシュし、アップロードを完了します。

ストリーミングアップロードでは、producer を離れたところから再開できます。この機能は、プログレッシブ命名戦略を使用する場合にのみ重要であることに注意してください。

restartingPolicy を lastPart に設定することで、ファイルとコンテンツのアップロードを producer が残した最後のパーツ番号から再開します。

  1. プログレッシブ命名戦略でルートを開始し、keyname は camel.txt に等しく、batchMessageNumber は 20 に等しく、restartingPolicy は lastPart に等しい - 70 個のメッセージを送信します。
  2. ルートを停止
  3. S3 バケットには、次の 4 つのファイルが表示されます: * camel.txt

    • camel-1.txt
    • camel-2.txt
    • camel-3.txt

      最初の 3 つには 20 件のメッセージが含まれますが、最後の 1 つには 10 件しかありません。

  4. ルートを再開します。
  5. メッセージを 25 回送信します。
  6. ルートを停止します。
  7. バケットには他に 2 つのファイルがあります。camel-5.txt と camel-6.txt です。最初のファイルには 20 件のメッセージがあり、2 つ目のファイルには 5 件のメッセージがあります。
  8. どうぞ

ランダムな命名戦略を使用する場合、これは必要ありません。

反対に、オーバーライドの restartingPolicy を指定できます。その場合、バケットで以前に書いたものを (その特定のキー名に対して) 上書きすることができます。

注記

ストリーミングアップロードモードでは、考慮される唯一の keyName オプションは endpoint オプションです。ヘッダーを使用すると NPE が出力されますが、これは設計によるものです。ヘッダーを設定すると、各交換でファイル名が変更される可能性があり、これはストリーミングアップロード producer の目的に反します。keyName は固定で静的である必要があります。選択した命名戦略によって、残りの作業が行われます。

もう 1 つの可能性は、batchMessageNumber および batchSize オプションで streamingUploadTimeout を指定することです。このオプションを使用すると、ユーザーは一定の時間が経過した後にファイルのアップロードを完了することができます。このように、アップロードの完了は、タイムアウト、メッセージ数、およびバッチサイズの 3 つの層で渡されます。

たとえば、以下のようになります。

from(kafka("topic1").brokers("localhost:9092"))
        .log("Kafka Message is: ${body}")
        .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).streamingUploadTimeout(10000).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
Copy to Clipboard Toggle word wrap

この場合、アップロードは 10 秒後に完了します。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat