第3章 インベントリー データベースを監視するコネクターの作成


Kafka、Debezium、および MySQL サービスを起動すると、コネクターインスタンスを作成して インベントリー データベースを監視する準備が整います。

この手順では、コネクターインスタンスを定義する KafkaConnector カスタムリソース(CR)を作成して適用することで、コネクターインスタンスを作成します。CR の適用後、コネクターインスタンスは インベントリー データベースの binlog の監視を開始します。binlog は、データベースのトランザクション(個別の行の変更やスキーマの変更など)をすべて記録します。データベースの行が変更されると、Debezium は変更イベントを生成します。

注記

通常、Kafka ツールを使用して、レプリカ数の指定など、必要なトピックを手動で作成します。ただし、このチュートリアルでは、1 つのレプリカのみでトピックを自動作成するように Kafka が設定されています。

手順

  1. Kafka Connect のデプロイに使用した examples/kafka-connect/kafka-connect-s2i-single-node-kafka.yaml ファイルを開きます。

    MySQL コネクターインスタンスを作成する前に、KafkaConnectS2I カスタムリソース(CR)でコネクターリソースを有効にする必要があります。

  2. metadata.annotations セクションで、Kafka Connect がコネクターリソースを使用できるようにします。

    この例では、アノテーションを examples/kafka-connect/kafka-connect-s2i-single-node-kafka.yaml ファイルに追加します。

    kafka-connect-s2i-single-node-kafka.yaml

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnectS2I
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
       ...
    Copy to Clipboard

  3. 更新された kafka-connect-s2i-single-node-kafka.yaml ファイルを適用して KafkaConnectS2I CR を更新します。

    $ oc apply -f kafka-connect-s2i-single-node-kafka.yaml
    Copy to Clipboard
  4. MySQL コネクターインスタンスを作成し、インベントリー データベースを監視します。

    この例では、MySQL コネクターインスタンスを定義する KafkaConnector CR を作成します。

    inventory-connector.yaml

      apiVersion: kafka.strimzi.io/v1alpha1
      kind: KafkaConnector
      metadata:
        name: inventory-connector  
    1
    
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mysql.MySqlConnector
        tasksMax: 1  
    2
    
        config:  
    3
    
          database.hostname: mysql  
    4
    
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054  
    5
    
          database.server.name: dbserver1  
    6
    
          database.whitelist: inventory  
    7
    
          database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092  
    8
    
          database.history.kafka.topic: schema-changes.inventory  
    9
    Copy to Clipboard

    1
    コネクターの名前。
    2
    1 度に 1 つのタスクのみが動作する必要があります。MySQL コネクターは MySQL サーバーの binlog を読み取るため、単一のコネクタータスクを使用することで、順序とイベントの処理が適切に行われるようになります。Kafka Connect サービスはコネクターを使用して作業を行う 1 つ以上のタスクを開始し、実行中のタスクを自動的に Kafka Connect サービスのクラスター全体に分散します。いずれかのサービスが停止またはクラッシュすると、これらのタスクは稼働中のサービスに再分散されます。
    3
    コネクターの設定。
    4
    データベースホスト。これは、MySQL サーバーを実行しているコンテナーの名前です (mysql)。
    5 6
    一意なサーバー ID および名前。サーバー名は、MySQL サーバーまたはサーバーのクラスターの論理識別子です。この名前は、すべての Kafka トピックのプレフィックスとして使用されます。
    7
    inventory データベースの変更のみが検出されます。
    8 9
    コネクターは、このブローカー (イベントの送信先となるブローカーと同じ) とトピック名を使用して、データベーススキーマの履歴を Kafka に保存します。再起動時に、コネクターは、コネクターが読み取りを開始すべき時点で binlog に存在したデータベースのスキーマを復元します。
  5. コネクターインスタンスを適用します。

    $ oc apply -f inventory-connector.yaml
    Copy to Clipboard

    inventory-connector コネクターは登録され、inventory データベースに対して実行が開始されます。

  6. inventory-connector が作成され、インベントリー データベースの監視を開始したことを確認します。

    Kafka Connect のログ出力を inventory-connector の起動時に監視して、コネクターインスタンスを確認することができます。

    1. Kafka Connect のログ出力を表示します。

      $ oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
      Copy to Clipboard
    2. ログの出力を確認し、初回のスナップショットが実行されたことを確認します。

      以下の行は、初回のスナップショットが開始されたことを表しています。

      ...
      2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...
      Copy to Clipboard

      スナップショットには、複数のステップが関係します。

      ...
      2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL master (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,843 INFO 	 using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...
      2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...
      Copy to Clipboard

      スナップショットの完了後、Debezium は変更イベントについて インベントリー データベースの binlog の監視を開始します。

      ...
      2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
      Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
      INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
      2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
      ...
      Copy to Clipboard
トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat