3.6. Debezium PostgreSQL コネクターのデプロイメント


Debezium PostgreSQL コネクターをデプロイするには、コネクターファイルを Kafka Connect に追加し、コネクターを実行するカスタムコンテナーを作成して、コネクター設定をコンテナーに追加します。詳細は以下を参照してください。

3.6.1. Debezium PostgreSQL コネクターのデプロイ

Debezium PostgreSQL コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、このコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。次に、2 つのカスタムリソース (CR) を作成する必要があります。

  • Kafka Connector を設定し、Debezium コネクターを実行するために作成したイメージの名前を指定する KafkaConnect CR。この CR を OpenShift Kafka インスタンスに適用します。
  • Debezium PostgreSQL コネクターを設定する KafkaConnector CR。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。

前提条件

  • PostgreSQL が実行され、PostgreSQL を設定して Debezium コネクターを実行 する手順が実行済みである。
  • Red Hat AMQ Streams を使用して、OpenShift で Apache Kafka および Kafka Connect を設定し、実行済みである。AMQ Streams は、Kafka を OpenShift に取り入れる operator およびイメージを提供します。
  • Podman または Docker がインストールされている。
  • Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー (quay.iodocker.ioなど) でコンテナーを作成および管理するアカウントとパーミッションを持っている。

手順

  1. Kafka Connect の Debezium PostgreSQL コンテナーを作成します。

    1. Debezium PostgreSQL コネクターアーカイブ をダウンロードします。
    2. Debezium PostgreSQL コネクターアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。

      ./my-plugins/
      ├── debezium-connector-postgresql
      │   ├── ...
    3. registry.redhat.io/amq7/amq-streams-kafka-26-rhel7:1.6.0 をベースイメージとして使用する Docker ファイルを作成します。たとえば、ターミナルウィンドウに以下を入力します。

      cat <<EOF >debezium-container-for-postgresql.yaml 1
      FROM {DockerKafkaConnect}
      USER root:root
      COPY ./my-plugins/ /opt/kafka/plugins/ 2
      USER 1001
      EOF
      1
      任意のファイル名を指定できます。
      2
      my-plugins は、プラグインディレクトリーの名前に置き換えます。

      このコマンドは、現在のディレクトリーに debezium-container-for-postgresql.yaml という名前の Docker ファイルを作成します。

    4. 前のステップで作成した debezium-container-for-postgresql.yaml Docker ファイルからコンテナーイメージをビルドします。ファイルが含まれるディレクトリーから、以下のコマンドを実行します。

      podman build -t debezium-container-for-postgresql:latest .
      docker build -t debezium-container-for-postgresql:latest .

      build コマンドは、debezium-container-for-postgresql という名前のコンテナーイメージを構築します。

    5. カスタムイメージを quay.io などのコンテナーレジストリーまたは内部のコンテナーレジストリーにプッシュします。このレジストリーが OpenShift インスタンスからアクセス可能であることを確認します。以下に例を示します。

      podman push debezium-container-for-postgresql:latest
    6. 新しい Debezium PostgreSQL KafkaConnect カスタムリソース (CR) を作成します。たとえば、以下の例のように annotations および image プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。

      apiVersion: kafka.strimzi.io/v1beta1
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations: strimzi.io/use-connector-resources: "true" 1
      spec:
        image: debezium-container-for-postgresql 2
      1
      KafkaConnector リソースはこの Kafka Connect クラスターでコネクターを設定するために使用されることを、metadata.annotations は Cluster Operator に示します。
      2
      spec.image は Debezium コネクターを実行するために作成したイメージの名前を指定します。設定された場合、このプロパティーによって Cluster Operator の STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数がオーバーライドされます。
    7. 以下のコマンドを実行して、KafkaConnect CR を OpenShift Kafka インスタンスに適用します。

      oc create -f dbz-connect.yaml

      これにより、OpenShift の Kafka Connect 環境が更新され、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connector インスタンスが追加されます。

  2. Debezium PostgreSQL コネクターインスタンスを設定する KafkaConnector カスタムリソースを作成します。

    コネクター設定プロパティーを設定する .yaml ファイルに Debezium PostgreSQL コネクターを設定します。コネクター設定は、Debezium に対して、スキーマおよびテーブルのサブセットにイベントを生成するよう指示する可能性があり、または機密性の高い、大きすぎる、または不必要な指定のコラムで Debezium が値を無視、マスク、または切り捨てするようにプロパティーを設定する可能性もあります。これらの設定で指定できる PostgreSQL コネクタープロパティーの完全リスト を参照してください。

    以下の例では、ポート 5432 で PostgreSQL サーバーホスト 192.168.99.100 に接続する Debezium コネクターを設定します。このホストには、sampledb という名前のデータベース、public という名前のスキーマがあり、fulfillment はサーバーの論理名です。

    fulfillment-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta1
      kind: KafkaConnector
      metadata:
        name: fulfillment-connector  1
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.postgresql.PostgresConnector
        tasksMax: 1  2
        config:  3
          database.hostname: 192.168.99.100   4
          database.port: 5432
          database.user: debezium
          database.password: dbz
          database.dbname: sampledb
          database.server.name: fulfillment   5
          schema.include.list: public   6
          plugin.name: pgoutput    7

    1
    コネクターの名前。
    2
    1 度に 1 つのタスクのみが動作する必要があります。PostgreSQL コネクターは PostgreSQL サーバーの 192.168.99.100 を読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。
    3
    コネクターの設定。
    4
    PostgreSQL サーバーを実行しているデータベースホストの名前。この例では、データベースのホスト名は 192.168.99.100 です。
    5
    一意のサーバー名。サーバー名は、PostgreSQL サーバーまたはサーバーのクラスターの論理識別子です。この名前は、変更イベントレコードを受信するすべての Kafka トピックの接頭辞として使用されます。
    6
    コネクターは public スキーマでのみ変更をキャプチャーします。選択したテーブルでのみ変更をキャプチャーするようにコネクターを設定できます。table.include.list コネクター設定プロパティーを参照してください。
    7
    PostgreSQL サーバーにインストールされている PostgreSQL 論理デコードプラグイン の名前。Postgre SQL 10 以降でサポートされている値は pgoutput のみですが、明示的に plugin.namepgoutput に設定する必要があります。
  3. Kafka Connect でコネクターインスタンスを作成します。たとえば、KafkaConnector リソースを fulfillment-connector.yaml ファイルに保存した場合は、以下のコマンドを実行します。

    oc apply -f fulfillment-connector.yaml

    このコマンドは meetment-connector を登録して、コネクターが KafkaConnector CR に定義されている sampledb データベースに対して実行を開始します。

  4. コネクターが作成され、起動されたことを確認します。

    1. Kafka Connect ログ出力を表示して、コネクターが作成され、指定データベースの変更のキャプチャーが開始されたことを確認します。

      oc logs $(oc get pods -o name -l strimzi.io/cluster=my-connect-cluster)
    2. ログの出力を確認し、初回のスナップショットが実行されたことを確認します。以下のような出力が表示されるはずです。

      ... INFO Starting snapshot for ...
      ... INFO Snapshot is using user 'debezium' ...

      コネクターがエラーがなく正常に起動すると、コネクターが変更をキャプチャーする各テーブルのトピックが作成されます。CR のサンプルでは、public スキーマの各テーブルにトピックがあります。ダウンストリームアプリケーションは、これらのトピックをサブスクライブできます。

    3. 以下のコマンドを実行して、コネクターによってトピックが作成されたことを検証します。

      oc get kafkatopics

結果

コネクターが起動すると、コネクターが設定された PostgreSQL サーバーデータベースの 整合性スナップショットが実行 されます。その後、コネクターは行レベルの操作のデータ変更イベントの生成を開始し、変更イベントレコードを Kafka トピックにストリーミングします。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.