2.3. PostgreSQL コネクターのデプロイ
PostgreSQL コネクターのインストールは、JAR をダウンロードして Kafka Connect 環境に抽出し、プラグインの親ディレクトリーが Kafka Connect 環境に指定されていることを確認する必要がある単純なプロセスです。
前提条件
- Zookeeper、Kafka、および Kafka Connect がインストールされている。
- PostgreSQL がインストールされ、設定している。
手順
- Debezium PostgreSQL コネクター をダウンロードします。
- ファイルを Kafka Connect 環境に展開します。
プラグインの親ディレクトリーを Kafka Connect プラグインパスに追加します。
plugin.path=/kafka/connect
上記の例では、Debezium PostgreSQL コネクターを /kafka/connect/Debezium-connector-postgresql
パスに展開したことを前提としています。
- Kafka Connect プロセスを再起動します。これにより、新しい JAR が確実に選択されるようになります。
関連情報
デプロイメントプロセス、および AMQ Streams でのコネクターのデプロイに関する詳細は、Debezium のインストールガイドを参照してください。
2.3.1. 設定例
コネクターを使用して特定の PostgreSQL サーバーまたはクラスターの変更イベントを生成するには、以下を実行します。
- 論理デコードプラグインのインストール
- 論理レプリケーションをサポートするように PostgreSQL サーバー を設定する
- PostgreSQL コネクターの設定ファイルを作成します。
コネクターが起動すると、PostgreSQL サーバーのデータベースの整合性スナップショットを取得し、変更のストリーミングを開始し、挿入、更新、削除されたすべての行に対してイベントを生成します。スキーマおよびテーブルのサブセットに対してイベントを生成することもできます。必要に応じて、機密、大きすぎる列、または不要な列を無視、マスク、または切り捨てます。
以下は、192.168.99.100 のポート 5432 で PostgreSQL サーバーを監視する PostgreSQL コネクターの設定例で、これは fullfillment
という論理的な名前になります。通常、コネクターに使用できる設定プロパティーを使用して、.yaml
ファイルに Debezium PostgreSQL コネクターを設定します。
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector 1 labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.postgresql.PostgresConnector tasksMax: 1 2 config: 3 database.hostname: postgresqldb 4 database.port: 5432 database.user: debezium database.password: dbz database.dbname: postgres database.server.name: fullfillment 5 database.whitelist: public.inventory 6
- 1
- コネクターの名前。
- 2
- 1 度に 1 つのタスクのみが動作する必要があります。PostgreSQL コネクターは PostgreSQL サーバーの
192.168.99.100
を読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。 - 3
- コネクターの設定。
- 4
- データベースホスト。PostgreSQL サーバーを実行しているコンテナーの名前
()です
。 - 5
- 一意のサーバー名。サーバー名は、PostgreSQL サーバーまたはサーバーのクラスターの論理識別子です。この名前は、すべての Kafka トピックの接頭辞として使用されます。
- 6
public.inventory
データベースの変更のみが検出されます。
これら の設定で指定できる コネクタープロパティーの完全リスト を参照してください。
この設定は、POST 経由で稼働中の Kafka Connect サービスに送信できます。その後、設定を記録し、PostgreSQL データベースに接続し、イベントを Kafka トピックに記録します。