4.3. Debezium JDBC コネクターのデプロイメント
Debezium JDBC コネクターをデプロイするには、Debezium JDBC コネクターアーカイブをインストールし、コネクターを設定し、その設定を Kafka Connect に追加してコネクターを起動します。
前提条件
- Apache ZooKeeper、Apache Kafka、および Kafka Connect がインストールされている。
- 宛先データベースがインストールされ、JDBC 接続を受け入れるように設定されている。
手順
- Debezium JDBC connector plug-in archive をダウンロードします。
- ファイルを Kafka Connect 環境にデプロイメントします。
必要に応じて、Maven Central から JDBC ドライバーをダウンロードし、ダウンロードしたドライバーファイルを JDBC シンクコネクター JAR ファイルが含まれるディレクトリーに展開します。
注記Oracle および Db2 用のドライバーは、JDBC シンクコネクターには含まれていません。ドライバーをダウンロードして手動でインストールする必要があります。
- JDBC シンクコネクターがインストールされているパスにドライバー JAR ファイルを追加します。
-
JDBC シンクコネクターをインストールするパスが Kafka Connect
plugin.path
の一部であることを確認してください。 - Kafka Connect プロセスを再起動し、新しい JAR ファイルを取得します。
4.3.1. Debezium JDBC コネクターの設定
通常、Debezium JDBC コネクターを登録するには、コネクターの設定プロパティーを指定する JSON リクエストを送信します。次の例は、最も一般的な設定を使用して、orders
というトピックからのイベントを消費する Debezium JDBC シンクコネクターのインスタンスを登録する JSON リクエストを示しています。
例: Debezium JDBC コネクターの設定
{ "name": "jdbc-connector", "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:postgresql://localhost/db", "connection.username": "pguser", "connection.password": "pgpassword", "insert.mode": "upsert", "delete.enabled": "true", "primary.key.mode": "record_key", "schema.evolution": "basic", "database.time_zone": "UTC" } }
{
"name": "jdbc-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost/db",
"connection.username": "pguser",
"connection.password": "pgpassword",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.mode": "record_key",
"schema.evolution": "basic",
"database.time_zone": "UTC"
}
}
項目 | 説明 |
---|---|
1 | Kafka Connect サービスにコネクターを登録するときにコネクターに割り当てられる名前。 |
2 | JDBC シンクコネクタークラスの名前。 |
3 | このコネクターに作成するタスクの最大数。 |
4 | コネクターが書き込み先のシンクデータベースへの接続に使用する JDBC URL。 |
5 | 認証に使用されるデータベースユーザーの名前。 |
6 | 認証に使用されるデータベースユーザーのパスワード。 |
7 | コネクターが使用する insert.mode。 |
8 | データベース内のレコードの削除を有効にします。詳細は、delete.enabled 設定プロパティーを参照してください。 |
9 | プライマリーキー列の解決に使用する方法を指定します。詳細は、primary.key.mode 設定プロパティーを参照してください。 |
10 | コネクターが宛先データベースのスキーマを進化できるようにします。詳細は、schema.evolution 設定プロパティーを参照してください。 |
11 | 時間フィールドタイプを書き込むときに使用されるタイムゾーンを指定します。 |
12 | 消費するトピックのコンマ区切りのリスト。 |
Debezium JDBC コネクターに設定できる設定プロパティーの完全なリストは、JDBC コネクターのプロパティー を参照してください。
POST
コマンドを使用して、この設定を実行中の Kafka Connect サービスに送信できます。サービスは設定を記録し、次の操作を実行するシンクコネクタータスクを開始します。
- データベースに接続します。
- サブスクライブされた Kafka トピックからのイベントを消費します。
- 設定されたデータベースにイベントを書き込みます。