検索

AMQ Streams on OpenShift の概要

download PDF
Red Hat AMQ Streams 2.1

OpenShift Container Platform で AMQ Streams2.1 の特徴と機能を発見する

概要

Kafka コンポーネントの機能と、AMQ Streams を使用して OpenShift に Kafka をデプロイおよび管理する方法について説明します。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。まずは、マスター (master)、スレーブ (slave)、ブラックリスト (blacklist)、ホワイトリスト (whitelist) の 4 つの用語の置き換えから始めます。これは大規模な取り組みであるため、これらの変更は今後の複数のリリースで段階的に実施されます。詳細は、Red Hat CTO である Chris Wright のメッセージをご覧ください。

第1章 主な特長

AMQ Streams は、OpenShift クラスターで Apache Kafka を実行するプロセスを簡素化します。

本書は、AMQ Streams を理解するためのスタート地点となるように作成されました。また、本書では AMQ Streams の中心となる Kafka の主要な概念をいくつか紹介し、Kafka コンポーネントの目的を簡単に説明します。Kafka のセキュリティーや監視オプションなど、設定ポイントを概説します。AMQ Streams のディストリビューションでは、Kafka クラスターのデプロイおよび管理ファイルと、デプロイメントの設定およびモニタリングのサンプルファイル を提供します。

一般的な Kafka デプロイメントと、Kafka のデプロイおよび管理に使用するツールについて説明します。

1.1. Kafka の機能

Kafka の基盤のデータストリーム処理機能とコンポーネントアーキテクチャーによって以下が提供されます。

  • スループットが非常に高く、レイテンシーが低い状態でデータを共有するマイクロサービスおよびその他のアプリケーション。
  • メッセージの順序の保証。
  • アプリケーションの状態を再構築するためにデータストレージからメッセージを巻き戻し/再生。
  • キーバリューログの使用時に古いレコードを削除するメッセージ圧縮。
  • クラスター設定での水平スケーラビリティー。
  • 耐障害性を制御するデータのレプリケーション。
  • 即時アクセス用の大量データの保持

1.2. Kafka のユースケース

Kafka の機能は、以下に適しています。

  • イベント駆動型のアーキテクチャー。
  • アプリケーションの状態に加えられた変更をイベントのログとしてキャプチャーするイベントソーシング。
  • メッセージのブローカー
  • Web サイトアクティビティーの追跡
  • メトリクスによる運用上のモニタリング
  • ログの収集および集計
  • 分散システムのコミットログ
  • アプリケーションがリアルタイムでデータに対応できるようにするストリーム処理。

1.3. AMQ Streams による Kafka のサポート

AMQ Streams は、Kafka を OpenShift で実行するためのコンテナーイメージおよび Operator を提供します。AMQ Streams Operator は、AMQ Streams の実行に必要です。AMQ Streams で提供される Operator は、Kafka を効果的に管理するために、専門的なオペレーション情報で目的に合うよう構築されています。

Operator は以下のプロセスを単純化します。

  • Kafka クラスターのデプロイおよび実行。
  • Kafka コンポーネントのデプロイおよび実行。
  • Kafka へアクセスするための設定。
  • Kafka へのアクセスをセキュア化。
  • Kafka のアップグレード。
  • ブローカーの管理。
  • トピックの作成および管理。
  • ユーザーの作成および管理。

第2章 AMQ Streams での Kafka のデプロイメント

Apache Kafka コンポーネントは、AMQ Streams ディストリビューションを使用して OpenShift にデプロイするために提供されます。Kafka コンポーネントは通常、クラスターとして実行され、可用性を確保します。

Kafka コンポーネントが組み込まれた通常のデプロイメントには以下が含まれます。

  • ブローカーノードの Kafka クラスター
  • レプリケートされた ZooKeeper インスタンスの zookeeper クラスター
  • 外部データ接続用の Kafka Connect クラスター
  • セカンダリークラスターで Kafka クラスターをミラーリングする Kafka MirrorMaker クラスター
  • 監視用に追加のKafka メトリクスデータを抽出する Kafka Exporter
  • Kafka クラスターに対して HTTP ベースの要求を行う Kafka Bridge

少なくとも Kafka および ZooKeeper は必要ですが、上記のコンポーネントがすべて必須なわけではありません。MirrorMaker や Kafka Connect など、一部のコンポーネントでは Kafka なしでデプロイできます。

2.1. Kafka コンポーネントのアーキテクチャー

Kafka ブローカーのクラスターがメッセージの配信を処理します。

ブローカーは、設定データの保存やクラスターの調整に Apache ZooKeeper を使用します。Apache Kafka の実行前に、Apache ZooKeeper クラスターを用意する必要があります。

他の Kafka コンポーネントはそれぞれ Kafka クラスターと対話し、特定のロールを実行します。

Kafka コンポーネントの操作

Data flows between several Kafka components and the Kafka cluster. See the component descriptions after this image.

Apache ZooKeeper
Apache ZooKeeper はクラスター調整サービスを提供し、ブローカーおよびコンシューマーのステータスを保存して追跡するので、Kafka のコアとなる依存関係です。ZooKeeper は、コントローラーの選出にも使用されます。
Kafka Connect

Kafka Connect は、Connector プラグインを使用して Kafka ブローカーと他のシステムの間でデータをストリーミングする統合ツールです。Kafka Connect は、Kafka と、データベースなどの外部データソースまたはターゲットと統合するためのフレームワークを提供し、コネクターを使用してデータをインポートまたはエクスポートします。コネクターは、必要な接続設定を提供するプラグインです。

  • ソース コネクターは、外部データを Kafka にプッシュします。
  • sink コネクターは Kafka からデータを抽出します。

    外部データは適切な形式に変換されます。

    データコネクションに必要なコネクタープラグインでコンテナーイメージを自動的にビルドする build 設定で、Kafka Connect をデプロイできます。

Kafka MirrorMaker

Kafka MirrorMaker は、データセンター内またはデータセンター全体の 2 台の Kafka クラスター間でデータをレプリケーションします。

MirrorMaker はソースの Kafka クラスターからメッセージを取得して、ターゲットの Kafka クラスターに書き込みます。

Kafka Bridge
Kafka Bridge には、HTTP ベースのクライアントと Kafka クラスターを統合する API が含まれています。
Kafka Exporter
Kafka Exporter は、Prometheus メトリクス (主にオフセット、コンシューマーグループ、コンシューマーラグおよびトピックに関連するデータ) として分析用にデータを抽出します。コンシューマーラグとは、パーティションに最後に書き込まれたメッセージと、そのパーティションからコンシューマーが現在取得中のメッセージとの間の遅延を指します。

第3章 Kafka の概要

Apache Kafka は、耐障害性のリアルタイムデータフィードを実現する、オープンソースの分散型 publish/subscribe メッセージングシステムです。

Apache Kafka の詳細については、Apache Kafka のドキュメント を参照してください。

3.1. Kafka がメッセージブローカーとしてどのように機能するか

AMQ Streams の使用経験を最大化するには、Kafka がメッセージブローカーとしてどのように動作するかを理解する必要があります。

Kafka クラスターは、複数のブローカーで構成されます。ブローカーには、データの受信およびストアに関するトピックが含まれます。トピックはパーティションに分割され、そこにデータが書き込まれます。パーティションは、耐障害性を確保するため、複数のトピックでレプリケーションされます。

Kafka ブローカーおよびトピック

Kafka brokers and topics inside a Kafka cluster showing the partition leader of each topic

ブローカー
サーバーまたはノードと呼ばれるブローカーは、ストレージとメッセージの受け渡しをオーケストレーションします。
トピック
トピックは、データの保存先を提供します。各トピックは、複数のパーティションに分割されます。
クラスター
Broker インスタンスのグループ
パーティション
トピックパーティションの数は、トピックの PartitionCount (パーティション数) で定義されます。
パーティションリーダー
パーティションリーダーは、トピックの全プロデューサー要求を処理します。
パーティションフォロワー

パーティションフォロワーは、パーティションリーダーのパーティションデータをレプリケーションし、オプションでコンシューマー要求を処理します。

