11.5. 使用方法


例如,若要从 storageAccount 存储帐户中的队列 消息Queue 获取消息内容,请使用以下代码片段:

from("azure-storage-queue://storageAccount/messageQueue?accessKey=yourAccessKey").
to("file://queuedirectory");
Copy to Clipboard Toggle word wrap

11.5.1. 组件制作者评估的消息标头

Expand
标头变量名称类型操作描述

CamelAzureStorageQueueSegmentOptions

QueueConstants.QUEUES_SEGMENT_OPTIONS

QueuesSegmentOptions

listQueues

列出队列的选项

CamelAzureStorageQueueTimeout

QueueConstants.TIMEOUT

Duration

All

可选的超时值,超过 \{@link RuntimeException} 将引发。

CamelAzureStorageQueueMetadata

QueueConstants.METADATA

Map<String,String>

createQueue

与队列关联的元数据

CamelAzureStorageQueueTimeToLive

QueueConstants.TIME_TO_LIVE

Duration

sendMessage

消息将在队列中保持活跃的时长。如果未设置该值,则默认值为 7 天,如果 -1 传递,则消息不会过期。生存时间必须是 -1 或任意正数。

CamelAzureStorageQueueVisibilityTimeout

QueueConstants.VISIBILITY_TIMEOUT

Duration

sendMessage,receiveMessages,updateMessage

消息在队列中不可见的超时时间。如果未设置该值,则消息将立即可见。超时时间必须在 0 秒和 7 天之间。

CamelAzureStorageQueueCreateQueue

QueueConstants.CREATE_QUEUE

布尔值

sendMessage

当设置为 true 时,在发送消息到队列时会自动创建队列。

CamelAzureStorageQueuePopReceipt

QueueConstants.POP_RECEIPT

字符串

DeleteMessage,updateMessage

必须匹配的唯一标识符才能删除或更新消息。

CamelAzureStorageQueueMessageId

QueueConstants.MESSAGE_ID

字符串

DeleteMessage,updateMessage

要删除或更新的消息的 ID。

CamelAzureStorageQueueMaxMessages

QueueConstants.MAX_MESSAGES

整数

receiveMessages,peekMessages

如果队列中存在比请求所有消息少的消息数量,则需要获取的最大消息数。如果只检索空 1 个消息,则允许的范围为 1 到 32 个消息。

CamelAzureStorageQueueOperation

QueueConstants.QUEUE_OPERATION

QueueOperationDefinition

All

指定要执行的制作者操作,请参阅此页面上与制作者操作相关的 doc。

CamelAzureStorageQueueName

QueueConstants.QUEUE_NAME

字符串

All

覆盖队列名称。

11.5.2. 由组件制作者或消费者设置的消息标头

Expand
标头变量名称类型描述

CamelAzureStorageQueueMessageId

QueueConstants.MESSAGE_ID

字符串

发送到队列的消息 ID。

CamelAzureStorageQueueInsertionTime

QueueConstants.INSERTION_TIME

OffsetDateTime

消息插入到 Queue 的时间。

CamelAzureStorageQueueExpirationTime

QueueConstants.EXPIRATION_TIME

OffsetDateTime

消息将过期并自动删除的时间。

CamelAzureStorageQueuePopReceipt

QueueConstants.POP_RECEIPT

字符串

删除/更新消息需要这个值。如果删除失败,则此弹出信息已被另一个客户端取消队列。

CamelAzureStorageQueueTimeNextVisible

QueueConstants.TIME_NEXT_VISIBLE

OffsetDateTime

消息再次在 Queue 中可见的时间。

CamelAzureStorageQueueDequeueCount

QueueConstants.DEQUEUE_COUNT

long

消息被取消队列的次数。

CamelAzureStorageQueueRawHttpHeaders

QueueConstants.RAW_HTTP_HEADERS

httpHeaders

返回用户可以使用的未解析 httpHeaders。

11.5.3. 高级 Azure Storage Queue 配置

如果您的 Camel 应用程序在防火墙后面运行,或者需要对 QueueServiceClient 实例配置拥有更多控制,您可以创建自己的实例:

StorageSharedKeyCredential credential = new StorageSharedKeyCredential("yourAccountName", "yourAccessKey");
String uri = String.format("https://%s.queue.core.windows.net", "yourAccountName");

QueueServiceClient client = new QueueServiceClientBuilder()
                          .endpoint(uri)
                          .credential(credential)
                          .buildClient();
// This is camel context
context.getRegistry().bind("client", client);
Copy to Clipboard Toggle word wrap

