9.5. 用途


たとえば、storageAccount ストレージアカウントのキュー messageQueue からメッセージコンテンツを取得するには、次のスニペットを使用します。

from("azure-storage-queue://storageAccount/messageQueue?accessKey=yourAccessKey").
to("file://queuedirectory");

9.5.1. コンポーネントプロデューサーによって評価されるメッセージヘッダー

ヘッダー変数名タイプ操作説明

CamelAzureStorageQueueSegmentOptions

QueueConstants.QUEUES_SEGMENT_OPTIONS

QueuesSegmentOptions

listQueues

キューを一覧表示するためのオプション

CamelAzureStorageQueueTimeout

QueueConstants.TIMEOUT

期間

すべて

それを超えると \{@link RuntimeException} が発生する任意のタイムアウト値。

CamelAzureStorageQueueMetadata

QueueConstants.METADATA

Map<String,String>

createQueue

キューに関連付けるメタデータ

CamelAzureStorageQueueTimeToLive

QueueConstants.TIME_TO_LIVE

期間

sendMessage

メッセージがキュー内で存続する時間。設定されていない場合、値はデフォルトで 7 日になります。-1 が渡されると、メッセージは期限切れになりません。存続時間は -1 または任意の正の数でなければなりません。

CamelAzureStorageQueueVisibilityTimeout

QueueConstants.VISIBILITY_TIMEOUT

期間

sendMessage, receiveMessages, updateMessage

メッセージがキューに表示されないタイムアウト期間。設定されていない場合、値はデフォルトで 0 になり、メッセージはすぐに表示されます。タイムアウトは 0 秒から 7 日の間にする必要があります。

CamelAzureStorageQueueCreateQueue

QueueConstants.CREATE_QUEUE

boolean

sendMessage

true に設定すると、メッセージを送信するときにキューが自動的に作成されます。

CamelAzureStorageQueuePopReceipt

QueueConstants.POP_RECEIPT

String

deleteMessage, updateMessage

メッセージを削除または更新するために一致する必要がある一意の識別子。

CamelAzureStorageQueueMessageId

QueueConstants.MESSAGE_ID

String

deleteMessage, updateMessage

削除または更新するメッセージの ID。

CamelAzureStorageQueueMaxMessages

QueueConstants.MAX_MESSAGES

Integer

receiveMessagespeekMessages

取得するメッセージの最大数。要求された数よりもキューに存在するメッセージが少ない場合は、すべてのメッセージが返されます。空のままにすると、1 つのメッセージのみが取得されます。許可される範囲は 1 から 32 のメッセージです。

CamelAzureStorageQueueOperation

QueueConstants.QUEUE_OPERATION

QueueOperationDefinition

すべて

実行するプロデューサー操作を指定します。プロデューサー操作に関連するこのページのドキュメントを参照してください。

CamelAzureStorageQueueName

QueueConstants.QUEUE_NAME

String

すべて

キュー名をオーバーライドします。

9.5.2. コンポーネントのプロデューサーまたはコンシューマーによって設定されるメッセージヘッダー

ヘッダー変数名タイプ説明

CamelAzureStorageQueueMessageId

QueueConstants.MESSAGE_ID

String

キューに送信されるメッセージの ID。

CamelAzureStorageQueueInsertionTime

QueueConstants.INSERTION_TIME

OffsetDateTime

メッセージがキューに挿入された時刻。

CamelAzureStorageQueueExpirationTime

QueueConstants.EXPIRATION_TIME

OffsetDateTime

メッセージが期限切れになり、自動的に削除される時間。

CamelAzureStorageQueuePopReceipt

QueueConstants.POP_RECEIPT

String

この値は、メッセージを削除/更新するために必要です。この popreceipt を使用して削除に失敗した場合、メッセージは別のクライアントによってキューから取り出されています。

CamelAzureStorageQueueTimeNextVisible

QueueConstants.TIME_NEXT_VISIBLE

OffsetDateTime

メッセージが再びキューに表示される時間。

CamelAzureStorageQueueDequeueCount

QueueConstants.DEQUEUE_COUNT

long

メッセージがデキューされた回数。

CamelAzureStorageQueueRawHttpHeaders

QueueConstants.RAW_HTTP_HEADERS

HttpHeaders

ユーザーが使用できる解析されていない httpHeaders を返します。

9.5.3. 高度な Azure ストレージキューの設定

Camel アプリケーションがファイアウォールの背後で実行されている場合、または QueueServiceClient インスタンス設定をより詳細に制御する必要がある場合は、独自のインスタンスを作成できます。

StorageSharedKeyCredential credential = new StorageSharedKeyCredential("yourAccountName", "yourAccessKey");
String uri = String.format("https://%s.queue.core.windows.net", "yourAccountName");

QueueServiceClient client = new QueueServiceClientBuilder()
                          .endpoint(uri)
                          .credential(credential)
                          .buildClient();
