9.5. 用途
たとえば、storageAccount
ストレージアカウントのキュー messageQueue
からメッセージコンテンツを取得するには、次のスニペットを使用します。
from("azure-storage-queue://storageAccount/messageQueue?accessKey=yourAccessKey"). to("file://queuedirectory");
9.5.1. コンポーネントプロデューサーによって評価されるメッセージヘッダー
ヘッダー | 変数名 | タイプ | 操作 | 説明 |
---|---|---|---|---|
|
|
|
| キューを一覧表示するためのオプション |
|
|
| すべて | それを超えると \{@link RuntimeException} が発生する任意のタイムアウト値。 |
|
|
|
| キューに関連付けるメタデータ |
|
|
|
| メッセージがキュー内で存続する時間。設定されていない場合、値はデフォルトで 7 日になります。-1 が渡されると、メッセージは期限切れになりません。存続時間は -1 または任意の正の数でなければなりません。 |
|
|
|
| メッセージがキューに表示されないタイムアウト期間。設定されていない場合、値はデフォルトで 0 になり、メッセージはすぐに表示されます。タイムアウトは 0 秒から 7 日の間にする必要があります。 |
|
|
|
|
|
|
|
|
| メッセージを削除または更新するために一致する必要がある一意の識別子。 |
|
|
|
| 削除または更新するメッセージの ID。 |
|
|
|
| 取得するメッセージの最大数。要求された数よりもキューに存在するメッセージが少ない場合は、すべてのメッセージが返されます。空のままにすると、1 つのメッセージのみが取得されます。許可される範囲は 1 から 32 のメッセージです。 |
|
|
| すべて | 実行するプロデューサー操作を指定します。プロデューサー操作に関連するこのページのドキュメントを参照してください。 |
|
|
| すべて | キュー名をオーバーライドします。 |
9.5.2. コンポーネントのプロデューサーまたはコンシューマーによって設定されるメッセージヘッダー
ヘッダー | 変数名 | タイプ | 説明 |
---|---|---|---|
|
|
| キューに送信されるメッセージの ID。 |
|
|
| メッセージがキューに挿入された時刻。 |
|
|
| メッセージが期限切れになり、自動的に削除される時間。 |
|
|
| この値は、メッセージを削除/更新するために必要です。この popreceipt を使用して削除に失敗した場合、メッセージは別のクライアントによってキューから取り出されています。 |
|
|
| メッセージが再びキューに表示される時間。 |
|
|
| メッセージがデキューされた回数。 |
|
|
| ユーザーが使用できる解析されていない httpHeaders を返します。 |
9.5.3. 高度な Azure ストレージキューの設定
Camel アプリケーションがファイアウォールの背後で実行されている場合、または QueueServiceClient
インスタンス設定をより詳細に制御する必要がある場合は、独自のインスタンスを作成できます。
StorageSharedKeyCredential credential = new StorageSharedKeyCredential("yourAccountName", "yourAccessKey"); String uri = String.format("https://%s.queue.core.windows.net", "yourAccountName"); QueueServiceClient client = new QueueServiceClientBuilder() .endpoint(uri) .credential(credential) .buildClient(); // This is camel context context.getRegistry().bind("client", client);
次に、Camel azure-storage-queue
コンポーネント設定でこのインスタンスを参照します。
from("azure-storage-queue://cameldev/queue1?serviceClient=#client") .to("file://outputFolder?fileName=output.txt&fileExist=Append");
9.5.4. レジストリー内の QueueServiceClient クライアントの自動検出
このコンポーネントは、レジストリー内の QueueServiceClient Bean の存在を検出できます。そのタイプの唯一のインスタンスである場合、それはクライアントとして使用され、上記の例のように uri パラメーターとして定義する必要はありません。これは、エンドポイントのよりスマートな設定に非常に役立つ場合があります。
9.5.5. Azure Storage Queue Producer の操作
Camel Azure Storage Queue コンポーネントは、プロデューサー側で幅広い操作を提供します。
サービスレベルの操作
これらの操作には、accountName
が 必要 です。
操作 | 説明 |
---|---|
| 指定されたマーカーから開始して、フィルターを通過するストレージアカウント内のキューを一覧表示します。 |
キューレベルでの操作
これらの操作には、accountName
と queueName
が 必要です。
操作 | 説明 |
---|---|
| 新しいキューを作成します。 |
| キューを完全に削除します。 |
| キュー内のすべてのメッセージを削除します.. |
|
デフォルトのプロデューサ操作 指定された存続時間とメッセージがキューに表示されないタイムアウト期間を指定してメッセージを送信します。メッセージテキストは、Exchange メッセージ本文から評価されます。デフォルトでは、キューが存在しない場合、最初に空のキューが作成されます。これを無効にする場合は、config |
| 指定されたメッセージをキューから削除します。 |
| キューからメッセージを最大数まで取得し、タイムアウト期間中は他の操作から非表示にします。ただし、信頼性の理由から、キューからメッセージをデキューしません。 |
| キューの先頭からメッセージの最大数までメッセージをピークします。 |
| キュー内の特定のメッセージを新しいメッセージで更新し、表示タイムアウトをリセットします。メッセージテキストは、Exchange メッセージ本文から評価されます。 |
これらの操作を camel アプリケーションで使用する方法については、このページの例のセクションを参照してください。
9.5.6. コンシューマーの例
1 つのバッチで最大 5 つのメッセージを含むファイルコンポーネントにキューを消費するには、次のようにします。
from("azure-storage-queue://cameldev/queue1?serviceClient=#client&maxMessages=5") .to("file://outputFolder?fileName=output.txt&fileExist=Append");
9.5.7. プロデューサー操作の例
-
listQueues
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g, to only returns list of queues with 'awesome' prefix: exchange.getIn().setHeader(QueueConstants.QUEUES_SEGMENT_OPTIONS, new QueuesSegmentOptions().setPrefix("awesome")); }) .to("azure-storage-queue://cameldev?serviceClient=#client&operation=listQueues") .log("${body}") .to("mock:result");
-
createQueue
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName"); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=createQueue");
-
deleteQueue
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName"); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteQueue");
-
clearQueue
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName"); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=clearQueue");
-
sendMessage
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setBody("message to send"); // we set a visibility of 1min exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1)); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client");
-
deleteMessage
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: // Mandatory header: exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1"); // Mandatory header: exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1"); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteMessage");
-
receiveMessages
:
from("direct:start") .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=receiveMessages") .process(exchange -> { final List<QueueMessageItem> messageItems = exchange.getMessage().getBody(List.class); messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText())); }) .to("mock:result");
-
peekMessages
:
from("direct:start") .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=peekMessages") .process(exchange -> { final List<PeekedMessageItem> messageItems = exchange.getMessage().getBody(List.class); messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText())); }) .to("mock:result");
-
updateMessage
:
from("direct:start") .process(exchange -> { // set the header you want the producer to evaluate, refer to the previous // section to learn about the headers that can be set // e.g: exchange.getIn().setBody("new message text"); // Mandatory header: exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1"); // Mandatory header: exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1"); // Mandatory header: exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1)); }) .to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=updateMessage");
9.5.8. 開発ノート (重要)
このコンポーネントで開発する場合、統合テストを実行するために Azure accessKey を取得する必要があります。モック単体テストに加えて、マイナーバージョンのアップグレードでも Azure クライアントが問題を起こす可能性があるため、変更を加えたり、クライアントのアップグレードごとに統合テストを実行したりする必要があります。統合テストを実行するには、このコンポーネントディレクトリーで次の maven コマンドを実行します。
mvn verify -PfullTests -DaccountName=myacc -DaccessKey=mykey
ここで、accountName
は Azure アカウント名で、accessKey
は Azure portal から生成されるアクセスキーです。