第8章 Kafka Connect


Kafka Connect は、Apache Kafka と外部システムとの間でデータをストリーミングするためのツールです。スケーラビリティーと信頼性を維持しながら大量のデータを移動するためのフレームワークを提供します。Kafka Connect は通常、Kafka を Kafka クラスター外部のデータベース、ストレージ、メッセージングシステムと統合するために使用されます。

Kafka Connect は、異なるタイプの外部システムの接続を実装するコネクタープラグインを使用します。シンクおよびソースには、2 種類のコネクタープラグインがあります。シンクコネクターは、データを Kafka から外部システムにストリーミングします。ソースコネクターは、外部システムから Kafka にデータをストリーミングします。

Kafka Connect は、スタンドアロンまたは分散モードで実行できます。

スタンドアロンモード
スタンドアロンモードでは、Kafka Connect はプロパティーファイルから読み取られたユーザー定義の設定を持つ単一ノードで実行されます。
分散モード
Distributed モードでは、Kafka Connect は 1 つまたは複数のワーカーノードで実行され、ワークロードはそれら間に分散されます。HTTP REST インターフェースを使用して、コネクターおよびその設定を管理します。

8.1. スタンドアロンモードでの Kafka Connect

スタンドアロンモードでは、Kafka Connect は単一のノードで単一のプロセスとして実行されます。プロパティーファイルを使用してスタンドアロンモードで設定を管理します。

8.1.1. スタンドアロンモードでの Kafka Connect の設定

スタンドアロンモードで Kafka Connect を設定するには、config/connect-standalone.properties 設定ファイルを編集します。以下のオプションは最も重要なオプションです。

bootstrap.servers
Kafka へのブートストラップ接続として使用される Kafka ブローカーアドレスのリスト。例: kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
key.converter
メッセージキーの Kafka 形式への変換に使用されるクラス。例: org.apache.kafka.connect.json.JsonConverter
value.converter
メッセージペイロードの Kafka 形式への変換に使用されるクラス。例: org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename
オフセットデータが保存されるファイルを指定します。

設定ファイルの例は、config/connect-standalone.properties のインストールディレクトリーにあります。サポートされる Kafka Connect 設定オプションの完全リストは、[kafka-connect-configuration-parameters-str] を参照してください。

コネクタープラグインは、ブートストラップアドレスを使用して Kafka ブローカーへのクライアント接続を開きます。これらの接続を設定するには、producer. または consumer. で始まる標準の Kafka プロデューサーおよびコンシューマー設定オプションを使用します。

Kafka プロデューサーおよびコンシューマーの設定に関する詳細は、以下を参照してください。

8.1.2. スタンドアロンモードでの Kafka Connect でのコネクターの設定

プロパティーファイルを使用すると、スタンドアロンモードで Kafka Connect のコネクタープラグインを設定できます。ほとんどの設定オプションは各コネクターに固有のものです。以下のオプションはすべてのコネクターに適用されます。

name
コネクターの名前。現在の Kafka Connect インスタンス内で一意である必要があります。
connector.class
コネクタープラグインのクラス。例: org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max
指定のコネクターが使用できるタスクの最大数。タスクを使用すると、コネクターが並行して作業を実行できるようにします。コネクターによって指定の数よりも少ないタスクが作成される可能性があります。
key.converter
メッセージキーの Kafka 形式への変換に使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値が上書きされます。例: org.apache.kafka.connect.json.JsonConverter
value.converter
メッセージペイロードの Kafka 形式への変換に使用されるクラス。これにより、Kafka Connect 設定によって設定されたデフォルト値が上書きされます。例: org.apache.kafka.connect.json.JsonConverter

さらに、シンクコネクターに以下のオプションのいずれかを設定する必要があります。

topics
入力として使用されるトピックのコンマ区切りリスト。
topics.regex
入力として使用するトピックの Java 正規表現。

その他のオプションは、使用できるコネクターのドキュメントを参照してください。

AMQ Streams には、コネクター設定ファイルのサンプルが含まれています。AMQ Streams インストールディレクトリーの config/connect-file-sink.properties および config/connect-file-source.properties を参照してください。

8.1.3. スタンドアロンモードでの Kafka Connect の実行

この手順では、スタンドアロンモードで Kafka Connect を設定し、実行する方法を説明します。

前提条件

  • AMQ Streams} クラスターがインストールされ、実行している。

手順

  1. /opt/kafka/config/connect-standalone.properties Kafka Connect 設定ファイルを編集し、bootstrap.server が Kafka ブローカーを示すように設定します。以下に例を示します。

    bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
  2. 設定ファイルで Kafka Connect を起動し、1 つ以上のコネクター設定を指定します。

    su - kafka
    /opt/kafka/bin/connect-standalone.sh /opt/kafka/config/connect-standalone.properties connector1.properties
    [connector2.properties ...]
  3. Kafka Connect が稼働していることを確認します。

       jcmd | grep ConnectStandalone

その他のリソース

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.