第9章 Kafka Connect


Kafka Connect は、Apache Kafka と外部システムとの間でデータをストリーミングするためのツールです。スケーラビリティーと信頼性を維持しながら、大量のデータを移動するためのフレームワークが提供されます。通常、Kafka Connect は、Kafka クラスター外部のデータベース、ストレージ、メッセージングシステムと統合するために使用されます。

Kafka Connect は、異なるタイプの外部システムへの接続を実装するコネクタープラグインを使用します。シンクコネクターには、sink および source の 2 つのタイプがあります。シンクコネクターは、Kafka から外部システムにデータをストリーミングします。ソースコネクターは、外部システムから Kafka にデータをストリーミングします。

Kafka Connect はスタンドアロンまたは分散モードで実行できます。

スタンドアロンモード
スタンドアロンモードでは、Kafka Connect はプロパティーファイルから読み取られたユーザー定義の設定を持つ単一ノードで実行されます。
分散モード
Distributed モードでは、Kafka Connect は 1 つまたは複数のワーカーノードで実行され、ワークロードはそれらのワーカーノード間で分散されます。HTTP REST インターフェースを使用して、コネクターとその設定を管理します。

9.1. スタンドアロンモードでの Kafka Connect

スタンドアロンモードでは、Kafka Connect は単一ノードで単一のプロセスとして実行されます。プロパティーファイルを使用してスタンドアロンモードの設定を管理します。

9.1.1. スタンドアロンモードでの Kafka Connect の設定

スタンドアロンモードで Kafka Connect を設定するには、config/connect-standalone.properties 設定ファイルを編集します。以下のオプションは最も重要なオプションです。

bootstrap.servers
Kafka へのブートストラップ接続として使用される Kafka ブローカーアドレスの一覧。たとえば、kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 のようになります。
key.converter
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。たとえば、org.apache.kafka.connect.json.JsonConverter のようになります。
value.converter
メッセージペイロードの Kafka 形式への変換に使用されるクラス。たとえば、org.apache.kafka.connect.json.JsonConverter のようになります。
offset.storage.file.filename
オフセットデータが保存されるファイルを指定します。

設定ファイルの例は、config/connect-standalone.properties のインストールディレクトリーで提供されます。サポートされるすべての Kafka Connect 設定オプションの完全リストは、[kafka-connect-configuration-parameters-str] を参照してください。

コネクタープラグインは、ブートストラップアドレスを使用して Kafka ブローカーへのクライアント接続を開きます。これらの接続を設定するには、producer. または consumer. で始まる標準の Kafka プロデューサーおよびコンシューマー設定オプションを使用します。

Kafka プロデューサーおよびコンシューマーの設定に関する詳細は、以下を参照してください。

9.1.2. スタンドアロンモードでの Kafka Connect でのコネクターの設定

プロパティーファイルを使用すると、スタンドアロンモードで Kafka Connect のコネクタープラグインを設定できます。ほとんどの設定オプションは、各コネクターに固有のものです。以下のオプションはすべてのコネクターに適用されます。

name
現在の Kafka Connect インスタンス内で一意である必要があります。
connector.class
コネクタープラグインのクラス。たとえば、org.apache.kafka.connect.file.FileStreamSinkConnector のようになります。
tasks.max
指定されたコネクターが使用可能なタスクの最大数。タスクを使用すると、コネクターが並行して機能できるようになります。コネクターは指定された値よりも少ないタスクが作成される可能性があります。
key.converter
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されるデフォルト値が上書きされます。たとえば、org.apache.kafka.connect.json.JsonConverter のようになります。
value.converter
メッセージペイロードの Kafka 形式への変換に使用されるクラス。これにより、Kafka Connect 設定によって設定されるデフォルト値が上書きされます。たとえば、org.apache.kafka.connect.json.JsonConverter のようになります。

さらに、シンクコネクターの以下のオプションのいずれかを設定する必要があります。

topics
入力に使用されるトピックのコンマ区切りリスト。
topics.regex
入力に使用されるトピックの Java 正規表現。

他のすべてのオプションは、利用可能なコネクターのドキュメントを参照してください。

AMQ Streams には、コネクター設定ファイルのサンプルが含まれています。AMQ Streams インストールディレクトリーの config/connect-file-sink.properties および config/connect-file-source.properties を参照してください。

9.1.3. スタンドアロンモードでの Kafka Connect の実行

この手順では、スタンドアロンモードで Kafka Connect を設定して実行する方法を説明します。

前提条件

  • AMQ Streams} クラスターがインストールされ、実行されている。

手順

  1. /opt/kafka/config/connect-standalone.properties Kafka Connect 設定ファイルを編集し、bootstrap.server を設定して Kafka ブローカーを示すように設定します。以下に例を示します。

    bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
  2. 設定ファイルを使用して Kafka Connect を開始し、1 つ以上のコネクター設定を指定します。

    su - kafka
    /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties
    [connector2.properties ...]
  3. Kafka Connect が実行されていることを確認します。

       jcmd | grep ConnectStandalone

関連情報

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.