5.7. 流上传模式


启用流模式后,用户可以通过利用多部分上传将数据上传到 S3,而无需提前提前知道数据量。当: batchSize 已完成,或已达到 batchMessageNumber 时,上传已完成。有两种可用的命名策略:

  • 渐进

    使用不断进化的策略,每个文件的名称由 keyName 选项和逐渐计数器组成,最终文件扩展(若有)

  • 随机.

    使用随机策略时,UUID 将在 keyName 之后添加,并且最终文件扩展名将被附加。

例如:

from(kafka("topic1").brokers("localhost:9092"))
        .log("Kafka Message is: ${body}")
        .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));

from(kafka("topic2").brokers("localhost:9092"))
         .log("Kafka Message is: ${body}")
         .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt"));

批处理的默认大小为 1 Mb,但您可以根据要求进行调整。

当您停止制作者路由时,制作者将负责清除剩余的缓冲区消息并完成上传。

在流上传中,您可以从其离开的地方重启制作者。务必要注意,只有在使用先进命名策略时,此功能才至关重要。

通过将 restartPolicy 设置为 lastPart,您可以重新启动从最后一部分上传文件和内容。

示例

  1. 使用先进命名策略和 keyname 等于 camel.txt 启动路由,batchMessageNumber 等于 20,restartPolicy 等于 lastPart - Send 70 消息。
  2. 停止路由
  3. 在 S3 存储桶中,您应该看到 4 个文件:* camel.txt

    • camel-1.txt
    • camel-2.txt
    • camel-3.txt

      前三个将有 20 个消息,而最后一条消息仅有 10 个。

  4. 重启路由。
  5. 发送 25 个消息。
  6. 停止路由。
  7. 现在,您的存储桶中有 2 个其他文件:aml-5.txt 和 camel-6.txt,第一个包含 20 个消息,第二个显示 5 个消息。
  8. 进入

使用随机命名策略时不需要这一步。

相反,您可以指定覆盖 restartPolicy。在这种情况下,您可以覆盖存储桶(针对该特定 keyName)之前写入的任何信息。

注意

在流上传模式中,考虑的唯一 keyName 选项是 endpoint 选项。使用标题会抛出 NPE,具体根据设计来实现。设置标头意味着可能会更改每个交换上的文件名,这与流上传制作者的目的相同。keyName 需要被修复并需要静态。所选的命名策略将执行剩余的工作。

另一种可能是通过 batchMessageNumber 和 batchSize 选项指定 streamingUploadTimeout。通过此选项,用户可以在经过特定时间后完成文件上传。这样,上传完成的过程将在三个层上传递:超时、消息的数量和批处理大小。

例如:

from(kafka("topic1").brokers("localhost:9092"))
        .log("Kafka Message is: ${body}")
        .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).streamingUploadTimeout(10000).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));

在这种情况下,上传将在 10 秒后完成。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.