トピックでは、ReplicationFactor (レプリケーション係数) を使用して、クラスター内のパーティションごとのレプリカ数を設定します。トピックは、最低でもパーティション 1 つで構成されます。

in-sync レプリカ (ISR) はリーダーと同数のメッセージを保持しています。設定では、メッセージを生成できるように 同期する 必要のあるレプリカ数を定義し、メッセージがレプリカパーティションに正常にコピーされない限りに、コミットされないようにします。こうすることで、リーダーに障害が発生しても、メッセージは失われません。

Kafka ブローカーおよびトピック 図のレプリケーションされたトピック内で、各番号付きのパーティションにはリーダー 1 つとフォロワーが 2 つあることが分かります。

3.2. プロデューサーおよびコンシューマー

プロデューサーおよびコンシューマーは、ブローカー経由でメッセージ (パブリッシュしてサブスクライブ) を送受信します。メッセージは、キー (オプション) と、メッセージデータ、ヘッダー、および関連するメタデータが含まれる で構成されます。キーは、メッセージの件名またはメッセージのプロパティーの特定に使用されます。メッセージはバッチで配信されます。バッチやレコードには、レコードのタイムスタンプやオフセットの位置など、クライアントクライアントのフィルタリングやルーティングに役立つ情報を提供するヘッダーとメタデータが含まれます。

プロデューサーおよびコンシューマー

A producer sends messages through a broker to a topic containing three partitions. Three consumers in a consumer group read the messages from the partitions

プロデューサー
プロデューサーは、メッセージをブローカートピックに送信し、パーティションの終端オフセットに書き込みます。メッセージは、ラウンドロビンベースでプロデューサーにより複数のパーティションに書き込まれるか、メッセージキーベースで特定のパーティションに書き込まれます。
コンシューマー
コンシューマーはトピックをサブスクライブし、トピック、パーティション、およびオフセットをもとにメッセージを読み取ります。
コンシューマーグループ
コンシューマーグループは、特定のトピックから複数のプロデューサーによって生成される、典型的に大量のデータストリームを共有するのに使用します。コンシューマーは group.id でグループ化され、メッセージをメンバー全体に分散できます。グループ内のコンシューマーは、同じパーティションからのデータは読み取りませんが、1 つ以上のパーティションからデータを受信できます。
オフセット

オフセットは、パーティション内のメッセージの位置を表します。特定のパーティションの各メッセージには一意のオフセットがあり、パーティション内のコンシューマーの位置を特定して、消費したレコード数を追跡するのに役立ちます。

コミットされたオフセットは、オフセットコミットログに書き込まれます。_consumer_offsets トピックには、コンシューマーグループをもとに、コミットされたオフセット、最後のオフセットと次のオフセットの位置に関する情報が保存されます。

データの生成および使用

A producer sends a message to a broker topic; the message is written to the end offset (7). A consumer reads messages from offset 5

第4章 Kafka Connect について

Kafka Connect は、Kafka ブローカーと他のシステムの間でデータをストリーミングする統合ツールです。もう 1 つのシステムは通常、データベースなどの外部データソースまたはターゲットです。

Kafka Connect はプラグインアーキテクチャーを使用しています。プラグインは他のシステムへの接続を可能にし、データを操作するための追加の構成を提供します。プラグインには、コネクター や、データコンバータや変換などの他のコンポーネントが含まれます。コネクターは、特定のタイプの外部システムで動作します。各コネクターは、その設定のスキーマを定義します。設定を Kafka Connect に提供して、Kafka Connect 内に コネクターインスタンス を作成します。次に、コネクターインスタンスは、システム間でデータを移動するための一連のタスクを定義します。

AMQ Streams は、分散モード で Kafka Connect を操作し、データストリーミングタスクを 1 つ以上のワーカー Pod に分散します。Kafka Connect クラスターは、ワーカー Pod のグループで構成されます。各コネクターは、1 人のワーカーでインスタンス化されます。各コネクターは、ワーカーのグループ全体に分散される 1 つ以上のタスクで構成されます。ワーカー間での分散により、拡張性の高いパイプラインが可能になります。

ワーカーは、データをある形式からソースシステムまたはターゲットシステムに適した別の形式に変換します。コネクターインスタンスの構成によっては、ワーカーが変換 (単一メッセージ変換 (SMT) とも呼ばれます) を適用する場合もあります。変換は、変換される前に、特定のデータのフィルターリングなどのメッセージを調整します。Kafka Connect にはいくつかの組み込みの変換がありますが、必要に応じてプラグインによって他の変換を提供できます。

4.1. Kafka Connect がデータをストリーミングする方法

Kafka Connect は、コネクターインスタンスを使用して他のシステムと統合し、データをストリーミングします。

Kafka Connect は、起動時に既存のコネクターインスタンスをロードし、データストリーミングタスクとコネクター構成をワーカー Pod 全体に分散します。ワーカーは、コネクターインスタンスのタスクを実行します。各ワーカーは個別の Pod として実行され、Kafka Connect クラスターのフォールトトレラント性を高めます。ワーカーよりも多くのタスクがある場合は、ワーカーには複数のタスクが割り当てられます。ワーカーに障害が発生した場合、そのタスクは Kafka Connect クラスター内のアクティブなワーカーに自動的に割り当てられます。

ストリーミングデータで使用される主な Kafka Connect コンポーネントは次のとおりです。

  • タスクを作成するためのコネクター
  • データを移動するタスク
  • タスクを実行するワーカー
  • データを操作するための変換
  • データを変換するコンバーター

4.1.1. コネクター

コネクターは、次のいずれかのタイプにすることができます。

  • データを Kafka にプッシュするソースコネクター
  • Kafka からデータを抽出するシンクコネクター

プラグインは、Kafka Connect がコネクターインスタンスを実行するための実装を提供します。コネクターインスタンスは、Kafka との間でデータを転送するために必要なタスクを作成します。Kafka Connect ランタイムは、必要な作業をワーカー Pod 間で分割するタスクを調整します。

MirrorMaker 2.0 は、Kafka Connect フレームワークも使用します。この場合、外部データシステムは別の Kafka クラスターです。MirrorMaker 2.0 専用のコネクターは、ソースとターゲットの Kafka クラスター間のデータレプリケーションを管理します。

注記

MirrorMaker 2.0 コネクターに加えて、Kafka は例として 2 つの組み込みコネクターを提供します。

  • File Stream Source Connector は、ワーカーのファイルシステム上のファイルから Kafka にデータをストリーミングし、入力ファイルを読み取り、各行を特定の Kafka トピックに送信します。
  • FileStreamSinkConnector は、Kafka からワーカーのファイルシステムにデータをストリーミングし、Kafka トピックからメッセージを読み取り、出力ファイルにそれぞれの行を書き込みます。

次のソースコネクターの図は、外部データシステムからレコードをストリーミングするソースコネクターのプロセスフローを示しています。Kafka Connect クラスターは、ソースコネクターとシンクコネクターを同時に操作する場合があります。ワーカーはクラスター内で分散モードで実行されています。ワーカーは、複数のコネクターインスタンスに対して 1 つ以上のタスクを実行できます。

Kafka へのソースコネクターストリーミングデータ

Kafka Connect source connector worker interaction in distributed mode

  1. プラグインは、ソースコネクターの実装アーティファクトを提供します
  2. 1 人のワーカーがソースコネクターインスタンスを開始します
  3. ソースコネクターは、データをストリーミングするタスクを作成します
  4. タスクは並行して実行され、外部データシステムをポーリングしてレコードを返します
  5. トランスフォームは、レコードのフィルターリングや再ラベル付けなど、レコードを調整します
  6. コンバーターは、レコードを Kafka に適した形式に変換します
  7. ソースコネクターは、KafkaConnectors または Kafka Connect API を使用して管理されます

次のシンクコネクターの図は、Kafka から外部データシステムにデータをストリーミングするときのプロセスフローを示しています。

Kafka からのシンクコネクターストリーミングデータ

Kafka Connect sink connector worker interaction in distributed mode

  1. プラグインは、シンクコネクターの実装アーティファクトを提供します
  2. 1 人のワーカーがシンクコネクターインスタンスを開始します
  3. シンクコネクターは、データをストリーミングするタスクを作成します
  4. タスクは並行して実行され、Kafka をポーリングしてレコードを返します
  5. Converter は、レコードを外部データシステムに適した形式に変換します
  6. トランスフォームは、レコードのフィルターリングや再ラベル付けなど、レコードを調整します
  7. シンクコネクターは、KafkaConnectors または Kafka Connect API を使用して管理されます

