第4章 AMQP への接続
インテグレーションでは、AMQP (Advanced Message Queue Protocol) ブローカーからメッセージを取得したり、ANQP ブローカーへメッセージを公開することができます。AMQP は、クライアントとメッセージブローカーとの間の通信を定義します。以下のブローカータイプで通信するには、AMQP コネクターを使用して対象のブローカーへのコネクションを作成します。
- AMQP をサポートする Apache ActiveMQ ブローカー
- Apache ActiveMQ Artemis
- AMQ 7 ブローカー
- EnMasse (オープンソースのメッセージングプラットフォーム)
以下のブローカータイプの 1 つと通信するには、Red Hat AMQ コネクターを使用 して、対象のブローカーへのコネクションを作成します。
- AMQP をサポートしない Apache ActiveMQ ブローカー
- AMQ 6 ブローカー
AMQP コネクターを使用して AMQP をサポートしない Apache ActiveMQ ブローカーへのコネクションや AMQ 6 ブローカーへのコネクションを作成できます。これを行うには、ブローカーのトランスポートを設定する必要があります。ブローカーの設定に関する情報は、『Red Hat JBoss A-MQ Managing and Monitoring Brokers』の「Adding Client Connection Points」 を参照してください。指定する設定値に関する情報は、『Red Hat JBoss A-MQ Connection Reference』の「Advanced Message Queuing Protocol (AMQP)」 を参照してください。
AMQP コネクターを使用するには、以下を参照してください。
4.1. AMQP コネクションの作成
インテグレーションで AMQP ブローカーからメッセージを取得したり AMQP ブローカーへメッセージを公開する場合は、インテグレーションに追加できる AMQP コネクションを作成します。
前提条件
接続する AMQP ブローカーに以下が必要となります。
- URI
- ユーザーアカウントのクレデンシャル
- PEM 形式の証明書テキスト
手順
- Fuse Online の左パネルで Connections をクリックし、利用可能なコネクションを表示します。
- 右上の Create Connection をクリックしてコネクターを表示します。
- AMQP Message Broker コネクターをクリックします。
コネクションを設定します。
- Connection URI フィールドにデータの送信先または取得元となる場所を入力します。
- User name フィールドに、このブローカーへアクセスするために使用するアカウントのユーザー名を入力します。
- Password フィールドに、このブローカーへアクセスするために使用するアカウントのパスワードを入力します。
- Client ID フィールドに、メッセージを見逃さずにコネクションを開閉できるようにする ID を入力します。宛先のタイプはトピックにする必要があります。
- このコネクションが開発環境で使用される場合は、Check certificates を無効にすると時間を一部節約することができます。証明書確認の無効化は開発環境で便利です。セキュアな本番環境では、常に Check certificates を有効にするようにしてください。
- Broker certificate フィールドにブローカーの PEM 形式の証明書テキストを貼り付けます。これは、証明書の確認を無効にした場合以外は必須になります。
- Client certificate フィールドにクライアントの PEM 形式の証明書テキストを貼り付けます。このフィールドは常に任意です。
- Validate をクリックします。Fuse Online は即座にコネクションを検証しようとし、検証の成功または失敗を示すメッセージを表示します。検証に失敗した場合は、必要に応じて設定の詳細を修正し、再度検証を行います。
- 検証に成功した場合は Next をクリックします。
-
Name フィールドに、このコネクションを別のコネクションと区別するために使用する名前を入力します。たとえば、
AMQP 1
を入力します。 -
Description は任意のフィールドで、このコネクションに関する便利な情報を入力します。たとえば、
Sample AMQP connection
を入力します。 - Save をクリックし、作成したコネクションが利用可能になったことを確認します。上記の例の名前を入力した場合は、インテグレーションに追加できるコネクションとして AMQP 1 が表示されます。
4.2. AMQP コネクションを追加してメッセージ受信時にインテグレーションの実行をトリガー
AMQP ブローカーからメッセージを受信したときにインテグレーションの実行をトリガーするには、AMQP コネクションをインテグレーションの最初のコネクションとして追加します。
前提条件
- メッセージの受信元となる AMQP ブローカーへのコネクションが作成されている必要があります。
- インテグレーションを作成することになります。Fuse Online は最初のコネクションの選択を要求します。
手順
- Choose a connection ページで、インテグレーションを開始するために使用する AMQP コネクションをクリックします。
- Choose an action ページで、Subscribe for messages アクションを選択し、指定したキューまたはトピックからメッセージを受信します。
アクションを設定するには以下を行います。
- Destination name フィールドに、データの受信元となるキューまたはトピックの名前を入力します。
- Destination type では Queue を許可するか、Topic を選択します。
- メッセージを見逃さずにコネクションを開閉できるようにするため、Durable subscription ID フィールドに永続サブスクリプション ID を入力します。宛先のタイプはトピックにする必要があります。
特定の条件を満たすデータのみを受信する場合は、Message selector フィールドにフィルター式を入力します。
メッセージセレクターは式が含まれる文字列です。式の構文は、SQL92 条件式構文のサブセットを基にしています。以下の例のメッセージセレクターは、値が
Sports
またはOpinion
に設定されたNewsType
プロパティーがあるメッセージをすべて選択します。NewsType = ’Sports’ OR NewsType = ’Opinion’
メッセージコンシューマーは、ヘッダーとプロパティーがメッセージセレクターの式と一致するメッセージのみを受信します。メッセージセレクターは、メッセージのボディー部の内容を基にしてメッセージを選択することはできません。
- Next をクリックして、アクションの出力タイプを指定します。
データタイプの認識が必要がない場合は、Select Type フィールドに Type specification not required を使用し、Next をクリックします。残りの手順を行う必要はありません。
これ以外の場合は、Select Type フィールドをクリックし、以下の 1 つをスキーマタイプとして選択します。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
application/schema+json
です。 -
JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは
application/json
です。 -
XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は
.xsd
です。 -
XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は
.xml
です。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
-
Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが
application/schema+json
の JSON スキーマファイルの内容を貼り付けます。 Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、
Vendor
をデータタイプ名として指定することができます。このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーションビジュアライゼーションとデータマッパーにタイプ名が表示されます。
- Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
- Next をクリックします。
結果
インテグレーションフローの最初にコネクションが表示されます。
4.3. フローの途中で AMQP へメッセージを公開または AMQP へメッセージを公開してインテグレーションを終了
フローの途中で AMQP ブローカーへメッセージを公開したり、AMQP ブローカーへメッセージを公開してシンプルなインテグレーションを終了することができます。これには、AMQP コネクションをフローの途中に追加するか、AMQP コネクションをシンプルなインテグレーションの最後のコネクションとして追加します。
前提条件
- メッセージの公開先となる AMQP ブローカーへのコネクションが作成されている必要があります。
- フローを作成または編集することになり、Fuse Online でインテグレーションに追加するよう要求されます。または、Fuse Online でシンプルなインテグレーションの最後のコネクションを選択するよう要求されます。
手順
- Add to Integration ページで、コネクションの追加先のプラス記号をクリックします。Fuse Online が最後のコネクションを選択するよう要求する場合は、このステップをスキップします。
- メッセージの公開に使用する AMQP コネクションをクリックします。
Choose an action ページで、以下のアクションの 1 つを選択します。
Publish messages で応答を受信せずに、指定するキューまたはトピックへメッセージを公開します。このアクションを設定するには、以下を行います。
- Destination name フィールドに、メッセージの送信先となるキューまたはトピックの名前を入力します。
- Destination type では Queue を許可するか、Topic を選択します。
- Persistent を選択し、コネクションが失敗した場合でもメッセージが確実に送信されるようにします。
Request response using messages で、指定するキューまたはトピックへメッセージを公開し、応答を受信します。
注記シンプルなインテグレーションの最後のコネクションが Request response using messages アクションを実行する AMQP コネクションである場合、コネクションはメッセージを公開しますが、応答は破棄されます。応答を保持するには、Request response using messages アクションを実行する途中のコネクションとして AMQP コネクションを追加し、ログステップでシンプルなインテグレーションを終了します。
このアクションを設定するには、以下を行います。
- Destination name フィールドに、メッセージの送信先となるキューまたはトピックの名前を入力します。
- Destination type では Queue を許可するか、Topic を選択します。
- メッセージを見逃さずにコネクションを開閉できるようにするため、Durable subscription ID フィールドに永続サブスクリプション ID を入力します。宛先のタイプはトピックにする必要があります。
特定の条件を満たす応答のみを受信する場合は、Message selector フィールドにフィルター式を入力します。
メッセージセレクターは式が含まれる文字列です。式の構文は、SQL92 条件式構文のサブセットを基にしています。以下の例のメッセージセレクターは、値が
Sports
またはOpinion
に設定されたNewsType
プロパティーがあるメッセージをすべて選択します。NewsType = ’Sports’ OR NewsType = ’Opinion’
メッセージコンシューマーは、ヘッダーとプロパティーがメッセージセレクターの式と一致するメッセージのみを受信します。メッセージセレクターは、メッセージのボディー部の内容を基にしてメッセージを選択することはできません。
- Next をクリックし、アクションの入力および出力タイプを指定します。
データタイプの認識が必要がない場合は、Select Type フィールドに Type specification not required を使用し、Next をクリックします。残りの手順を行う必要はありません。
これ以外の場合は、Select Type フィールドをクリックし、以下の 1 つをスキーマタイプとして選択します。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
application/schema+json
です。 -
JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは
application/json
です。 -
XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は
.xsd
です。 -
XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は
.xml
です。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
-
Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが
application/schema+json
の JSON スキーマファイルの内容を貼り付けます。 Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、
Vendor
をデータタイプ名として指定することができます。このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーションビジュアライゼーションとデータマッパーにタイプ名が表示されます。
- Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
- Next をクリックします。
結果
インテグレーションビジュアライゼーションで追加した場所にコネクションが表示されます。