5.4. Debezium SQL Server コネクターのデプロイ
Debezium SQL Server コネクターをデプロイするには、コネクターファイルを Kafka Connect に追加し、コネクターを実行するカスタムコンテナーを作成してから、コネクター設定をコンテナーに追加します。Debezium SQL Server コネクターのデプロイに関する詳細は、以下を参照してください。
5.4.1. Debezium SQL Server コネクターのデプロイ
Debezium SQL Server コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、以下のカスタムリソース (CR) を作成する必要があります。
-
Kafka Connect インスタンスを定義する
KafkaConnect
CR。image
は Debezium コネクターを実行するために作成したイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる operator およびイメージを提供します。 -
Debezium SQL Server コネクターを定義する
KafkaConnector
CR。この CR をKafkaConnect
CR を適用するのと同じ OpenShift インスタンスに適用します。
前提条件
- SQL Server が稼働し、Debezium コネクターと連携するように SQL Server を設定する手順 が完了済みである必要があります。
- AMQ Streams が OpenShift にデプロイされ、Apache Kafka および Kafka Connect を実行している。詳細は、OpenShift に Debezium をインストールする を参照してください。
- Podman または Docker がインストールされている。
-
Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (
quay.io
やdocker.io
など) でコンテナーを作成および管理するアカウントとパーミッションを持っている。
手順
Kafka Connect の Debezium SQL Server コンテナーを作成します。
- Debezium SQL Server コネクターアーカイブ をダウンロードします。
Debezium SQL Server コネクターアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。
./my-plugins/ ├── debezium-connector-sqlserver │ ├── ...
registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0
をベースイメージとして使用する Docker ファイルを作成します。たとえば、ターミナルウィンドウから以下のコマンドを入力します。my-plugins
はプラグインディレクトリーの名前に置き換えます。cat <<EOF >debezium-container-for-sqlserver.yaml 1 FROM registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0 USER root:root COPY ./<my-plugins>/ /opt/kafka/plugins/ 2 USER 1001 EOF
このコマンドは、現在のディレクトリーに
debezium-container-for-sqlserver.yaml
という名前の Docker ファイルを作成します。前のステップで作成した
debezium-container-for-sqlserver.yaml
Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。podman build -t debezium-container-for-sqlserver:latest .
docker build -t debezium-container-for-sqlserver:latest .
上記のコマンドは、
debezium-container-for-sqlserver
という名前のコンテナーイメージを構築します。カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。
podman push <myregistry.io>/debezium-container-for-sqlserver:latest
docker push <myregistry.io>/debezium-container-for-sqlserver:latest
新しい Debezium SQL Server KafkaConnect カスタムリソース (CR) を作成します。たとえば、以下の例のように
annotations
とimage
プロパティーを指定するdbz-connect.yaml
という名前の KafkaConnect CR を作成します。apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: #... image: debezium-container-for-sqlserver 2
以下のコマンドを入力して、
KafkaConnect
CR を OpenShift Kafka Connect 環境に適用します。oc create -f dbz-connect.yaml
このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。
Debezium SQL Server コネクターインスタンスを設定する
KafkaConnector
カスタムリソースを作成します。通常、コネクターに使用できる設定プロパティーを使用して、
.yaml
ファイルに Debezium SQL Server コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。以下の例では、ポート
1433
で PostgreSQL サーバーホスト192.168.99.100
に接続する Debezium コネクターを設定します。このホストには、testDB
という名前のデータベース、名前がcustomers
というテーブルがあり、fulfillment
がサーバーの論理名です。SQL Server
fulfillment-connector.yaml
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: fulfillment-connector 1 labels: strimzi.io/cluster: my-connect-cluster annotations: strimzi.io/use-connector-resources: 'true' spec: class: io.debezium.connector.sqlserver.SqlServerConnector 2 config: database.hostname: 192.168.99.100 3 database.port: 1433 4 database.user: debezium 5 database.password: dbz 6 database.dbname: testDB 7 database.server.name: fullfullment 8 database.include.list: dbo.customers 9 database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 10 database.history.kafka.topic: dbhistory.fullfillment 11
表5.9 コネクター設定の説明 項目 説明 1
Kafka Connect サービスに登録する場合のコネクターの名前。
2
この SQL Server コネクタークラスの名前。
3
SQL Server インスタンスのアドレス。
4
SQL Server インスタンスのポート番号。
5
SQL Server ユーザーの名前。
6
SQL Server ユーザーのパスワード。
7
変更をキャプチャーするデータベースの名前。
8
namespace を形成する SQL Server インスタンス/クラスターの論理名で、コネクターが書き込む Kafka トピックの名前、Kafka Connect スキーマ名、および Arvo コンバーターが使用される場合に対応する Avro スキーマの namespace のすべてに使用されます。
9
Debezium が変更をキャプチャーする必要があるすべてのテーブルのリスト。
10
DDL ステートメントをデータベース履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。
11
コネクターが DDL ステートメントを書き、復元するデータベース履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。
Kafka Connect でコネクターインスタンスを作成します。たとえば、
KafkaConnector
リソースをfulfillment-connector.yaml
ファイルに保存した場合は、以下のコマンドを実行します。oc apply -f fulfillment-connector.yaml
上記のコマンドは
fulfillment-connector
を登録し、コネクターはKafkaConnector
CR に定義されているtestDB
データベースに対して実行を開始します。コネクターが作成され、起動されたことを確認します。
Kafka Connect ログ出力を表示して、コネクターが作成され、指定データベースの変更のキャプチャーが開始されたことを確認します。
oc logs $(oc get pods -o name -l strimzi.io/cluster=my-connect-cluster)
ログの出力を確認し、Debezium により初回のスナップショットが実行されたことを確認します。ログには、以下のメッセージと同様の出力が表示されます。
... INFO Starting snapshot for ... ... INFO Snapshot is using user 'debezium' ...
コネクターがエラーがなく正常に起動すると、コネクターが変更をキャプチャーする各テーブルのトピックが作成されます。CR のサンプルでは、
include.list
プロパティーに指定されたテーブルのトピックがあります。ダウンストリームアプリケーションは、これらのトピックをサブスクライブできます。以下のコマンドを実行して、コネクターによってトピックが作成されたことを検証します。
oc get kafkatopics
Debezium SQL Server コネクターに設定できる設定プロパティーの完全リストは SQL Server コネクタープロパティー を参照してください。
結果
コネクターが起動すると、コネクターが設定された SQL Server データベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。