第12章 Kafka への接続
Apache Kafka はデータの取得および公開に使用できる分散ストリーミングプラットフォームです。インテグレーションでは、指定した Kafka トピックからデータをサブスクライブでき、または指定した Kafka トピックにデータを公開することもできます。これには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションフローに追加します。詳細は以下を参照してください。
12.1. Kafka ブローカーへのコネクションの作成 リンクのコピーリンクがクリップボードにコピーされました!
インテグレーションで、Kafka トピックからデータをサブスクライブしたり、Kafka トピックにデータを公開するには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションに追加します。
本リリースでは、Kafka へのコネクションは SSL をサポートしません。これは、今後のリリースで変更される予定です。
前提条件
接続する Kafka ブローカーの URI を知っている必要があります。
手順
- Fuse Online の左パネルで Connections をクリックし、利用可能なコネクションを表示します。
- 右上の Create Connection をクリックしてコネクターを表示します。
- Kafka コネクターをクリックします。
- Kafka bootstraps URI フィールドに、Kafka ブローカー URI のコンマ区切りリストを入力します。各 URI は host:port 形式で指定する必要があります。
- Validate をクリックします。Fuse Online は即座にコネクションを検証しようとし、検証の成功または失敗を示すメッセージを表示します。検証に失敗した場合は、入力パラメーターを訂正し、再度検証を行います。
- 検証に成功した場合は Next をクリックします。
-
Name フィールドに、このコネクションを別のコネクションと区別するために使用する名前を入力します。たとえば、
Kafka Westを入力します。 - Description は任意のフィールドで、このコネクションに関する便利な情報を入力します。
- Save をクリックし、作成したコネクションが利用可能になったことを確認します。上記の例の名前を入力した場合は、インテグレーションに追加できるコネクションとして Kafka West が表示されます。
12.2. Kafka ブローカーからデータを取得してインテグレーションの実行をトリガー リンクのコピーリンクがクリップボードにコピーされました!
Kafka ブローカーからデータを受信したときにインテグレーションの実行をトリガーするには、Kafka コネクションを最初のコネクションとして追加します。インテグレーションの実行中、Kafka コネクションは指定した Kafka トピックでデータを継続的に監視します。コネクションが新しいデータを見つけると、そのデータをインテグレーションの次のステップに渡します。
前提条件
Kafka ブローカーへのコネクションが作成されている必要があります。
手順
- Fuse Online パネルの左側にある Integrations をクリックします。
- Create Integration をクリックします。
- Choose a connection ページで、インテグレーションを開始するために使用する Kafka コネクションをクリックします。
- Choose an action ページで、Subscribe アクションを選択し、指定したトピックからデータを受信します。
- Topic Name フィールドで、下向きの矢印をクリックしてトピックのリストを表示し、サブスクライブするトピックをクリックします。
- Next をクリックして、アクションの出力タイプを指定します。
データタイプの認識が必要がない場合は、Select Type フィールドに Type specification not required を使用し、Next をクリックします。残りの手順を行う必要はありません。
これ以外の場合は、Select Type フィールドをクリックし、以下の 1 つをスキーマタイプとして選択します。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
application/schema+jsonです。 -
JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは
application/jsonです。 -
XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は
.xsdです。 -
XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は
.xmlです。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
-
Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが
application/schema+jsonの JSON スキーマファイルの内容を貼り付けます。 Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、
Vendorをデータタイプ名として指定することができます。このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーションビジュアライゼーションとデータマッパーにタイプ名が表示されます。
- Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
- Next をクリックします。
結果
インテグレーションに最初のコネクションが追加され、Fuse Online は最後のコネクションを選択するよう要求します。
12.3. Kafka ブローカーへのデータの公開 リンクのコピーリンクがクリップボードにコピーされました!
インテグレーションでは、フローの途中で Kafka ブローカーにデータを公開したり、シンプルなインテグレーションを終了するためにデータを公開したりすることができます。これには、Kafka コネクションをフローの途中に追加するか、シンプルなインテグレーションの最後のコネクションとして追加します。
前提条件
- Kafka ブローカーへのコネクションが作成されている必要があります。
- フローを作成または編集することになり、Fuse Online でインテグレーションに追加するよう要求されます。または、Fuse Online で最後のコネクションを選択するよう要求されます。
手順
- Add to Integration ページで、Kafka コネクションの追加先のプラス記号をクリックします。シンプルなインテグレーションの最後のコネクションを追加する場合は、このステップをスキップします。
- メッセージの公開に使用する Kafka コネクションをクリックします。
- Choose an action ページで Publish を選択します。
- Topic Name フィールドで、下向きの矢印をクリックしてトピックのリストを表示し、公開するトピックをクリックします。
- Next をクリックしてアクションの入力タイプを指定します。
データタイプの認識が必要がない場合は、Select Type フィールドに Type specification not required を使用し、Next をクリックします。残りの手順を行う必要はありません。
これ以外の場合は、Select Type フィールドをクリックし、以下の 1 つをスキーマタイプとして選択します。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
application/schema+jsonです。 -
JSON instance は JSON データが含まれるドキュメントです。ドキュメントのメディアタイプは
application/jsonです。 -
XML schema は XML データの構造を記述するドキュメントです。このドキュメントのファイル拡張子は
.xsdです。 -
XML instance は XML データが含まれるドキュメントです。このドキュメントのファイル拡張子は
.xmlです。
-
JSON schema は JSON データの構造を記述するドキュメントです。ドキュメントのメディアタイプは
-
Definition 入力ボックスに、選択したスキーマタイプに準拠する定義を貼り付けます。たとえば、JSON schema を選択した場合は、メディアタイプが
application/schema+jsonの JSON スキーマファイルの内容を貼り付けます。 Data Type Name フィールドにはデータタイプに選択した名前を入力します。たとえば、ベンダーの JSON スキーマを指定する場合は、
Vendorをデータタイプ名として指定することができます。このタイプを指定するコネクションを使用するインテグレーションを作成または編集するときにこのデータタイプ名が表示されます。Fuse Online では、インテグレーションビジュアライゼーションとデータマッパーにタイプ名が表示されます。
- Data Type Description フィールドには、このタイプを区別するのに便利な情報を提供します。データマッパーでこのタイプを処理するステップにカーソルを合わせると、この説明が表示されます。
- Next をクリックします。
結果
インテグレーションビジュアライゼーションで追加した場所にコネクションが表示されます。