搜索

14.6. 使用方法

download PDF

例如,若要从 storageAccount 存储帐户中的队列 messageQueue 获取消息内容,请使用以下代码片段:

from("azure-storage-queue://storageAccount/messageQueue?accessKey=yourAccessKey").
to("file://queuedirectory");

14.6.1. 由组件制作者评估的消息标头

标头变量名称类型操作描述

CamelAzureStorageQueueSegmentOptions

QueueConstants.QUEUES_SEGMENT_OPTIONS

QueuesSegmentOptions

listQueues

列出队列的选项

CamelAzureStorageQueueTimeout

QueueConstants.TIMEOUT

Duration

All

将引发 \{@link RuntimeException} 以外的可选超时值。

CamelAzureStorageQueueMetadata

QueueConstants.METADATA

Map<String,String>

createQueue

与队列关联的元数据

CamelAzureStorageQueueTimeToLive

QueueConstants.TIME_TO_LIVE

Duration

sendMessage

消息在队列中保持活动状态的时长。如果未设置该值将默认为 7 天,如果传递 -1,则消息不会过期。生存时间必须是 -1 或任意正数。

CamelAzureStorageQueueVisibilityTimeout

QueueConstants.VISIBILITY_TIMEOUT

Duration

sendMessage, receiveMessages, updateMessage

消息在队列中不可见的超时时间。如果未设置该值将默认为 0,且信息将立即可见。超时必须在 0 秒和 7 天之间。

CamelAzureStorageQueueCreateQueue

QueueConstants.CREATE_QUEUE

布尔值

sendMessage

当设置为 true 时,将在发送消息到队列时自动创建队列。

CamelAzureStorageQueuePopReceipt

QueueConstants.POP_RECEIPT

字符串

deleteMessage, updateMessage

必须匹配删除或更新消息的唯一标识符。

CamelAzureStorageQueueMessageId

QueueConstants.MESSAGE_ID

字符串

deleteMessage, updateMessage

要删除或更新的消息 ID。

CamelAzureStorageQueueMaxMessages

QueueConstants.MAX_MESSAGES

整数

receiveMessages, peekMessages

如果要获取的最大消息数,如果队列中存在的消息数量少于请求的所有消息,则返回。如果仅检索 1 个消息,则允许的范围为 1 到 32 个消息。

CamelAzureStorageQueueOperation

QueueConstants.QUEUE_OPERATION

QueueOperationDefinition

All

指定要执行的制作者操作,请参阅此页面上的与制作者操作相关的文档。

CamelAzureStorageQueueName

QueueConstants.QUEUE_NAME

字符串

All

覆盖队列名称。

14.6.2. 由组件制作者或消费者设置的消息标头

标头变量名称类型描述

CamelAzureStorageQueueMessageId

QueueConstants.MESSAGE_ID

字符串

发送到队列的消息 ID。

CamelAzureStorageQueueInsertionTime

QueueConstants.INSERTION_TIME

OffsetDateTime

Message 插入到队列中的时间。

CamelAzureStorageQueueExpirationTime

QueueConstants.EXPIRATION_TIME

OffsetDateTime

消息将过期并自动删除的时间。

CamelAzureStorageQueuePopReceipt

QueueConstants.POP_RECEIPT

字符串

删除/更新消息需要这个值。如果删除失败,使用这个 popreceipt,则消息已被另一个客户端取消队列。

CamelAzureStorageQueueTimeNextVisible

QueueConstants.TIME_NEXT_VISIBLE

OffsetDateTime

消息再次在 Queue 中可见的时间。

CamelAzureStorageQueueDequeueCount

QueueConstants.DEQUEUE_COUNT

long

消息已排队的次数。

CamelAzureStorageQueueRawHttpHeaders

QueueConstants.RAW_HTTP_HEADERS

httpHeaders

返回可供用户使用的非稀疏 httpHeaders。

14.6.3. 高级 Azure Storage Queue 配置

如果您的 Camel 应用程序在防火墙后面运行,或者需要对 QueueServiceClient 实例配置有更多控制,您可以创建自己的实例:

StorageSharedKeyCredential credential = new StorageSharedKeyCredential("yourAccountName", "yourAccessKey");
String uri = String.format("https://%s.queue.core.windows.net", "yourAccountName");

QueueServiceClient client = new QueueServiceClientBuilder()
                          .endpoint(uri)
                          .credential(credential)
                          .buildClient();
// This is camel context
context.getRegistry().bind("client", client);

然后,在 Camel azure-storage-queue 组件配置中引用此实例:

from("azure-storage-queue://cameldev/queue1?serviceClient=#client")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");

14.6.4. 在 registry 中自动检测 QueueServiceClient 客户端

组件能够检测在 registry 中存在 QueueServiceClient bean。如果这是该类型的唯一实例,它将用作客户端,您不必将其定义为 uri 参数,如上例所示。这对端点的智能配置非常有用。

14.6.5. Azure Storage Queue Producer 操作

Camel Azure Storage Queue 组件在生成者端提供广泛的操作:

服务级别的操作

对于这些操作,需要 accountName

操作描述

listQueues

列出存储帐户中的队列,该帐户通过指定标记开始的过滤器。

队列级别的操作

对于这些操作,需要 accountNamequeueName

操作描述

createQueue

创建新队列。

deleteQueue

永久删除队列。

clearQueue

删除队列中的所有消息。

sendMessage

默认 Producer Operation Sends a message with a given time-to-live and a timeout period,其中消息在队列中不可见。消息文本从交换消息正文评估。默认情况下,如果队列不存在,它将首先创建一个空队列。如果要禁用此功能,请将 config createQueue 或 header CamelAzureStorageQueueCreateQueue 设置为 false

deleteMessage

删除队列中的指定消息。

receiveMessages

最多从队列中检索消息数量,并在超时时间内从其他操作中隐藏它们。但是,由于可靠性的原因,它不会从队列中分离消息。

peekMessages

从队列前到最大消息数的 peek 消息。

updateMessage

使用新消息更新队列中的特定消息,并重置可见性超时。消息文本从交换消息正文评估。

请参阅此页面中的示例部分,了解如何在 camel 应用程序中使用这些操作。

14.6.6. 消费者示例

要将队列消耗到一个批处理中最多 5 个消息的文件组件中,您可以执行以下操作:

from("azure-storage-queue://cameldev/queue1?serviceClient=#client&maxMessages=5")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");

14.6.7. 制作者操作示例

  • listQueues:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g, to only returns list of queues with 'awesome' prefix:
      exchange.getIn().setHeader(QueueConstants.QUEUES_SEGMENT_OPTIONS, new QueuesSegmentOptions().setPrefix("awesome"));
     })
    .to("azure-storage-queue://cameldev?serviceClient=#client&operation=listQueues")
    .log("${body}")
    .to("mock:result");
  • createQueue
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=createQueue");
  • deleteQueue:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteQueue");
  • clearQueue
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=clearQueue");
  • sendMessage:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setBody("message to send");
      // we set a visibility of 1min
      exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client");
  • deleteMessage:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      // Mandatory header:
      exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
      // Mandatory header:
      exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteMessage");
  • receiveMessages:
from("direct:start")
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=receiveMessages")
    .process(exchange -> {
        final List<QueueMessageItem> messageItems = exchange.getMessage().getBody(List.class);
        messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
    })
   .to("mock:result");
  • peekMessages:
from("direct:start")
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=peekMessages")
    .process(exchange -> {
        final List<PeekedMessageItem> messageItems = exchange.getMessage().getBody(List.class);
        messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
    })
   .to("mock:result");
  • updateMessage:
from("direct:start")
   .process(exchange -> {
       // set the header you want the producer to evaluate, refer to the previous
       // section to learn about the headers that can be set
       // e.g:
       exchange.getIn().setBody("new message text");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
    })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=updateMessage");

14.6.8. 开发注意事项(Important)

当在这个组件上开发时,您需要获取您的 Azure accessKey 来运行集成测试。除了模拟的单元测试外,还需要在每次进行更改时运行集成测试,甚至进行客户端升级,因为 Azure 客户端也可以在次版本升级时中断操作。要运行集成测试,在此组件目录中运行以下 maven 命令:

mvn verify -PfullTests -DaccountName=myacc -DaccessKey=mykey

其中 accountName 是您的 Azure 帐户名称,accessKey 是从 Azure 门户生成的访问密钥。

Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.