// This is camel context
context.getRegistry().bind("client", client);

次に、Camel azure-storage-queue コンポーネント設定でこのインスタンスを参照します。

from("azure-storage-queue://cameldev/queue1?serviceClient=#client")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");

9.5.4. レジストリー内の QueueServiceClient クライアントの自動検出

このコンポーネントは、レジストリー内の QueueServiceClient Bean の存在を検出できます。そのタイプの唯一のインスタンスである場合、それはクライアントとして使用され、上記の例のように uri パラメーターとして定義する必要はありません。これは、エンドポイントのよりスマートな設定に非常に役立つ場合があります。

9.5.5. Azure Storage Queue Producer の操作

Camel Azure Storage Queue コンポーネントは、プロデューサー側で幅広い操作を提供します。

サービスレベルの操作

これらの操作には、accountName必要 です。

操作説明

listQueues

指定されたマーカーから開始して、フィルターを通過するストレージアカウント内のキューを一覧表示します。

キューレベルでの操作

これらの操作には、accountNamequeueName必要です

操作説明

createQueue

新しいキューを作成します。

deleteQueue

キューを完全に削除します。

clearQueue

キュー内のすべてのメッセージを削除します..

sendMessage

デフォルトのプロデューサ操作 指定された存続時間とメッセージがキューに表示されないタイムアウト期間を指定してメッセージを送信します。メッセージテキストは、Exchange メッセージ本文から評価されます。デフォルトでは、キューが存在しない場合、最初に空のキューが作成されます。これを無効にする場合は、config createQueue またはヘッダー CamelAzureStorageQueueCreateQueuefalse に設定します。

deleteMessage

指定されたメッセージをキューから削除します。

receiveMessages

キューからメッセージを最大数まで取得し、タイムアウト期間中は他の操作から非表示にします。ただし、信頼性の理由から、キューからメッセージをデキューしません。

peekMessages

キューの先頭からメッセージの最大数までメッセージをピークします。

updateMessage

キュー内の特定のメッセージを新しいメッセージで更新し、表示タイムアウトをリセットします。メッセージテキストは、Exchange メッセージ本文から評価されます。

これらの操作を camel アプリケーションで使用する方法については、このページの例のセクションを参照してください。

9.5.6. コンシューマーの例

1 つのバッチで最大 5 つのメッセージを含むファイルコンポーネントにキューを消費するには、次のようにします。

from("azure-storage-queue://cameldev/queue1?serviceClient=#client&maxMessages=5")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");

9.5.7. プロデューサー操作の例

  • listQueues:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g, to only returns list of queues with 'awesome' prefix:
      exchange.getIn().setHeader(QueueConstants.QUEUES_SEGMENT_OPTIONS, new QueuesSegmentOptions().setPrefix("awesome"));
     })
    .to("azure-storage-queue://cameldev?serviceClient=#client&operation=listQueues")
    .log("${body}")
    .to("mock:result");
  • createQueue:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=createQueue");
  • deleteQueue:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteQueue");
  • clearQueue:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=clearQueue");
  • sendMessage:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      exchange.getIn().setBody("message to send");
      // we set a visibility of 1min
      exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client");
  • deleteMessage:
from("direct:start")
    .process(exchange -> {
      // set the header you want the producer to evaluate, refer to the previous
      // section to learn about the headers that can be set
      // e.g:
      // Mandatory header:
      exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
      // Mandatory header:
      exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
     })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteMessage");
  • receiveMessages:
from("direct:start")
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=receiveMessages")
    .process(exchange -> {
        final List<QueueMessageItem> messageItems = exchange.getMessage().getBody(List.class);
        messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
    })
   .to("mock:result");
  • peekMessages:
from("direct:start")
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=peekMessages")
    .process(exchange -> {
        final List<PeekedMessageItem> messageItems = exchange.getMessage().getBody(List.class);
        messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
    })
   .to("mock:result");
  • updateMessage:
from("direct:start")
   .process(exchange -> {
       // set the header you want the producer to evaluate, refer to the previous
       // section to learn about the headers that can be set
       // e.g:
       exchange.getIn().setBody("new message text");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
       // Mandatory header:
       exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
    })
    .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=updateMessage");

9.5.8. 開発ノート (重要)

このコンポーネントで開発する場合、統合テストを実行するために Azure accessKey を取得する必要があります。モック単体テストに加えて、マイナーバージョンのアップグレードでも Azure クライアントが問題を起こす可能性があるため、変更を加えたり、クライアントのアップグレードごとに統合テストを実行したりする必要があります。統合テストを実行するには、このコンポーネントディレクトリーで次の maven コマンドを実行します。

mvn verify -PfullTests -DaccountName=myacc -DaccessKey=mykey

ここで、accountName は Azure アカウント名で、accessKey は Azure portal から生成されるアクセスキーです。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.