3.4. Kafka Connect サービスの再起動


Debezium MySQL コネクターが作成、更新、および削除イベントをキャプチャーする方法を確認しました。次に、コネクターが稼働していない場合でもどのように変更イベントをキャプチャーするかを見てみましょう。

Kafka Connect サービスは、登録されたコネクターのタスクを自動的に管理します。そのため、Kafka Connect サービスがオフラインになった場合は、サービスの再起動時に、実行されていないタスクをすべて開始します。つまり、Debezium が稼働していない場合でも、サービスは変更をデータベースに報告できます。

この手順では、Kafka Connect を停止し、データベースのデータを一部変更した後、Kafka Connect を再起動して変更イベントを確認します。

手順

  1. Kafka Connect サービスを停止します。

    1. Kafka Connect サービスのデプロイメント設定を開きます。

      $ oc edit dc/my-connect-cluster-connect

      デプロイメントの設定が表示されます。

      apiVersion: apps.openshift.io/v1
      kind: DeploymentConfig
      metadata:
        ...
      spec:
        replicas: 1
      ...
    2. spec.replicas の値を 0 に変更します。
    3. デプロイメント設定を保存します。
    4. Kafka Connect サービスが停止したことを確認します。

      このコマンドを実行すると、Kafka Connect サービスが完了し、稼働している Pod がないことを確認できます。

      $ oc get pods -l strimzi.io/name=my-connect-cluster-connect
      NAME                                          READY   STATUS      RESTARTS   AGE
      my-connect-cluster-connect-1-dxcs9            0/1     Completed   0          7h
  2. Kafka Connect サービスが停止している間に、MySQL クライアントを実行しているターミナルに切り替え、新しいレコードをデータベースに追加します。

    mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
  3. Kafka Connect サービスを再起動します。

    1. Kafka Connect サービスのデプロイメント設定を開きます。

      $ oc edit dc/my-connect-cluster-connect

      デプロイメントの設定が表示されます。

      apiVersion: apps.openshift.io/v1
      kind: DeploymentConfig
      metadata:
        ...
      spec:
        replicas: 0
      ...
    2. spec.replicas の値を 1 に変更します。
    3. デプロイメント設定を保存します。
    4. Kafka Connect サービスが再起動したことを確認します。

      このコマンドを実行すると、Kafka Connect サービスが稼働中で、Pod の準備ができていることを確認できます。

      $ oc get pods -l strimzi.io/name=my-connect-cluster-connect
      NAME                                          READY   STATUS      RESTARTS   AGE
      my-connect-cluster-connect-2-q9kkl            1/1     Running     0          74s
  4. kafka-console-consumer.sh を実行しているターミナルに切り替えます。新しいイベントを受け取ると表示されます。
  5. Kafka Connect がオフラインだったときに作成したレコードを確認します (書式を調整して読みやすくしてあります)。

    {
      ...
      "payload":{
        "id":1005
      }
    }
    {
      ...
      "payload":{
        "before":null,
        "after":{
           "id":1005,
           "first_name":"Sarah",
           "last_name":"Thompson",
           "email":"kitt@acme.com"
        },
        "source":{
           "version":"1.7.2.Final",
           "connector":"mysql",
           "name":"dbserver1",
           "ts_ms":1582581502000,
           "snapshot":"false",
           "db":"inventory",
           "table":"customers",
           "server_id":223344,
           "gtid":null,
           "file":"mysql-bin.000004",
           "pos":364,
           "row":0,
           "thread":5,
           "query":null
        },
        "op":"c",
        "ts_ms":1582581502317
      }
    }
Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.