4.1.2. タスク

Kafka Connect ランタイムによって調整されたデータ転送は、並行して実行されるタスクに分割されます。タスクは、コネクターインスタンスによって提供される構成を使用して開始されます。Kafka Connect は、タスク構成をワーカーに配布します。ワーカーは、タスクをインスタンス化して実行します。

  • ソースコネクタータスクは外部データシステムをポーリングし、ワーカーが Kafka ブローカーに送信するレコードのリストを返します。
  • シンクコネクタータスクは、外部データシステムに書き込むためにワーカーから Kafka レコードを受信します。

シンクコネクターの場合、作成されるタスクの数は、消費されるパーティションの数に関連します。ソースコネクターの場合、ソースデータの分割方法はコネクターによって定義されます。コネクター設定で tasksMax を設定することにより、並行して実行できるタスクの最大数を制御できます。コネクターは、最大設定よりも少ないタスクを作成する可能性があります。たとえば、ソースデータをその数のパーティションに分割できない場合、コネクターは作成するタスクが少なくなる可能性があります。

注記

Kafka Connect のコンテキストでは、partition は、外部システムのトピックパーティションまたは shard of data を意味する場合があります。

4.1.3. ワーカー

ワーカーは、Kafka Connect クラスターにデプロイされたコネクター設定を採用します。構成は、Kafka Connect によって使用される内部 Kafka トピックに保存されます。ワーカーは、コネクターとそのタスクも実行します。

Kafka Connect クラスターには、同じ group.id を持つワーカーのグループが含まれています。ID は、Kafka 内のクラスターを識別します。ID は、KafkaConnect リソースを介してワーカー設定で割り当てられます。ワーカー設定では、Kafka Connect の内部トピックの名前も指定されます。トピックには、コネクター設定、オフセット、およびステータス情報が格納されます。これらのトピックのグループ ID と名前も、Kafka Connect クラスターに固有である必要があります。

ワーカーには、1 つ以上のコネクターインスタンスとタスクが割り当てられます。Kafka Connect を展開するための分散型アプローチは、フォールトトレラントでスケーラブルです。ワーカー Pod に障害が発生した場合、実行中のタスクはアクティブなワーカーに再割り当てされます。Kafka Connect リソースの replicas プロパティーを設定することで、ワーカー Pod のグループに追加できます。

4.1.4. 変換

Kafka Connect は、外部データを変換します。シングルメッセージは、変更メッセージをターゲットの宛先に適した形式に変換します。たとえば、変換によってフィールドが挿入または名前変更される場合があります。変換では、データをフィルターリングしてルーティングすることもできます。プラグインには、ワーカーが 1 つ以上の変換を実行するために必要な実装が含まれています。

  • ソースコネクターは、データを Kafka でサポートされている形式に変換する前に変換を適用します。
  • シンクコネクターは、データを外部データシステムに適した形式に変換した後に変換を適用します。

トランスフォームは、コネクタープラグインに含めるために JAR ファイルにパッケージ化された Java クラスファイルのセットで構成されます。Kafka Connect は一連の標準変換を提供しますが、独自の変換を作成することもできます。

4.1.5. コンバーター

ワーカーはデータを受信すると、コンバーターを使用してデータを適切な形式に変換します。KafkaConnect リソースのワーカー config でワーカーのコンバーターを指定します。

Kafka Connect は、JSON や Avro などの Kafka でサポートされている形式との間でデータを変換できます。また、データを構造化するためのスキーマもサポートしています。データを構造化形式に変換しない場合は、スキーマを有効にする必要はありません。

注記

特定のコネクターのコンバータを指定して、すべてのワーカーに適用される一般的な Kafka Connect ワーカー設定をオーバーライドすることもできます。

第5章 Kafka Bridge インターフェース

Kafka Bridge では、HTTP ベースのクライアントと Kafka クラスターとの対話を可能にする RESTful インターフェースが提供されます。  また、クライアントアプリケーションによる Kafka プロトコルの変換は必要なく、Web API コネクションの利点が AMQ Streams に提供されます。

API には consumerstopics の 2 つの主なリソースがあります。これらのリソースは、Kafka クラスターでコンシューマーおよびプロデューサーと対話するためにエンドポイント経由で公開され、アクセスが可能になります。リソースと関係があるのは Kafka ブリッジのみで、Kafka に直接接続されたコンシューマーやプロデューサーとは関係はありません。

5.1. HTTP リクエスト

Kafka Bridge は、以下の方法で Kafka クラスターへの HTTP リクエストをサポートします。

  • トピックにメッセージを送信する。
  • トピックからメッセージを取得する。
  • トピックのパーティションリストを取得する。
  • コンシューマーを作成および削除する。
  • コンシューマーをトピックにサブスクライブし、このようなトピックからメッセージを受信できるようにする。
  • コンシューマーがサブスクライブしているトピックの一覧を取得する。
  • トピックからコンシューマーのサブスクライブを解除する。
  • パーティションをコンシューマーに割り当てる。
  • コンシューマーオフセットの一覧をコミットする。
  • パーティションで検索して、コンシューマーが最初または最後のオフセットの位置、または指定のオフセットの位置からメッセージを受信できるようにする。

上記の方法で、JSON 応答と HTTP 応答コードのエラー処理を行います。メッセージは JSON またはバイナリー形式で送信できます。

クライアントは、ネイティブの Kafka プロトコルを使用する必要なくメッセージを生成して使用できます。

関連情報

5.2. Kafka Bridge でサポートされるクライアント

Kafka Bridge を使用して、内部および外部の HTTP クライアントアプリケーションの両方を Kafka クラスターに統合できます。

内部クライアント
内部クライアントとは、Kafka Bridge 自体と同じ OpenShift クラスターで実行されるコンテナーベースの HTTP クライアントのことです。内部クライアントは、ホストの Kafka Bridge および KafkaBridge のカスタムリソースで定義されたポートにアクセスできます。
外部クライアント
外部クライアントとは、Kafka Bridge がデプロイおよび実行される OpenShift クラスター外部で実行される HTTP クライアントのことです。外部クライアントは、OpenShift Route、ロードバランサーサービス、または Ingress を使用して Kafka Bridge にアクセスできます。

HTTP 内部および外部クライアントの統合

Internal and external HTTP producers and consumers exchange data with the Kafka brokers through the Kafka Bridge

第6章 AMQ Streams の Operator

AMQ Streams では Operator を使用して Kafka をサポートし、Kafka のコンポーネントおよび依存関係を OpenShift にデプロイして管理します。

Operator は、OpenShift アプリケーションのパッケージ化、デプロイメント、および管理を行う方法です。AMQ Streams Operator は OpenShift の機能を拡張し、Kafka デプロイメントに関連する共通タスクや複雑なタスクを自動化します。Kafka 操作の情報をコードに実装することで、Kafka の管理タスクは簡素化され、必要な手動の作業が少なくなります。

Operator

AMQ Streams は、OpenShift クラスター内で実行中の Kafka クラスターを管理するための Operator を提供します。

Cluster Operator
Apache Kafka クラスター、Kafka Connect、Kafka MirrorMaker、Kafka Bridge、Kafka Exporter、Cruise Control、および Entity Operator をデプロイおよび管理します。
Entitiy Operator
Topic Operator および User Operator を構成します。
Topic Operator
Kafka トピックを管理します。
User Operator
Kafka ユーザーを管理します。

Cluster Operator は、Kafka クラスターと同時に、Topic Operator および User Operator を Entity Operator 設定の一部としてデプロイできます。

AMQ Streams アーキテクチャー内の Operator

Operators within the AMQ Streams architecture

6.1. Cluster Operator

AMQ Streams では、Cluster Operator を使用して以下のクラスターをデプロイおよび管理します。

  • Kafka (ZooKeeper、Entity Operator、Kafka Exporter、Cruise Control を含む)
  • Kafka Connect
  • Kafka MirrorMaker
  • Kafka Bridge

クラスターのデプロイメントにはカスタムリソースが使用されます。

たとえば、以下のように Kafka クラスターをデプロイします。

  • クラスター設定のある Kafka リソースが OpenShift クラスター内で作成されます。
  • Kafka リソースに宣言された内容を基にして、該当する Kafka クラスターが Cluster Operator によってデプロイされます。

Cluster Operator で以下もデプロイできます (Kafka リソースの設定より)。

  • KafkaTopic カスタムリソースより Operator スタイルのトピック管理を提供する Topic Operator
  • KafkaUser カスタムリソースより Operator スタイルのユーザー管理を提供する User Operator

デプロイメントの Entity Operator 内の Topic Operator および User Operator 関数。

AMQ Streams Drain Cleaner のデプロイメントで Cluster Operator を使用すると、Pod のエビクションに役立ちます。AMQ Streams Drain Cleaner をデプロイすることで、Cluster Operator を使用して OpenShift ではなく Kafka Pod を移動できます。AMQ Streams Drain Cleaner は、エビクトされる Pod に、ローリングアップデートのアノテーションを付けます。このアノテーションは、Cluster Operator にローリングアップデートを実行するように通知します。

Cluster Operator のアーキテクチャー例

The Cluster Operator creates and deploys Kafka and ZooKeeper clusters

6.2. Topic Operator

Topic Operator は、OpenShift リソースより Kafka クラスターのトピックを管理する方法を提供します。

Topic Operator のアーキテクチャー例

The Topic Operator manages topics for a Kafka cluster via KafkaTopic resources

Topic Operator の役割は、対応する Kafka トピックと同期して Kafka トピックを記述する KafkaTopic OpenShift リソースのセットを保持することです。

KafkaTopic とトピックの関係は次のとおりです。

  • KafkaTopic が作成されると、Topic Operator によってトピックが作成されます。
  • 削除されると、Topic Operator によってトピックが削除されます。
  • が変更されると、Topick Operator によってトピックが更新されます。

上記と逆になるトピックと KafkaTopic の関係は次のとおりです。

  • トピックが Kafka クラスター内で作成されると、Operator によって KafkaTopic が作成されます。
  • トピックが Kafka クラスターから削除されると、Operator によって KafkaTopic が削除されます。
  • トピックが Kafka クラスターで変更されると、Operator によって KafkaTopic が更新されます。

このため、KafkaTopic をアプリケーションのデプロイメントの一部として宣言でき、トピックの作成は Topic Operator によって行われます。アプリケーションは、必要なトピックからの作成または消費のみに対処する必要があります。

Topic Operator は、各トピックの情報を トピックストア で維持します。トピックストアは、Kafka トピックまたは OpenShift KafkaTopic カスタムリソースからの更新と継続的に同期されます。ローカルのインメモリートピックストアに適用される操作からの更新は、ディスク上のバックアップトピックストアに永続化されます。トピックが再設定されたり、別のブローカーに再割り当てされた場合、KafkaTopic は常に最新の状態になります。

6.3. User Operator

User Operator は、Kafka ユーザーが記述される KafkaUser リソースを監視して Kafka クラスターの Kafka ユーザーを管理し、Kafka ユーザーが Kafka クラスターで適切に設定されるようにします。

たとえば、KafkaUser とユーザーの関係は次のようになります。

  • KafkaUser が作成されると、User Operator によって記述されるユーザーが作成されます。
  • 削除されると、User Operator によって記述されるユーザーが削除されます。
  • 変更されると、User Operator によって記述されるユーザーが更新されます。

User Operator は Topic Operator とは異なり、Kafka クラスターからの変更は OpenShift リソースと同期されません。アプリケーションで直接 Kafka トピックを Kafka で作成することは可能ですが、ユーザーが User Operator と同時に直接 Kafka クラスターで管理されることは想定されません。

User Operator では、アプリケーションのデプロイメントの一部として KafkaUser リソースを宣言できます。ユーザーの認証および承認メカニズムを指定できます。たとえば、ユーザーがブローカーへのアクセスを独占しないようにするため、Kafka リソースの使用を制御する ユーザークォータ を設定することもできます。

ユーザーが作成されると、ユーザークレデンシャルが Secret に作成されます。アプリケーションはユーザーとそのクレデンシャルを使用して、認証やメッセージの生成または消費を行う必要があります。

User Operator は 認証のクレデンシャルを管理する他に、KafkaUser 宣言にユーザーのアクセス権限の記述を含めることで承認も管理します。

6.4. AMQ Streams operator のフィーチャーゲート

フィーチャーゲートを使用して、operator の一部の機能を有効または無効にすることができます。

フィーチャーゲートは Operator の設定で指定され、alpha、beta、または General Availability (GA) の 3 段階の成熟度があります。

詳細は「Feature gates」を参照してください。

第7章 Kafka の設定

AMQ Streams を使用した Kafka コンポーネントの OpenShift クラスターへのデプロイメントは、カスタムリソースの適用により高度な設定が可能です。カスタムリソースは、OpenShift リソースを拡張するために CRD (カスタムリソース定義、Custom Resource Definition) によって追加される API のインスタンスとして作成されます。

CRD は、OpenShift クラスターでカスタムリソースを記述するための設定手順として機能し、デプロイメントで使用する Kafka コンポーネントごとに AMQ Streams で提供されます。CRD およびカスタムリソースは YAML ファイルとして定義されます。YAML ファイルのサンプルは AMQ Streams ディストリビューションに同梱されています。

また、CRD を使用すると、CLI へのアクセスや設定検証などのネイティブ OpenShift 機能を AMQ Streams リソースで活用することもできます。

本項では、カスタムリソースを使用して Kafka のコンポーネントを設定する方法を見ていきます。まず、一般的な設定のポイント、次にコンポーネント固有の重要な設定に関する考慮事項について説明します。

AMQ Streams は、デプロイメント用の独自の Kafka コンポーネント設定を構築する際の開始点として役立つ 構成ファイルの例 を提供します。

7.1. カスタムリソース

CRD をインストールして新規カスタムリソースタイプをクラスターに追加した後に、その仕様に基づいてリソースのインスタンスを作成できます。

AMQ Streams コンポーネントのカスタムリソースには、specで定義される共通の設定プロパティーがあります。

Kafka トピックカスタムリソースからのこの抜粋では、apiVersion および kind プロパティーを使用して、関連付けられた CRD を識別します。Spec プロパティーは、トピックのパーティションおよびレプリカの数を定義する設定を示しています。

Kafka トピックカスタムリソース
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1
  # ...

共通の設定、特定のコンポーネントに特有の設定など、他にも YAML 定義に組み込むことができる設定オプションが多数あります。

7.2. 共通の設定

複数のリソースに共通する設定オプションの一部が以下に記載されています。セキュリティー および メトリクスコレクション も採用できます (該当する場合)。

ブートストラップサーバー

ブートストラップサーバーは、以下の Kafka クラスターに対するホスト/ポート接続に使用されます。

  • Kafka Connect
  • Kafka Bridge
  • Kafka MirrorMaker プロデューサーおよびコンシューマー
CPU およびメモリーリソース

コンポーネントの CPU およびメモリーリソースを要求します。制限は、特定のコンテナーが消費できる最大リソースを指定します。

Topic Operator および User Operator のリソース要求および制限は Kafka リソースに設定されます。

ロギング
コンポーネントのロギングレベルを定義します。ロギングは直接 (インライン) または外部で Config Map を使用して定義できます。
ヘルスチェック
ヘルスチェックの設定では、liveness および readiness プローブが導入され、コンテナーを再起動するタイミング (liveliness) と、コンテナーがトラフィック (readiness) を受け入れるタイミングが分かります。
JVM オプション
JVM オプションでは、メモリー割り当ての最大と最小を指定し、実行するプラットフォームに応じてコンポーネントのパフォーマンスを最適化します。
Pod のスケジューリング
Pod スケジュールは アフィニティー/非アフィニティールール を使用して、どのような状況で Pod がノードにスケジューリングされるかを決定します。
共通設定の YAML 例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-cluster
spec:
  # ...
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  resources:
    requests:
      cpu: 12
      memory: 64Gi
    limits:
      cpu: 12
      memory: 64Gi
  logging:
    type: inline
    loggers:
      connect.root.logger.level: "INFO"
  readinessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  jvmOptions:
    "-Xmx": "2g"
    "-Xms": "2g"
  template:
    pod:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: node-type
                    operator: In
                    values:
                      - fast-network
  # ...

