6.7. 流上传模式
启用流模式后,用户可以通过多部分上传将数据上传到 S3,而无需提前了解数据维度的时间。上传将在完成后完成: batchSize 已完成,或者达到 batchMessageNumber。有两个可能的命名策略:
progressive
使用 progressive 策略,每个文件的名称都由 keyName 选项和一个 progressive 计数器组成,最终文件扩展名(若有)
random。
使用随机策略时,将在 keyName 后添加 UUID,最终会附加文件扩展名。
例如:
from(kafka("topic1").brokers("localhost:9092")) .log("Kafka Message is: ${body}") .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt")); from(kafka("topic2").brokers("localhost:9092")) .log("Kafka Message is: ${body}") .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt"));
批处理的默认大小为 1 Mb,但您可以根据您的要求进行调整。
当您停止生成者路由时,生成者将负责刷新剩余的缓冲消息,并完成上传。
在流上传中,您将能够从离开的时间点重新启动生成者。务必要注意,只有在使用进度命名策略时,此功能才至关重要。
通过将 restartPolicy 设置为 lastPart,您将重启从制作者左侧最后一个部分编号上传文件和内容。
示例
- 使用 progressive naming strategy 和 keyname 等于 camel.txt 来启动路由,batchMessageNumber 等于 20,restartPolicy 等于 lastPart - Send 70 消息。
- 停止路由
在您的 S3 存储桶中,您现在应该看到 4 个文件:* camel.txt
- camel-1.txt
- camel-2.txt
camel-3.txt
前三个消息将有 20 个消息,而最后一个消息仅有 10 个。
- 重新启动路由。
- 发送 25 个消息。
- 停止路由。
- 您的存储桶中现在有 2 个其他文件: camel-5.txt 和 camel-6.txt,第一个带有 20 个消息,第二个文件为 5 个信息。
- 继续
使用随机命名策略时不需要这样做。
相反,您可以指定覆盖重启策略。在这种情况下,您可以覆盖您在存储桶上之前(用于该特定 keyName)写入的任何内容。
在流上传模式中,将考虑的唯一 keyName 选项是端点选项。使用标头将抛出 NPE,这由设计完成。设置标头意味着可能会更改每个交换上的文件名,这针对流上传制作者的动画。keyName 需要修复和静态。所选命名策略将执行其余工作。
另一个可能是使用 batchMessageNumber 和 batchSize 选项指定 streamingUploadTimeout。使用此选项时,用户可以在特定时间通过后完成文件上传。这样,上传完成将在三个层上传递:超时、消息数和批处理大小。
例如:
from(kafka("topic1").brokers("localhost:9092")) .log("Kafka Message is: ${body}") .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25).streamingUploadTimeout(10000).namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive).keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));
在这种情况下,上传将在 10 秒后完成。