10.9. Kafka Connect コネクターの設定
KafkaConnector リソースは、Cluster Operator によるコネクターの管理に OpenShift ネイティブのアプローチを提供します。KafkaConnector リソースを使用してコネクターを作成、削除、または再設定するには、KafkaConnect カスタムリソースで use-connector-resources アノテーションを true に設定する必要があります。
KafkaConnectors を有効にするためのアノテーション
KafkaConnect 設定で use-connector-resources アノテーションが有効になっている場合は、KafkaConnector リソースを使用してコネクターを定義および管理する必要があります。
または、KafkaConnector リソースの代わりに Kafka Connect REST API を使用してコネクターを管理することもできます。API を使用するには、KafkaConnect リソースで KafkaConnector リソースを使用するために strimzi.io/use-connector-resources アノテーションを削除する必要があります。
KafkaConnector リソースは、Kafka Connect クラスター内でコネクターを作成するために必要な設定を提供します。Kafka Connect クラスターは、KafkaConnect 設定で指定されたとおりに Kafka クラスターとやり取りします。Kafka クラスターは、Streams for Apache Kafka によって管理する必要はなく、OpenShift クラスターにデプロイする必要もありません。
同じ OpenShift クラスター内に含まれる Kafka コンポーネント
設定では、必要な認証方法など、コネクターインスタンスが外部データシステムとやり取りする方法も指定します。さらに、監視するデータを定義する必要があります。たとえば、データベースからデータを読み取るソースコネクターでは、設定にデータベース名を含める場合があります。ターゲットトピック名を指定して、このデータを Kafka のどこに配置するかを定義することもできます。
タスクの最大数を指定するには、tasksMax プロパティーを使用します。たとえば、ソースコネクターに tasksMax: 2 を使用すると、ソースデータのインポートを 2 つのタスクに分割できます。
ソースコネクター設定の例
- 1
- コネクターの名前として使用される
KafkaConnectorリソースの名前。OpenShift リソースで有効な名前を使用します。 - 2
- コネクターインスタンスを作成する Kafka Connect クラスターの名前。コネクターは、リンク先の Kafka Connect クラスターと同じ namespace にデプロイする必要があります。
- 3
- コネクタークラスのフルネーム。これは、Kafka Connect クラスターによって使用されているイメージに存在するはずです。
- 4
- コネクターが作成できる Kafka Connect タスクの最大数。
- 5
- 失敗したコネクターとタスクの自動再起動を有効にします。デフォルトでは、再起動の回数は無制限ですが、
maxRestartsプロパティーを使用して自動再起動の最大回数を設定できます。 - 6
- キーと値のペア形式の コネクター設定。
- 7
- 外部データファイルの場所。この例では、
/opt/kafka/LICENSEファイルから読み取るようにFileStreamSourceConnectorを設定しています。 - 8
- ソースデータのパブリッシュ先となる Kafka トピック。
外部コネクター設定 (シークレットに保存されているユーザーアクセス認証情報など) を含めるには、KafkaConnect リソースの template プロパティーを使用します。設定プロバイダー を使用して値を読み込むこともできます。
10.9.1. Kafka Connect コネクターの手動停止または一時停止 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースを使用してコネクターを設定している場合は、state 設定を使用してコネクターを停止または一時停止します。コネクターとタスクがインスタンス化されたままになる一時停止状態とは対照的に、コネクターを停止すると設定のみが保持され、アクティブなプロセスは保持されません。コネクター実行の停止は、単に一時停止するよりも長時間停止する場合に適しています。一時停止されたコネクターはすぐに再開されますが、停止されたコネクターにはメモリーとリソースが解放されるという利点があります。
state 設定は、KafkaConnectorSpec スキーマの (非推奨) pause 設定を置き換えるもので、コネクターでの一時停止を許可します。これまでに pause 設定を使用してコネクターを一時停止した場合には、競合の回避目的にのみ state 設定の使用に移行することを推奨します。
前提条件
- Cluster Operator が稼働中である。
手順
一時停止または停止するコネクターを制御する
KafkaConnectorカスタムリソースの名前を見つけます。oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnectorリソースを編集して、コネクターを停止または一時停止します。Kafka Connect コネクターを停止する設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow state設定をstoppedまたはpausedに変更します。このプロパティーが設定されていない場合のコネクターのデフォルトの状態はrunningです。変更を
KafkaConnector設定に適用します。stateをrunningに変更するか、設定を削除して、コネクターを再開できます。
または、Kafka Connect API を公開 し、stop エンドポイントと pause エンドポイントを使用してコネクターの実行を停止することもできます。たとえば、PUT /connectors/<connector_name>/stop などです。その後、resume エンドポイントを使用して再起動できます。
10.9.2. Kafka Connect コネクターの手動での再起動 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースを使用してコネクターを管理している場合は、strimzi.io/restart アノテーションを使用してコネクターの再起動を手動でトリガーします。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka コネクターを制御する
KafkaConnectorカスタムリソースの名前を見つけます。oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow OpenShift で
KafkaConnectorリソースにアノテーションを付けて、コネクターを再起動します。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"Copy to Clipboard Copied! Toggle word wrap Toggle overflow restartアノテーションはtrueに設定されています。次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクターは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnectorカスタムリソースから削除されます。
10.9.3. Kafka Connect コネクタータスクの手動での再起動 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースを使用してコネクターを管理している場合は、strimzi.io/restart-task アノテーションを使用して、コネクタータスクの再起動を手動でトリガーします。
前提条件
- Cluster Operator が稼働中である。
手順
再起動する Kafka コネクタータスクを制御する
KafkaConnectorカスタムリソースの名前を見つけます。oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnectorカスタムリソースから再起動するタスクの ID を検索します。oc describe KafkaConnector <kafka_connector_name>
oc describe KafkaConnector <kafka_connector_name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow タスク ID は 0 から始まる負の値ではない整数です。
OpenShift で
KafkaConnectorリソースにアノテーションを付けて、ID を使用してコネクタータスクを再開します。oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"Copy to Clipboard Copied! Toggle word wrap Toggle overflow この例では、タスク
0が再起動されます。次の調整が発生するまで待ちます (デフォルトでは 2 分ごとです)。
アノテーションが調整プロセスで検出されれば、Kafka コネクタータスクは再起動されます。Kafka Connect が再起動リクエストを受け入れると、アノテーションは
KafkaConnectorカスタムリソースから削除されます。
10.9.4. コネクターオフセットのリスト表示 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースを使用してコネクターのオフセットを追跡するには、listOffsets 設定を追加します。データの流れを追跡するオフセットは、設定で指定された config map に書き込まれます。config map が存在しない場合は、Streams for Apache Kafka が作成します。
設定が完了したら、KafkaConnector リソースにアノテーションを付けて、リストを config map に書き込みます。
シンクコネクターは Kafka の標準コンシューマーオフセットメカニズムを使用し、ソースコネクターは Kafka トピック内にカスタム形式でオフセットを保存します。
- シンクコネクターの場合、リストには Kafka トピックパーティションと、各パーティションの最後にコミットされたオフセットが表示されます。
- ソースコネクターの場合、リストにはソースシステムのパーティションと最後に処理されたオフセットが表示されます。
前提条件
- Cluster Operator が稼働中である。
手順
コネクターの
KafkaConnectorリソースを、listOffsets設定が含まれるように編集します。オフセットをリストするための設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnectorリソースにアノテーションを付けて、リストを config map に書き込むコマンドを実行します。oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list -n <namespace>
oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list -n <namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow アノテーションは、リスト操作が成功するか、リソースから手動で削除されるまで残ります。
KafkaConnectorリソースを更新した後に、次のコマンドを使用して、オフセットを含む config map が作成されたか確認します。oc get configmap my-connector-offsets -n <namespace>
oc get configmap my-connector-offsets -n <namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow config map の内容を調べて、オフセットがリストされていることを確認します。
oc describe configmap my-connector-offsets -n <namespace>
oc describe configmap my-connector-offsets -n <namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow Streams for Apache Kafka は、オフセット情報を
offsets.jsonプロパティーに置きます。既存 config map の更新時に他のプロパティーは上書きされません。ソースコネクターオフセットリストの例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow シンクコネクターオフセットリストの例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
10.9.5. コネクターオフセットの変更 リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースを使用してコネクターのオフセットを変更するには、コネクターを停止するようにリソースを設定し、alterOffsets 設定を追加して config map でオフセットの変更を指定します。オフセットをリストする ために使用したのと同じ config map を再利用できます。
コネクターが停止し、設定が完了したら、オフセットの変更を適用するように KafkaConnector リソースにアノテーションを付け、コネクターを再起動します。
コネクターオフセットの変更は、たとえば poison レコードをスキップしたり、レコードを再生したりする場合に役立ちます。
この手順では、my-source-connector という名前のソースコネクターのオフセット位置を変更します。
前提条件
- Cluster Operator が稼働中である。
手順
KafkaConnectorリソースを編集してコネクターを停止し、alterOffsets設定を追加します。コネクターを停止してオフセットを変更する設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 変更を加えるために、config map を編集します。
この例では、ソースコネクターのオフセット位置を 15000 にリセットします。
ソースコネクターオフセットリストの設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnectorリソースにアノテーションを付けて、オフセット位置を更新するコマンドを実行します。oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=alter -n <namespace>
oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=alter -n <namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow アノテーションは、更新操作が成功するか、リソースから手動で削除されるまで残ります。
- コネクターオフセットのリスト表示 の手順を使用して変更を確認します。
状態を
runningに変更してコネクターを再起動します。コネクターを起動するための設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
10.9.6. コネクターオフセットのリセット リンクのコピーリンクがクリップボードにコピーされました!
KafkaConnector リソースを使用してコネクターオフセットをリセットするには、コネクターを停止するようにリソースを設定します。
コネクターの停止後、KafkaConnector リソースにアノテーションを付けてオフセットをクリアし、コネクターを再起動します。
この手順では、my-source-connector という名前のソースコネクターのオフセット位置をリセットします。
前提条件
- Cluster Operator が稼働中である。
手順
コネクターを停止するように、
KafkaConnectorリソースを編集します。コネクターを停止するための設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- コネクターの状態を
stoppedに変更します。このプロパティーが設定されていない場合のコネクターのデフォルトの状態はrunningです。
KafkaConnectorリソースにアノテーションを付けて、オフセット位置をリセットするコマンドを実行します。oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=reset -n <namespace>
oc annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=reset -n <namespace>Copy to Clipboard Copied! Toggle word wrap Toggle overflow アノテーションは、リセット操作が成功するか、リソースから手動で削除されるまで残ります。
コネクターオフセットのリスト表示 の手順を使用して変更を確認します。
リセット後、
offsets.jsonプロパティーは空になります。ソースコネクターオフセットリストの例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 状態を
runningに変更してコネクターを再起動します。コネクターを起動するための設定例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow