5.5. Kafka Connect の設定
Kafka Connect の基本設定には、Kafka クラスターに接続するブートストラップアドレスと、暗号化および認証の詳細が必要です。
Kafka Connect インスタンスはデフォルトでは、以下が同じ値で設定されます。
- Kafka Connect クラスターのグループ ID
- コネクターオフセットを保存する Kafka トピック
- コネクターおよびタスクステータス設定を保存する Kafka トピック
- コネクターおよびタスクステータスの更新情報を保存する Kafka トピック
複数の異なる Kafka Connect インスタンスが使用されている場合には、上記の設定はインスタンスごとに反映する必要があります。
Kafka Connect 設定の YAML 例
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect spec: # ... config: group.id: my-connect-cluster offset.storage.topic: my-connect-cluster-offsets config.storage.topic: my-connect-cluster-configs status.storage.topic: my-connect-cluster-status # ...
コネクター
コネクターは Kafka Connect とは別に設定されます。この設定では、Kafka Connect にフィードするソース入力データおよびターゲット出力データを記述します。外部ソースデータは、対象のメッセージを格納する特定のトピックを参照する必要があります。
Kafka には、以下のようにビルトインコネクターが 2 つあります。
-
FileStreamSourceConnector
は、外部システムから Kafka にデータをストリーミングし、入力ソースから行を読み取り、各行を Kafka トピックに送信します。 -
FileStreamSinkConnector
は、Kafka から外部システムにデータをストリーミングし、Kafka トピックからメッセージを読み取り、出力ファイルにメッセージごとに 1 行を作成します。
コネクタープラグインを使用して他のコネクターを追加できます。コネクタープラグインは、JAR ファイルのセットで、特定タイプの外部システムへの接続に必要な実装を定義します。
新しい Kafka Connect プラグインを使用するカスタム Kafka Connect イメージを作成します。
イメージを作成するには、以下を使用します。
- ベースイメージとしての Red Hat Ecosystem Catalog の Kafka コンテナーイメージ
- 新規コンテナーイメージを作成する OpenShift ビルド と S2I (Source-to-Image) フレームワーク
コネクターの管理
KafkaConnector
リソースまたは Kafka Connect REST API を使用し、Kafka Connect クラスターでコネクターインスタンスを作成して管理できます。KafkaConnector
リソースでは OpenShift ネイティブのアプローチを利用でき、このリソースは Cluster Operator で管理されます。
KafkaConnector
の spec
では、コネクタークラスと設定オプション、データを処理するコネクター タスク の最大数を指定します。
KafkaConnector 設定の YAML 例
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: my-source-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 2 config: file: "/opt/kafka/LICENSE" topic: my-topic # ...
KafkaConnector
を有効するには、アノテーションを KafkaConnect
リソースに追加します。
KafkaConnector を有効にするアノテーションの YAML 例
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect annotations: strimzi.io/use-connector-resources: "true" # ...