4.5. Debezium MongoDB コネクターのデプロイ


Debezium MongoDB コネクターをデプロイするには、コネクターファイルを Kafka Connect に追加し、コネクターを実行するカスタムコンテナーを作成して、コネクター設定をコンテナーに追加します。詳細は以下を参照してください。

4.5.1. Debezium MongoDB コネクターのデプロイ

Debezium MongoDB コネクターをデプロイするには、Debezium コネクターアーカイブが含まれるカスタム Kafka Connect コンテナーイメージをビルドし、続いてこのコンテナーイメージをコンテナーレジストリーにプッシュする必要があります。その後、2 つのカスタムリソース (CR) を作成します。

  • Kafka Connect インスタンスを定義する KafkaConnect CR。CR の image プロパティーは、Debezium コネクターを実行するために作成するコンテナーイメージの名前を指定します。この CR を、Red Hat AMQ Streams がデプロイされている OpenShift インスタンスに適用します。AMQ Streams は、Apache Kafka を OpenShift に取り入れる Operator およびイメージを提供します。
  • Debezium MongoDB コネクターを定義する KafkaConnector CR。この CR を KafkaConnect CR を適用するのと同じ OpenShift インスタンスに適用します。

前提条件

  • MongoDB が稼働し、MongoDB を設定して Debezium コネクターと連携する 手順が完了済みである必要があります。
  • AMQ Streams が OpenShift にデプロイされ、Apache Kafka および Kafka Connect を実行している。詳細は、『Deploying and Upgrading AMQ Streams on OpenShift』を参照してください。
  • Podman または Docker がインストールされている。
  • Debezium コネクターを実行するコンテナーを追加する予定のコンテナーレジストリー( quay.io または docker.ioなど)でコンテナーを作成および管理するアカウントおよびパーミッションがある。

手順

  1. Kafka Connect の Debezium MongoDB コンテナーを作成します。

    1. Debezium MongoDB コネクターアーカイブ をダウンロードします。
    2. Debezium MongoDB コネクターアーカイブを展開して、コネクタープラグインのディレクトリー構造を作成します。以下に例を示します。

      ./my-plugins/
      ├── debezium-connector-mongodb
      │   ├── ...
    3. registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.0 をベースイメージとして使用する Docker ファイルを作成します。たとえば、ターミナルウィンドウに以下のコマンドを入力します。my -plugins はプラグインディレクトリーの名前に置き換えます。

      cat <<EOF >debezium-container-for-mongodb.yaml 1
      FROM registry.redhat.io/amq7/amq-streams-kafka-28-rhel8:1.8.0
      USER root:root
      COPY ./<my-plugins>/ /opt/kafka/plugins/ 2
      USER 1001
      EOF
      1 1 1 1 1 1 1
      任意のファイル名を指定できます。
      2 2 2 2 2 2 2
      my-plugins は、プラグインディレクトリーの名前に置き換えます。

      このコマンドは、現在のディレクトリーに debezium-container-for-mongodb.yaml という名前の Docker ファイルを作成します。

    4. 前の手順で作成した debezium-container-for-mongodb.yaml Docker ファイルからコンテナーイメージをビルドします。ファイルを含むディレクトリーから、ターミナルウィンドウを開き、以下のコマンドのいずれかを入力します。

      podman build -t debezium-container-for-mongodb:latest .
      docker build -t debezium-container-for-mongodb:latest .

      上記のコマンドは、debezium-container-for-mongodb という名前のコンテナーイメージを構築します。

    5. カスタムイメージを quay.io または内部コンテナーレジストリーなどのコンテナーレジストリーにプッシュします。コンテナーレジストリーは、イメージをデプロイする OpenShift インスタンスで利用できる必要があります。以下のいずれかのコマンドを実行します。

      podman push <myregistry.io>/debezium-container-for-mongodb:latest
      docker push <myregistry.io>/debezium-container-for-mongodb:latest
    6. 新しい Debezium MongoDB KafkaConnect カスタムリソース(CR)を作成します。たとえば、以下の例のように アノテーション およびイメージ プロパティーを指定する dbz-connect.yaml という名前の KafkaConnect CR を作成します。

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
        annotations:
          strimzi.io/use-connector-resources: "true" 1
      spec:
        #...
        image: debezium-container-for-mongodb  2
      1
      metadata.annotations は、KafkaConnectors リソースがこの Kafka Connect クラスターでコネクター を設定するために使用される ことを Cluster Operator に示します。
      2
      spec.image は、Debezium コネクターを実行するために作成したイメージの名前を指定します。このプロパティーは、Cluster Operator の STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 変数を上書きします。
    7. 以下のコマンドを入力して、KafkaConnect CR を OpenShift Kafka Connect 環境に適用します。

      oc create -f dbz-connect.yaml

      このコマンドは、Debezium コネクターを実行するために作成したイメージの名前を指定する Kafka Connect インスタンスを追加します。

  2. Debezium MongoDB コネクターインスタンスを設定する KafkaConnector カスタムリソースを作成します。

    コネクターの設定プロパティーを指定する a .yaml ファイルで Debezium MongoDB コネクターを設定します。コネクター設定は、Debezium に対して MongoDB レプリカセットまたはシャードクラスターのサブセットの変更イベントを生成するよう指示することがあります。任意で、不要なコレクションをフィルタリングするプロパティーを設定できます。

    以下の例では、MongoDB レプリカセット rs0192.168.99.100 のポート 27017 に接続し、インベントリー コレクションで生じる変更をキャプチャーする Debezium コネクターを設定します。完全な入力は、レプリカセットの論理名です。

    MongoDB inventory-connector.yaml

    apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector 1
        labels: strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mongodb.MongoDbConnector 2
        config:
         mongodb.hosts: rs0/192.168.99.100:27017 3
         mongodb.name: fulfillment 4
         collection.include.list: inventory[.]* 5

    1
    コネクターを Kafka Connect に登録するために使用される名前。
    2
    MongoDB コネクタークラスの名前。
    3
    MongoDB レプリカセットへの接続に使用するホストアドレス。
    4
    生成されたイベントの namespace を形成する MongoDB レプリカセットの論理名。コネクターが書き込む Kafka トピックの名前、Kafka Connect スキーマ名、および Arvo コンバーターが使用される場合に対応する Avro スキーマの namespace のすべてに使用されます。
    5
    監視するすべてのコレクションのコレクション namespace (例: <dbName>.<collectionName>) と一致する正規表現のオプションリスト。
  3. Kafka Connect でコネクターインスタンスを作成します。たとえば、KafkaConnector リソースを inventory-connector.yaml ファイルに保存した場合、以下のコマンドを実行します。

    oc apply -f inventory-connector.yaml

    上記のコマンドは inventory-connector を登録し、コネクターは KafkaConnector CR に定義されている インベントリー コレクションに対して実行を開始します。

  4. コネクターが作成され、起動されたことを確認します。

    1. Kafka Connect ログ出力を表示して、コネクターが作成され、指定データベースの変更のキャプチャーが開始されたことを確認します。

      oc logs $(oc get pods -o name -l strimzi.io/cluster=my-connect-cluster)
    2. ログの出力を確認し、Debezium が初回のスナップショットを実行することを確認します。ログには、以下のメッセージと同様の出力が表示されます。

      ... INFO Starting snapshot for ...
      ... INFO Snapshot is using user 'debezium' ...

      コネクターがエラーがなく正常に起動すると、コネクターが変更をキャプチャーする各コレクションのトピックが作成されます。前述の例の CR の場合、collection. include.list プロパティーに指定されたコレクションの トピックがあります。ダウンストリームアプリケーションは、コネクターによって作成されるトピックをサブスクライブできます。

    3. 以下のコマンドを実行して、コネクターによってトピックが作成されたことを検証します。

      oc get kafkatopics

Debezium MongoDB コネクターに設定できる設定プロパティーの完全リストは、MongoDB コネクター設定プロパティーを参照してください。

結果

コネクターが起動すると、以下のアクションを完了します。

  • MongoDB レプリカセットでコレクションの スナップショット 一貫性をもたせて実行する。
  • レプリカセットの oplogs を読み取ります。
  • 挿入、更新、および削除されたすべてのドキュメントの変更イベントを生成します。
  • 変更イベントレコードを Kafka トピックへストリーミングします。
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.