第12章 アプリケーション用 Debezium コネクターの設定
Debezium コネクターのデフォルトの動作がアプリケーションに適していない場合、以下の Debezium 機能を使用して必要な動作を設定することができます。
- Kafka Connect 自動トピック作成
- Connect が実行時にトピックを作成し、トピックの名前に基づいて設定をトピックに適用するのを許可します。
- Avro シリアライゼーション
- Debezium PostgreSQL、MongoDB、または SQL Server コネクターが Avro を使用してメッセージのキーと値をシリアライズする設定をサポートします。これにより、変更イベントレコードのコンシューマーがレコードスキーマの変更に容易に適応できるようにします。
- xref:configuring-notifications-to-report-connector-status
- 設定可能なチャネルセットを介してコネクターのステータス情報を公開するメカニズムを提供します。
- CloudEvents コンバーター
- Debezium コネクターが CloudEvents 仕様に準拠する変更イベントレコードを出力できるようにします。
- Debezium コネクターへのシグナル送信
- コネクターの動作を変更したり、アドホックスナップショットの開始などのアクションをトリガーしたりする方法を提供します。
12.1. Kafka Connect 自動トピック作成のカスタマイズ
Kafka には、トピックを自動的に作成するメカニズムが 2 つ用意されています。Kafka ブローカーの自動トピック作成を有効にすることができます。また、Kafka 2.6.0 以降では、Kafka Connect のトピック作成を有効にすることもできます。Kafka ブローカーは、auto.create.topics.enable
プロパティーを使用してトピックの自動作成を制御します。Kafka Connect では、topic.creation.enable
プロパティーで、Kafka Connect がトピックを作成することを許可するかどうかを指定します。いずれの場合も、プロパティーのデフォルト設定ではトピックの自動作成が有効です。
トピックの自動作成が有効な場合、Debezium ソースコネクターがまだルーティング先トピックが存在しないテーブルの変更イベントレコードを出力すると、イベントレコードが Kafka に取り込まれる際にトピックが実行時に作成されます。
ブローカーと Kafka Connect での自動トピック作成の違い
ブローカーが作成するトピックは、1 つのデフォルト設定の共有に制限されます。ブローカーは、異なるトピックまたはトピックのセットに一意の設定を適用することはできません。対照的に、Kafka Connect では、トピックの作成時に任意のさまざまな設定を適用し、Debezium コネクター設定で指定したレプリケーション係数、パーティション数、およびその他のトピック固有の設定を定義することができます。コネクター設定はトピック作成グループのセットを定義し、トピック設定のプロパティーセットを各グループに関連付けます。
ブローカー設定と Kafka Connect 設定は、互いに独立しています。ブローカーでトピック作成を無効にしたかどうかに関係なく、Kafka Connect はトピックを作成することができます。ブローカーと Kafka Connect の両方でトピックの自動作成を有効にすると Connect の設定が優先され、Kafka Connect のいずれの設定も適用されない場合に限り、ブローカーはトピックを作成します。
詳細は、以下のトピックを参照してください。
12.1.1. Kafka ブローカーの自動トピック作成の無効化
デフォルトでは、トピックがまだ存在しない場合、Kafka ブローカー設定によりブローカーは実行時にトピックを作成することができます。ブローカーによって作成されたトピックにカスタムプロパティーを設定することはできません。2.6.0 より前のバージョンの Kafka を使用し、特定の設定でトピックを作成する場合は、ブローカーの自動トピック作成を無効にし、手動またはカスタムデプロイプロセスのいずれにより明示的にトピックを作成する必要があります。
手順
-
ブローカーの設定で、
auto.create.topics.enable
の値をfalse
にします。
12.1.2. Kafka Connect の自動トピック作成の設定
Kafka Connect でのトピックの自動作成は、topic.creation.enable
プロパティーによって制御されます。次の例に示すように、プロパティーのデフォルト値は true
であり、トピックの自動作成を有効にします。
topic.creation.enable = true
topic.creation.enable
プロパティーの設定は、Connect クラスター内のすべてのワーカーに適用されます。
Kafka Connect の自動トピック作成では、トピックの作成時に Kafka Connect が適用する設定プロパティーを定義する必要があります。トピックグループを定義して Debezium コネクター設定でトピックの設定プロパティーを指定し、続いてそれぞれのグループに適用するプロパティーを指定します。コネクター設定では、デフォルトのトピック作成グループ、およびオプションで 1 つまたは複数のカスタムトピック作成グループを定義します。カスタムトピック作成グループは、トピック名パターンのリストを使用してグループの設定が適用されるトピックを指定します。
Kafka Connect がどのようにトピックをトピック作成グループと照合するかの詳細は、トピック作成グループ を参照してください。設定プロパティーがどのようにグループに割り当てられるかの詳細は、トピック作成グループの設定プロパティー を参照してください。
デフォルトでは、Kafka Connect が作成するトピックは、パターンserver.schema.table
に基づいて名前が付けられます (例: dbserver.myschema.inventory
)。
手順
-
Kafka Connect がトピックを自動的に作成しないようにするには、次の例のように、
Kafka Connect
カスタムリソースで topic.creation.enable の値をfalse
に設定します。
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster ... spec: config: topic.creation.enable: "false"
Kafka Connect の自動トピック作成では、replication.factor
プロパティーと partitions
プロパティーを少なくとも default
のトピック作成グループに設定する必要があります。グループは、Kafka ブローカーのデフォルト値から必要なプロパティーの値を取得することができます。
12.1.3. 自動的に作成されたトピックの設定
Kafka Connect でトピックを自動的に作成するには、トピックの作成時に適用する設定プロパティーに関するソースコネクターからの情報が必要です。それぞれの Debezium コネクターの設定で、トピックの作成を制御するプロパティーを定義します。Kafka Connect がコネクターから出力されるイベントレコード用のトピックを作成すると、作成されるトピックは該当するグループから設定を取得します。この設定は、そのコネクターによって出力されたイベントレコードにのみ適用されます。
12.1.3.1. トピック作成グループ
トピックプロパティーのセットが、トピック作成グループに関連付けられます。少なくとも default
トピック作成グループを定義し、その設定プロパティーを指定する必要があります。それ以外に、オプションとして 1 つまたは複数のカスタムトピック作成グループを定義し、それぞれに一意のプロパティーを指定することができます。
カスタムトピック作成グループを作成する場合、トピック名パターンに基づいて各グループにメンバートピックを定義します。各グループに含めるトピックまたはグループから除外するトピックを記述する命名パターンを指定することができます。include
および exclude
プロパティーには、トピック名パターンを定義する正規表現のコンマ区切りリストを指定します。たとえば、文字列 dbserver1.inventory
で始まるすべてのトピックをグループに含める場合は、その topic.creation.inventory.include
プロパティーの値をdbserver1\\.inventory\\.*
に設定します。
カスタムトピックグループに include
および exclude
プロパティーの両方を指定すると、除外ルールが優先され包含ルールがオーバーライドされます。
12.1.3.2. トピック作成グループの設定プロパティー
default
トピック作成グループおよびそれぞれのカスタムグループは、一意の設定プロパティーのセットに関連付けられます。任意の Kafka トピックレベルの設定プロパティー をグループの設定に含めることができます。たとえば、トピックグループに 古いトピックセグメントのクリーンアップポリシー、保持時間、または トピックの圧縮タイプ を指定することができます。少なくとも、作成するトピックの設定を記述するプロパティーの最小セットを定義する必要があります。
カスタムグループが登録されていない場合、または登録されているグループの include
パターンが作成するトピックの名前とマッチしない場合、Kafka Connect は default
グループの設定を使用してトピックを作成します。
トピック設定についての概要は、Debezium の OpenShift へのインストールの Kafka トピック作成に関する推奨事項 を参照してください。
12.1.3.3. Debezium デフォルトトピック作成グループ設定の指定
Kafka Connect の自動トピック作成を使用するためには、デフォルトのトピック作成グループを作成し、その設定を定義する必要があります。デフォルトのトピック作成グループの設定は、カスタムトピック作成グループの include
リストのパターンにマッチしない名前のすべてのトピックに適用されます。
前提条件
Kafka Connect のカスタムリソースで、
metadata.annotations
のuse-connector-resources
の値により、クラスターの Operator が KafkaConnector カスタムリソースを使用してクラスター内のコネクターを設定するように指定されている。以下に例を示します。... metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" ...
手順
topic.creation.default
グループのプロパティーを定義するには、以下の例に示すように、コネクターのカスタムリソースのspec.config
にプロパティーを追加します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector labels: strimzi.io/cluster: my-connect-cluster spec: ... config: ... topic.creation.default.replication.factor: 3 1 topic.creation.default.partitions: 10 2 topic.creation.default.cleanup.policy: compact 3 topic.creation.default.compression.type: lz4 4 ...
任意の Kafka トピックレベルの設定プロパティー を
default
グループの設定に含めることができます。
項目 | 説明 |
---|---|
1 |
|
2 |
|
3 |
|
4 |
|
カスタムグループは、必要なreplication.factor
および partitions
プロパティーのみに対して、default
グループの設定が戻ります。カスタムトピックグループ設定の他のプロパティーが定義されていない場合、default
グループで指定された値は適用されません。
12.1.3.4. Debezium カスタムトピック作成グループ設定の指定
複数のカスタムトピックグループを、それぞれ個別の設定で定義することができます。
手順
カスタムトピックグループを定義するには、コネクターのカスタムリソースの
spec.config
にtopic.creation.<group_name>.include
プロパティーを追加し、続いてカスタムグループのトピックに適用する設定プロパティーを定義します。次の例では、カスタムトピック作成グループ
inventory
とapplicationlogs
を定義するカスタムリソースの抜粋を示しています。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: ... 1 topic.creation.inventory.include: dbserver1\\.inventory\\.* 2 topic.creation.inventory.partitions: 20 topic.creation.inventory.cleanup.policy: compact topic.creation.inventory.delete.retention.ms: 7776000000 3 topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* 4 topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.* 5 topic.creation.applicationlogs.replication.factor: 1 topic.creation.applicationlogs.partitions: 20 topic.creation.applicationlogs.cleanup.policy: delete topic.creation.applicationlogs.retention.ms: 7776000000 topic.creation.applicationlogs.compression.type: lz4 ... ...
項目 | 説明 |
---|---|
1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
12.1.3.5. Debezium カスタムトピック作成グループの登録
カスタムトピック作成グループの設定を指定したら、グループを登録します。
手順
カスタムグループを登録するには、コネクターのカスタムリソースに
topic.creation.groups
プロパティーを追加し、カスタムトピック作成グループをコンマで区切って指定します。カスタムトピック作成グループ
inventory
とapplicationlogs
を登録するコネクターカスタムリソースの抜粋を以下に示します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: topic.creation.groups: inventory,applicationlogs ...
設定の完了
default
トピックグループの設定に加えて inventory
および applicationlogs
カスタムトピック作成グループの設定が含まれる完了した設定の例を以下に示します。
例: デフォルトのトピック作成グループおよび 2 つのカスタムグループの設定
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector ... spec: ... config: ... topic.creation.default.replication.factor: 3, topic.creation.default.partitions: 10, topic.creation.default.cleanup.policy: compact topic.creation.default.compression.type: lz4 topic.creation.groups: inventory,applicationlogs topic.creation.inventory.include: dbserver1\\.inventory\\.* topic.creation.inventory.partitions: 20 topic.creation.inventory.cleanup.policy: compact topic.creation.inventory.delete.retention.ms: 7776000000 topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.* topic.creation.applicationlogs.replication.factor: 1 topic.creation.applicationlogs.partitions: 20 topic.creation.applicationlogs.cleanup.policy: delete topic.creation.applicationlogs.retention.ms: 7776000000 topic.creation.applicationlogs.compression.type: lz4 ...