11.5. 使用方法
例如,若要从 storageAccount
存储帐户中的队列 消息Queue
获取消息内容,请使用以下代码片段:
from("azure-storage-queue://storageAccount/messageQueue?accessKey=yourAccessKey"). to("file://queuedirectory");
11.5.1. 组件制作者评估的消息标头
标头 | 变量名称 | 类型 | 操作 | 描述 |
---|---|---|---|---|
|
|
|
| 列出队列的选项 |
|
|
| All | 可选的超时值,超过 \{@link RuntimeException} 将引发。 |
|
|
|
| 与队列关联的元数据 |
|
|
|
| 消息将在队列中保持活跃的时长。如果未设置该值,则默认值为 7 天,如果 -1 传递,则消息不会过期。生存时间必须是 -1 或任意正数。 |
|
|
|
| 消息在队列中不可见的超时时间。如果未设置该值,则消息将立即可见。超时时间必须在 0 秒和 7 天之间。 |
|
|
|
|
当设置为 |
|
|
|
| 必须匹配的唯一标识符才能删除或更新消息。 |
|
|
|
| 要删除或更新的消息的 ID。 |
|
|
|
| 如果队列中存在比请求所有消息少的消息数量,则需要获取的最大消息数。如果只检索空 1 个消息,则允许的范围为 1 到 32 个消息。 |
|
|
| All | 指定要执行的制作者操作,请参阅此页面上与制作者操作相关的 doc。 |
|
|
| All | 覆盖队列名称。 |
11.5.2. 由组件制作者或消费者设置的消息标头
标头 | 变量名称 | 类型 | 描述 |
---|---|---|---|
|
|
| 发送到队列的消息 ID。 |
|
|
| 消息插入到 Queue 的时间。 |
|
|
| 消息将过期并自动删除的时间。 |
|
|
| 删除/更新消息需要这个值。如果删除失败,则此弹出信息已被另一个客户端取消队列。 |
|
|
| 消息再次在 Queue 中可见的时间。 |
|
|
| 消息被取消队列的次数。 |
|
|
| 返回用户可以使用的未解析 httpHeaders。 |
11.5.3. 高级 Azure Storage Queue 配置
如果您的 Camel 应用程序在防火墙后面运行,或者需要对 QueueServiceClient
实例配置拥有更多控制,您可以创建自己的实例:
StorageSharedKeyCredential credential = new StorageSharedKeyCredential("yourAccountName", "yourAccessKey"); String uri = String.format("https://%s.queue.core.windows.net", "yourAccountName"); QueueServiceClient client = new QueueServiceClientBuilder() .endpoint(uri) .credential(credential) .buildClient(); // This is camel context context.getRegistry().bind("client", client);
然后,在 Camel azure-storage-queue
组件配置中引用这个实例:
from("azure-storage-queue://cameldev/queue1?serviceClient=#client") .to("file://outputFolder?fileName=output.txt&fileExist=Append");
11.5.4. 在 registry 中自动检测 QueueServiceClient 客户端
组件能够检测将 QueueServiceClient bean 是否存在到 registry 中。如果这是该类型的唯一实例,它将用作客户端,并且您不必将其定义为 uri 参数,如上例所示。这对端点的智能配置可能非常有用。
11.5.5. Azure Storage Queue Producer 操作
Camel Azure Storage Queue 组件在制作者端提供广泛的操作:
对服务级别的操作
对于这些操作,需要 accountName
。
操作 | 描述 |
---|---|
| 列出存储帐户中从指定标记开始的过滤器的队列。 |
对队列级别的操作
对于这些操作,需要 accountName
和 queueName
。
操作 | 描述 |
---|---|
| 创建新队列。 |
| 永久删除队列。 |
| 删除队列中的所有消息。 |
|
默认 Producer 操作 会发送一个带有给定生存时间的消息,以及消息在队列中不可见的超时周期。消息文本从交换消息正文评估。默认情况下,如果队列不存在,它将首先创建一个空队列。如果要禁用此功能,请将 config |
| 删除队列中指定的消息。 |
| 最多从队列检索最大消息数,并从超时时间的其他操作中隐藏它们。但是,由于可靠性的原因,它不会将消息从队列中分离。 |
| 来自队列前面的消息,最高到消息的最大数量。 |
| 使用新消息更新队列中的特定消息,并重置可见性超时。消息文本从交换消息正文评估。 |
请参阅此页面中的示例部分,了解如何在 camel 应用程序中使用这些操作。
11.5.6. 消费者示例
要在一个批处理中最多 5 个消息的文件中消耗队列,如下所示:
from("azure-storage-queue://cameldev/queue1?serviceClient=#client&maxMessages=5") .to("file://outputFolder?fileName=output.txt&fileExist=Append");
11.5.7. 生成者操作示例
-
listQueues
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g, to only returns list of queues with 'awesome' prefix: exchange.getIn().setHeader(QueueConstants.QUEUES_SEGMENT_OPTIONS, new QueuesSegmentOptions().setPrefix("awesome")); }) .to("azure-storage-queue://cameldev?serviceClient=#client&operation=listQueues") .log("${body}") .to("mock:result");
-
createQueue
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName"); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=createQueue");
-
deleteQueue
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName"); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteQueue");
-
清除Queue
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName"); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=clearQueue");
-
sendMessage
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setBody("message to send"); // we set a visibility of 1min exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1)); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client");
-
deleteMessage
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: // Mandatory header: exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1"); // Mandatory header: exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1"); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteMessage");
-
receiveMessages
:
from("direct:start") .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=receiveMessages") .process(exchange -> { final List<QueueMessageItem> messageItems = exchange.getMessage().getBody(List.class); messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText())); }) .to("mock:result");
-
peekMessages
:
from("direct:start") .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=peekMessages") .process(exchange -> { final List<PeekedMessageItem> messageItems = exchange.getMessage().getBody(List.class); messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText())); }) .to("mock:result");
-
updateMessage
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setBody("new message text"); // Mandatory header: exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1"); // Mandatory header: exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1"); // Mandatory header: exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1)); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=updateMessage");
11.5.8. 开发备注(Important)
在此组件上开发时,您需要获取 Azure accessKey 才能运行集成测试。除了模拟单元测试外,还需要使用您进行的每个更改来运行集成测试,甚至客户端升级,因为 Azure 客户端可能会在次版本升级时破坏问题。要运行集成测试,在这个组件目录中运行以下 maven 命令:
mvn verify -PfullTests -DaccountName=myacc -DaccessKey=mykey
其中by accountName
是您的 Azure 帐户名称,accessKey
是从 Azure 门户生成的访问密钥。