第3章 AMQ への接続
インテグレーションでは、Red Hat AMQ ブローカーからメッセージを取得したり、Red Hat AMQ ブローカーへメッセージをパブリッシュすることができます。Red Hat AMQ は、クライアントとメッセージブローカーとの間の通信に OpenWire プロトコルを使用します。以下のブローカータイプと通信するには、Red Hat AMQ コネクターを使用して、対象のブローカーへのコネクションを作成します。
- AMQP をサポートしない Apache ActiveMQ ブローカー
- AMQ 6 ブローカー
以下のブローカータイプの 1 つと通信するには、AMQP コネクターを使用 して、対象のブローカーへのコネクションを作成します。
- AMQP をサポートする Apache ActiveMQ ブローカー
- Apache ActiveMQ Artemis
- AMQ 7 ブローカー
- EnMasse (オープンソースのメッセージングプラットフォーム)
Red Hat AMQ コネクターを使用するには、以下を参照してください。
3.1. AMQ コネクションの作成
インテグレーションでは、以下からメッセージを取得したり、以下にメッセージをパブリッシュすることがあります。
- AMQP をサポートしない Apache ActiveMQ ブローカー
- AMQ 6 ブローカー
このような場合、インテグレーションに追加できる Red Hat AMQ コネクションを作成します。
前提条件
接続する Red Hat AMQ ブローカーには以下が必要となります。
- ブローカーの URL
- ユーザーアカウントのクレデンシャル
- ブローカーの PEM 形式の証明書
手順
- Fuse Online の左パネルで Connections をクリックし、利用可能なコネクションを表示します。
- 右上の Create Connection をクリックしてコネクターを表示します。
- Red Hat AMQ コネクターを表示します。
以下を入力してコネクションを設定します。
-
Broker URL フィールドにデータの送信先またはデータの取得元の場所を入力します (例:
tcp://localhost:61616
)。 - User Name フィールドに、このブローカーへアクセスするために使用するアカウントのユーザー名を入力します。
- Password フィールドに、このブローカーへアクセスするために使用するアカウントのパスワードを入力します。
- Client ID フィールドに、メッセージを見逃さずにコネクションを開閉できるようにする ID を入力します。宛先のタイプはトピックにする必要があります。
- このコネクションが開発環境で使用される場合は、Check Certificates を無効にすると時間を一部節約することができます。証明書確認の無効化は開発環境で便利です。セキュアな本番環境では、常に Check Certificates を有効にするようにしてください。
- Broker Certificate フィールドに Red Hat AMQ ブローカーの PEM 形式の証明書テキストを貼り付けます。これは、証明書確認を無効にした場合以外は必須です。
- Client Certificate フィールドに Red Hat クライアントの PEM 形式の証明書テキストを貼り付けます。このフィールドは常に任意です。
-
Broker URL フィールドにデータの送信先またはデータの取得元の場所を入力します (例:
- Validate をクリックします。Fuse Online は即座にコネクションを検証しようとし、検証の成功または失敗を示すメッセージを表示します。検証に失敗した場合は、必要に応じて設定の詳細を修正し、再度検証を行います。
- 検証に成功した場合は Next をクリックします。
-
Connection Name フィールドに、このコネクションを別のコネクションと区別するために使用する名前を入力します。たとえば、
Red Hat AMQ 1
を入力します。 -
Description は任意のフィールドで、このコネクションに関する便利な情報を入力します。たとえば、
Sample Red Hat AMQ connection that uses a provided broker.
を入力します。 - 右上にある Create をクリックし、作成したコネクションが利用可能になったことを確認します。上記の例の名前を入力した場合は、インテグレーションに追加できるコネクションとして Red Hat AMQ 1 が表示されます。
3.2. AMQ コネクションを追加してメッセージ受信時にインテグレーションの実行をトリガー
Red Hat AMQ ブローカーからメッセージを受信したときにインテグレーションの実行をトリガーするには、Red Hat AMQ コネクションを最初のコネクションとして追加します。
前提条件
メッセージの取得元となる Red Hat AMQ ブローカーへの Red Hat AMQ コネクションが作成されている必要があります。
手順
- Fuse Online パネルの左側にある Integrations をクリックします。
- Create Integration をクリックします。
- Choose a Start Connection ページで、インテグレーションを開始するために使用する Red Hat AMQ コネクションをクリックします。
- Choose an Action ページで、Subscribe for messages アクションをクリックし、指定したキューまたはトピックからメッセージを受信します。
アクションを設定するには以下を行います。
- Destination Name フィールドに、データの受信元となるキューまたはトピックの名前を入力します。
- Destination Type では Queue を許可するか、Topic を選択します。
- メッセージを見逃さずにコネクションを開閉できるようにするため、Durable Subscription ID フィールドに永続サブスクリプション ID を入力します。宛先のタイプはトピックにする必要があります。
特定の条件を満たすデータのみを受信する場合は、Message Selector フィールドにフィルター式を入力します。
メッセージセレクターは式が含まれる文字列です。式の構文は、SQL92 条件式構文のサブセットを基にしています。以下の例のメッセージセレクターは、値が
Sports
またはOpinion
に設定されたNewsType
プロパティーがあるメッセージをすべて選択します。NewsType = ’Sports’ OR NewsType = ’Opinion’
メッセージコンシューマーは、ヘッダーとプロパティーがメッセージセレクターの式と一致するメッセージのみを受信します。メッセージセレクターは、メッセージのボディー部の内容を基にしてメッセージを選択することはできません。
- Next をクリックして、アクションの出力タイプを指定します。
データタイプの認識が必要がない場合は、Select Type フィールドで Type specification not required を受け入れ、下部にある Done をクリックします。残りの手順を行う必要はありません。
その他の場合は、以下の 1 つをスキーマタイプとして選択します。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
application/schema+json
です。 -
JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは
application/json
です。 -
XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は
.xsd
です。 -
XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は
.xml
です。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
-
Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが
application/schema+json
の JSON スキーマファイルの内容を貼り付けます。 Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、
Vendor
をデータタイプ名として指定することができます。このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーション仮想化パネルとデータマッパーにタイプ名が表示されます。
- Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
- Done をクリックします。
結果
インテグレーションフローの最初にコネクションが表示されます。
3.3. AMQ メッセージをパブリッシュしてインテグレーションを終了
メッセージを Red Hat AMQ ブローカーにパブリッシュしてシンプルなインテグレーションを終了するには、Red Hat AMQ コネクションをシンプルなインテグレーションの最後のコネクションとして追加します。
前提条件
- メッセージのパブリッシュ先となる Red Hat AMQ ブローカーへのコネクションが作成されている必要があります。
- インテグレーションを作成することになり、Fuse Online は最後のコネクションの選択を要求します。または、インテグレーションを変更して最後のコネクションを変更することになります。
手順
- Choose a Finish Connection ページで、インテグレーションの終了に使用する Red Hat AMQ コネクションをクリックします。
- Choose an Action ページで、Publish messages アクションをクリックし、指定したキューまたはトピックにメッセージをパブリッシュします。
- Destination Name フィールドに、メッセージの送信先となるキューまたはトピックの名前を入力します。
- Destination Type では Queue を許可するか、Topic を選択します。
- Persistent を選択し、コネクションが失敗した場合でもメッセージが確実に送信されるようにします。
- Next をクリックし、アクションの入力および出力タイプを指定します。
データタイプの認識が必要がない場合は、Select Type フィールドで Type specification not required を受け入れ、下部にある Done をクリックします。残りの手順を行う必要はありません。
その他の場合は、以下の 1 つをスキーマタイプとして選択します。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
application/schema+json
です。 -
JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは
application/json
です。 -
XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は
.xsd
です。 -
XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は
.xml
です。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
-
Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが
application/schema+json
の JSON スキーマファイルの内容を貼り付けます。 Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、
Vendor
をデータタイプ名として指定することができます。このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーション仮想化パネルとデータマッパーにタイプ名が表示されます。
- Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
- Done をクリックします。
結果
インテグレーションフローの最後にコネクションが表示されます。
3.4. インテグレーションの途中に AMQ メッセージをパブリッシュ
インテグレーションの途中に Red Hat AMQ ブローカーにメッセージをパブリッシュするには、Red Hat AMQ コネクションをフローの途中に追加します。
前提条件
- メッセージのパブリッシュ先となる Red Hat AMQ ブローカーへのコネクションが作成されている必要があります。
- フローを作成または編集することになります。Fuse Online はステップの選択を要求します。または、Fuse Online で最後のコネクションを選択するよう要求されます。
手順
- フローの途中に追加する Red Hat AMQ コネクションをクリックします。
Choose an Action ページで、以下のアクションの 1 つを選択します。
Publish messages アクションは指定するキューまたはトピックにメッセージをパブリッシュします。このアクションを設定するには、以下を行います。
- Destination Name フィールドに、メッセージの送信先となるキューまたはトピックの名前を入力します。
- Destination Type では Queue を許可するか、Topic を選択します。
- Persistent を選択し、コネクションが失敗した場合でもメッセージが確実に送信されるようにします。
Request response using messages で、応答の受信に指定した JMS 宛先にメッセージを送ります。このアクションを設定するには、以下を行います。
- Destination Name フィールドに、メッセージの送信先となるキューまたはトピックの名前を入力します。
- Destination Type では Queue を許可するか、Topic を選択します。
特定の条件を満たす応答のみを受信する場合は、Message Selector フィールドにフィルター式を入力します。
メッセージセレクターは式が含まれる文字列です。式の構文は、SQL92 条件式構文のサブセットを基にしています。以下の例のメッセージセレクターは、値が
Sports
またはOpinion
に設定されたNewsType
プロパティーがあるメッセージをすべて選択します。NewsType = ’Sports’ OR NewsType = ’Opinion’
メッセージコンシューマーは、ヘッダーとプロパティーがメッセージセレクターの式と一致するメッセージのみを受信します。メッセージセレクターは、メッセージのボディー部の内容を基にしてメッセージを選択することはできません。
- Named Reply To フィールドには、キューまたはトピックの名前を入力します。宛先によって、応答がこのキューまたはトピックに送信されます。
- Persistent を選択し、コネクションが失敗した場合でもメッセージが確実に送信されるようにします。
- Response Time Out フィールドには、ランタイム例外をスローする前にこのコネクションが応答メッセージを待つ期間 (ミリ秒単位) を指定します。デフォルトは 5000 ミリ秒 (5 秒)です。
- Next をクリックし、アクションの入力タイプを指定してから、アクションの出力タイプを指定します。
データタイプの認識が必要がない場合は、Select Type フィールドで Type specification not required を受け入れ、下部にある Done をクリックします。残りの手順を行う必要はありません。
その他の場合は、以下の 1 つをスキーマタイプとして選択します。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
application/schema+json
です。 -
JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは
application/json
です。 -
XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は
.xsd
です。 -
XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は
.xml
です。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
-
Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが
application/schema+json
の JSON スキーマファイルの内容を貼り付けます。 Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、
Vendor
をデータタイプ名として指定することができます。このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーション仮想化パネルとデータマッパーにタイプ名が表示されます。
- Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
- Done をクリックします。
結果
インテグレーションフローで追加した場所にコネクションが表示されます。