7.3. Kafka クラスターの設定

Kafka クラスターは、1 つまたは複数のブローカーで構成されます。プロデューサーおよびコンシューマーがブローカー内のトピックにアクセスできるようにするには、Kafka 設定でクラスターへのデータの保存方法、およびデータへのアクセス方法を定義する必要があります。ラック 全体で複数のブローカーノードを使用して Kafka クラスターを実行するように設定できます。

ストレージ

Kafka および ZooKeeper は、ディスクにデータを格納します。

AMQ Streams は、StorageClass でプロビジョニングされるブロックストレージが必要です。ストレージ用のファイルシステム形式は XFS または EXT4 である必要があります。3 種類のデータストレージがサポートされます。

一時データストレージ (開発用のみで推奨されます)
一時ストレージは、インスタンスの有効期間についてのデータを格納します。インスタンスを再起動すると、データは失われます。
永続ストレージ
永続ストレージは、インスタンスのライフサイクルとは関係なく長期のデータストレージに関連付けられます。
JBOD (Just a Bunch of Disks、Kafka のみに適しています)
JBOD では、複数のディスクを使用して各ブローカーにコミットログを保存できます。

既存の Kafka クラスターが使用するディスク容量は、増やすことができます (インフラストラクチャーでサポートされる場合)。

リスナー

リスナーは、クライアントが Kafka クラスターに接続する方法を設定します。

Kafka クラスター内の各リスナーに一意の名前とポートを指定することで、複数のリスナーを設定できます。

以下のタイプのリスナーがサポートされます。

  • OpenShift 内でのアクセスに使用する 内部リスナー
  • OpenShift 外からアクセスするときに使用する 外部リスナー

リスナーの TLS 暗号化を有効にし、認証 を設定できます。

内部リスナーは、内部 タイプを使用して指定されます。

外部リスナーは、外部用 type を指定して Kafka を公開します。

  • OpenShift ルートおよびデフォルトの HAProxy ルーターを使用する route
  • ロードバランサーサービスを使用する loadbalancer
  • OpenShift ノードのポートを使用する nodeport
  • OpenShift IngressNGINX Ingress Controller for Kubernetes を使用する ingress

トークンベースの認証に OAuth 2.0 を使用している場合は、リスナーが承認サーバーを使用するように設定できます。

ラックアウェアネス
ラックアウェアネス (rack awareness) は、Kafka ブローカーの Pod とトピックレプリカを racks 全体に分散する設定機能です。ラックとは、データセンターまたは、データセンター内のラック、アベイラビリティーゾーンを表します。
Kafka 設定の YAML 例
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external1
        port: 9094
        type: route
        tls: true
        authentication:
          type: tls
    # ...
    storage:
      type: persistent-claim
      size: 10000Gi
    # ...
    rack:
      topologyKey: topology.kubernetes.io/zone
    # ...

7.4. Kafka MirrorMaker の設定

MirrorMaker を設定するには、ソースおよびターゲット (宛先) の Kafka クラスターが実行中である必要があります。

従来のバージョンの MirrorMaker のサポートも継続されますが、AMQ Streams で MirrorMaker 2.0 を使用することもできます。

MirrorMaker 2.0

MirrorMaker 2.0 は Kafka Connect フレームワークをベースとし、コネクターによってクラスター間のデータ転送が管理されます。

MirrorMaker 2.0 は以下を使用します。

  • ソースクラスターからデータを消費するソースクラスターの設定。
  • データをターゲットクラスターに出力するターゲットクラスターの設定。
クラスターの設定

active/passive または active/active クラスター設定で MirrorMaker 2.0 を使用できます。

  • active/active 設定では、両方のクラスターがアクティブで、同じデータを同時に提供します。これは、地理的に異なる場所で同じデータをローカルで利用可能にする場合に便利です。
  • active/passive 設定では、アクティブなクラスターからのデータはパッシブなクラスターで複製され、たとえば、システム障害時のデータ復旧などでスタンバイ状態を維持します。

KafkaMirrorMaker2 カスタムリソースを設定し、ソースおよびターゲットクラスターの接続詳細を含む Kafka Connect デプロイメントを定義します。次に、複数の MirrorMaker 2.0 コネクターを実行し、接続を確立します。

トピックの設定は、KafkaMirrorMaker2 カスタムリソースに定義されたトピックに従って、ソースクラスターとターゲットクラスターの間で自動的に同期化されます。設定の変更はリモートトピックに伝播されるため、新しいトピックおよびパーティションは削除および作成されます。トピックのレプリケーションは、トピックを許可または拒否するために、正規表現パターンを使用して定義されます。

以下の MirrorMaker 2.0 コネクターおよび関連する内部トピックは、クラスター間でのデータの転送および同期を管理するのに役立ちます。

MirrorSourceConnector
MirrorSourceConnector は、ソースクラスターからリモートトピックを作成します。
MirrorCheckpointConnector
MirrorCheckpointConnector は、オフセット同期 (offset sync) トピックと チェックポイント (checkpoint) トピックを使用して、指定のコンシューマーグループのオフセットを追跡し、マッピングします。オフセット同期トピックは、複製されたトピックパーティションのソースおよびターゲットオフセットをレコードメタデータからマッピングします。チェックポイントは、各ソースクラスターから生成され、チェックポイントトピックを介してターゲットクラスターでレプリケートされます。チェックポイントトピックは、各コンシューマーグループのレプリケートされたトピックパーティションのソースおよびターゲットクラスターで最後にコミットされたオフセットをマッピングします。
MirrorHeartbeatConnector
MirrorHeartbeatConnector は、クラスター間の接続を定期的に確認します。ハートビートは、ローカルクラスターで作成される ハートビート (heartbeat) トピックで、MirrorHeartbeatConnector によって毎秒作成されます。MirrorMaker 2.0 がリモートとローカルの両方にある場合、リモートで MirrorHeartbeatConnector によって生成されるハートビートはリモートトピックと同様に処理され、ローカルクラスターで MirrorSourceConnector によってミラーリングされます。ハートビートトピックによって、リモートクラスターが利用可能で、クラスターが接続されていることを簡単にチェックできます。障害が発生した場合、ハートビートトピックのオフセットポジションとタイムスタンプは復旧と診断に役立ちます。

図7.1 2 つのクラスターにおけるレプリケーション

Region 1 の Kafka クラスターと Region 2 の Kafka クラスター間の MirrorMaker2.0 レプリケーション
2 つのクラスターにおける双方向レプリケーション

MirrorMaker 2.0 アーキテクチャーは、アクティブ/アクティブ クラスター設定で双方向レプリケーションをサポートするため、両方のクラスターがアクティブになり、同じデータを同時に提供します。MirrorMaker 2.0 クラスターは、ターゲット宛先ごとに必要です。

リモートトピックは、クラスター名をトピック名に追加する、自動名前変更によって区別されます。これは、同じデータを地理的に異なる場所でローカルで使用できるようにする場合に便利です。

ただし、active/passive クラスター設定でデータをバックアップまたは移行する場合は、トピックの元の名前を維持することが望ましい場合があります。その場合は、自動名前変更を無効にするように MirrorMaker 2.0 を設定できます。

図7.2 双方向レプリケーション

MirrorMaker 2.0 双方向アーキテクチャー
MirrorMaker 2.0 設定の YAML の例
  apiVersion: kafka.strimzi.io/v1beta2
  kind: KafkaMirrorMaker2
  metadata:
    name: my-mirror-maker2
    spec:
      version: 3.1.0
      connectCluster: "my-cluster-target"
      clusters:
      - alias: "my-cluster-source"
        bootstrapServers: my-cluster-source-kafka-bootstrap:9092
      - alias: "my-cluster-target"
        bootstrapServers: my-cluster-target-kafka-bootstrap:9092
      mirrors:
      - sourceCluster: "my-cluster-source"
        targetCluster: "my-cluster-target"
        sourceConnector: {}
      topicsPattern: ".*"
      groupsPattern: "group1|group2|group3"
MirrorMaker

従来のバージョンの MirrorMaker では、プロデューサーとコンシューマーを使用して、クラスターにまたがってデータをレプリケートします。

MirrorMaker は以下を使用します。

  • ソースクラスターからデータを使用するコンシューマーの設定。
  • データをターゲットクラスターに出力するプロデューサーの設定。

コンシューマーおよびプロデューサー設定には、認証および暗号化設定が含まれます。

include フィールドは、ソースからターゲットクラスターにミラーリングするトピックを定義します。

主なコンシューマー設定
コンシューマーグループ ID
使用するメッセージがコンシューマーグループに割り当てられるようにするための MirrorMaker コンシューマーのコンシューマーグループ ID。
コンシューマーストリームの数
メッセージを並行して使用するコンシューマーグループ内のコンシューマー数を決定する値。
オフセットコミットの間隔
メッセージの使用とメッセージのコミットの期間を設定するオフセットコミットの間隔。
キープロデューサーの設定
送信失敗のキャンセルオプション
メッセージ送信の失敗を無視するか、または MirrorMaker を終了して再作成するかを定義できます。
MirrorMaker 設定の YAML 例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker
metadata:
  name: my-mirror-maker
spec:
  # ...
  consumer:
    bootstrapServers: my-source-cluster-kafka-bootstrap:9092
    groupId: "my-group"
    numStreams: 2
    offsetCommitInterval: 120000
    # ...
  producer:
    # ...
    abortOnSendFailure: false
    # ...
  include: "my-topic|other-topic"
  # ...

7.5. Kafka Connect の設定

AMQ Streams の KafkaConnect リソースを使用して、新しい Kafka Connect クラスターをすばやく簡単に作成します。

KafkaConnect リソースを使用して Kafka Connect をデプロイする場合は、Kafka クラスターに接続するためのブートストラップサーバーアドレス (spec.bootstrapServers で) を指定します。サーバーがダウンした場合に備えて、複数のアドレスを指定できます。また、認証情報と TLS 暗号化証明書を指定して、安全な接続を確立します。

注記

Kafka クラスターは、AMQ Streams で管理したり、OpenShift クラスターにデプロイしたりする必要はありません。

KafkaConnect リソースを使用して、以下を指定することもできます。

  • 接続を確立するためのプラグインを含むコンテナーイメージを構築するためのプラグイン構成
  • Kafka Connect クラスターに属するワーカー Pod の構成
  • KafkaConnector リソースを使用してプラグインを管理できるようにするアノテーション

Cluster Operator は、KafkaConnect リソースを使用してデプロイされた Kafka Connect クラスターと、KafkaConnector リソースを使用して作成されたコネクターを管理します。

プラグイン設定

プラグインは、コネクターインスタンスを作成するための実装を提供します。プラグインがインスタンス化されると、特定のタイプの外部データシステムに接続するための構成が提供されます。プラグインは、特定の種類のデータソースに接続するためのコネクターとタスクの実装を定義する 1 つ以上の JAR ファイルのセットを提供します。多くの外部システム用のプラグインは、Kafka Connect で使用できます。独自のプラグインを作成することもできます。

この設定では、Kafka Connect にフィードするソース入力データおよびターゲット出力データを記述します。ソースコネクターの場合、外部ソースデータは、メッセージを格納する特定のトピックを参照する必要があります。プラグインには、データの変換に必要なライブラリーとファイルが含まれている場合もあります。

Kafka Connect デプロイメントには、1 つ以上のプラグインを含めることができますが、各プラグインのバージョンは 1 つだけです。

選択したプラグインを含むカスタム Kafka Connect イメージを作成できます。イメージは次の 2 つの方法で作成できます。

コンテナーイメージを自動的に作成するには、KafkaConnect リソースの build プロパティーを使用して、Kafka Connect クラスターに追加するプラグインを指定します。AMQ Streams は、プラグインアーティファクトを自動的にダウンロードして新しいコンテナーイメージに追加します。

プラグイン設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  build: 1
    output: 2
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: 3
      - name: debezium-postgres-connector
        artifacts:
          - type: tgz
            url: https://ARTIFACT-ADDRESS.tgz
            sha512sum: HASH-NUMBER-TO-VERIFY-ARTIFACT
      # ...
  # ...

1
コネクタープラグインで自動的にコンテナーイメージをビルドするための ビルド設定プロパティー
2
新しいイメージがプッシュされるコンテナーレジストリーの設定。output プロパティーは、イメージのタイプおよび名前を記述し、任意でコンテナーレジストリーへのアクセスに必要なクレデンシャルが含まれる Secret の名前を記述します。
3
新しいコンテナーイメージに追加するプラグインとそのアーティファクトのリスト。plugins プロパティーは、アーティファクトのタイプとアーティファクトのダウンロード元となる URL を記述します。各プラグインは、1 つ以上のアーティファクトで設定する必要があります。さらに、SHA-512 チェックサムを指定して、アーティファクトを展開する前に検証することもできます。

Dockerfile を使用してイメージをビルドしている場合は、AMQ Streams の最新のコンテナーイメージをベースイメージとして使用して、プラグイン設定ファイルを追加できます。

プラグイン設定の手動追加を示す例

FROM registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

ワーカー用の Kafka Connect クラスター設定

ワーカーの構成は、KafkaConnect リソースの config プロパティーで指定します。

分散型 Kafka Connect クラスターには、グループ ID と一連の内部設定トピックがあります。

  • group.id
  • offset.storage.topic
  • config.storage.topic
  • status.storage.topic

Kafka Connect クラスターは、デフォルトでこれらのプロパティーに同じ値で構成されています。Kafka Connect クラスターは、エラーを作成するため、グループ ID またはトピック名を共有できません。複数の異なる Kafka Connect クラスターを使用する場合、これらの設定は、作成された各 Kafka Connect クラスターのワーカーに対して一意である必要があります。

各 Kafka Connect クラスターで使用されるコネクターの名前も一意である必要があります。

次のワーカー設定の例では、JSON コンバーターが指定されています。レプリケーション係数は、Kafka Connect によって使用される内部 Kafka トピックに設定されます。これは、実稼働環境では少なくとも 3 である必要があります。トピックの作成後にレプリケーション係数を変更しても効果はありません。

ワーカー設定の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
# ...
spec:
  config:
    # ...
    group.id: my-connect-cluster 1
    offset.storage.topic: my-connect-cluster-offsets 2
    config.storage.topic: my-connect-cluster-configs 3
    status.storage.topic: my-connect-cluster-status 4
    key.converter: org.apache.kafka.connect.json.JsonConverter 5
    value.converter: org.apache.kafka.connect.json.JsonConverter 6
    key.converter.schemas.enable: true 7
    value.converter.schemas.enable: true 8
    config.storage.replication.factor: 3 9
    offset.storage.replication.factor: 3 10
    status.storage.replication.factor: 3 11
  # ...

1
Kafka 内の Kafka Connect クラスター ID。Kafka Connect クラスターごとに一意である必要があります。
2
コネクターオフセットを保存する Kafka トピック。Kafka Connect クラスターごとに一意である必要があります。
3
コネクターおよびタスクステータスの設定を保存する Kafka トピック。Kafka Connect クラスターごとに一意である必要があります。
4
コネクターおよびタスクステータスの更新を保存する Kafka トピック。Kafka Connect クラスターごとに一意である必要があります。
5
Kafka に保存するためにメッセージキーを JSON 形式に変換するコンバーター。
6
Kafka に保存するためにメッセージ値を JSON 形式に変換するコンバーター。
7
メッセージキーを構造化された JSON 形式に変換できるスキーマ。
8
メッセージ値を構造化された JSON 形式に変換できるスキーマ。
9
コネクターオフセットを保存する Kafka トピックのレプリケーション係数。
10
コネクターとタスクのステータス設定を保存する Kafka トピックのレプリケーション係数。
11
コネクターとタスクのステータスの更新を保存する Kafka トピックのレプリケーション係数。
コネクターの Kafka Connector 管理

