第15章 Kafka への接続


Apache Kafka はデータの取得およびパブリッシュに使用できる分散ストリーミングプラットフォームです。インテグレーションでは、指定した Kafka トピックからデータをサブスクライブでき、または指定した Kafka トピックにデータを公開することもできます。そのためには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションフローに追加します。詳細は以下を参照してください。

15.1. Kafka ブローカーへのコネクションの作成

インテグレーションで、Kafka トピックからデータをサブスクライブしたり、Kafka トピックにデータを公開するには、Kafka へのコネクションを作成し、そのコネクションをインテグレーションに追加します。

前提条件

  • Red Hat マネージド Kafka の場合:

    • Red Hat OpenShift Streams for Apache Kafka の使用 で説明されているように、Kafka インスタンスを作成し、サービスアカウントを作成し、Kafka トピックを設定しました。サービスアカウントのクライアント ID とクライアントシークレットを知っている。
    • Kafka インスタンスの Bootstrap サーバー URI を知っている。Bootstrap サーバーの URI を取得するには:

      1. Red Hat Managed Services Web コンソールにログインします。
      2. Web コンソールの Kafka インスタンス ページで、接続先の関連する Kafka インスタンスについて、オプションアイコン (3 つの縦のドット) を選択し、Connection をクリックして Bootstrap サーバー URI を表示します。
  • PLAIN SSL メカニズムの場合、ユーザー名とパスワードはわかっています。
  • Transport Layer Security (TLS) を使用してデータを暗号化する場合は、Kafka ブローカーの PEM 形式の証明書テキストが必要です。通常、Kafka サーバー管理者からブローカー証明書テキストを取得します。

手順

  1. Fuse Online の左パネルで Connections をクリックし、利用可能なコネクションを表示します。
  2. Create Connection をクリックしてコネクターを表示します。
  3. Kafka Message Broker コネクターをクリックします。
  4. Kafka broker URIs フィールドで、このコネクションがアクセスするブローカーを入力または選択するか、Kafka ブローカー URI のコンマ区切りリストを入力します。各 URI は host:port 形式で指定する必要があります。

    Red Hat Managed Kafka の場合は、Managed Kafka インスタンスの Bootstrap サーバー URI を入力します。OpenShift クラスターに Strimzi Operator または AMQ Streams Operator をインストールした場合、URI は自動検出され、選択できることに注意してください。

  5. セキュリティープロトコル フィールドで、次のいずれかのオプションを選択します。

    • 転送中のデータを保護するためにデータを暗号化する場合は、TLS (Transport Layer Security) を選択します。ステップ 7 に進みます。
    • SASL で認証し、SSL (Red Hat Managed Kafka など) でデータを暗号化する場合は、SASL_SSL を選択します。
    • データを暗号化しない場合は、Plain を選択し、8 に進みます。
  6. Security Protocol として SASL_SSL を選択した場合は、認証情報を設定するために次の 2 つのオプションから選択する必要があります。

    • PLAINSASL メカニズム として使用する場合は、ユーザー名パスワード のフィールドを設定する必要があります。
    • OAUTHBEARERSASL メカニズム として使用する場合は、次のフィールドを設定する必要があります。

      • OAuth クライアント ID を持つ Username
      • OAUth クライアントシークレットを含む Password
      • SASL ログインコールバックハンドラークラス kafka-clients 2.5 バージョンまたは Strimzi プロジェクトの kafka-oauth-client の任意のコールバックハンドラークラスを使用できます。Red Hat Managed Kafka に接続するには、以下を使用できます。

        io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

      • プロバイダーから提供された /oauth/token エンドポイント URI を使用した OAuth トークンエンドポイント URI
  7. 手順 5. で TLS を選択した場合、Broker certificate フィールドに Kafka ブローカーの PEM 形式の証明書テキストを貼り付けます。
  8. 任意手順:カスタムプロパティーを追加 をクリックして key:value のペアを指定し、Kafka のプロデューサーとコンシューマーのオプションを設定します。

    たとえば、新しいインテグレーションがトピックから古いメッセージを消費できるようにするには、Key フィールドに auto.offset.reset を、Value フィールドに earliest を入力し、auto.offset.reset の値をデフォルト値 (latest) から earliest に変更します。

    Kafka プロデューサー設定オプションの詳細は、https://kafka.apache.org/documentation/#producerconfigs を参照してください。

    Kafka コンシューマー設定オプションの詳細は、https://kafka.apache.org/documentation/#consumerconfigs を参照してください。

    注記: 設定属性を追加する場合、Fuse Online は次のステップで設定属性を検証プロセスの一部として含めません。

  9. Validate をクリックします。Fuse Online は即座にコネクションを検証しようとし、検証の成功または失敗を示すメッセージを表示します。検証に失敗した場合は、入力パラメーターを訂正し、再度検証を行います。
  10. 検証に成功した場合は Next をクリックします。
  11. Name フィールドに、このコネクションを別のコネクションと区別するために使用する名前を入力します。たとえば、Kafka Test と入力できます。
  12. Description は任意のフィールドで、このコネクションに関する便利な情報を入力します。
  13. Save をクリックし、作成したコネクションが利用可能になったことを確認します。上記の例の名前を入力した場合は、インテグレーションに追加できるコネクションとして Kafka Test が表示されます。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.