4.5. Deploying the MongoDB connector
Debezium MongoDB コネクターをデプロイするには、Debezium MongoDB コネクターアーカイブをインストールして、コネクターを設定し、その設定を Kafka Connect に追加してコネクターを起動します。
MongoDB コネクターをインストールするには、Debezium の OpenShift へのインストール の手順に従います。主なステップは以下のとおりです。
- Red Hat AMQ Streams を使用して OpenShift で Apache Kafka および Kafka Connect を設定します。AMQ Streams は、Kafka を OpenShift に取り入れる operator およびイメージを提供します。
- Debezium MongoDB コネクター をダウンロードします。
- コネクターのファイルを Kafka Connect 環境に展開します。
コネクタープラグインの親ディレクトリーを Kafka Connect
plugin.path
に追加します。以下に例を示します。plugin.path=/kafka/connect
上記の例では、Debezium MongoDB コネクターを
/kafka/connect/Debezium-connector-mongodb
パスに展開したことを前提としています。- Kafka Connect プロセスを再起動して、新しい JAR ファイルが確実に取得されるようにします。
また、MongoDB を設定 して Debezium コネクターを実行する必要もあります。
その他のリソース
デプロイメントプロセスや AMQ Streams でのコネクターのデプロイに関する詳細は、Debezium のインストールガイドを参照してください。
4.5.1. 設定例
コネクターを使用して特定の MongoDB レプリカセットまたはシャードクラスターの変更イベントを生成するには、JSON で設定ファイルを作成します。コネクターが起動すると、MongoDB レプリカセットでコレクションのスナップショットを実行し、レプリカセットの oplogs の読み取りを開始して、挿入、更新、削除されたすべてのドキュメントのイベントを生成します。任意で、不必要なコレクションを除外します。
以下は、MongoDB レプリカセット rs0
を 192.168.99.100 のポート 27017 でモニターする MongoDB コネクターの設定例です (論理名は fullfillment
)。通常、コネクターに使用できる設定プロパティーを使用して、.yaml
ファイルに Debezium MongoDB コネクターを設定します。
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnector metadata: name: inventory-connector 1 labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mongodb.MongoDbConnector 2 config: mongodb.hosts: rs0/192.168.99.100:27017 3 mongodb.name: fulfillment 4 collection.include.list: inventory[.]* 5
- 1 1 1 1 1 1
- Kafka Connect サービスに登録する場合のコネクターの名前。
- 2 2 2 2 2 2
- MongoDB コネクタークラスの名前。
- 3 3 3 3 3 3
- MongoDB レプリカセットへの接続に使用するホストアドレス。
- 4 4 4 4 4 4
- 生成されたイベントの namespace を形成する MongoDB レプリカセットの論理名。コネクターが書き込む Kafka トピックの名前、Kafka Connect スキーマ名、および Arvo コンバーターが使用される場合に対応する Avro スキーマの namespace のすべてに使用されます。
- 5 5 5 5
- 監視するすべてのコレクションのコレクション namespace (例: <dbName>.<collectionName>) と一致する正規表現のリスト。これは任意です。
これらの設定で指定できる コネクタープロパティーの完全リスト を参照してください。
この設定は、POST 経由で稼働中の Kafka Connect サービスに送信できます。その後、設定を記録し、MongoDB レプリカセットまたはシャードクラスターに接続するコネクタータスクを 1 つ起動して、各レプリカセットにタスクを割り当てます。必要に応じてスナップショットを実行し、oplog を読み取り、Kafka トピックへのイベントを記録します。