デプロイメントでワーカー Pod に使用されるコンテナーイメージにプラグインが追加されたら、AMQ Streams のKafkaConnector カスタムリソースまたは Kafka Connect API を使用してコネクターインスタンスを管理できます。これらのオプションを使用して、新しいコネクターインスタンスを作成することもできます。

KafkaConnector リソースは、Cluster Operator によるコネクターの管理に OpenShift ネイティブのアプローチを提供します。KafkaConnector リソースを使用してコネクターを管理するには、KafkaConnect カスタムリソースで注釈を指定する必要があります。

KafkaConnectors を有効にするための注釈

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
  # ...

use-connector-resourcestrue に設定すると、KafkaConnectors はコネクターを作成、削除、および再設定できます。

KafkaConnect 設定で use-connector-resources が有効になっている場合は、KafkaConnector リソースを使用してコネクターを定義および管理する必要があります。KafkaConnector リソースは、外部システムに接続するように構成されています。これらは、外部データシステムと相互作用する Kafka Connect クラスターおよび Kafka クラスターと同じ OpenShift クラスターにデプロイされます。

Kafka コンポーネントは同じ OpenShift クラスターに含まれています

Kafka and Kafka Connect clusters

設定は、認証を含め、コネクターインスタンスが外部データシステムに接続する方法を指定します。また、監視するデータを指定する必要があります。ソースコネクターの場合は、設定でデータベース名を指定できます。ターゲットトピック名を指定することで、Kafka のどこにデータを配置するかを指定することもできます。

タスクの最大数を指定するには、tasksMax を使用します。たとえば、tasksMax:2 ソースデータのインポートを 2 つのタスクに分割する場合があります。

KafkaConnector ソースコネクター構成の例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector  1
  labels:
    strimzi.io/cluster: my-connect-cluster 2
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector 3
  tasksMax: 2 4
  config: 5
    file: "/opt/kafka/LICENSE" 6
    topic: my-topic 7
    # ...

1
コネクターの名前として使用される KafkaConnector リソースの名前。OpenShift リソースで有効な名前を使用します。
2
コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
3
コネクタークラスのフルネーム。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
4
コネクターが作成できる Kafka Connect タスクの最大数。
5
キーと値のペアとしてのコネクター設定
6
外部データファイルの場所。この例では、/opt/kafka/LICENSE ファイルから読み取るように FileStreamSourceConnector を設定しています。
7
ソースデータのパブリッシュ先となる Kafka トピック。
注記

OpenShift Secrets または ConfigMaps から コネクターの機密設定値をロード できます。

Kafka Connect API

KafkaConnector リソースを使用してコネクターを管理する代わりに、Kafka Connect REST API を使用します。Kafka Connect REST API は、<connect_cluster_name>-connect-api:8083 で実行しているサービスとして利用できます。ここで、<connect_cluster_name> は、お使いの Kafka Connect クラスターの名前になります。

コネクター設定を JSON オブジェクトとして追加します。

コネクター設定を追加するための curl 要求の例

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
    }
}'

KafkaConnectors が有効になっている場合、Kafka Connect REST API に直接手作業で追加された変更は Cluster Operator によって元に戻されます。

REST API でサポートされる操作は、Apache Kafka のドキュメント を参照してください。

注記

Kafka Connect API サービスを OpenShift の外部に公開できます。これを行うには、入力やルートなどのアクセスを提供する接続メカニズムを使用するサービスを作成します。接続が安全でないため、慎重に使用してください。

7.6. Kafka Bridge の設定

Kafka Bridge 設定には、接続先の Kafka クラスターのブートストラップサーバー仕様と、必須の暗号化および認証オプションが必要になります。

コンシューマーの Apache Kafka 設定ドキュメント およびプロデューサーの Apache Kafka 設定ドキュメント で説明されているように、Kafka Bridge コンシューマーおよびプロデューサー設定は標準です。

HTTP 関連の設定オプションでは、サーバーがリッスンするポート接続を設定します。

CORS

Kafka Bridge では、CORS (Cross-Origin Resource Sharing) の使用がサポートされます。CORS は、複数のオリジンから指定のリソースにブラウザーでアクセスできるようにする HTTP メカニズムです (たとえば、異なるドメイン上のリソースへのアクセス)。CORS を使用する場合、Kafka Bridge を通じた Kafka クラスターとの対話用に、許可されるリソースオリジンおよび HTTP メソッドのリストを定義できます。リストは、Kafka Bridge 設定の http 仕様で定義されます。

CORS では、異なるドメイン上のオリジンソース間での シンプルな リクエストおよび プリフライト リクエストが可能です。

  • シンプルなリクエストは、そのヘッダーで許可されるオリジンを定義する必要のある HTTP リクエストです。
  • プリフライトリクエストでは、オリジンとメソッドが許可されることを確認するために、実際のリクエストの前に初期 OPTIONS HTTP リクエストが送信されます。
Kafka ブリッジ設定の YAML 例
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: my-bridge
spec:
  # ...
  bootstrapServers: my-cluster-kafka:9092
  http:
    port: 8080
    cors:
      allowedOrigins: "https://strimzi.io"
      allowedMethods: "GET,POST,PUT,DELETE,OPTIONS,PATCH"
  consumer:
    config:
      auto.offset.reset: earliest
  producer:
    config:
      delivery.timeout.ms: 300000
  # ...

関連情報

第8章 Kafka のセキュリティー

AMQ Streams のセキュアなデプロイメントには、以下が含めることができます。

  • データ交換の暗号化
  • アイデンティティー証明に使用する認証
  • ユーザーが実行するアクションを許可または拒否する認可

8.1. 暗号化

AMQ Streams は、暗号化通信用のプロトコルである Transport Layer Security (TLS) をサポートします。

通信は、以下の間で常に暗号化されます。

  • Kafka ブローカー
  • ZooKeeper ノード
  • Operator および Kafka ブローカー
  • Operator および ZooKeeper ノード
  • Kafka Exporter

また、Kafka ブローカーのリスナーに TLS 暗号化を適用して、Kafka ブローカーとクライアント間で TLS を設定することもできます。外部リスナーの設定時に外部クライアントに TLS を指定します。

AMQ Streams コンポーネントおよび Kafka クライアントは、暗号化にデジタル署名を使用します。Cluster Operator は、証明書を設定し、Kafka クラスター内で暗号化を有効にします。Kafka クライアントと Kafka ブローカーの通信やクラスター間の通信に、Kafka リスナー証明書と呼ばれる独自のサーバー証明書を指定できます。

AMQ Streams は シークレット を使用して、TLS に必要な証明書および秘密鍵を PEM および PKCS #12 形式で保存します。

TLS 認証局 (CA) は、証明書を発行してコンポーネントのアイデンティティーを認証します。AMQ Streams は、CA 証明書に対してコンポーネントの証明書を検証します。

  • AMQ Streams コンポーネントは、クラスター CA 証明局 (CA) に対して検証されます。
  • Kafka クライアントは クライアント CA 証明局 (CA) に対して検証されます。

8.2. 認証

Kafka リスナーは認証を使用して、Kafka クラスターへのクライアント接続のセキュリティーを確保します。

サポート対象の認証メカニズム:

  • 相互 TLS クライアント認証 (TLS 暗号化が有効なリスナーの場合)
  • SASL SCRAM-SHA-512
  • OAuth 2.0 のトークンベースの認証
  • カスタム認証

User Operator では TLS および SCRAM 認証のユーザー認証情報は管理対象ですが、OAuth 2.0 は管理対象ではありません。たとえば、User Operator を使用して、Kafka クラスターにアクセスする必要があるクライアントに対応するユーザーを作成し、認証タイプとして TLS を指定できます。

OAuth 2.0 トークンベースの認証を使用すると、アプリケーションクライアントは、アカウント認証情報を公開せずに Kafka ブローカーにアクセスできます。承認サーバーは、アクセスの付与とアクセスに関する問い合わせを処理します。

カスタム認証では、kafka でサポートされているあらゆるタイプの認証が可能です。柔軟性を高めることができますが、複雑さも増します。

8.3. 承認

Kafka クラスターは、承認メカニズムを使用して特定のクライアントまたはユーザーによって Kafka ブローカーで許可される操作を制御します。承認は、Kafka クラスターに適用されると、クライアント接続に使用する全リスナーに対して有効になります。

ユーザーを Kafka ブローカー設定の スーパーユーザー のリストに追加すると、承認メカニズムにより適用される承認制約に関係なく、そのユーザーにはクラスターへのアクセスが無制限に許可されます。

サポート対象の承認メカニズム:

  • 簡易承認
  • OAuth 2.0 での承認 (OAuth 2.0 トークンベースの認証を使用している場合)
  • Open Policy Agent (OPA) での承認
  • カスタム承認

簡易承認では、デフォルトの Kafka 承認プラグインである AclAuthorizer が使用されます。AclAuthorizer は、アクセス制御リスト (ACL) を使用して、どのユーザーがどのリソースにアクセスできるかを定義します。カスタム認証の場合は、ACL ルールを適用するように独自の Authorizer プラグインを構成します。

OAuth 2.0 および OPA は、承認サーバーからのポリシーベースの制御を提供します。Kafka ブローカーのリソースへのアクセス権限を付与するのに使用されるセキュリティーポリシーおよびパーミッションは、承認サーバーで定義されます。

承認サーバーに接続し、クライアントまたはユーザーがリクエストした操作が許可または拒否されることを検証するのに、URL が使用されます。ユーザーとクライアントは、承認サーバーで作成されるポリシーと照合され、Kafka ブローカーで特定のアクションを実行するためのアクセスが許可されます。

第9章 モニタリング

モニタリングデータでは、AMQ Streams のパフォーマンスおよびヘルスを監視できます。分析および通知のメトリクスデータを取得するようにデプロイメントを設定できます。

メトリクスデータは、接続性およびデータ配信の問題を調査するときに役立ちます。たとえば、メトリクスデータを使用すると、更新されていないパーティションや、メッセージの消費速度を特定できます。アラートルールでは、指定した通信チャネルを使用して、このようなメトリクスに関するタイムクリティカルな通知を送信できます。モニタリングの視覚化では、デプロイメントの設定更新のタイミングと方法を判別できるように、リアルタイムのメトリクスデータを表示します。メトリクス設定ファイルのサンプルは AMQ Streams に同梱されています。

分散トレースは、AMQ Streams でメッセージのエンドツーエンドのトレース機能を提供することで、メトリクスデータの収集を補完します。

Cruise Control は、ワークロードのデータを基づく Kafka クラスターのリバランスをサポートします。

メトリクスおよびモニタリングツール

AMQ Streams では、メトリクスおよびモニタリングに以下のツールを使用できます。

  • Prometheus は、Kafka、ZooKeeper、および Kafka Connect クラスターからメトリクスをプルします。Prometheus の Alertmanager プラグインはアラートを処理して、そのアラートを通知サービスにルーティングします。
  • Kafka Exporter は、さらに Prometheus メトリクスを追加します。
  • Grafana は、ダッシュボードで Prometheus メトリクスを視覚化できます。
  • Jaeger は、分散トレースをサポートし、アプリケーション間のトランザクションをトレースします。
  • Cruise Control は、Kafka クラスター全体に渡るデータを分散します。

メトリクスおよびモニタリングツールのサポートドキュメント

メトリクスおよびモニタリングツールの詳細は、サポートドキュメントを参照してください。

9.1. Prometheus

Prometheus は、Kafka コンポーネントおよび AMQ Streams Operator からメトリクスデータを抽出できます。

Prometheus を使用してメトリクスデータを取得し、アラートを発行するには、Prometheus および Prometheus Alertmanager プラグインをデプロイする必要があります。メトリクスデータを公開するには、Kafka リソースもメトリクス設定でデプロイまたは再デプロイする必要があります。

Prometheus は、公開されたメトリクスデータをモニタリング用に収集します。Alertmanager は、事前に定義されたアラートルールをもとに、条件が問題発生の可能性を示した場合に、アラートを発行します。

メトリクスおよびアラートルール設定ファイルのサンプルは AMQ Streams に同梱されています。AMQ Streams に含まれるアラートメカニズムのサンプルは、通知を Slack チャネルに送信するように設定されています。

9.2. Grafana

Grafana は Prometheus によって公開されるメトリクスデータを使用して、モニタリングできるように、ダッシュボードを視覚化して表示します。

データソースとして Prometheus を追加している場合には、Grafana のデプロイメントが必要です。ダッシュボードの例 (AMQ Streams JSON ファイルとして提供) は、モニタリングデータを表示するために、Grafana インターフェースを使用してインポートされます。

9.3. Kafka Exporter

Kafka Exporter は、Apache Kafka ブローカーおよびクライアントのモニタリングを強化するオープンソースプロジェクトです。Kafka Exporter は、Kafka クラスターとともにデプロイされ、オフセット、コンシューマーグループ、コンシューマーラグ、およびトピックに関連する Kafka ブローカーからの Prometheus メトリクスデータを追加で抽出します。提供される Grafana ダッシュボードを使用して、Prometheus が Kafka Exporter から収集したデータを可視化することができます。

サンプル設定ファイル、アラートルール、および Kafka Exporter の Grafana ダッシュボードは AMQ Streams で提供されます。

9.4. 分散トレース

Kafka デプロイメントでは、以下に対して、Jaeger を使用した分散トレースがサポートされます。

  • ソースクラスターからターゲットクラスターへのメッセージをトレースする MirrorMaker
  • Kafka Connect が使用して生成したメッセージをトレースする Kafka Connect
  • Kafka Bridge が使用して生成したメッセージと、クライアントアプリケーションからの HTTP 要求をトレースする Kafka Bridge

テンプレート設定プロパティーは、Kafka リソース用に設定され、環境変数のトレースを記述します。

Kafka クライアントのトレース

Kafka プロデューサーやコンシューマーなどのクライアントアプリケーションも、トランザクションをモニタリングするように設定できます。クライアントはトレースプロファイルで設定され、トレーサーはクライアントアプリケーションが使用するように初期化されます。

9.5. Cruise Control

Cruise Control は、Kafka クラスター全体に渡るデータの監視および分散を単純化するオープンソースプロジェクトです。Cruise Control は Kafka クラスターと共にデプロイされ、そのトラフィックを監視し、よりバランスの取れたパーティション割り当てを提案し、その提案をもとにしたパーティション再割り当てのトリガーになります。

Control はリソースの使用状況に関する情報を収集し、Kafka クラスターのワークロードをモデル化および分析します。定義済みの 最適化ゴール を基にして、Cruise Control はクラスターを効果的にリバランスする方法に関する 最適化プロポーザル を生成します。最適化プロポーザル が承認されると、Cruise Control はプロポーザルに示されるリバランスを適用します。

Prometheus は、Cruise Control のメトリクスデータを抽出できます。これには、最適化プロポーザルおよびリバランス操作に関連するデータが含まれます。サンプル設定ファイルおよび Cruise Control の Grafana ダッシュボードは、AMQ Streams で提供されます。

付録A サブスクリプションの使用

AMQ Streams は、ソフトウェアサブスクリプションから提供されます。サブスクリプションを管理するには、Red Hat カスタマーポータルでアカウントにアクセスします。

アカウントへのアクセス

  1. access.redhat.com に移動します。
  2. アカウントがない場合は、作成します。
  3. アカウントにログインします。

サブスクリプションのアクティベート

  1. access.redhat.com に移動します。
  2. サブスクリプション に移動します。
  3. Activate a subscription に移動し、16 桁のアクティベーション番号を入力します。

Zip および Tar ファイルのダウンロード

zip または tar ファイルにアクセスするには、カスタマーポータルを使用して、ダウンロードする関連ファイルを検索します。RPM パッケージを使用している場合は、この手順は必要ありません。

  1. ブラウザーを開き、access.redhat.com/downloads で Red Hat カスタマーポータルの Product Downloads ページにログインします。
  2. INTEGRATION AND AUTOMATION カテゴリーで、AMQ Streams for Apache Kafka エントリーを見つけます。
  3. 必要な AMQ Streams 製品を選択します。Software Downloads ページが開きます。
  4. コンポーネントの Download リンクをクリックします。

改訂日時: 2022-07-12 13:25:43 +1000

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.