5.7. 流上传模式
启用流模式后,用户可以通过利用多部分上传将数据上传到 S3,而无需提前提前知道数据量。当: batchSize 已完成,或已达到 batchMessageNumber 时,上传已完成。有两种可用的命名策略:
渐进
使用不断进化的策略,每个文件的名称由 keyName 选项和逐渐计数器组成,最终文件扩展(若有)
随机.
使用随机策略时,UUID 将在 keyName 之后添加,并且最终文件扩展名将被附加。
例如:
from(kafka("topic1").brokers("localhost:9092")) .log("Kafka Message is: ${body}") .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt")); from(kafka("topic2").brokers("localhost:9092")) .log("Kafka Message is: ${body}") .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt"));
批处理的默认大小为 1 Mb,但您可以根据要求进行调整。
当您停止制作者路由时,制作者将负责清除剩余的缓冲区消息并完成上传。
在流上传中,您可以从其离开的地方重启制作者。务必要注意,只有在使用先进命名策略时,此功能才至关重要。
通过将 restartPolicy 设置为 lastPart,您可以重新启动从最后一部分上传文件和内容。
示例
- 使用先进命名策略和 keyname 等于 camel.txt 启动路由,batchMessageNumber 等于 20,restartPolicy 等于 lastPart - Send 70 消息。
- 停止路由
在 S3 存储桶中,您应该看到 4 个文件:* camel.txt
- camel-1.txt
- camel-2.txt
camel-3.txt
前三个将有 20 个消息,而最后一条消息仅有 10 个。
- 重启路由。
- 发送 25 个消息。
- 停止路由。
- 现在,您的存储桶中有 2 个其他文件:aml-5.txt 和 camel-6.txt,第一个包含 20 个消息,第二个显示 5 个消息。
- 进入
使用随机命名策略时不需要这一步。
相反,您可以指定覆盖 restartPolicy。在这种情况下,您可以覆盖存储桶(针对该特定 keyName)之前写入的任何信息。
在流上传模式中,考虑的唯一 keyName 选项是 endpoint 选项。使用标题会抛出 NPE,具体根据设计来实现。设置标头意味着可能会更改每个交换上的文件名,这与流上传制作者的目的相同。keyName 需要被修复并需要静态。所选的命名策略将执行剩余的工作。
另一种可能是通过 batchMessageNumber 和 batchSize 选项指定 streamingUploadTimeout。通过此选项,用户可以在经过特定时间后完成文件上传。这样,上传完成的过程将在三个层上传递:超时、消息的数量和批处理大小。
例如:
from(kafka("topic1").brokers("localhost:9092")) .log("Kafka Message is: ${body}") .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).streamingUploadTimeout(10000).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
在这种情况下,上传将在 10 秒后完成。