3.2. Kafka Connect のデプロイ


MySQL データベースをデプロイした後、AMQ Streams を使用して、Debezium MySQL コネクタープラグインを含む Kafka Connect コンテナーイメージをビルドします。デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。

  • Kafka Connect インスタンスを定義し、MySQL コネクターアーティファクトに関する情報をイメージに含める KafkaConnect CR。
  • MySQL コネクターがソースデータベースにアクセスするために使用する情報を提供する KafkaConnector CR。AMQ Streams が Kafka Connect Pod を起動した後、KafkaConnector CR を適用してコネクターを起動します。

ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドでは、Red Hat Maven リポジトリーから必要なアーティファクトをダウンロードし、イメージに組み込みます。新規に作成されたコンテナーは .spec.build.output で指定されたコンテナーレジストリーにプッシュされ、Kafka Connect Pod のデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドした後、KafkaConnector カスタムリソースを使用してコネクターを起動します。

前提条件

手順

  1. OpenShift クラスターにログインし、debezium などのプロジェクトを作成または開きます。
  2. コネクターの Debezium KafkaConnect カスタムリソース (CR) を作成するか、既存のリソースを変更します。
    以下の例は、KafkaConnect カスタムリソースを記述した dbz-connect.yaml ファイルからの抜粋を示しています。
    metadata.annotations および spec.build プロパティーが必要です。

    例3.1 Debezium コネクターを含む KafkaConnect カスタムリソースを定義した dbz-connect.yaml ファイル

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      replicas: 1
      version: 3.5.0
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-mysql
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.3.7.Final-redhat-00001/debezium-connector-mysql-2.3.7.Final-redhat-00001-plugin.zip 7
      bootstrapServers: my-cluster-kafka-bootstrap:9093
    
    ...
    表3.1 Kafka Connect 設定の説明
    項目説明

    1

    strimzi.io/use-connector-resources アノテーションを "true" に設定して、クラスター Operator が KafkaConnector リソースを使用してこの Kafka Connect クラスター内のコネクターを設定できるようにします。

    2

    spec.build 設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所とともにイメージに追加するプラグインをリストします。

    3

    build.output は、新しくビルドされたイメージを保存するレジストリーを指定します。

    4

    イメージ出力の名前およびイメージ名を指定します。output.type の有効な値は、Docker Hub や Quay などのコンテナーレジストリーにプッシュする場合は docker、内部の OpenShift ImageStream にイメージをプッシュする場合は imagestream です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定で build.output の指定に関する詳細は、AMQ Streams Build スキーマ参照 のドキュメントを参照 してください。

    5

    plugins 設定は、Kafka Connect イメージに追加するすべてのコネクターをリストします。リストの各エントリーについて、プラグイン name と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。必要に応じて、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。

    6

    artifacts.type の値は、artifacts.url で指定するアーティファクトのファイルタイプを指定します。有効なタイプは ziptgz、または jar です。Debezium コネクターアーカイブは、.zip ファイル形式で提供されます。JDBC ドライバーファイルは .jar 形式です。type の値は、url フィールドで参照されるファイルのタイプと一致する必要があります。

    7

    artifacts.url の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターが指定されたサーバーにアクセスできる必要があります。

  3. 以下のコマンドを入力して、KafkaConnect ビルド仕様を OpenShift クラスターに適用します。

    oc create -f dbz-connect.yaml

    AMQ Streams Operator がカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
    ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定にリストしたコネクターアーティファクトがクラスターで利用可能になります。

  4. KafkaConnector リソースを作成して、MySQL コネクターのインスタンスを定義します。
    たとえば、以下の KafkaConnector CR を作成し、debezium-inventory-connector.yaml として保存します。

    例3.2 Debezium コネクターの KafkaConnector カスタムリソースを定義した mysql-inventory-connector.yaml ファイル

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: my-connect-cluster
      name: inventory-connector 1
    spec:
      class: io.debezium.connector.mysql.MySqlConnector 2
      tasksMax: 1  3
      config:  4
        database.hostname: mysql 5
        database.port: 3306   6
        database.user: debezium  7
        database.password: dbz  8
        database.server.id: 184054
        topic.prefix: dbserver1  9
        table.include.list: inventory.*  10
        schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 11
        schema.history.internal.kafka.topic: schema-changes.inventory 12
    表3.2 コネクター設定の説明
    項目説明

    1

    Kafka Connect クラスターに登録するコネクターの名前。

    2

    コネクタークラスの名前。

    3

    一度に 1 つのタスクのみを実行します。MySQL コネクターは MySQL サーバーの binlog を読み取るため、単一のコネクタータスクを使用して、適切な順序とイベント処理を確保します。Kafka Connect サービスは、コネクターを使用して 1 つ以上のタスクを開始し、作業を完了します。実行中のタスクを Kafka Connect サービスのクラスター全体に自動的に分散します。サービスが停止またはクラッシュした場合、タスクは実行中のサービスに再分散されます。

    4

    コネクターの設定。

    5

    MySQL データベースインスタンスのホスト名またはアドレス。

    6

    データベースインスタンスのポート番号。

    7

    Debezium がデータベースに接続するユーザーアカウントの名前。

    8

    Debezium がデータベースユーザーアカウントに接続するために使用するパスワード。

    9

    MySQL サーバーまたはクラスターのトピック接頭辞。この文字列は、コネクターがイベントレコードを送信する全 Kafka トピックの名前の前に付けます。

    10

    コネクターが変更イベントをキャプチャーするテーブルのリスト。コネクターは、inventory テーブルで発生した場合にのみ、変更を検出します。

    11

    DDL ステートメントをデータベーススキーマの履歴トピックに書き込み、復元するためにコネクターによって使用される Kafka ブローカーのリスト。これは、コネクターが変更イベントレコードを送信するブローカーと同じです。再起動後、コネクターは、コネクターが読み取りを再開する時点で binlog に存在しているデータベーススキーマを復元します。

    12

    データベーススキーマ履歴トピックの名前。このトピックは内部使用のみを目的としており、コンシューマーが使用しないようにしてください。

  5. 以下のコマンドを実行してコネクターリソースを作成します。

    oc create -n <namespace> -f <kafkaConnector>.yaml

    以下に例を示します。

    oc create -n debezium -f mysql-inventory-connector.yaml

    コネクターは Kafka Connect クラスターに登録され、KafkaConnector CR の spec.config.database.dbname で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。

これで、コネクターが作成されたことを確認 し、inventory データベースの変更のキャプチャーが開始したことを確認する準備が整いました。

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.