10.5. 使用方法
例如,要从 camelazure
存储帐户中的 container1
上的块 blob hello.txt
下载 blob 内容,请使用以下代码片段:
from("azure-storage-blob://camelazure/container1?blobName=hello.txt&accessKey=yourAccessKey"). to("file://blobdirectory");
10.5.1. 组件制作者评估的消息标头
标头 | 变量名称 | 类型 | 操作 | 描述 |
---|---|---|---|---|
|
|
| All | 可选的超时值,超过 {@link RuntimeException} 将引发 {@link RuntimeException}。 |
|
|
| 与容器和 Blob 相关的操作 | 与容器或 blob 关联的元数据。 |
|
|
|
|
指定此容器中的数据对公共使用的方式。如果无公共访问,传递 |
|
|
| 与容器和 Blob 相关的操作 | 它包含值,将限制各种请求的成功操作到存在的条件。这些条件完全是可选的。 |
|
|
|
| 列出特定 Blob 的详细信息 |
|
|
|
| 过滤结果,以仅返回名称以指定前缀开头的 Blob。可以是 null,以返回所有 Blob。 |
|
|
|
| 指定要返回的 Blob 数量上限,包括所有 BlobPrefix 元素。如果请求没有指定 maxResultsPerPage 或指定大于 5,000 的值,服务器将返回最多 5,000 个项目。 |
|
|
|
| 定义可用于配置 {@link BlobContainerClient} 对象上 listBlobsFlatSegment 的调用行为的选项。 |
|
|
|
| 一组操作的额外参数。 |
|
|
|
| 定义 AccessTier 的值。 |
|
|
| 与上传 Blob 相关的大多数操作 | 块内容的 MD5 哈希。此哈希用于在传输过程中验证块的完整性。当指定此标头时,存储服务会比较与此标头值到达的内容的哈希值。请注意,这个 MD5 哈希不使用 blob 存储。如果两个哈希不匹配,则操作将失败。 |
|
|
| 与页面 Blob 相关的操作 | {@link PageRange} 对象。假设页面必须与 512 字节界限一致,开始偏移必须是 512ulus 的 modulus,结束偏移必须是 512 - 1 的 modulus。有效字节范围的示例为 0-511、512-1023 等。 |
|
|
|
|
当设置为 |
|
|
|
|
当设置为 |
|
|
|
|
当设置为 |
|
|
|
| 指定要返回的块类型。 |
|
|
|
| 指定页面 blob 的最大大小,最多 8 TB。页面 blob 大小必须与 512 字节边界一致。 |
|
|
|
| 用户控制的值,可用于跟踪请求。序列号的值必须在 0 到 2^63 到 1 之间。默认值为 0。 |
|
|
|
| 指定删除此 blob 上快照的行为。\{@code Include} 将删除基本 blob 和所有快照。\{@code Only} 将只删除快照。如果快照被删除,您必须传递 null。 |
|
|
|
| {@link ListBlobContainersOptions},用于指定该服务应返回哪些数据。 |
|
|
|
| 用于下载到文件的 {@link ParallelTransferOptions}。忽略并行传输参数的数量。 |
|
|
|
| 下载的 Blob 将保存到的文件的目录。 |
|
|
|
| 覆盖 URL 下载链接的默认过期时间(millis)。 |
|
|
| 与 blob 相关的操作 | 覆盖/设置交换标头中的 blob 名称。 |
|
|
| 与容器和 Blob 相关的操作 | 覆盖/设置交换标头中的容器名称。 |
|
|
| All | 指定要执行的制作者操作,请参阅此页面上与制作者操作相关的 doc。 |
|
|
|
| 过滤结果,以仅返回名称与指定正则表达式匹配的 Blob。可以是 null 以返回所有。如果同时设置了 prefix 和 regex,则 regex 会忽略 priority 和 prefix。 |
|
|
|
| 它过滤结果以在开始时间后大约返回事件。注: 也可以返回属于上一个小时的一些事件。属于此小时的几个事件可能会缺失;为了确保返回来自小时的所有事件,按一小时循环开始时间。 |
|
|
|
| 它过滤结果以在结束时间之前返回事件。注: 也可以返回属于下一个小时的一些事件。属于此小时的几个事件可能会缺失;为了确保返回小时中的所有事件,以一小时结束时间。 |
|
|
|
| 这提供了在服务调用期间通过 Http 管道传递的额外上下文。 |
|
|
|
| 在复制 blob 操作中用作源帐户名称的源 Blob 帐户名称 |
|
|
|
| 在复制 blob 操作中用作源容器名称的源 Blob 容器名称 |
10.5.2. 由组件制作者或消费者设置的消息标头
标头 | 变量名称 | 类型 | 描述 |
---|---|---|---|
|
|
| 访问 blob 的层。 |
|
|
| 当 blob 的访问层最后一次更改时,日期时间。 |
|
|
| blob 的归档状态。 |
|
|
| blob 创建时间。 |
|
|
| 页面 blob 的当前序列号。 |
|
|
| blob 的大小。 |
|
|
| blob 的类型。 |
|
|
| 为 blob 指定的缓存控制。 |
|
|
| 提交到附加 Blob 的块数 |
|
|
| 为 blob 指定的内容分布。 |
|
|
| 为 blob 指定的内容编码。 |
|
|
| 为 blob 指定的内容语言。 |
|
|
| 为 blob 指定的内容 MD5。 |
|
|
| 为 blob 指定的内容类型。 |
|
|
| 当 blob 上的最后一次复制操作完成后,日期时间。 |
|
|
| blob 最后一次增量副本快照的快照标识符。 |
|
|
| blob 上执行的最后复制操作的标识符。 |
|
|
| 在 blob 上执行最后一次复制操作的进度。 |
|
|
| 在 blob 上执行最后一次复制操作的来源。 |
|
|
| blob 上执行的最后复制操作的状态。 |
|
|
| blob 上最后一个复制操作的描述。 |
|
|
| blob 的 E Tag |
|
|
| 表示 blob 的访问层是否从 blob 的属性中推断出来的标志。 |
|
|
| 表示 blob 是否已递增复制的标志。 |
|
|
| 表示 blob 的内容是否在服务器上加密的标志。 |
|
|
| 当 Blob 最后一次修改时,日期时间。 |
|
|
| blob 上的租期类型。 |
|
|
| blob 上的租期状态。 |
|
|
| blob 上租期的状态。 |
|
|
| 与 blob 关联的其他元数据。 |
|
|
| 块提交到块 Blob 的偏移。 |
|
|
|
从操作下载的文件名 |
|
|
|
由 |
|
|
| 返回用户可以使用的未解析 httpHeaders。 |
10.5.3. 高级 Azure Storage Blob 配置
如果您的 Camel 应用程序在防火墙后面运行,或者需要对 BlobServiceClient
实例配置拥有更多控制,您可以创建自己的实例:
StorageSharedKeyCredential credential = new StorageSharedKeyCredential("yourAccountName", "yourAccessKey"); String uri = String.format("https://%s.blob.core.windows.net", "yourAccountName"); BlobServiceClient client = new BlobServiceClientBuilder() .endpoint(uri) .credential(credential) .buildClient(); // This is camel context context.getRegistry().bind("client", client);
然后,在 Camel azure-storage-blob
组件配置中引用这个实例:
from("azure-storage-blob://cameldev/container1?blobName=myblob&serviceClient=#client") .to("mock:result");
10.5.4. 在 registry 中自动检测 BlobServiceClient 客户端
组件能够检测 registry 中的 BlobServiceClient bean 存在。如果这是该类型的唯一实例,它将用作客户端,并且您不必将其定义为 uri 参数,如上例所示。这对端点的智能配置可能非常有用。
10.5.5. Azure Storage Blob Producer 操作
Camel Azure Storage Blob 组件在制作者端提供广泛的操作:
对服务级别的操作
对于这些操作,需要 accountName
。
操作 | 描述 |
---|---|
| 获取 blob 的内容。您可以将此操作的输出限制为 blob 范围。 |
| 返回对存储帐户中的 blob 和 blob 元数据的所有更改的事务日志。更改源提供这些更改的排序、保证、持久性、不可变、只读日志。 |
容器级别的操作
对于这些操作,需要 accountName
和 containerName
。
操作 | 描述 |
---|---|
| 在存储帐户中创建新容器。如果存在具有相同名称的容器,则制作者将忽略它。 |
| 删除存储帐户中指定的容器。如果容器不存在,则操作会失败。 |
| 返回此容器中的 blob 列表,文件夹结构扁平化。 |
对 blob 级别的操作
对于这些操作,需要 accountName
、containerName
和 blobName
。
操作 | blob 类型 | 描述 |
---|---|---|
| Common | 获取 blob 的内容。您可以将此操作的输出限制为 blob 范围。 |
| Common | 删除 blob。 |
| Common | 将整个 blob 下载到路径指定的文件中。如果文件已存在 {@link FileAlreadyExistsException},则该文件必须不存在。 |
| Common | 使用共享访问令牌(SAS)为指定的 blob 生成下载链接。默认情况下,仅允许 1 小时访问。但是,您可以通过标头覆盖默认的过期持续时间。 |
| BlockBlob | 创建新的块 blob,或更新现有块 Blob 的内容。更新现有块 Blob 覆盖 blob 上的任何现有元数据。PutBlob 不支持部分更新,现有 Blob 的内容会被新内容覆盖。 |
|
|
将指定块上传到块 blob 的"staging 区域",以便稍后由对 commitBlobBlockList 的调用提交。但是,如果标头 |
|
|
通过指定要组成 blob 的块 ID 列表来写入 blob。为了作为 blob 的一部分编写,必须在之前的 |
|
| 使用指定的块列表过滤器返回作为块 blob 的一部分上传的块列表。 |
|
| 创建 0-length 附加 blob。调用 commitAppendBlo'b 操作,将数据附加到附加 Blob。 |
|
|
向现有附加 Blob 末尾提交一个新的数据块。如果标头 |
|
|
创建指定长度的页面 blob。调用 |
|
|
将一个或多个页面写入页面 blob。写入大小必须是 512 的倍数。如果标头 |
|
| 将页面 blob 调整为指定大小(必须是 512 的倍数)。 |
|
| 从页面 blob 中释放指定的页面。范围的大小必须是 512 的倍数。 |
|
| 返回页面 blob 或页面 blob 快照的有效页面范围列表。 |
|
| 将 blob 从一个容器复制到另一个容器,即使来自不同帐户。 |
请参阅此页面中的示例部分,了解如何在 camel 应用程序中使用这些操作。
10.5.6. 消费者示例
要使用文件组件将 blob 消耗到文件中,如下所示:
from("azure-storage-blob://camelazure/container1?blobName=hello.txt&accountName=yourAccountName&accessKey=yourAccessKey"). to("file://blobdirectory");
但是,您还可以直接在不使用文件组件的情况下写入文件,您需要指定 fileDir
文件夹路径,以便在您的机器中保存 blob。
from("azure-storage-blob://camelazure/container1?blobName=hello.txt&accountName=yourAccountName&accessKey=yourAccessKey&fileDir=/var/to/awesome/dir"). to("mock:results");
另外,组件支持批处理消费者,因此您可以消耗多个 Blob,它只指定容器名称,使用者将根据容器中的 Blob 数量返回多个交换。
示例
from("azure-storage-blob://camelazure/container1?accountName=yourAccountName&accessKey=yourAccessKey&fileDir=/var/to/awesome/dir"). to("mock:results");
10.5.7. 生成者操作示例
-
listBlobContainers
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.LIST_BLOB_CONTAINERS_OPTIONS, new ListBlobContainersOptions().setMaxResultsPerPage(10)); }) .to("azure-storage-blob://camelazure?operation=listBlobContainers&client&serviceClient=#client") .to("mock:result");
-
createBlobContainer
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_CONTAINER_NAME, "newContainerName"); }) .to("azure-storage-blob://camelazure/container1?operation=createBlobContainer&serviceClient=#client") .to("mock:result");
-
deleteBlobContainer
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_CONTAINER_NAME, "overridenName"); }) .to("azure-storage-blob://camelazure/container1?operation=deleteBlobContainer&serviceClient=#client") .to("mock:result");
-
listBlobs
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_CONTAINER_NAME, "overridenName"); }) .to("azure-storage-blob://camelazure/container1?operation=listBlobs&serviceClient=#client") .to("mock:result");
-
getBlob
:
我们都可以在交换正文中设置 outputStream
,并将数据写入其中。例如:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_CONTAINER_NAME, "overridenName"); // set our body exchange.getIn().setBody(outputStream); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=getBlob&serviceClient=#client") .to("mock:result");
如果没有设置正文,则此操作将为我们提供一个 InputStream
实例,以便进一步下游:
from("direct:start") .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=getBlob&serviceClient=#client") .process(exchange -> { InputStream inputStream = exchange.getMessage().getBody(InputStream.class); // We use Apache common IO for simplicity, but you are free to do whatever dealing // with inputStream System.out.println(IOUtils.toString(inputStream, StandardCharsets.UTF_8.name())); }) .to("mock:result");
-
deleteBlob
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_NAME, "overridenName"); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=deleteBlob&serviceClient=#client") .to("mock:result");
-
下载BlobToFile
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_NAME, "overridenName"); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=downloadBlobToFile&fileDir=/var/mydir&serviceClient=#client") .to("mock:result");
-
downloadLink
from("direct:start") .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=downloadLink&serviceClient=#client") .process(exchange -> { String link = exchange.getMessage().getHeader(BlobConstants.DOWNLOAD_LINK, String.class); System.out.println("My link " + link); }) .to("mock:result");
-
uploadBlockBlob
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(BlobConstants.BLOB_NAME, "overridenName"); exchange.getIn().setBody("Block Blob"); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=uploadBlockBlob&serviceClient=#client") .to("mock:result");
-
stageBlockBlobList
from("direct:start") .process(exchange -> { final List<BlobBlock> blocks = new LinkedList<>(); blocks.add(BlobBlock.createBlobBlock(new ByteArrayInputStream("Hello".getBytes()))); blocks.add(BlobBlock.createBlobBlock(new ByteArrayInputStream("From".getBytes()))); blocks.add(BlobBlock.createBlobBlock(new ByteArrayInputStream("Camel".getBytes()))); exchange.getIn().setBody(blocks); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=stageBlockBlobList&serviceClient=#client") .to("mock:result");
-
commitBlockBlobList
from("direct:start") .process(exchange -> { // We assume here you have the knowledge of these blocks you want to commit final List<Block> blocksIds = new LinkedList<>(); blocksIds.add(new Block().setName("id-1")); blocksIds.add(new Block().setName("id-2")); blocksIds.add(new Block().setName("id-3")); exchange.getIn().setBody(blocksIds); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=commitBlockBlobList&serviceClient=#client") .to("mock:result");
-
getBlobBlockList
from("direct:start") .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=getBlobBlockList&serviceClient=#client") .log("${body}") .to("mock:result");
-
createAppendBlob
from("direct:start") .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=createAppendBlob&serviceClient=#client") .to("mock:result");
-
commitAppendBlob
from("direct:start") .process(exchange -> { final String data = "Hello world from my awesome tests!"; final InputStream dataStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); exchange.getIn().setBody(dataStream); // of course you can set whatever headers you like, refer to the headers section to learn more }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=commitAppendBlob&serviceClient=#client") .to("mock:result");
-
createPageBlob
from("direct:start") .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=createPageBlob&serviceClient=#client") .to("mock:result");
-
uploadPageBlob
from("direct:start") .process(exchange -> { byte[] dataBytes = new byte[512]; // we set range for the page from 0-511 new Random().nextBytes(dataBytes); final InputStream dataStream = new ByteArrayInputStream(dataBytes); final PageRange pageRange = new PageRange().setStart(0).setEnd(511); exchange.getIn().setHeader(BlobConstants.PAGE_BLOB_RANGE, pageRange); exchange.getIn().setBody(dataStream); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=uploadPageBlob&serviceClient=#client") .to("mock:result");
-
resizePageBlob
from("direct:start") .process(exchange -> { final PageRange pageRange = new PageRange().setStart(0).setEnd(511); exchange.getIn().setHeader(BlobConstants.PAGE_BLOB_RANGE, pageRange); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=resizePageBlob&serviceClient=#client") .to("mock:result");
-
clearPageBlob
from("direct:start") .process(exchange -> { final PageRange pageRange = new PageRange().setStart(0).setEnd(511); exchange.getIn().setHeader(BlobConstants.PAGE_BLOB_RANGE, pageRange); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=clearPageBlob&serviceClient=#client") .to("mock:result");
-
getPageBlobRanges
from("direct:start") .process(exchange -> { final PageRange pageRange = new PageRange().setStart(0).setEnd(511); exchange.getIn().setHeader(BlobConstants.PAGE_BLOB_RANGE, pageRange); }) .to("azure-storage-blob://camelazure/container1?blobName=blob&operation=getPageBlobRanges&serviceClient=#client") .log("${body}") .to("mock:result");
-
copyBlob
from("direct:copyBlob") .process(exchange -> { exchange.getIn().setHeader(BlobConstants.BLOB_NAME, "file.txt"); exchange.getMessage().setHeader(BlobConstants.SOURCE_BLOB_CONTAINER_NAME, "containerblob1"); exchange.getMessage().setHeader(BlobConstants.SOURCE_BLOB_ACCOUNT_NAME, "account"); }) .to("azure-storage-blob://account/containerblob2?operation=copyBlob&sourceBlobAccessKey=RAW(accessKey)") .to("mock:result");
这样,帐户 'account' 的容器容器容器中的 file.txt 将被复制到同一帐户的容器容器blob2 中。
10.5.8. 开发备注(Important)
所有集成测试都使用 Testcontainers 并运行。需要获取 Azure accessKey 和 accountName 才能使用 Azure 服务运行所有集成测试。除了模拟单元测试外,还需要使用您进行的每个更改来运行集成测试,甚至客户端升级,因为 Azure 客户端可能会在次版本升级时破坏问题。要运行集成测试,在这个组件目录中运行以下 maven 命令:
mvn verify -PfullTests -DaccountName=myacc -DaccessKey=mykey
其中by accountName
是您的 Azure 帐户名称,accessKey
是从 Azure 门户生成的访问密钥。