第8章 Kafka Connect


Kafka Connect は、Apache Kafka と外部システムとの間でデータをストリーミングするためのツールです。スケーラビリティーと信頼性を維持しながら大量のデータを移動するためのフレームワークが提供されます。Kafka Connect は通常、Kafka を Kafka クラスター外のデータベース、ストレージシステム、およびメッセージングシステムと統合するために使用されます。

Kafka Connect は、さまざまな種類の外部システムへの接続を実装するコネクタープラグインを使用します。コネクタープラグインには、シンクとソースの 2 つのタイプがあります。シンクコネクターは、Kafka から外部システムにデータをストリーミングします。ソースコネクターは、外部システムから Kafka にデータをストリーミングします。

Kafka Connect はスタンドアロンまたは分散モードで実行できます。

スタンドアロンモード
スタンドアロンモードでは、Kafka Connect はプロパティーファイルから読み込んだユーザー定義の設定を持つ単一ノードで実行されます。
分散モード
分散モードでは、Kafka Connect は 1 つまたは複数のワーカーノードで実行され、ワークロードはワーカーノード間で分散されます。コネクターとその設定は、HTTP REST インターフェイスを使用して管理します。

8.1. スタンドアロンモードでの Kafka Connect

スタンドアロンモードでは、Kafka Connect は単一ノードで単一のプロセスとして実行されます。スタンドアロンモードの設定は、プロパティーファイルを使用して管理します。

8.1.1. スタンドアロンモードでの Kafka Connect の設定

Kafka Connect をスタンドアロンモードで設定するには、config/connect-standalone.properties 設定ファイルを編集します。以下のオプションが最も重要です。

bootstrap.servers
Kafka へのブートストラップ接続として使用される Kafka ブローカーアドレスのリスト。たとえば、kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 です。
key.converter
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。たとえば、org.apache.kafka.connect.json.JsonConverter です。
value.converter
メッセージペイロードを Kafka 形式との間で変換するために使用されるクラス。たとえば、org.apache.kafka.connect.json.JsonConverter です。
offset.storage.file.filename
オフセットデータが保存されるファイルを指定します。

設定ファイルの例は、config/connect-standalone.properties のインストールディレクトリーにあります。サポートされるすべての Kafka Connect 設定オプションの完全リストは、[kafka-connect-configuration-parameters-str] を参照してください。

コネクタープラグインは、ブートストラップアドレスを使用して Kafka ブローカーへのクライアント接続を開きます。これらの接続を設定するには、標準的な Kafka のプロデューサーとコンシューマーの設定オプションを使用し、producer. または consumer. 接頭辞を付けます。

Kafka プロデューサーおよびコンシューマーの設定に関する詳細は、以下を参照してください。

8.1.2. スタンドアロンモードでの Kafka Connect でのコネクターの設定

プロパティーファイルを使用すると、スタンドアロンモードで Kafka Connect のコネクタープラグインを設定できます。ほとんどの設定オプションは、各コネクターに固有のものです。以下のオプションはすべてのコネクターに適用されます。

name
現在の Kafka Connect インスタンス内で一意である必要があるコネクターの名前。
connector.class
コネクタープラグインのクラス。たとえば、org.apache.kafka.connect.file.FileStreamSinkConnector です。
tasks.max
指定のコネクターが使用できるタスクの最大数。タスクにより、コネクターは並行して作業を実行できます。コネクターは、指定された数よりも少ないタスクを作成する可能性があります。
key.converter
メッセージキーを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値がオーバーライドされます。たとえば、org.apache.kafka.connect.json.JsonConverter です。
value.converter
メッセージペイロードを Kafka 形式との間で変換するために使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値がオーバーライドされます。たとえば、org.apache.kafka.connect.json.JsonConverter です。

さらに、シンクコネクターには以下のオプションの 1 つ以上を設定する必要があります。

topics
入力として使用されるトピックのコンマ区切りリスト。
topics.regex
入力として使用されるトピックの Java 正規表現。

その他のオプションについては、利用可能なコネクターのドキュメントを参照してください。

AMQ Streams には、コネクター設定ファイルの例が含まれています。AMQ Streams のインストール・ディレクトリーにある config/connect-file-sink.properties および config/connect-file-source.properties を参照してください。

8.1.3. スタンドアロンモードでの Kafka Connect の実行

この手順では、スタンドアロンモードで Kafka Connect を設定し、実行する方法を説明します。

前提条件

  • インストールされ、実行されている AMQ Streams クラスター。

手順

  1. /opt/kafka/config/connect-standalone.properties Kafka Connect 設定ファイルを編集し、bootstrap.server が Kafka ブローカーを指すように設定します。以下に例を示します。

    bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
  2. 設定ファイルで Kafka Connect を起動し、1 つ以上のコネクター設定を指定します。

    su - kafka
    /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties
    [connector2.properties ...]
  3. KafkaConnect が実行されていることを確認します。

       jcmd | grep ConnectStandalone

関連情報

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.