8.4. Debezium SQL Server コネクターのデプロイ
以下の方法のいずれかを使用して Debezium SQL Server コネクターをデプロイできます。
8.4.1. AMQ Streams を使用した SQL Server コネクターデプロイメント
Debezium 1.7 以降、Debezium コネクターのデプロイに推奨される方法は、AMQ Streams を使用してコネクタープラグインが含まれる Kafka Connect コンテナーイメージをビルドすることです。
デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。
-
Kafka Connect インスタンスを定義し、コネクターアーティファクトに関する情報をイメージに含める必要がある
KafkaConnect
CR。 -
コネクターがソースデータベースにアクセスするために使用する情報を提供する
KafkaConnector
CR。AMQStreams が Kafka Connect Pod を開始し、KafkaConnector
CR を適用してコネクターを開始します。
Kafka Connect イメージのビルド仕様では、デプロイ可能なコネクターを指定できます。各コネクタープラグインに対して、デプロイメントに利用可能にする他のコンポーネントを指定することもできます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。AMQ Streams が Kafka Connect イメージをビルドすると、指定のアーティファクトをダウンロードし、イメージに組み込みます。
Kafka Connect CR の spec.build.output
パラメーターは、生成される KafkaConnect
コンテナーイメージを格納する場所を指定します。コンテナーイメージは Docker レジストリーまたは OpenShift ImageStream に保存できます。イメージを ImageStream に保存するには、Kafka Connect をデプロイする前に ImageStream を作成する必要があります。イメージストリームは自動的に作成されません。
KafkaConnect
リソースを使用してクラスターを作成する場合は、Kafka Connect REST API を使用してコネクターを作成または更新できません。ただし、REST API を使用して情報を取得できます。
関連情報
- AMQ Streams on OpenShift の使用のKafka Connect の設定を参照してください。
- AMQ Streams を使用した新しいコンテナーイメージの自動作成と OpenShift での AMQ Streams のアップグレード
8.4.2. AMQ Streams を使用した Debezium SQL Server コネクターのデプロイ
以前のバージョンの AMQ Streams では、OpenShift に Debezium コネクターをデプロイするには、最初にコネクター用の Kafka Connect イメージをビルドする必要がありました。コネクターを OpenShift にデプロイする場合に現在推奨される方法は、AMQ Streams でビルド設定を使用して、使用する Debezium コネクタープラグインが含まれる Kafka Connect コンテナーイメージを自動的にビルドすることです。
ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect
カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーまたは別の設定済みの HTTP サーバーから必要なアーティファクトをダウンロードします。新規に作成されたコンテナーは .spec.build.output
で指定されたコンテナーレジストリーにプッシュされ、Kafka Connect Pod のデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドしたら、KafkaConnector
カスタムリソースを作成し、ビルドに含まれるコネクターを起動します。
前提条件
- クラスター Operator がインストールされている OpenShift クラスターにアクセスできる。
- AMQ Streams Operator が稼働している。
- Kafka クラスターは、Apache Open Shift での AMQ ストリームのデプロイとアップグレードに記載されているようにデプロイされます。
- Red Hat Integration ライセンスがある。
- Kafka Connect is deployed on AMQ Streams。
-
OpenShift
oc
CLI クライアントがインストールされている、または OpenShift Container Platform Web コンソールにアクセスできる。 Kafka Connect ビルドイメージの保存方法に応じて、レジストリーのパーミッションを用意するか、ImageStream リソースを作成している。
- ビルドイメージを Red Hat Quay.io または Docker Hub などのイメージレジストリーに保存する場合は、以下が必要です。
- レジストリーでイメージを作成し、管理するためのアカウントおよびパーミッション
- ビルドイメージをネイティブ OpenShift ImageStream として保存する場合は、以下が必要です。
- ImageStream リソースがクラスターにデプロイされている。クラスターの ImageStream を明示的に作成している。ImageStreams はデフォルトでは利用できません。
手順
- OpenShift クラスターにログインします。
コネクターの Debezium
KafkaConnect
カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、以下の例のようにmetadata.annotations
およびspec.build
プロパティーを指定するKafkaConnect
CR を作成します。dbz-connect.yaml
などの名前でファイルを保存します。例8.1 Debezium コネクターを含む
KafkaConnect
カスタムリソースを定義するdbz-connect.yaml
ファイルapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: debezium-kafka-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: version: 3.00 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-sqlserver artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-sqlserver/1.7.2.Final-redhat-<build_number>/debezium-connector-sqlserver-1.7.2.Final-redhat-<build_number>-plugin.zip 7 - type: zip url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.0-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.0-redhat-<build-number>.zip - type: zip url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/1.7.2.Final/debezium-scripting-1.7.2.Final.zip bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
表8.13 Kafka Connect 設定の説明 項目 説明 1
strimzi.io/use-connector-resources
アノテーションを"true"
に設定して、クラスター Operator がKafkaConnector
リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。2
spec.build
設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。3
build.output
は、新しくビルドされたイメージを保存するレジストリーを指定します。4
イメージ出力の名前およびイメージ名を指定します。
output.type
の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合はdocker
、内部の OpenShift ImageStream にイメージをプッシュする場合はimagestream
です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定でbuild.output
の指定に関する詳細は、AMQ Streams Build スキーマ参照 のドキュメントを参照 してください。5
plugins
設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグインname
と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。6
artifacts.type
の値は、artifacts.url
で指定するアーティファクトのファイルタイプを指定します。有効なタイプはzip
、tgz
、またはjar
です。Debezium コネクターアーカイブは、.zip
ファイル形式で提供されます。JDBC ドライバーファイルは.jar
形式です。type
の値は、url
フィールドで参照されるファイルのタイプと一致する必要があります。7
artifacts.url
の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。以下のコマンドを入力して、
KafkaConnect
ビルド仕様を OpenShift クラスターに適用します。oc create -f dbz-connect.yaml
Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリスト表示されているコネクターアーティファクトはクラスターで利用できます。KafkaConnector
リソースを作成し、デプロイする各コネクターのインスタンスを定義します。
たとえば、以下のKafkaConnector
CR を作成し、sqlserver-inventory-connector.yaml
として保存します。例8.2 Debezium コネクターの
KafkaConnector
カスタムリソースを定義するsqlserver-inventory-connector.yaml
ファイルapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: debezium-kafka-connect-cluster name: inventory-connector-sqlserver 1 spec: class: io.debezium.connector.sqlserver.SqlServerConnector 2 tasksMax: 1 3 config: 4 database.history.kafka.bootstrap.servers: 'debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092' database.history.kafka.topic: schema-changes.inventory database.hostname: sqlserver.debezium-sqlserver.svc.cluster.local 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.dbname: mydatabase 9 database.server.name: inventory_connector_sqlserver 10 database.include.list: public.inventory 11
表8.14 コネクター設定の説明 項目 説明 1
Kafka Connect クラスターに登録するコネクターの名前。
2
コネクタークラスの名前。
3
同時に動作できるタスクの数。
4
コネクターの設定。
5
ホストデータベースインスタンスのアドレス。
6
データベースインスタンスのポート番号。
7
Debezium がデータベースに接続するユーザーアカウントの名前。
8
データベースユーザーアカウントのパスワード
9
変更をキャプチャーするデータベースの名前。
10
データベースインスタンスまたはクラスターの論理名。
指定の名前は英数字またはアンダースコアからのみ形成する必要があります。
論理名は、このコネクターから変更イベントを受信する Kafka トピックの接頭辞として使用されるため、名前はクラスターのコネクター間で一意である必要があります。
コネクターを Avro コネクターと統合する場合、名前空間は関連する Kafka Connect スキーマの名前や、対応する Avro スキーマの名前空間でも使用されます。11
コネクターが変更イベントをキャプチャーするテーブルのリスト。
以下のコマンドを実行してコネクターリソースを作成します。
oc create -n <namespace> -f <kafkaConnector>.yaml
以下に例を示します。
oc create -n debezium -f {context}-inventory-connector.yaml
コネクターは Kafka Connect クラスターに登録され、
KafkaConnector
CR のspec.config.database.dbname
で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。
これで、Debezium SQL ディプロイメントを検証する 準備が整いました。
8.4.3. Dockerfile からカスタム Kafka Connect コンテナーイメージをビルドして Debezium SQL Server コネクターのデプロイ
Debezium SQL Server コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnect
CR。image
は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる operator およびイメージを提供します。 -
Debezium SQL Server コネクターを定義する
KafkaConnector
CR。この CR をKafkaConnect
CR を適用するのと同じ OpenShift インスタンスに適用します。
前提条件
- SQL Server が稼働し、Debezium コネクターと連携するように SQL Server を設定する手順 が完了済みである必要があります。
- AMQ Streams は OpenShift にデプロイされ、Apache Kafka および Kafka Connect が稼働している必要があります。詳細は、Deploying and Upgrading AMQ Streams on OpenShift を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.io
やdocker.io
など) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect の Debezium SQL Server コンテナーを作成します。
- Debezium SQL Server コネクターアーカイブ をダウンロードします。
Debezium SQL Server コネクターアーカイブをデプロイメントして、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。
./my-plugins/ ├── debezium-connector-sqlserver │ ├── ...
registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0
をベースイメージとして使用して、新規の Dockerfile を作成します。たとえば、ターミナルウィンドウから以下のコマンドを入力します。my-plugins
はプラグインディレクトリーの名前に置き換えます。cat <<EOF >debezium-container-for-sqlserver.yaml 1 FROM registry.redhat.io/amq7/amq-streams-kafka-30-rhel8:2.0.0 USER root:root COPY ./<my-plugins>/ /opt/kafka/plugins/ 2 USER 1001 EOF
このコマンドは、現在のディレクトリーに
debezium-container-for-sqlserver.yaml
という名前の Dockerfile を作成します。前のステップで作成した
debezium-container-for-sqlserver.yaml
Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-sqlserver:latest .
docker build -t debezium-container-for-sqlserver:latest .
上記のコマンドは、
debezium-container-for-sqlserver
という名前のコンテナーイメージを構築します。カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。
podman push <myregistry.io>/debezium-container-for-sqlserver:latest
docker push <myregistry.io>/debezium-container-for-sqlserver:latest
新しい Debezium SQL Server KafkaConnect カスタムリソース (CR) を作成します。たとえば、以下の例のように
annotations
とimage
プロパティーを指定するdbz-connect.yaml
という名前の KafkaConnect CR を作成します。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: #... image: debezium-container-for-sqlserver 2
以下のコマンドを入力して、
KafkaConnect
CR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium SQL Server コネクターインスタンスを設定する
KafkaConnector
カスタムリソースを作成します。通常、コネクターに使用できる設定プロパティーを使用して、
.yaml
ファイルに Debezium SQL Server コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。以下の例では、ポート
1433
で PostgreSQL サーバーホスト192.168.99.100
に接続する Debezium コネクターを設定します。このホストには、testDB
という名前のデータベース、名前がcustomers
というテーブルがあり、fulfillment
がサーバーの論理名です。SQL Server
fulfillment-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: fulfillment-connector 1 labels: strimzi.io/cluster: my-connect-cluster annotations: strimzi.io/use-connector-resources: 'true' spec: class: io.debezium.connector.sqlserver.SqlServerConnector 2 config: database.hostname: 192.168.99.100 3 database.port: 1433 4 database.user: debezium 5 database.password: dbz 6 database.dbname: testDB 7 database.server.name: fullfullment 8 database.include.list: dbo.customers 9 database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 10 database.history.kafka.topic: dbhistory.fullfillment 11
表8.15 コネクター設定の説明 項目 説明 1
Kafka Connect サービスに登録する場合のコネクターの名前。
2
この SQL Server コネクタークラスの名前。
3
SQL Server インスタンスのアドレス。
4
SQL Server インスタンスのポート番号。
5
SQL Server ユーザーの名前。
6
SQL Server ユーザーのパスワード。
7
変更をキャプチャーするデータベースの名前。
8
namespace を形成する SQL Server インスタンス/クラスターの論理名で、コネクターが書き込む Kafka トピックの名前、Kafka Connect スキーマ名、および Arvo コンバーターが使用される場合に対応する Avro スキーマの namespace のすべてに使用されます。
9
Debezium が変更をキャプチャーする必要があるすべてのテーブルのリスト。
10
DDL ステートメントをデータベース履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。
11
コネクターが DDL ステートメントを書き、復元するデータベース履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnector
リソースをfulfillment-connector.yaml
ファイルに保存した場合は、以下のコマンドを実行します。oc apply -f fulfillment-connector.yaml
上記のコマンドは
fulfillment-connector
を登録し、コネクターはKafkaConnector
CR に定義されているtestDB
データベースに対して実行を開始します。
Debezium SQL Server コネクターが実行していることの確認
コネクターがエラーなしで正常に起動すると、コネクターがキャプチャーするように設定された各テーブルのトピックが作成されます。ダウンストリームアプリケーションは、これらのトピックをサブスクライブして、ソースデータベースで発生する情報イベントを取得できます。
コネクターが実行されていることを確認するには、OpenShift Container Platform Web コンソールまたは OpenShift CLI ツール (oc) から以下の操作を実行します。
- コネクターのステータスを確認します。
- コネクターがトピックを生成していることを確認します。
- 各テーブルの最初のスナップショットの実行中にコネクターが生成する読み取り操作 ("op":"r") のイベントがトピックに反映されていることを確認します。
前提条件
- Debezium コネクターが AMQ Streams on OpenShift にデプロイされている。
-
OpenShift
oc
CLI クライアントがインストールされている。 - OpenShift Container Platform Web コンソールにアクセスできる。
手順
以下の方法のいずれかを使用して
KafkaConnector
リソースのステータスを確認します。OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaConnector
を入力します。 - KafkaConnectors リストから確認するコネクターの名前をクリックします (例: inventory-connector-sqlserver)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを入力します。
oc describe KafkaConnector <connector-name> -n <project>
以下に例を示します。
oc describe KafkaConnector inventory-connector-sqlserver -n debezium
このコマンドは、以下の出力のようなステータス情報を返します。
例8.3
KafkaConnector
リソースのステータスName: inventory-connector-sqlserver Namespace: debezium Labels: strimzi.io/cluster=debezium-kafka-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector-sqlserver Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: inventory_connector_sqlserver inventory_connector_sqlserver.inventory.addresses inventory_connector_sqlserver.inventory.customers inventory_connector_sqlserver.inventory.geom inventory_connector_sqlserver.inventory.orders inventory_connector_sqlserver.inventory.products inventory_connector_sqlserver.inventory.products_on_hand Events: <none>
コネクターによって Kafka トピックが作成されたことを確認します。
OpenShift Container Platform Web コンソールから以下を実行します。
-
Home
Search に移動します。 -
Search ページで Resources をクリックし、Select Resource ボックスを開き、
KafkaTopic
を入力します。 - KafkaTopics リストから確認するトピックの名前をクリックします (例: inventory-connector-sqlserver.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d)。
- Conditions セクションで、Type および Status 列の値が Ready および True に設定されていることを確認します。
-
Home
ターミナルウィンドウから以下を実行します。
以下のコマンドを入力します。
oc get kafkatopics
このコマンドは、以下の出力のようなステータス情報を返します。
例8.4
KafkaTopic
リソースのステータスNAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs debezium-kafka-cluster 1 1 True connect-cluster-offsets debezium-kafka-cluster 25 1 True connect-cluster-status debezium-kafka-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a debezium-kafka-cluster 50 1 True inventory-connector-sqlserver---a96f69b23d6118ff415f772679da623fbbb99421 debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.products---df0746db116844cee2297fab611c21b56f82dcef debezium-kafka-cluster 1 1 True inventory-connector-sqlserver.inventory.products-on-hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 debezium-kafka-cluster 1 1 True schema-changes.inventory debezium-kafka-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 debezium-kafka-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b debezium-kafka-cluster 1 1 True
トピックの内容を確認します。
- ターミナルウィンドウから、以下のコマンドを入力します。
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
以下に例を示します。
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory_connector_sqlserver.inventory.products_on_hand
トピック名を指定する形式は、手順 1 で返された
oc describe
コマンドと同じです (例:inventory_connector_oracle.inventory.addresses
)。トピックの各イベントについて、このコマンドは、以下の出力のような情報を返します。
例8.5 Debezium 変更イベントの内容
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory_connector_sqlserver.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_sqlserver.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory_connector_sqlserver.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory_connector_sqlserver.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"1.7.2.Final-redhat-00001","connector":"sqlserver","name":"inventory_connector_sqlserver","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"sqlserver-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
上記の例では、
payload
値は、コネクタースナップショットがテーブルinventory.products_on_hand
から 読み込み (op" ="r"
) イベントを生成したことを示しています。product_id
レコードのbefore
状態はnull
であり、レコードに以前の値が存在しないことを示します。"after"
状態がproduct_id
101
で項目のquantity
を3
で示しています。
Debezium SQL Server コネクターに設定できる設定プロパティーの完全リストは SQL Server コネクタープロパティー を参照してください。
結果
コネクターが起動すると、コネクターが設定された SQL Server データベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。
8.4.4. Debezium SQL Server コネクター設定プロパティーの説明
Debezium SQL Server コネクターには、アプリケーションに適したコネクター動作を実現するために使用できる設定プロパティーが多数あります。多くのプロパティーにはデフォルト値があります。
プロパティーに関する情報は、以下のように設定されています。
- 必要なコネクター設定プロパティー
- 高度なコネクター設定プロパティー
Debezium がデータベース履歴トピックから読み取るイベントを処理する方法を制御する データベース履歴コネクター設定プロパティー。
- データベースドライバーの動作を制御する パススルーデータベースドライバープロパティー
Debezium SQL Server コネクター設定プロパティー (必須)
以下の設定プロパティーは、デフォルト値がない場合は必須です。
プロパティー | デフォルト | 説明 |
---|---|---|
デフォルトなし | コネクターの一意名。同じ名前で再登録を試みると失敗します。(このプロパティーはすべての Kafka Connect コネクターに必要です) | |
デフォルトなし |
コネクターの Java クラスの名前。SQL Server コネクターには、常に | |
| このコネクターのために作成する必要のあるタスクの最大数。SQL Server コネクターは常に単一のタスクを使用するため、この値を使用しません。そのため、デフォルト値は常に許容されます。 | |
デフォルトなし | SQL Server データベースサーバーの IP アドレスまたはホスト名。 | |
| SQL Server データベースサーバーのポート番号 (整数)。 | |
デフォルトなし | SQL Server データベースサーバーへの接続時に使用するユーザー名。 | |
デフォルトなし | SQL Server データベースサーバーへの接続時に使用するパスワード。 | |
デフォルトなし |
変更内容をストリーミングする SQL Server データベースの名前は、 | |
デフォルトなし |
変更をストリームする SQL Server データベースの名前をコンマで区切って指定します。現在は、1 つのデータベース名のみサポートしています。 このオプションは実験的なもので、本番では使用しないでください。これを使用すると、コネクターの動作がデフォルトの設定と互換性がなく、アップグレードやダウングレードのパスがありません。
| |
デフォルトなし | Debezium がキャプチャーする SQL Server データベースサーバーの namespace を識別および提供する論理名。論理名は、他のコネクター全体で一意となる必要があります。これは、このコネクターから生成されるすべての Kafka トピック名の接頭辞として使用されるためです。使用できる文字は、英数字、ハイフン、ドット、アンダースコアのみです。 | |
デフォルトなし |
Debezium がキャプチャーするテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。 | |
デフォルトなし |
キャプチャーから除外するテーブルの完全修飾テーブル識別子と一致する正規表現のコンマ区切りリスト (任意)。Debezium は | |
空の文字列 |
変更イベントメッセージの値に含まれる必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。プライマリーキー列は、値に含まれていない場合でもイベントのキーに常に含まれることに注意してください。また、 | |
空の文字列 |
変更イベントメッセージの値から除外される必要がある列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は schemaName.tableName.columnName です。プライマリーキー列は、値から除外される場合でもイベントのキーに常に含まれることに注意してください。また、 | |
該当なし |
文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。列の完全修飾名の形式は `<schemaName>.<tableName>._<columnName>` です。作成された変更イベントレコードでは、指定された列の値は仮名に置き換えられます。
仮名は、指定された hashAlgorithm と salt を適用すると得られるハッシュ化された値で設定されます。使用されるハッシュ関数に基づいて、参照整合性は維持され、列値は仮名に置き換えられます。サポートされるハッシュ関数は、Java Cryptography Architecture Standard Algorithm Name Documentation の MessageDigest section に説明されています。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
必要な場合は、仮名は自動的に列の長さに短縮されます。コネクター設定には、異なるハッシュアルゴリズムと salt を指定する複数のプロパティーを含めることができます。 | |
|
時間、日付、およびタイムスタンプは、異なる精度の種類で表すことができます。 | |
|
コネクターによる | |
|
コネクターがデータベーススキーマの変更を、データベースサーバー ID と同じ名前の Kafka トピックに公開するかどうかを指定するブール値。各スキーマの変更は、データベース名が含まれるキーと、スキーマ更新を記述する JSON 構造である値で記録されます。これは、コネクターがデータベース履歴を内部で記録する方法には依存しません。デフォルトは | |
|
削除 イベントの後に廃棄 (tombstone) イベントが続くかどうかを制御します。 | |
該当なし | フィールド値が指定された文字数より長い場合に、変更イベントメッセージ値で値を省略する必要がある文字ベースの列の完全修飾名と一致する正規表現のコンマ区切りリスト (任意)。長さが異なる複数のプロパティーを単一の設定で使用できますが、それぞれの長さは正の整数である必要があります。列の完全修飾名の形式は schemaName.tableName.columnName です。 | |
該当なし |
文字ベースの列の完全修飾名にマッチする正規表現のコンマ区切りリスト (オプション) で、変更イベントメッセージの値を、指定された数のアスタリスク ( | |
該当なし |
出力された変更メッセージの該当するフィールドスキーマに元の型および長さをパラメーターとして追加する必要がある列の完全修飾名と一致する、正規表現のコンマ区切りリスト (任意)。スキーマパラメーター ( | |
該当なし |
出力された変更メッセージフィールドスキーマに元の型および長さをパラメーターとして追加する必要がある列のデータベース固有のデータ型名と一致する、正規表現のコンマ区切りリスト (任意)。スキーマパラメーター ( | |
該当なし | 指定のテーブルの Kafka トピックに公開する変更イベントレコードのカスタムメッセージキーを形成するためにコネクターが使用する列を指定する式のリスト。
デフォルトでは、Debezium はテーブルのプライマリーキー列を、出力するレコードのメッセージキーとして使用します。デフォルトの代わりに、またはプライマリーキーのないテーブルのキーを指定するには、1 つ以上の列をもとにカスタムメッセージキーを設定できます。
各完全修飾テーブル名は、以下の形式の正規表現です。 カスタムメッセージキーの作成に使用する列の数に制限はありません。ただし、一意の鍵を指定するために必要な最小数を使用することが推奨されます。 | |
bytes |
バイナリー ( |
高度な SQL Server コネクター設定プロパティー
以下の 高度な 設定プロパティーには、ほとんどの状況で機能する適切なデフォルト設定があるため、コネクターの設定で指定する必要はほとんどありません。
プロパティー | デフォルト | 説明 |
---|---|---|
Initial | キャプチャーされたテーブルの構造 (および必要に応じてデータ) の最初のスナップショットを作成するモード。スナップショットが完了すると、コネクターはデータベースのやり直し (redo) ログから変更イベントの読み取りを続行します。以下の値を使用できます。
| |
|
スナップショットに含めるテーブルの完全修飾名 ( このプロパティーは増分スナップショットの動作には影響しません。 | |
repeatable_read | 使用されるトランザクション分離レベルと、キャプチャー用に指定されたテーブルをコネクターがロックする期間を制御するモード。以下の値を使用できます。
モードの選択は、データの整合性にも影響します。 | |
|
イベントの処理中にコネクターが例外に対応する方法を指定します。 | |
| 各反復処理の実行中に新しい変更イベントが表示されるまでコネクターが待機する時間 (ミリ秒単位) を指定する正の整数値。デフォルトは 1000 ミリ秒 (1 秒) です。 | |
|
データベースログから読み取られた変更イベントが Kafka に書き込まれる前に配置される、ブロッキングキューの最大サイズを指定する正の整数値。このキューは、Kafka への書き込みが遅い場合や Kafka が利用できない場合などに、CDC テーブルリーダーにバックプレシャーを提供できます。キューに発生するイベントは、このコネクターによって定期的に記録されるオフセットには含まれません。デフォルトは 8192 で、常に | |
| このコネクターの反復処理中に処理される必要があるイベントの各バッチの最大サイズを指定する正の整数値。デフォルトは 2048 です。 | |
|
ハートビートメッセージが送信される頻度を制御します。 | |
|
ハートビートメッセージが送信されるトピックの命名を制御します。 | |
デフォルトなし |
コネクターの起動後、スナップショットを取得するまで待機する間隔 (ミリ秒単位)。 | |
| スナップショットの実行中に各テーブルから 1 度に読み取る必要がある行の最大数を指定します。コネクターは、このサイズの複数のバッチでテーブルの内容を読み取ります。デフォルトは 2000 です。 | |
デフォルトなし | 指定のクエリーのデータベースのラウンドトリップごとにフェッチされる行の数を指定します。デフォルトは、JDBC ドライバーのデフォルトのフェッチサイズです。 | |
|
スナップショットの実行時に、テーブルロックを取得するまで待つ最大時間 (ミリ秒単位) を指定する整数値。この時間間隔でテーブルロックを取得できないと、スナップショットは失敗します (スナップショット も参照してください)。 | |
デフォルトなし | スナップショットに追加するテーブル行を指定します。スナップショットにテーブルの行のサブセットのみを含める場合は、プロパティーを使用します。このプロパティーはスナップショットにのみ影響します。コネクターがログから読み取るイベントには影響しません。
プロパティーには、
スナップショットにソフト削除以外のレコードのみを含める場合は、soft-delete 列 ( "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"
作成されるスナップショットでは、コネクターには | |
コネクター設定が、Avro を使用するように | Avro の命名要件に準拠するためにフィールド名がサニタイズされるかどうか。 | |
|
詳細は、トランザクションメタデータ を参照してください。 | |
10000 (10 秒) | 再試行可能なエラーが発生した後にコネクターを再起動するまで待機する時間 (ミリ秒単位)。 | |
デフォルトなし |
ストリーミング中にスキップされる操作タイプのコンマ区切りリスト。操作には、 | |
デフォルト値なし |
シグナルをコネクターに送信するために使用されるデータコレクションの完全修飾名 。 シグナル機能はテクノロジープレビュー機能です。 | |
| 増分スナップショットのチャンクの実行中にコネクターがメモリーを取得して読み取る行の最大数。スナップショットは、サイズが大きいスナップショットの場合にはクエリーが少なくなるため、チャンクサイズを増やすと効率が上がります。ただし、チャンクサイズが大きい場合には、スナップショットデータのバッファーにより多くのメモリーが必要になります。チャンクサイズは、環境で最適なパフォーマンスを発揮できる値に、調整します。 | |
0 |
データベースの複数のテーブルからの変更をストリーミングする際に、メモリーの使用量を削減するために使用する、反復ごとの最大トランザクション数を指定します。 |
Debezium SQL Server コネクターデータベース履歴設定プロパティー
Debezium には、コネクターがスキーマ履歴トピックと対話する方法を制御する database.history.*
プロパティーのセットが含まれています。
以下の表は、Debezium コネクターを設定するための database.history
プロパティーについて説明しています。
プロパティー | デフォルト | 説明 |
---|---|---|
コネクターがデータベーススキーマの履歴を保存する Kafka トピックの完全名。 | ||
Kafka クラスターへの最初の接続を確立するために コネクターが使用するホストとポートのペアのリスト。このコネクションは、コネクターによって以前に保存されたデータベーススキーマ履歴の取得や、ソースデータベースから読み取られる各 DDL ステートメントの書き込みに使用されます。各ペアは、Kafka Connect プロセスによって使用される同じ Kafka クラスターを示す必要があります。 | ||
| 永続化されたデータのポーリングが行われている間にコネクターが起動/回復を待つ最大時間 (ミリ秒単位) を指定する整数値。デフォルトは 100 ミリ秒です。 | |
|
エラーでコネクターのリカバリーが失敗する前に、コネクターが永続化された履歴データの読み取りを試行する最大回数。データが受信されなかった場合に最大待機する時間は、 | |
|
コネクターが不正または不明なデータベースのステートメントを無視するかどうか、または人が問題を修正するために処理を停止するかどうかを指定するブール値。安全なデフォルトは | |
今後のリリースで非推奨になり、削除される予定です。代わりに |
|
コネクターがすべての DDL ステートメントを記録するかどうかを指定するブール値
安全なデフォルトは |
|
コネクターがすべての DDL ステートメントを記録するかどうかを指定するブール値
安全なデフォルトは |
プロデューサーおよびコンシューマークライアントを設定するためのパススルーデータベース履歴プロパティー
Debezium は、Kafka プロデューサーを使用して、データベース履歴トピックにスキーマの変更を書き込みます。同様に、コネクターが起動すると、データベース履歴トピックから読み取る Kafka コンシューマーに依存します。database.history.producer.*
および database.history.consumer.*
接頭辞で始まるパススルー設定プロパティーのセットに値を割り当てて、Kafka プロデューサーおよびコンシューマークライアントの設定を定義します。パススループロデューサーおよびコンシューマーデータベース履歴プロパティーは、以下の例のように Kafka ブローカーとのこれらのクライアントの接続をセキュアにする方法など、さまざまな動作を制御します。
database.history.producer.security.protocol=SSL database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks database.history.producer.ssl.keystore.password=test1234 database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks database.history.producer.ssl.truststore.password=test1234 database.history.producer.ssl.key.password=test1234 database.history.consumer.security.protocol=SSL database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks database.history.consumer.ssl.keystore.password=test1234 database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks database.history.consumer.ssl.truststore.password=test1234 database.history.consumer.ssl.key.password=test1234
Debezium は、プロパティーを Kafka クライアントに渡す前に、プロパティー名から接頭辞を削除します。
Kafka プロデューサー設定プロパティー および Kafka コンシューマー設定プロパティーの詳細は、Kafka のドキュメントを参照してください。
Debezium SQL Server コネクターパススルーデータベースドライバー設定プロパティー
Debezium コネクターでは、データベースドライバーのパススルー設定が可能です。パススルーデータベースプロパティーは、接頭辞 database.*
で始まります。たとえば、コネクターは database.foobar=false
などのプロパティーを JDBC URL に渡します。
データベース履歴クライアントのパススループロパティー の場合のように、Debezium はプロパティーから接頭辞を削除してからデータベースドライバーに渡します。