4장. 변경 이벤트 보기


Debezium MySQL 커넥터를 배포한 후 인벤토리 데이터베이스에 대한 변경 사항을 캡처하기 시작합니다.

커넥터가 시작되면 각 커넥터가 MySQL 데이터베이스의 테이블 중 하나를 나타내는 Apache Kafka 주제 세트에 이벤트를 씁니다. 각 주제의 이름은 데이터베이스 서버 이름, dbserver1 로 시작됩니다.

커넥터는 다음 Kafka 항목에 씁니다.

dbserver1
변경 사항을 캡처하는 테이블에 적용되는 DDL 문을 사용하는 스키마 변경 주제입니다.
dbserver1.inventory.products
inventory 데이터베이스의 products 테이블에 대한 변경 이벤트 레코드를 수신합니다.
dbserver1.inventory.products_on_hand
inventory 데이터베이스의 products_on_hand 테이블에 대한 변경 이벤트 레코드를 수신합니다.
dbserver1.inventory.customers
inventory 데이터베이스의 customers 테이블에 대한 변경 이벤트 레코드를 수신합니다.
dbserver1.inventory.orders
inventory 데이터베이스의 orders 테이블에 대한 변경 이벤트 레코드를 수신합니다.

이 튜토리얼의 나머지 부분에서는 dbserver1.inventory.customers Kafka 주제를 검사합니다. 주제를 자세히 살펴보면 다양한 유형의 변경 이벤트를 나타내는 방법을 확인하고 각 이벤트를 캡처한 커넥터에 대한 정보를 찾을 수 있습니다.

튜토리얼에는 다음 섹션이 포함되어 있습니다.

4.1. 생성 이벤트 보기

dbserver1.inventory.customers 주제를 보면 MySQL 커넥터가 inventory 데이터베이스에서 이벤트를 캡처 방법을 확인할 수 있습니다. 이 경우 생성 이벤트는 새 고객이 데이터베이스에 추가되는 것을 캡처합니다.

