6.5. Kafka Connect コネクターの追加
Kafka Connect はコネクターを使用して他のシステムと統合し、データをストリーミングします。コネクターは Kafka Connector
クラスのインスタンスであり、次のいずれかのタイプになります。
- ソースコネクター
- ソースコネクターは、外部システムからデータを取得し、それをメッセージとして Kafka に提供するランタイムエンティティーです。
- シンクコネクター
- シンクコネクターは、Kafka トピックからメッセージを取得し、外部システムに提供するランタイムエンティティーです。
Kafka Connect はプラグインアーキテクチャーを使用して、コネクターの実装アーティファクトを提供します。プラグインは他のシステムへの接続を可能にし、データを操作するための追加の設定を提供します。プラグインには、コネクターや、データコンバーターや変換などの他のコンポーネントが含まれます。コネクターは、特定のタイプの外部システムで動作します。各コネクターは、その設定のスキーマを定義します。設定を Kafka Connect に指定して、Kafka Connect 内にコネクターインスタンスを作成します。次に、コネクターインスタンスは、システム間でデータを移動するための一連のタスクを定義します。
次のいずれかの方法で、コネクタープラグインを Kafka Connect に追加します。
コンテナーイメージにプラグインを追加したら、次の方法でコネクターインスタンスを開始、停止、および管理できます。
これらのオプションを使用して、新しいコネクターインスタンスを作成することもできます。
6.5.1. コネクタープラグインを使用して新しいコンテナーイメージを自動的にビルドする
Streams for Apache Kafka が追加のコネクターを使用して新しいコンテナーイメージを自動的にビルドするように Kafka Connect を設定します。コネクタープラグインは、KafkaConnect
カスタムリソースの .spec.build.plugins
プロパティーを使用して定義します。Streams for Apache Kafka はコネクタープラグインを自動的にダウンロードし、新しいコンテナーイメージに追加します。コンテナーは、.spec.build.output
に指定されたコンテナーリポジトリーにプッシュされ、Kafka Connect デプロイメントで自動的に使用されます。
前提条件
- Cluster Operator がデプロイされている。
- コンテナーレジストリー。
イメージをプッシュ、保存、およびプルできる独自のコンテナーレジストリーを提供する必要があります。Streams for Apache Kafka は、プライベートコンテナーレジストリーだけでなく、Quay や Docker Hub などのパブリックレジストリーもサポートします。
手順
.spec.build.output
でコンテナーレジストリーを、.spec.build.plugins
で追加のコネクターを指定して、KafkaConnect
カスタムリソースを設定します。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: 1 #... build: output: 2 type: docker image: my-registry.io/my-org/my-connect-cluster:latest pushSecret: my-registry-credentials plugins: 3 - name: connector-1 artifacts: - type: tgz url: <url_to_download_connector_1_artifact> sha512sum: <SHA-512_checksum_of_connector_1_artifact> - name: connector-2 artifacts: - type: jar url: <url_to_download_connector_2_artifact> sha512sum: <SHA-512_checksum_of_connector_2_artifact> #...
リソースを作成または更新します。
$ oc apply -f <kafka_connect_configuration_file>
- 新しいコンテナーイメージがビルドされ、Kafka Connect クラスターがデプロイされるまで待ちます。
-
Kafka Connect REST API または
KafkaConnector
カスタムリソースを使用して、追加したコネクタープラグインを使用します。
6.5.2. Kafka Connect ベースイメージからコネクタープラグインを使用して新しいコンテナーイメージをビルドする
Kafka Connect 基本イメージからコネクタープラグインを使用してカスタム Docker イメージを作成します。カスタムイメージを /opt/kafka/plugins
ディレクトリーに追加します。
Red Hat Ecosystem Catalog の Kafka コンテナーイメージは、追加のコネクタープラグインで独自のカスタムイメージを作成するためのベースイメージとして使用できます。
起動時に、Streams for Apache Kafka バージョンの Kafka Connect は、/opt/kafka/plugins
ディレクトリーに含まれるサードパーティーのコネクタープラグインをロードします。
手順
ベースイメージとして
registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0
を使用して、新しいDockerfile
を作成します。FROM registry.redhat.io/amq-streams/kafka-37-rhel9:2.7.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
プラグインファイルの例
$ tree ./my-plugins/ ./my-plugins/ ├── debezium-connector-mongodb │ ├── bson-<version>.jar │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mongodb-<version>.jar │ ├── debezium-core-<version>.jar │ ├── LICENSE.txt │ ├── mongodb-driver-core-<version>.jar │ ├── README.md │ └── # ... ├── debezium-connector-mysql │ ├── CHANGELOG.md │ ├── CONTRIBUTE.md │ ├── COPYRIGHT.txt │ ├── debezium-connector-mysql-<version>.jar │ ├── debezium-core-<version>.jar │ ├── LICENSE.txt │ ├── mysql-binlog-connector-java-<version>.jar │ ├── mysql-connector-java-<version>.jar │ ├── README.md │ └── # ... └── debezium-connector-postgres ├── CHANGELOG.md ├── CONTRIBUTE.md ├── COPYRIGHT.txt ├── debezium-connector-postgres-<version>.jar ├── debezium-core-<version>.jar ├── LICENSE.txt ├── postgresql-<version>.jar ├── protobuf-java-<version>.jar ├── README.md └── # ...
COPY コマンドは、コンテナーイメージにコピーするプラグインファイルを指します。
この例では、Debezium コネクター (MongoDB、MySQL、および PostgreSQL) のプラグインを追加しますが、簡潔にするためにすべてのファイルがリストされているわけではありません。Kafka Connect で実行されている Debezium は、他の Kafka Connect タスクと同じように表示されます。
- コンテナーイメージをビルドします。
- カスタムイメージをコンテナーレジストリーにプッシュします。
新しいコンテナーイメージを示します。
次のいずれかの方法でイメージを指定できます。
KafkaConnect
カスタムリソースのKafkaConnect.spec.image
プロパティーを編集します。設定されている場合、このプロパティーは Cluster Operator の
STRIMZI_KAFKA_CONNECT_IMAGES
環境変数をオーバーライドします。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: 1 #... image: my-new-container-image 2 config: 3 #...
-
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml
ファイルのSTRIMZI_KAFKA_CONNECT_IMAGES
環境変数を編集して、新しいコンテナーイメージを指すようにし、Cluster Operator を再インストールします。
6.5.3. KafkaConnector リソースのデプロイ
コネクターを管理するために KafkaConnector
リソースをデプロイします。KafkaConnector
カスタムリソースは、Cluster Operator によるコネクターの管理に OpenShift ネイティブのアプローチを提供します。Kafka Connect REST API のように、コネクターを管理するために HTTP 要求を送信する必要はありません。該当する KafkaConnector
リソースを更新して稼働中のコネクターインスタンスを管理した後、更新を適用します。Cluster Operator は、実行中のコネクターインスタンスの設定を更新します。該当する KafkaConnector
を削除して、コネクターを削除します。
KafkaConnector
リソースは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
この手順で示す設定では、失敗したコネクターとタスクを自動的に再起動するために autoRestart
機能が有効になっています (enabled: true
)。KafkaConnector
リソースにアノテーションを付けて、コネクターを 再起動 するか、コネクタータスクの再起動 を手動で行うこともできます。
コネクターの例
独自のコネクターを使用することも、Streams for Apache Kafka に付属する例を試すこともできます。Apache Kafka 3.1.0 までは、サンプルファイルコネクタープラグインが Apache Kafka に含まれていました。Apache Kafka の 3.1.1 および 3.2.0 リリースから、例を他のコネクターと同様にプラグインパスに追加する必要があります。
Streams for Apache Kafka には、サンプルファイルコネクタープラグイン用の サンプル KafkaConnector
設定ファイル (examples/connect/source-connector.yaml
) が用意されています。このファイルでは、次のコネクターインスタンスが KafkaConnector
リソースとして作成されます。
-
Kafka ライセンスファイル (ソース) から各行を読み取り、データをメッセージとして単一の Kafka トピックに書き込む
FileStreamSourceConnector
インスタンス。 -
Kafka トピックからメッセージを読み取り、メッセージを一時ファイル (シンク) に書き込む
FileStreamSinkConnector
インスタンス。
この手順では、サンプルファイルを使用してコネクターを作成します。
サンプルコネクターは、運用環境での使用を意図したものではありません。
前提条件
- Kafka Connect デプロイメント。
- Cluster Operator が稼働している。
手順
次のいずれかの方法で、
FileStreamSourceConnector
およびFileStreamSinkConnector
プラグインを Kafka Connect に追加します。Kafka Connect 設定で
strimzi.io/use-connector-resources annotation
をtrue
に設定します。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: # ...
KafkaConnector
リソースを有効にすると、Cluster Operator はそれらを監視します。examples/connect/source-connector.yaml
ファイルを編集します。KafkaConnector ソースコネクター設定の例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connector 1 labels: strimzi.io/cluster: my-connect-cluster 2 spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector 3 tasksMax: 2 4 autoRestart: 5 enabled: true config: 6 file: "/opt/kafka/LICENSE" 7 topic: my-topic 8 # ...
- 1
- コネクターの名前として使用される
KafkaConnector
リソースの名前。OpenShift リソースで有効な名前を使用します。 - 2
- コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
- 3
- コネクタークラスのフルネーム。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
- 4
- コネクターが作成できる Kafka Connect タスクの最大数。
- 5
- 失敗したコネクターとタスクの自動再起動を有効にします。デフォルトでは、再起動の回数は無制限ですが、
maxRestarts
プロパティーを使用して自動再起動の最大回数を設定できます。 - 6
- キーと値のペア形式の コネクター設定。
- 7
- 外部データファイルの場所。この例では、
/opt/kafka/LICENSE
ファイルから読み取るようにFileStreamSourceConnector
を設定しています。 - 8
- ソースデータのパブリッシュ先となる Kafka トピック。
OpenShift クラスターでソース
KafkaConnector
を作成します。oc apply -f examples/connect/source-connector.yaml
examples/connect/sink-connector.yaml
ファイルを作成します。touch examples/connect/sink-connector.yaml
以下の YAML を
sink-connector.yaml
ファイルに貼り付けます。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: strimzi.io/cluster: my-connect spec: class: org.apache.kafka.connect.file.FileStreamSinkConnector 1 tasksMax: 2 config: 2 file: "/tmp/my-file" 3 topics: my-topic 4
OpenShift クラスターにシンク
KafkaConnector
を作成します。oc apply -f examples/connect/sink-connector.yaml
コネクターリソースが作成されたことを確認します。
oc get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name my-source-connector my-sink-connector
<my_connect_cluster> を Kafka Connect クラスターの名前に置き換えます。
コンテナーで、
kafka -console-consumer.sh
を実行して、ソースコネクターによってトピックに書き込まれたメッセージを読み取ります。oc exec <my_kafka_cluster>-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
<my_kafka_cluster> を Kafka クラスターの名前に置き換えます。
ソースおよびシンクコネクターの設定オプション
コネクター設定は、KafkaConnector
リソースの spec.config
プロパティーで定義されます。
FileStreamSourceConnector
クラスおよび FileStreamSinkConnector
クラスは、Kafka Connect REST API と同じ設定オプションをサポートします。他のコネクターは異なる設定オプションをサポートします。
名前 | 型 | デフォルト値 | 説明 |
---|---|---|---|
| 文字列 | Null | メッセージを書き込むソースファイル。指定のない場合は、標準入力が使用されます。 |
| List | Null | データのパブリッシュ先となる Kafka トピック。 |
名前 | 型 | デフォルト値 | 説明 |
---|---|---|---|
| 文字列 | Null | メッセージを書き込む宛先ファイル。指定のない場合は標準出力が使用されます。 |
| List | Null | データの読み取り元となる 1 つ以上の Kafka トピック。 |
| 文字列 | Null | データの読み取り元となる 1 つ以上の Kafka トピックと一致する正規表現。 |
6.5.4. Kafka Connect API の公開
KafkaConnector
リソースを使用してコネクターを管理する代わりに、Kafka Connect REST API を使用します。Kafka Connect REST API は、<connect_cluster_name>-connect-api:8083
で実行しているサービスとして利用できます。ここで、<connect_cluster_name> は、お使いの Kafka Connect クラスターの名前になります。サービスは、Kafka Connect インスタンスの作成時に作成されます。
Kafka Connect REST API でサポートされる操作は、Apache Kafka Connect API のドキュメント で説明されています。
strimzi.io/use-connector-resources
アノテーションは KafkaConnectors を有効にします。アノテーションを KafkaConnect
リソース設定に適用した場合、そのアノテーションを削除して Kafka Connect API を使用する必要があります。それ以外の場合、Kafka Connect REST API を使用して直接行われた手動による変更は、 Cluster Operator によって元に戻されます。
コネクター設定を JSON オブジェクトとして追加できます。
コネクター設定を追加するための curl 要求の例
curl -X POST \ http://my-connect-cluster-connect-api:8083/connectors \ -H 'Content-Type: application/json' \ -d '{ "name": "my-source-connector", "config": { "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", "file": "/opt/kafka/LICENSE", "topic":"my-topic", "tasksMax": "4", "type": "source" } }'
API には OpenShift クラスター内でのみアクセスできます。OpenShift クラスター外部で実行しているアプリケーションに Kafka Connect API がアクセスできるようにする場合は、以下の機能のいずれかを使用して Kafka Connect API を手動で公開できます。
-
LoadBalancer
またはNodePort
タイプのサービス -
Ingress
リソース (Kubernetes のみ) - OpenShift ルート (OpenShift のみ)
接続はセキュアではないため、外部からのアクセスはよく考えてから許可してください。
サービスを作成する場合には、<connect_cluster_name>-connect-api
サービスの selector
からラベルを使用して、サービスがトラフィックをルーティングする Pod を設定します。
サービスのセレクター設定
# ... selector: strimzi.io/cluster: my-connect-cluster 1 strimzi.io/kind: KafkaConnect strimzi.io/name: my-connect-cluster-connect 2 #...
また、外部クライアントからの HTTP 要求を許可する NetworkPolicy
を作成する必要もあります。
Kafka Connect API への要求を許可する NetworkPolicy の例
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: my-custom-connect-network-policy
spec:
ingress:
- from:
- podSelector: 1
matchLabels:
app: my-connector-manager
ports:
- port: 8083
protocol: TCP
podSelector:
matchLabels:
strimzi.io/cluster: my-connect-cluster
strimzi.io/kind: KafkaConnect
strimzi.io/name: my-connect-cluster-connect
policyTypes:
- Ingress
- 1
- API への接続が許可される Pod のラベル。
クラスター外でコネクター設定を追加するには、curl コマンドで API を公開するリソースの URL を使用します。
6.5.5. Kafka Connect API へのアクセスの制限
Kafka Connect API へのアクセスを信頼できるユーザーのみに制限して、不正なアクションや潜在的なセキュリティーの問題を防ぐことが重要です。Kafka Connect API は、コネクター設定を変更するための広範な機能を提供するため、セキュリティー対策を講じることがさらに重要になります。管理者によりセキュアであると想定されている機密情報が、Kafka Connect API にアクセスできるユーザーに取得されてしまう可能性があります。
Kafka Connect REST API には、OpenShift クラスターへのアクセスが認証されており、ホスト名/IP アドレス、ポート番号など、エンドポイント URL を知っている場合には、アクセスできます。
たとえば、組織が Kafka Connect クラスターとコネクターを使用して、機密データを顧客データベースから中央データベースにストリーミングするとします。管理者は設定プロバイダープラグインを使用して、顧客データベースと中央データベースへの接続に関連する機密情報 (データベース接続の詳細や認証情報など) を保存します。設定プロバイダーは、この機密情報が許可されていないユーザーに公開されるのを防ぎます。ただし、Kafka Connect API にアクセスできるユーザーは、管理者の同意なしに顧客データベースにアクセスできます。これを行うには、偽のデータベースをセットアップし、それに接続するコネクターを設定します。次に、顧客データベースを参照するようにコネクター設定を変更しますが、データを中央データベースに送信する代わりに、偽のデータベースに送信します。偽のデータベースに接続するようにコネクターを設定すると、設定プロバイダーにセキュアに保存されているにもかかわらず、顧客データベースに接続するためのログインの詳細と認証情報が傍受されます。
KafkaConnector
カスタムリソースを使用している場合、デフォルトでは、OpenShift RBAC ルールにより、OpenShift クラスター管理者のみがコネクターに変更を加えることが許可されます。Streams for Apache Kafka リソースを管理するクラスター管理者以外のユーザーを指定 することもできます。Kafka Connect 設定で KafkaConnector
リソースを有効にすると、Kafka Connect REST API を使用して直接行われた変更は Cluster Operator によって元に戻されます。KafkaConnector
リソースを使用していない場合、デフォルトの RBAC ルールは Kafka Connect API へのアクセスを制限しません。OpenShift RBAC を使用して Kafka Connect REST API への直接アクセスを制限する場合は、KafkaConnector
リソースを有効にして使用する必要があります。
セキュリティーを強化するために、Kafka Connect API の次のプロパティーを設定することを推奨します。
org.apache.kafka.disallowed.login.modules
(Kafka 3.4 以降)
org.apache.kafka.disallowed.login.modules
Java システムプロパティーを設定して、セキュアではないログインモジュールの使用を防止します。たとえば、com.sun.security.auth.module.JndiLoginModule
を指定すると、KafkaJndiLoginModule
が使用できなくなります。ログインモジュールを禁止する設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: # ... jvmOptions: javaSystemProperties: - name: org.apache.kafka.disallowed.login.modules value: com.sun.security.auth.module.JndiLoginModule, org.apache.kafka.common.security.kerberos.KerberosLoginModule # ...
信頼できるログインモジュールのみを許可し、使用しているバージョンに対する Kafka からの最新のアドバイスに従ってください。ベストプラクティスとして、
org.apache.kafka.disallowed.login.modules
システムプロパティーを使用して、Kafka Connect 設定でセキュアではないログインモジュールを明示的に禁止する必要があります。connector.client.config.override.policy
connector.client.config.override.policy
プロパティーをNone
に設定して、コネクター設定が Kafka Connect 設定とそれが使用するコンシューマーおよびプロデューサーをオーバーライドしないようにします。コネクターオーバーライドポリシーを指定する設定例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" spec: # ... config: connector.client.config.override.policy: None # ...
6.5.6. Kafka Connect API の使用から KafkaConnector カスタムリソースの使用への切り替え
Kafka Connect API の使用から KafkaConnector
カスタムリソースの使用に切り替えて、コネクターを管理できます。スイッチの作成は、次の作業を以下の順序で行います。
-
設定で
KafkaConnector
リソースをデプロイし、コネクターインスタンスを作成します。 -
strimzi.io/use-connector-resources
アノテーションをtrue
に設定して、Kafka Connect 設定でKafkaConnector
リソースを有効にします。
作成する前に KafkaConnector
リソースを有効にすると、すべてのコネクターが削除されます。
KafkaConnector
リソースの使用から Kafka Connect API の使用に切り替えるには、最初に KafkaConnector
リソースを有効にするアノテーションを Kafka Connect 設定から削除します。それ以外の場合、Kafka Connect REST API を使用して直接行われた手動による変更は、 Cluster Operator によって元に戻されます。
切り替えを行うときは、KafkaConnect
リソースのステータスを確認 してください。metadata.generation
(デプロイの現在のバージョン) の値は、status.observedGeneration
(リソースの最新の調整) と一致する必要があります。Kafka Connect クラスターが Ready
になったら、KafkaConnector
リソースを削除できます。