然后,在 Camel azure-storage-queue 组件配置中引用这个实例:

from("azure-storage-queue://cameldev/queue1?serviceClient=#client")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");
Copy to Clipboard Toggle word wrap

组件能够检测将 QueueServiceClient bean 是否存在到 registry 中。如果这是该类型的唯一实例,它将用作客户端,并且您不必将其定义为 uri 参数,如上例所示。这对端点的智能配置可能非常有用。

11.5.5. Azure Storage Queue Producer 操作

Camel Azure Storage Queue 组件在制作者端提供广泛的操作:

对服务级别的操作

对于这些操作,需要 accountName

Expand
操作描述

listQueues

列出存储帐户中从指定标记开始的过滤器的队列。

对队列级别的操作

对于这些操作,需要 accountNamequeueName

Expand
操作描述

createQueue

创建新队列。

deleteQueue

永久删除队列。

clearQueue

删除队列中的所有消息。

sendMessage

默认 Producer 操作 会发送一个带有给定生存时间的消息,以及消息在队列中不可见的超时周期。消息文本从交换消息正文评估。默认情况下,如果队列不存在,它将首先创建一个空队列。如果要禁用此功能,请将 config createQueue 或标头 CamelAzureStorageQueueCreateQueue 设置为 false

deleteMessage

删除队列中指定的消息。

receiveMessages

最多从队列检索最大消息数,并从超时时间的其他操作中隐藏它们。但是,由于可靠性的原因,它不会将消息从队列中分离。

peekMessages

来自队列前面的消息,最高到消息的最大数量。

updateMessage

使用新消息更新队列中的特定消息,并重置可见性超时。消息文本从交换消息正文评估。

请参阅此页面中的示例部分,了解如何在 camel 应用程序中使用这些操作。

11.5.6. 消费者示例

要在一个批处理中最多 5 个消息的文件中消耗队列,如下所示:

from("azure-storage-queue://cameldev/queue1?serviceClient=#client&maxMessages=5")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");
Copy to Clipboard Toggle word wrap

11.5.7. 生成者操作示例

  • listQueues:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g, to only returns list of queues with 'awesome' prefix:
      exchange.getIn().setHeader(QueueConstants.QUEUES_SEGMENT_OPTIONS, new QueuesSegmentOptions().setPrefix("awesome"));
     })
    .to("azure-storage-queue://cameldev?serviceClient=#client&operation=listQueues")
    .log("${body}")
    .to("mock:result");
Copy to Clipboard Toggle word wrap
  • createQueue:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=createQueue");
Copy to Clipboard Toggle word wrap
  • deleteQueue:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteQueue");
Copy to Clipboard Toggle word wrap
  • 清除Queue
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=clearQueue");
Copy to Clipboard Toggle word wrap
  • sendMessage
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setBody("message to send");
      // we set a visibility of 1min
      exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client");
Copy to Clipboard Toggle word wrap
  • deleteMessage:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      // Mandatory header:
      exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
      // Mandatory header:
      exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteMessage");
Copy to Clipboard Toggle word wrap
  • receiveMessages
from("direct:start")
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=receiveMessages")
    .process(exchange -> {
        final List<QueueMessageItem> messageItems = exchange.getMessage().getBody(List.class);
        messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
    })
   .to("mock:result");
Copy to Clipboard Toggle word wrap
  • peekMessages
from("direct:start")
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=peekMessages")
    .process(exchange -> {
        final List<PeekedMessageItem> messageItems = exchange.getMessage().getBody(List.class);
        messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
    })
   .to("mock:result");
Copy to Clipboard Toggle word wrap
  • updateMessage
from("direct:start")
   .process(exchange -> {
       // set the header you want the producer to evaluate, refer to the previous
       // section to learn about the headers that can be set
       // e.g:
       exchange.getIn().setBody("new message text");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
    })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=updateMessage");
Copy to Clipboard Toggle word wrap

11.5.8. 开发备注(Important)

在此组件上开发时,您需要获取 Azure accessKey 才能运行集成测试。除了模拟单元测试外,还需要使用您进行的每个更改来运行集成测试,甚至客户端升级,因为 Azure 客户端可能会在次版本升级时破坏问题。要运行集成测试,在这个组件目录中运行以下 maven 命令:

mvn verify -PfullTests -DaccountName=myacc -DaccessKey=mykey
Copy to Clipboard Toggle word wrap

其中by accountName 是您的 Azure 帐户名称,accessKey 是从 Azure 门户生成的访问密钥。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat