第15章 Kafka への接続
Apache Kafka はデータの取得およびパブリッシュに使用できる分散ストリーミングプラットフォームです。インテグレーションでは、指定した Kafka トピックからデータをサブスクライブでき、または指定した Kafka トピックにデータを公開することもできます。そのためには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションフローに追加します。詳細は以下を参照してください。
15.1. Kafka ブローカーへのコネクションの作成
インテグレーションで、Kafka トピックからデータをサブスクライブしたり、Kafka トピックにデータをパブリッシュするには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションに追加します。
前提条件
- Transport Layer Security (TLS) を使用してデータを暗号化する場合は、Kafka ブローカーの PEM 形式の証明書テキストが必要です。通常、Kafka サーバー管理者からブローカー証明書テキストを取得します。
手順
- Fuse Online の左パネルで Connections をクリックし、利用可能なコネクションを表示します。
- Create Connection をクリックしてコネクターを表示します。
- Kafka Message Broker コネクターをクリックします。
- Kafka broker URIs フィールドで、このコネクションがアクセスするブローカーを選択するか、Kafka ブローカー URI のコンマ区切りリストを入力します。各 URI は host:port 形式で指定する必要があります。
Transport Protocol フィールドでは、以下のオプションのいずれかを選択します。
- 転送中のデータを保護するためにデータを暗号化する場合は、TLS (Transport Layer Security) を選択します。
- データを暗号化しない場合は、Plain を選択し、7. に進みます。
- 手順 5. で TLS を選択した場合、Broker certificate フィールドに Kafka ブローカーの PEM 形式の証明書テキストを貼り付けます。
任意手順:Add をクリックして
key:value
ペアを指定し、Kafka プロデューサーおよびコンシューマーオプションを設定します。たとえば、新しいインテグレーションがトピックから古いメッセージを消費できるようにするには、Key フィールドに auto.offset.reset を、Value フィールドに earliest を入力し、
auto.offset.reset
の値をデフォルト値 (latest
) からearliest
に変更します。Kafka プロデューサー設定オプションの詳細は、https://kafka.apache.org/documentation/#producerconfigs を参照してください。
Kafka コンシューマー設定オプションの詳細は、https://kafka.apache.org/documentation/#consumerconfigs を参照してください。
注記: 設定属性を追加する場合、Fuse Online は次のステップで設定属性を検証プロセスの一部として含めません。
- Validate をクリックします。Fuse Online は即座にコネクションを検証しようとし、検証の成功または失敗を示すメッセージを表示します。検証に失敗した場合は、入力パラメーターを訂正し、再度検証を行います。
- 検証に成功した場合は Next をクリックします。
- Name フィールドに、このコネクションを別のコネクションと区別するために使用する名前を入力します。たとえば、Kafka Test と入力できます。
- Description は任意のフィールドで、このコネクションに関する便利な情報を入力します。
- Save をクリックし、作成したコネクションが利用可能になったことを確認します。上記の例の名前を入力した場合は、インテグレーションに追加できるコネクションとして Kafka West が表示されます。