16.6. 使用方法
例如,若要从 storageAccount 存储帐户中的队列 messageQueue 获取消息内容,并使用以下代码片段:
from("azure-storage-queue://storageAccount/messageQueue?accessKey=yourAccessKey").
to("file://queuedirectory");
16.6.1. 由组件制作者评估的消息标头 复制链接链接已复制到粘贴板!
| 标头 | 变量名称 | 类型 | 操作 | 描述 |
|---|---|---|---|---|
|
|
|
|
| 列出队列的选项 |
|
|
|
| All | 超过 \{@link RuntimeException} 将引发的可选超时值。 |
|
|
|
|
| 与队列关联的元数据 |
|
|
|
|
| 消息在队列中保持活跃的时间。如果未设置,则默认值为 7 天,如果传递了 -1,则消息将不会过期。生存时间必须为 -1 或任何正数。 |
|
|
|
|
| 队列中消息不可见的超时时间。如果未设置值,则默认为 0,消息将立即可见。超时必须在 0 秒到 7 天之间。 |
|
|
|
|
|
当 设为 |
|
|
|
|
| 必须匹配的唯一标识符,才能删除或更新消息。 |
|
|
|
|
| 要删除或更新的消息 ID。 |
|
|
|
|
| 如果队列中存在的信息数量少于请求的所有消息,则要获取的最大消息数将返回。如果仅检索 1 条消息,则允许的范围为 1 到 32 个消息。 |
|
|
|
| All | 指定要执行的制作者操作,请参阅此页面上的与生成者操作相关的 doc。 |
|
|
|
| All | 覆盖队列名称。 |
16.6.2. 由组件制作者或消费者设置的消息标头 复制链接链接已复制到粘贴板!
| 标头 | 变量名称 | 类型 | 描述 |
|---|---|---|---|
|
|
|
| 发送到队列的消息 ID。 |
|
|
|
| 消息插入到队列中的时间。 |
|
|
|
| Message 将过期并自动删除的时间。 |
|
|
|
| 删除/更新 Message 需要这个值。如果使用这个弹出窗口删除失败,则消息已被另一个客户端取消排队。 |
|
|
|
| 在 Queue 中再次看到消息的时间。 |
|
|
|
| 消息已取消队列的次数。 |
|
|
|
| 返回可由用户使用的未解析的 httpHeaders。 |
16.6.3. 高级 Azure Storage Queue 配置 复制链接链接已复制到粘贴板!
如果您的 Camel 应用程序在防火墙后面运行,或者您需要对 QueueServiceClient 实例配置进行更多控制,您可以创建自己的实例:
StorageSharedKeyCredential credential = new StorageSharedKeyCredential("yourAccountName", "yourAccessKey");
String uri = String.format("https://%s.queue.core.windows.net", "yourAccountName");
QueueServiceClient client = new QueueServiceClientBuilder()
.endpoint(uri)
.credential(credential)
.buildClient();
// This is camel context
context.getRegistry().bind("client", client);
然后,在 Camel azure-storage-queue 组件配置中引用此实例:
from("azure-storage-queue://cameldev/queue1?serviceClient=#client")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");
16.6.4. 在 registry 中自动检测 QueueServiceClient 客户端 复制链接链接已复制到粘贴板!
组件能够检测 QueueServiceClient bean 是否存在。如果这是该类型的唯一实例,它将用作客户端,您不必将其定义为 uri 参数,如上例。这对对端点的智能配置非常有用。
16.6.5. Azure Storage Queue Producer 操作 复制链接链接已复制到粘贴板!
Camel Azure Storage Queue 组件在生成者端提供广泛的操作:
服务级别的操作
对于这些操作,需要 accountName。
| 操作 | 描述 |
|---|---|
|
| 列出存储帐户中的队列,该队列从指定标记开始传递过滤器。 |
队列级别的操作
对于这些操作,需要 accountName 和 queueName。
| 操作 | 描述 |
|---|---|
|
| 创建新队列。 |
|
| 永久删除队列。 |
|
| 删除队列中的所有消息。 |
|
|
默认 Producer 操作 发送带有给定生存时间的消息,以及一个超时时间,其中消息在队列中不可见。消息文本从交换消息正文评估。默认情况下,如果队列不存在,它将首先创建一个空队列。如果要禁用此功能,请将 config |
|
| 删除队列中指定的消息。 |
|
| 从队列中检索最大消息数,并在超时时间内将其隐藏到其他操作中。但是,由于可靠性,它不会从队列中取消排队消息。 |
|
| 从队列前到最大消息数的消息。 |
|
| 使用新消息更新队列中的特定消息,并重置可见性超时。消息文本从交换消息正文评估。 |
请参阅此页面中的示例部分,了解如何在您的 camel 中使用这些操作。
16.6.6. 消费者示例 复制链接链接已复制到粘贴板!
要将队列消耗在一个批处理中最多 5 个信息的文件组件中,可以执行以下操作:
from("azure-storage-queue://cameldev/queue1?serviceClient=#client&maxMessages=5")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");
16.6.7. 生成者操作示例 复制链接链接已复制到粘贴板!
-
listQueues:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g, to only returns list of queues with 'awesome' prefix:
exchange.getIn().setHeader(QueueConstants.QUEUES_SEGMENT_OPTIONS, new QueuesSegmentOptions().setPrefix("awesome"));
})
.to("azure-storage-queue://cameldev?serviceClient=#client&operation=listQueues")
.log("${body}")
.to("mock:result");
-
createQueue:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=createQueue");
-
deleteQueue:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteQueue");
-
clearQueue:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=clearQueue");
-
sendMessage:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
exchange.getIn().setBody("message to send");
// we set a visibility of 1min
exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client");
-
deleteMessage:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
// Mandatory header:
exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
// Mandatory header:
exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteMessage");
-
receiveMessages:
from("direct:start")
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=receiveMessages")
.process(exchange -> {
final List<QueueMessageItem> messageItems = exchange.getMessage().getBody(List.class);
messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
})
.to("mock:result");
-
peekMessages:
from("direct:start")
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=peekMessages")
.process(exchange -> {
final List<PeekedMessageItem> messageItems = exchange.getMessage().getBody(List.class);
messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
})
.to("mock:result");
-
updateMessage:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
exchange.getIn().setBody("new message text");
// Mandatory header:
exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
// Mandatory header:
exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
// Mandatory header:
exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=updateMessage");
16.6.8. 开发备注(Important) 复制链接链接已复制到粘贴板!
在此组件上进行开发时,您需要获取 Azure accessKey 才能运行集成测试。除了模拟的单元测试外,还需要在进行、甚至进行客户端升级的每个更改中运行集成测试,因为 Azure 客户端也可以中断次版本升级的情况。要运行集成测试,请在此组件目录中运行以下 maven 命令:
mvn verify -PfullTests -DaccountName=myacc -DaccessKey=mykey
其中,accountName 是 Azure 帐户名称,accessKey 是从 Azure 门户生成的访问密钥。