第15章 Kafka への接続
Apache Kafka はデータの取得およびパブリッシュに使用できる分散ストリーミングプラットフォームです。インテグレーションでは、指定した Kafka トピックからデータをサブスクライブでき、または指定した Kafka トピックにデータを公開することもできます。そのためには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションフローに追加します。詳細は以下を参照してください。
15.1. Kafka ブローカーへのコネクションの作成
インテグレーションで、Kafka トピックからデータをサブスクライブしたり、Kafka トピックにデータを公開するには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションに追加します。
前提条件
Red Hat マネージド Kafka の場合:
- Red Hat OpenShift Streams for Apache Kafka の使用 で説明されているように、Kafka インスタンスを作成し、サービスアカウントを作成し、Kafka トピックを設定しました。サービスアカウントのクライアント ID とクライアントシークレットを知っている。
Kafka インスタンスの Bootstrap サーバー URI を知っている。Bootstrap サーバーの URI を取得するには:
- Red Hat Managed Services Web コンソールにログインします。
- Web コンソールの Kafka インスタンス ページで、接続先の関連する Kafka インスタンスについて、オプションアイコン (3 つの縦のドット) を選択し、Connection をクリックして Bootstrap サーバー URI を表示します。
- PLAIN SSL メカニズムの場合、ユーザー名とパスワードはわかっています。
- Transport Layer Security (TLS) を使用してデータを暗号化する場合は、Kafka ブローカーの PEM 形式の証明書テキストが必要です。通常、Kafka サーバー管理者からブローカー証明書テキストを取得します。
手順
- Fuse Online の左パネルで Connections をクリックし、利用可能なコネクションを表示します。
- Create Connection をクリックしてコネクターを表示します。
- Kafka Message Broker コネクターをクリックします。
Kafka broker URIs フィールドで、このコネクションがアクセスするブローカーを入力または選択するか、Kafka ブローカー URI のコンマ区切りリストを入力します。各 URI は
host:port
形式で指定する必要があります。Red Hat Managed Kafka の場合は、Managed Kafka インスタンスの Bootstrap サーバー URI を入力します。OpenShift クラスターに Strimzi Operator または AMQ Streams Operator をインストールした場合、URI は自動検出され、選択できることに注意してください。
セキュリティープロトコル フィールドで、次のいずれかのオプションを選択します。
- 転送中のデータを保護するためにデータを暗号化する場合は、TLS (Transport Layer Security) を選択します。ステップ 7 に進みます。
- SASL で認証し、SSL (Red Hat Managed Kafka など) でデータを暗号化する場合は、SASL_SSL を選択します。
- データを暗号化しない場合は、Plain を選択し、8 に進みます。
Security Protocol として SASL_SSL を選択した場合は、認証情報を設定するために次の 2 つのオプションから選択する必要があります。
- PLAIN を SASL メカニズム として使用する場合は、ユーザー名 と パスワード のフィールドを設定する必要があります。
OAUTHBEARER を SASL メカニズム として使用する場合は、次のフィールドを設定する必要があります。
- OAuth クライアント ID を持つ Username。
- OAUth クライアントシークレットを含む Password。
SASL ログインコールバックハンドラークラス kafka-clients 2.5 バージョンまたは Strimzi プロジェクトの kafka-oauth-client の任意のコールバックハンドラークラスを使用できます。Red Hat Managed Kafka に接続するには、以下を使用できます。
io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
-
プロバイダーから提供された
/oauth/token
エンドポイント URI を使用した OAuth トークンエンドポイント URI。
- 手順 5. で TLS を選択した場合、Broker certificate フィールドに Kafka ブローカーの PEM 形式の証明書テキストを貼り付けます。
任意手順:カスタムプロパティーを追加 をクリックして
key:value
のペアを指定し、Kafka のプロデューサーとコンシューマーのオプションを設定します。たとえば、新しいインテグレーションがトピックから古いメッセージを消費できるようにするには、Key フィールドに auto.offset.reset を、Value フィールドに earliest を入力し、
auto.offset.reset
の値をデフォルト値 (latest
) からearliest
に変更します。Kafka プロデューサー設定オプションの詳細は、https://kafka.apache.org/documentation/#producerconfigs を参照してください。
Kafka コンシューマー設定オプションの詳細は、https://kafka.apache.org/documentation/#consumerconfigs を参照してください。
注記: 設定属性を追加する場合、Fuse Online は次のステップで設定属性を検証プロセスの一部として含めません。
- Validate をクリックします。Fuse Online は即座にコネクションを検証しようとし、検証の成功または失敗を示すメッセージを表示します。検証に失敗した場合は、入力パラメーターを訂正し、再度検証を行います。
- 検証に成功した場合は Next をクリックします。
- Name フィールドに、このコネクションを別のコネクションと区別するために使用する名前を入力します。たとえば、Kafka Test と入力できます。
- Description は任意のフィールドで、このコネクションに関する便利な情報を入力します。
- Save をクリックし、作成したコネクションが利用可能になったことを確認します。上記の例の名前を入力した場合は、インテグレーションに追加できるコネクションとして Kafka Test が表示されます。