프로세스

  1. 새 터미널을 열고 kafka-console-consumer 를 사용하여 주제 시작부터 dbserver1.inventory.customers 주제를 사용합니다.

    이 명령은 Kafka(my-cluster-kafka-0)를 실행 중인 Pod에서 간단한 소비자(kafka-console-consumer.sh)를 실행합니다.

    $ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server localhost:9092 \
      --from-beginning \
      --property print.key=true \
      --topic dbserver1.inventory.customers

    소비자는 customers 테이블의 각 행에 대해 4개의 메시지(JSON 형식)를 반환합니다. 각 메시지에는 해당 테이블 행의 이벤트 레코드가 포함되어 있습니다.

    각 이벤트에는 두 개의 JSON 문서가 있습니다. 키와 . 키는 행의 기본 키에 해당하며 값에는 행의 세부 정보(해당 행에 포함된 필드, 각 필드의 값, 행에서 수행된 작업 유형)가 표시됩니다.

  2. 마지막 이벤트의 경우 세부 정보를 검토합니다.

    다음은 마지막 이벤트의 세부 정보입니다(읽기 쉽도록 형식 지정).

    {
      "schema":{
        "type":"struct",
          "fields":[
            {
              "type":"int32",
              "optional":false,
              "field":"id"
            }
          ],
        "optional":false,
        "name":"dbserver1.inventory.customers.Key"
      },
      "payload":{
        "id":1004
      }
    }

    이벤트에는 스키마페이로드 의 두 부분이 있습니다. 스키마 에는 페이로드에 무엇이 있는지 설명하는 Kafka Connect 스키마가 포함되어 있습니다. 이 경우 페이로드는 선택 사항이 아니며 하나의 필수 필드(예: int32)가 있는 dbserver1.inventory.customers.Key 라는 구조입니다.

    페이로드 에는 값이 1004 인 단일 id 필드가 있습니다.

    이벤트의 키를 검토하면 이 이벤트가 inventory.customers 테이블의 값에 1004 값이 있는 ID 기본 키 열에 적용되는 것을 확인할 수 있습니다.

  3. 동일한 이벤트의 세부 정보를 검토합니다.

    이벤트의 값은 행이 생성되었음을 표시하고 포함된 항목(이 경우 ID ,first_name,last_name, email )을 설명합니다.

    다음은 마지막 이벤트 의 세부 정보입니다(읽기 쉽도록 형식 지정).

    {
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "before"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "after"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "string",
                "optional": true,
                "field": "version"
              },
              {
                "type": "string",
                "optional": false,
                "field": "name"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "server_id"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "ts_sec"
              },
              {
                "type": "string",
                "optional": true,
                "field": "gtid"
              },
              {
                "type": "string",
                "optional": false,
                "field": "file"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "pos"
              },
              {
                "type": "int32",
                "optional": false,
                "field": "row"
              },
              {
                "type": "boolean",
                "optional": true,
                "field": "snapshot"
              },
              {
                "type": "int64",
                "optional": true,
                "field": "thread"
              },
              {
                "type": "string",
                "optional": true,
                "field": "db"
              },
              {
                "type": "string",
                "optional": true,
                "field": "table"
              }
            ],
            "optional": false,
            "name": "io.debezium.connector.mysql.Source",
            "field": "source"
          },
          {
            "type": "string",
            "optional": false,
            "field": "op"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          }
        ],
        "optional": false,
        "name": "dbserver1.inventory.customers.Envelope",
        "version": 1
      },
      "payload": {
        "before": null,
        "after": {
          "id": 1004,
          "first_name": "Anne",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "source": {
          "version": "2.3.4.Final",
          "name": "dbserver1",
          "server_id": 0,
          "ts_sec": 0,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 154,
          "row": 0,
          "snapshot": true,
          "thread": null,
          "db": "inventory",
          "table": "customers"
        },
        "op": "r",
        "ts_ms": 1486500577691
      }
    }

    이벤트의 이 부분은 훨씬 길지만 이벤트의 키와 마찬가지로 스키마페이로드 도 있습니다. 스키마 에는 5개의 필드를 포함할 수 있는 dbserver1.inventory.customers.Envelope (버전 1)라는 Kafka Connect 스키마가 포함되어 있습니다.

    op
    작업 유형을 설명하는 문자열 값이 포함된 필수 필드입니다. MySQL 커넥터의 값은 생성(또는 삽입)용 c 이며 업데이트의 경우 u, delete의 경우 d, 읽기(스냅샷의 경우) r 입니다.
    이전
    존재하는 경우 이벤트 발생 행의 상태를 포함하는 선택적 필드입니다. 구조는 dbserver1.inventory.customers.Value Kafka Connect 스키마로 설명되며, dbserver1 커넥터는 inventory.customers 테이블의 모든 행에 사용합니다.
    이후
    존재하는 경우 이벤트가 발생한 행의 상태를 포함하는 선택적 필드입니다. 구조는 이전에 사용된 동일한 dbserver1.inventory.customers.Value Kafka Connect 스키마로 설명되어 있습니다.
    소스
    MySQL의 경우 커넥터 이름, 이벤트가 기록된 binlog 파일의 이름, 이벤트가 표시되는 해당 binlog 파일의 위치, 영향을 받는 데이터베이스 및 테이블의 이름, 영향을 받는 데이터베이스 및 테이블의 이름 등 여러 필드를 포함하는 이벤트에 대한 소스 메타데이터가 포함된 필수 필드입니다. 변경을 수행한 MySQL 스레드 ID, 이 이벤트가 스냅샷의 일부인지 여부, 사용 가능한 경우 MySQL 서버 ID 및 타임스탬프(초)입니다.
    ts_ms
    존재하는 경우 커넥터가 이벤트를 처리하는 JVM에서 Kafka Connect 작업을 실행하는 JVM에서 시스템 시계를 사용하는 시간이 포함된 선택적 필드입니다.
    참고

    이벤트의 JSON 표현은 설명하는 행보다 훨씬 길 수 있습니다. 이는 모든 이벤트 키와 값이 있는 Kafka Connect가 페이로드 를 설명하는 스키마 를 제공하기 때문입니다. 시간이 지남에 따라 이러한 구조는 변경될 수 있습니다. 그러나 이벤트 자체에서 키와 값에 대한 스키마를 사용하면 특히 시간이 지남에 따라 진화할 때 애플리케이션이 메시지를 이해하기가 훨씬 쉬워집니다.

    Debezium MySQL 커넥터는 데이터베이스 테이블의 구조를 기반으로 이러한 스키마를 구성합니다. DDL 문을 사용하여 MySQL 데이터베이스의 테이블 정의를 변경하는 경우 커넥터는 이러한 DDL 문을 읽고 Kafka Connect 스키마를 업데이트합니다. 이는 각 이벤트가 이벤트가 발생한 시점의 테이블과 정확히 동일하게 구조화되는 유일한 방법입니다. 그러나 단일 테이블에 대한 모든 이벤트를 포함하는 Kafka 항목에는 테이블 정의의 각 상태에 해당하는 이벤트가 있을 수 있습니다.

    JSON 변환기는 모든 메시지의 키 및 값 스키마를 포함하므로 매우 자세한 이벤트를 생성합니다.

  4. 이벤트의 스키마를 인벤토리 데이터베이스 상태와 비교합니다. MySQL 명령줄 클라이언트를 실행 중인 터미널에서 다음 문을 실행합니다.

    mysql> SELECT * FROM customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne       | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)

    검토한 이벤트 레코드가 데이터베이스의 레코드와 일치함을 나타냅니다.

Red Hat logoGithubRedditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

© 2024 Red Hat, Inc.