8.5. 使用方法
例如,要从 camelazure
存储帐户中的 container1
上的 block blob hello.txt
下载 blob 内容,请使用以下代码片段:
from("azure-storage-blob://camelazure/container1?blobName=hello.txt&accessKey=yourAccessKey"). to("file://blobdirectory");
8.5.1. 由组件制作者评估的消息标头
标头 | 变量名称 | 类型 | 操作 | 描述 |
---|---|---|---|---|
|
|
| All | 将引发 {@link RuntimeException} 的可选超时值。 |
|
|
| 与容器和 blob 相关的操作 | 与容器或 blob 关联的元数据。 |
|
|
|
|
指定此容器中的数据如何可供公共使用。传递 |
|
|
| 与容器和 blob 相关的操作 | 这将包含值,这会将各种请求的成功操作限制为存在的条件。这些条件完全是可选的。 |
|
|
|
| 列出特定 Blob 的详细信息 |
|
|
|
| 过滤结果,以仅返回名称以指定前缀开头的 Blob。可能为空,返回所有 Blob。 |
|
|
|
| 指定要返回的最大 Blob 数量,包括所有 BlobPrefix 元素。如果请求没有指定 maxResultsPerPage 或指定大于 5,000 的值,服务器将返回到 5,000 个项目。 |
|
|
|
| 定义可用于配置 {@link BlobContainerClient} 对象上对 listBlobsFlatSegment 的调用行为的选项。 |
|
|
|
| 一组操作的其他参数。 |
|
|
|
| 定义 AccessTier 的值。 |
|
|
| 大多数与上传 blob 相关的操作 | 块内容的 MD5 哈希。此哈希用于在传输过程中验证块的完整性。当指定此标头时,存储服务会将已到达此标头值的内容的哈希值进行比较。请注意,这个 MD5 哈希没有使用 blob 存储。如果两个哈希不匹配,则操作将失败。 |
|
|
| 与页面 blob 相关的操作 | {@link PageRange} 对象。由于页面必须与 512 字节边界一致,启动偏移必须是 modulus 为 512,最终偏移必须是 modulus 为 512 - 1。有效字节范围的示例为 0-511、512-1023 等。 |
|
|
|
|
当设置为 |
|
|
|
|
当设置为 |
|
|
|
|
当设置为 |
|
|
|
| 指定要返回的块类型。 |
|
|
|
| 指定页面 blob 的最大大小,最多 8 TB。页面 blob 大小必须与 512 字节边界一致。 |
|
|
|
| 用户控制的值,可用于跟踪请求。序列号的值必须在 0 到 2^63 - 1 之间。默认值为 0。 |
|
|
|
| 指定删除此 blob 上的快照的行为。\{@code Include} 将删除基础 blob 和所有快照。\{@code Only} 将只删除快照。如果删除了快照,则必须传递 null。 |
|
|
|
| {@link ListBlobContainersOptions},用于指定服务应返回哪些数据。 |
|
|
|
| {@link ParallelTransferOptions} 用于下载至文件。并行传输参数的数量将被忽略。 |
|
|
|
| 下载 Blob 将保存到的项目目录。 |
|
|
|
| 覆盖 URL 下载链接的默认过期时间(millis)。 |
|
|
| 与 blob 相关的操作 | 覆盖/设置交换标头中的 Blob 名称。 |
|
|
| 与容器和 blob 相关的操作 | 覆盖/设置交换标头中的容器名称。 |
|
|
| All | 指定要执行的制作者操作,请参阅此页面中的与制作者操作相关的 doc。 |
|
|
|
| 过滤结果,以仅返回名称与指定正则表达式匹配的 Blob。可能为空以返回所有.如果同时设置了 prefix 和 regex,则 regex 将采用优先级和前缀。 |
|
|
|
| 它过滤结果,以大约在开始时间后返回事件。注意:也可以返回属于上小时的几个事件。可以缺少属于此小时的几个事件;若要确保所有事件都返回,每小时舍入开始时间。 |
|
|
|
| 它过滤结果,以大约在结束时间前返回事件。注意:也可以返回属于下一小时的几个事件。可以缺少属于此小时的几个事件;若要确保所有事件都返回,每小时舍入结束时间。 |
|
|
|
| 这提供了在服务调用过程中通过 Http 管道传递的额外上下文。 |
|
|
|
| 在 copy blob 操作中用作源帐户名称的源 blob 帐户名称 |
|
|
|
| 在 copy blob 操作中用作源容器名称的源 blob 容器名称 |
8.5.2. 由组件制作者或消费者设置的消息标头
标头 | 变量名称 | 类型 | 描述 |
---|---|---|---|
|
|
| blob 的访问层。 |
|
|
| blob 最后更改的访问层时的日期。 |
|
|
| blob 的归档状态。 |
|
|
| blob 的创建时间。 |
|
|
| 页面 blob 的当前序列号。 |
|
|
| blob 的大小。 |
|
|
| blob 的类型。 |
|
|
| 为 blob 指定的缓存控制。 |
|
|
| 提交到附加 blob 的块数 |
|
|
| 为 blob 指定的内容分布。 |
|
|
| 为 blob 指定的内容编码。 |
|
|
| 为 blob 指定的内容语言。 |
|
|
| 为 blob 指定的内容 MD5。 |
|
|
| 为 blob 指定的内容类型。 |
|
|
| blob 已完成上一次副本操作时。 |
|
|
| blob 最后一次增量复制快照的快照标识符。 |
|
|
| blob 上执行的最后一个复制操作的标识符。 |
|
|
| 对 blob 执行的最后一个复制操作的进度。 |
|
|
| 对 blob 执行的最后一个复制操作的源。 |
|
|
| 对 blob 执行的最后一个复制操作的状态。 |
|
|
| blob 上最后一次复制操作的描述。 |
|
|
| blob 的 E Tag |
|
|
| 表示 blob 的访问层是否从 blob 的属性中推断出来。 |
|
|
| 指明 blob 是否递增复制的标记。 |
|
|
| 指明 blob 内容是否在服务器中加密的标志。 |
|
|
| 最后一次修改 blob 时的日期时间。 |
|
|
| blob 中的租期类型。 |
|
|
| blob 上租期的状态。 |
|
|
| blob 上租期的状态。 |
|
|
| 与 blob 关联的其他元数据。 |
|
|
| 块提交到块 blob 的偏移。 |
|
|
|
从操作 |
|
|
|
由 |
|
|
| 返回可由用户使用的非解析的 httpHeaders。 |
8.5.3. 高级 Azure Storage Blob 配置
如果您的 Camel 应用程序在防火墙后面运行,或者需要对 BlobServiceClient
实例配置拥有更多控制,您可以创建自己的实例:
StorageSharedKeyCredential credential = new StorageSharedKeyCredential("yourAccountName", "yourAccessKey"); String uri = String.format("https://%s.blob.core.windows.net", "yourAccountName"); BlobServiceClient client = new BlobServiceClientBuilder() .endpoint(uri) .credential(credential) .buildClient(); // This is camel context context.getRegistry().bind("client", client);
然后,在 Camel azure-storage-blob
组件配置中引用这个实例:
from("azure-storage-blob://cameldev/container1?blobName=myblob&serviceClient=#client") .to("mock:result");
8.5.4. 在 registry 中自动检测 BlobServiceClient 客户端
组件可以在 registry 中检测 BlobServiceClient bean 的存在。如果是该类型的唯一实例,它将用作客户端,并且不必将它定义为 uri 参数,如上例所示。这可能对端点的更智能配置非常有用。
8.5.5. Azure Storage Blob Producer 操作
Camel Azure Storage Blob 组件在制作者端提供广泛的操作:
服务级别的操作
对于这些操作,需要 accountName
。
操作 | 描述 |
---|---|
| 获取 blob 的内容。您可以将此操作的输出限制为 blob 范围。 |
| 返回存储帐户中所有更改的事务日志,以及您的存储帐户中的 blob 元数据。更改源提供有顺序、有保证、持久、不可变的、这些更改的只读日志。 |
容器级别的操作
对于这些操作,需要 accountName
和 containerName
。
操作 | 描述 |
---|---|
| 在存储帐户内创建新容器。如果存在具有相同名称的容器,则制作者将忽略它。 |
| 删除存储帐户中指定的容器。如果容器不存在,操作会失败。 |
| 返回此容器中的 Blob 列表,以及文件夹结构扁平化。 |
blob 级别的操作
对于这些操作,需要 accountName
、containerName
和 blobName
。
操作 | blob 类型 | 描述 |
---|---|---|
| Common | 获取 blob 的内容。您可以将此操作的输出限制为 blob 范围。 |
| Common | 删除 blob。 |
| Common | 将整个 blob 下载到路径指定的文件中。如果文件已存在一个 {@link FileAlreadyExistsException},则文件必须不存在。 |
| Common | 使用共享访问签名(SAS)为指定的 blob 生成下载链接。默认情况下,此限制仅限制为 1 小时允许访问。但是,您可以通过标头覆盖默认的过期持续时间。 |
| BlockBlob | 创建新的块 blob,或更新现有块 blob 的内容。更新现有的块 blob 覆盖 blob 上的任何现有元数据。PutBlob 不支持部分更新;新内容会覆盖现有 blob 的内容。 |
|
|
将指定的块上传到块 blob 的"staging 区域",以便稍后由调用提交到 commitBlobBlockList。但是,如果标头 |
|
|
通过指定要组成 blob 的块 ID 列表来写入 blob。要作为 blob 的一部分编写,块必须在之前的 |
|
| 使用指定的块列表过滤器,返回已作为块 blob 一部分的块列表。 |
|
| 创建一个 0-length 附加 blob。调用 commitAppendBlo'b 操作,将数据附加到附加 blob 中。 |
|
|
将新数据块提交到现有 append blob 的末尾。如果标头 |
|
|
创建指定长度的页面 blob。调用 |
|
|
将一个或多个页面写入页面 blob。写入大小必须是 512 的倍数。如果标头 |
|
| 将页面 blob 调整为指定大小(必须是 512 的倍数)。 |
|
| 从 blob 页面释放指定的页面。范围的大小必须是 512 的倍数。 |
|
| 返回页面 blob 或页面 blob 快照的有效页面范围列表。 |
|
| 将 blob 从一个容器复制到另一个容器,即使来自不同帐户。 |
请参阅此页面中的示例部分,了解如何在您的 camel 应用程序中使用这些操作。
8.5.6. 消费者示例
要使用文件组件将 blob 消耗到一个文件中,如下所示:
from("azure-storage-blob://camelazure/container1?blobName=hello.txt&accountName=yourAccountName&accessKey=yourAccessKey"). to("file://blobdirectory");
但是,您还可以直接写入文件,而无需使用文件组件,您将需要指定 fileDir
文件夹路径,以便在机器中保存 blob。
from("azure-storage-blob://camelazure/container1?blobName=hello.txt&accountName=yourAccountName&accessKey=yourAccessKey&fileDir=/var/to/awesome/dir"). to("mock:results");
此外,组件支持批处理使用者,因此您可以使用只指定容器名称的多个 Blob,因此消费者将根据容器中的 Blob 数量返回多个交换。
示例
from("azure-storage-blob://camelazure/container1?accountName=yourAccountName&accessKey=yourAccessKey&fileDir=/var/to/awesome/dir"). to("mock:results");
8.5.7. 制作者操作示例
-
listBlobContainers
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.LIST_BLOB_CONTAINERS_OPTIONS, new ListBlobContainersOptions().setMaxResultsPerPage(10)); }) .to("azure-storage-blob://camelazure?operation=listBlobContainers&client&serviceClient=#client") .to("mock:result");
-
createBlobContainer
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_CONTAINER_NAME, "newContainerName"); }) .to("azure-storage-blob://camelazure/container1?operation=createBlobContainer&serviceClient=#client") .to("mock:result");
-
deleteBlobContainer
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_CONTAINER_NAME, "overridenName"); }) .to("azure-storage-blob://camelazure/container1?operation=deleteBlobContainer&serviceClient=#client") .to("mock:result");
-
listBlobs
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_CONTAINER_NAME, "overridenName"); }) .to("azure-storage-blob://camelazure/container1?operation=listBlobs&serviceClient=#client") .to("mock:result");
-
getBlob
:
我们可以在交换正文中设置 outputStream
,并将数据写入其中。例如:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_CONTAINER_NAME, "overridenName"); // set our body exchange.getIn().setBody(outputStream); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=getBlob&serviceClient=#client") .to("mock:result");
如果没有设置正文,则此操作会给我们提供一个 InputStream
实例,该实例可以进行进一步的下游:
from("direct:start") .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=getBlob&serviceClient=#client") .process(exchange -> { InputStream inputStream = exchange.getMessage().getBody(InputStream.class); // We use Apache common IO for simplicity, but you are free to do whatever dealing // with inputStream System.out.println(IOUtils.toString(inputStream, StandardCharsets.UTF_8.name())); }) .to("mock:result");
-
deleteBlob
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_NAME, "overridenName"); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=deleteBlob&serviceClient=#client") .to("mock:result");
-
downloadBlobToFile
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_NAME, "overridenName"); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=downloadBlobToFile&fileDir=/var/mydir&serviceClient=#client") .to("mock:result");
-
downloadLink
from("direct:start") .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=downloadLink&serviceClient=#client") .process(exchange -> { String link = exchange.getMessage().getHeader(BlobConstants.DOWNLOAD_LINK, String.class); System.out.println("My link " + link); }) .to("mock:result");
-
uploadBlockBlob
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_NAME, "overridenName"); exchange.getIn().setBody("Block Blob"); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=uploadBlockBlob&serviceClient=#client") .to("mock:result");
-
stageBlockBlobList
from("direct:start") .process(exchange -> { final List<BlobBlock> blocks = new LinkedList<>(); blocks.add(BlobBlock.createBlobBlock(new ByteArrayInputStream("Hello".getBytes()))); blocks.add(BlobBlock.createBlobBlock(new ByteArrayInputStream("From".getBytes()))); blocks.add(BlobBlock.createBlobBlock(new ByteArrayInputStream("Camel".getBytes()))); exchange.getIn().setBody(blocks); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=stageBlockBlobList&serviceClient=#client") .to("mock:result");
-
commitBlockBlobList
from("direct:start") .process(exchange -> { // We assume here you have the knowledge of these blocks you want to commit final List<Block> blocksIds = new LinkedList<>(); blocksIds.add(new Block().setName("id-1")); blocksIds.add(new Block().setName("id-2")); blocksIds.add(new Block().setName("id-3")); exchange.getIn().setBody(blocksIds); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=commitBlockBlobList&serviceClient=#client") .to("mock:result");
-
getBlobBlockList
from("direct:start") .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=getBlobBlockList&serviceClient=#client") .log("${body}") .to("mock:result");
-
createAppendBlob
from("direct:start") .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=createAppendBlob&serviceClient=#client") .to("mock:result");
-
commitAppendBlob
from("direct:start") .process(exchange -> { final String data = "Hello world from my awesome tests!"; final InputStream dataStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); exchange.getIn().setBody(dataStream); // of course you can set whatever headers you like, refer to the headers section to learn more }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=commitAppendBlob&serviceClient=#client") .to("mock:result");
-
createPageBlob
from("direct:start") .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=createPageBlob&serviceClient=#client") .to("mock:result");
-
uploadPageBlob
from("direct:start") .process(exchange -> { byte[] dataBytes = new byte[512]; // we set range for the page from 0-511 new Random().nextBytes(dataBytes); final InputStream dataStream = new ByteArrayInputStream(dataBytes); final PageRange pageRange = new PageRange().setStart(0).setEnd(511); exchange.getIn().setHeader(BlobConstants.PAGE_BLOB_RANGE, pageRange); exchange.getIn().setBody(dataStream); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=uploadPageBlob&serviceClient=#client") .to("mock:result");
-
resizePageBlob
from("direct:start") .process(exchange -> { final PageRange pageRange = new PageRange().setStart(0).setEnd(511); exchange.getIn().setHeader(BlobConstants.PAGE_BLOB_RANGE, pageRange); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=resizePageBlob&serviceClient=#client") .to("mock:result");
-
clearPageBlob
from("direct:start") .process(exchange -> { final PageRange pageRange = new PageRange().setStart(0).setEnd(511); exchange.getIn().setHeader(BlobConstants.PAGE_BLOB_RANGE, pageRange); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=clearPageBlob&serviceClient=#client") .to("mock:result");
-
getPageBlobRanges
from("direct:start") .process(exchange -> { final PageRange pageRange = new PageRange().setStart(0).setEnd(511); exchange.getIn().setHeader(BlobConstants.PAGE_BLOB_RANGE, pageRange); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=getPageBlobRanges&serviceClient=#client") .log("${body}") .to("mock:result");
-
copyBlob
from("direct:copyBlob") .process(exchange -> { exchange.getIn().setHeader(BlobConstants.BLOB_NAME, "file.txt"); exchange.getMessage().setHeader(BlobConstants.SOURCE_BLOB_CONTAINER_NAME, "containerblob1"); exchange.getMessage().setHeader(BlobConstants.SOURCE_BLOB_ACCOUNT_NAME, "account"); }) .to("azure-storage-blob://account/containerblob2?operation=copyBlob&sourceBlobAccessKey=RAW(accessKey)") .to("mock:result");
这样,帐户 'account' 的容器 containerblob1 中的 file.txt 将复制到同一帐户的容器 containerblob2 中。
8.5.8. 开发备注(Important)
所有集成测试都使用 Testcontainers 并默认运行。需要获取 Azure accessKey 和 accountName,才能使用 Azure 服务运行所有集成测试。除了模拟的单元测试外,还需要针对您进行的每个更改运行集成测试,甚至客户端升级,因为 Azure 客户端即使在次版本升级时也会出现问题。要运行集成测试,在这个组件目录中运行以下 maven 命令:
mvn verify -PfullTests -DaccountName=myacc -DaccessKey=mykey
其中,by accountName
是 Azure 帐户名称,accessKey
是从 Azure 门户生成的访问密钥。