2.2. Kafka Connect のデプロイ


MySQL データベースをデプロイした後、AMQ Streams を使用して、Debezium MySQL コネクタープラグインを含む Kafka Connect コンテナーイメージを構築します。デプロイメントプロセス中に、以下のカスタムリソース (CR) を作成し、使用します。

  • Kafka Connect インスタンスを定義し、MySQL コネクターアーティファクトに関する情報をイメージに含める KafkaConnect CR。
  • MySQL コネクターがソースデータベースにアクセスするために使用する情報を提供する KafkaConnector CR。AMQStreams が Kafka Connect ポッドを開始した後、KafkaConnector CR を適用してコネクターを開始します。

ビルドプロセス中、AMQ Streams Operator は Debezium コネクター定義を含む KafkaConnect カスタムリソースの入力パラメーターを Kafka Connect コンテナーイメージに変換します。このビルドは、Red Hat Maven リポジトリーから必要なアーティファクトをダウンロードし、イメージに組み込みますす。新規に作成されたコンテナーは .spec.build.output に指定されるコンテナーレジストリーにプッシュされ、Kafka Connect Pod のデプロイに使用されます。AMQ Streams が Kafka Connect イメージをビルドした後、 KafkaConnector カスタムリソースを使用してコネクターを開始します。

手順

  1. OpenShift クラスターにログインし、debezium などのプロジェクトを作成またはオープンします。
  2. コネクターの Debezium KafkaConnect カスタムリソース (CR) を作成するか、既存のリソースを変更します。たとえば、以下の例のように metadata.annotations および spec.build プロパティーを指定する KafkaConnect CR を作成します。dbz-connect.yaml などの名前でファイルを保存します。

    例2.1 Debezium コネクターを含む KafkaConnect カスタムリソースを定義する dbz-connect.yaml ファイル

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 
    1
    
    spec:
      version: 3.00
      build: 
    2
    
        output: 
    3
    
          type: imagestream  
    4
    
          image: debezium-streams-connect:latest
        plugins: 
    5
    
          - name: debezium-connector-mysql
            artifacts:
              - type: zip 
    6
    
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/1.7.2.Final-redhat-<build_number>/debezium-connector-mysql-1.7.2.Final.zip  
    7
    
    
      bootstrapServers: my-cluster-kafka-bootstrap:9093
    Copy to Clipboard Toggle word wrap
    Expand
    表2.1 Kafka Connect 設定の説明
    項目説明

    1

    strimzi.io/use-connector-resources アノテーションを true に設定して、クラスターオペレーターが KafkaConnector リソースを使用してこの Kafka Connect クラスター内のコネクターを構成できるようにします。

    2

    spec.build 設定は、ビルドイメージの保存場所を指定し、プラグインアーティファクトの場所と共にイメージに追加するプラグインを一覧表示します。

    3

    build.output は、新たにビルドされたイメージが保存されるレジストリーを指定します。

    4

    イメージ出力の名前およびイメージ名を指定します。output.type の有効な値は、Docker Hub や Quay などのコンテナレジストリにプッシュする場合は docker、内部の OpenShift ImageStream にイメージをプッシュする場合は imagestream です。ImageStream を使用するには、ImageStream リソースをクラスターにデプロイする必要があります。KafkaConnect 設定で build.output の指定に関する詳細は、AMQ Streams Build スキーマ参照 のドキュメントを参照 してください。

    5

    plugins 設定は、Kafka Connect イメージに追加するすべてのコネクターを一覧表示します。一覧の各エントリーについて、プラグイン name と、コネクターのビルドに必要なアーティファクトに関する情報を指定します。任意で、各コネクタープラグインに対して、コネクターと使用できる他のコンポーネントを含めることができます。たとえば、Service Registry アーティファクトまたは Debezium スクリプトコンポーネントを追加できます。

    6

    artifacts.type の値は、artifacts.url で指定したアーティファクトのファイルタイプを指定します。有効なタイプは ziptgz、または jar です。Debezium コネクターアーカイブは、.zip ファイル形式で提供されます。JDBC ドライバーファイルは .jar 形式です。type の値は、url フィールドで参照されるファイルのタイプと一致する必要があります。

    7

    artifacts.url の値は、コネクターアーティファクトのファイルを格納する Maven リポジトリーなどの HTTP サーバーのアドレスを指定します。OpenShift クラスターは指定されたサーバーにアクセスできる必要があります。

  3. 以下のコマンドを入力して、KafkaConnect ビルド仕様を OpenShift クラスターに適用します。

    oc create -f dbz-connect.yaml
    Copy to Clipboard Toggle word wrap

    Streams Operator はカスタムリソースで指定された設定に基づいて、デプロイする Kafka Connect イメージを準備します。
    ビルドが完了すると、Operator はイメージを指定されたレジストリーまたは ImageStream にプッシュし、Kafka Connect クラスターを起動します。設定に一覧表示されているコネクターアーティファクトはクラスターで利用できます。

  4. KafkaConnector リソースを作成して、MySQL コネクターのインスタンスを定義します。
    たとえば、以下の KafkaConnector CR を作成し、debezium-inventory-connector.yaml として保存します。

    例2.2 Debezium コネクターの KafkaConnector カスタムリソースを定義する mysql-inventory-connector.yaml ファイル

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: my-connect-cluster
      name: inventory-connector 
    1
    
    spec:
      class: io.debezium.connector.mysql.MySqlConnector 
    2
    
      tasksMax: 1  
    3
    
      config:  
    4
    
        database.hostname: mysql 
    5
    
        database.port: 3306   
    6
    
        database.user: debezium  
    7
    
        database.password: dbz  
    8
    
        database.server.id: 184054
        database.dbname: mydatabase 
    9
    
        database.server.name: dbserver1 
    10
    
        database.include.list: inventory  
    11
    
        database.history.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 
    12
    
        database.history.kafka.topic: schema-changes.inventory
    Copy to Clipboard Toggle word wrap
    Expand
    表2.2 コネクター設定の説明
    項目説明

    1

    Kafka Connect クラスターに登録するコネクターの名前。

    2

    コネクタークラスの名前。

    3

    1 度に 1 つのタスクのみが動作する必要があります。MySQL コネクターは MySQL サーバーの binlog を読み取るため、単一のコネクタータスクを使用して、適切な順序とイベント処理を確保します。Kafka Connect サービスは、コネクターを使用して 1 つ以上のタスクを開始し、作業を完了します。実行中のタスクを Kafka Connect サービスのクラスター全体に自動的に分散します。サービスが停止またはクラッシュした場合、タスクは実行中のサービスに再分散されます。

    4

    コネクターの設定。

    5

    データベースホスト。MySQL サーバー (mysql) を実行するコンテナーの名前です。

    6

    データベースインスタンスのポート番号。

    7

    Debezium がデータベースに接続するユーザーアカウントの名前。

    8

    データベースユーザーアカウントのパスワード

    9

    変更をキャプチャーするデータベースの名前。

    10

    データベースインスタンスまたはクラスターの論理名。サーバー名は、MySQL サーバーまたはサーバーのクラスターの論理識別子です。この名前は、すべての Kafka トピックのプレフィックスとして使用されます。

    11

    コネクターが変更イベントをキャプチャーするテーブルの一覧。コネクターは、インベントリーデータベースの変更のみを検出します。

    12

    コネクターがデータベーススキーマ (イベントの送信先と同じブローカー) の履歴を格納するために使用する Kafka ブローカーとトピックを指定します。再起動後、コネクターは、コネクターが読み取りを再開したときに、binlog 内のポイントに存在していたデータベーススキーマを復元します。

  5. 以下のコマンドを実行してコネクターリソースを作成します。

    oc create -n <namespace> -f <kafkaConnector>.yaml
    Copy to Clipboard Toggle word wrap

    以下に例を示します。

    oc create -n debezium -f mysql-inventory-connector.yaml
    Copy to Clipboard Toggle word wrap

    コネクターは Kafka Connect クラスターに登録され、KafkaConnector CR の spec.config.database.dbname で指定されたデータベースに対して実行を開始します。コネクター Pod の準備ができると、Debezium が実行されます。

これで、コネクターが作成されたことを確認 し、inventory データベースの変更のキャプチャーが開始されたことを 確認する 準備が整いました。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat