4장. 변경 이벤트 보기


Debezium MySQL 커넥터를 배포한 후 인벤토리 데이터베이스에 대한 변경 사항 캡처를 시작합니다.

커넥터가 시작되면 MySQL 데이터베이스의 테이블 중 하나를 나타내는 Apache Kafka 주제 집합에 이벤트를 씁니다. 각 항목의 이름은 데이터베이스 서버 dbserver1 의 이름으로 시작됩니다.

커넥터는 다음 Kafka 항목에 씁니다.

dbserver1
변경 사항이 캡처되는 테이블에 적용되는 DDL 문을 작성하는 스키마 변경 주제입니다.
dbserver1.inventory.products
inventory 데이터베이스의 products 테이블에 대한 변경 이벤트 레코드를 수신합니다.
dbserver1.inventory.products_on_hand
inventory 데이터베이스의 products_on_hand 테이블에 대한 변경 이벤트 레코드를 수신합니다.
dbserver1.inventory.customers
inventory 데이터베이스의 customers 테이블에 대한 변경 이벤트 레코드를 받습니다.
dbserver1.inventory.orders
inventory 데이터베이스의 orders 테이블에 대한 변경 이벤트 레코드를 수신합니다.

이 튜토리얼의 나머지 부분에서는 dbserver1.inventory.customers Kafka 주제를 검사합니다. 주제를 더 자세히 살펴보면 다양한 유형의 변경 이벤트를 나타내는 방법을 확인하고 각 이벤트가 캡처된 커넥터에 대한 정보를 찾을 수 있습니다.

이 튜토리얼에는 다음 섹션이 포함되어 있습니다.

4.1. 생성 이벤트 보기

dbserver1.inventory.customers 주제를 보면 MySQL 커넥터가 inventory 데이터베이스에서 생성 이벤트를 캡처하는 방법을 확인할 수 있습니다. 이 경우 생성 이벤트는 데이터베이스에 추가되는 새 고객을 캡처합니다.

절차

  1. 새 터미널을 열고 kafka-console-consumer 를 사용하여 주제 시작부터 dbserver1.inventory.customers 주제를 사용합니다.

    이 명령은 Kafka(my-cluster-kafka-0)를 실행하는 Pod에서 간단한 소비자(kafka-console-consumer.sh)를 실행합니다.

    $ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server localhost:9092 \
      --from-beginning \
      --property print.key=true \
      --topic dbserver1.inventory.customers
    Copy to Clipboard Toggle word wrap

    소비자는 customers 테이블의 각 행에 대해 4개의 메시지(JSON 형식)를 반환합니다. 각 메시지에는 해당 테이블 행에 대한 이벤트 레코드가 포함되어 있습니다.

    각 이벤트에는 의 두 개의 JSON 문서가 있습니다. 키는 행의 기본 키에 해당하고, 값은 행의 세부 정보(행이 포함된 필드, 각 필드의 값, 행에서 수행된 작업 유형)를 표시합니다.

  2. 마지막 이벤트의 경우 의 세부 정보를 검토합니다.

    다음은 마지막 이벤트의 (읽성을 위해 포맷됨)에 대한 세부 정보입니다.

    {
      "schema":{
        "type":"struct",
          "fields":[
            {
              "type":"int32",
              "optional":false,
              "field":"id"
            }
          ],
        "optional":false,
        "name":"dbserver1.inventory.customers.Key"
      },
      "payload":{
        "id":1004
      }
    }
    Copy to Clipboard Toggle word wrap

    이벤트에는 스키마페이로드 의 두 부분이 있습니다. 스키마에 는 페이로드에 있는 내용을 설명하는 Kafka Connect 스키마가 포함되어 있습니다. 이 경우 페이로드는 선택 사항이 아니며 필수 필드( int32유형의id )가 있는 dbserver1.inventory.customers.Key 라는 구조입니다.

    페이로드 에는 값이 1004 인 단일 id 필드가 있습니다.

    이벤트 를 검토하면 id 기본 키 열에 값이 1004inventory.customers 테이블의 행에 이 이벤트가 적용되는 것을 확인할 수 있습니다.

  3. 동일한 이벤트의 의 세부 사항을 검토합니다.

    이벤트 값은 행이 생성되었으며 포함된 내용(이 경우 id,first_name,last_name, 삽입된 행의 이메일 )을 설명합니다.

    다음은 마지막 이벤트의 (읽성을 위해 포맷됨)에 대한 세부 정보입니다.

    {
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "before"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "after"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "string",
                "optional": true,
                "field": "version"
              },
              {
                "type": "string",
                "optional": false,
                "field": "name"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "server_id"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "ts_sec"
              },
              {
                "type": "string",
                "optional": true,
                "field": "gtid"
              },
              {
                "type": "string",
                "optional": false,
                "field": "file"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "pos"
              },
              {
                "type": "int32",
                "optional": false,
                "field": "row"
              },
              {
                "type": "boolean",
                "optional": true,
                "field": "snapshot"
              },
              {
                "type": "int64",
                "optional": true,
                "field": "thread"
              },
              {
                "type": "string",
                "optional": true,
                "field": "db"
              },
              {
                "type": "string",
                "optional": true,
                "field": "table"
              }
            ],
            "optional": false,
            "name": "io.debezium.connector.mysql.Source",
            "field": "source"
          },
          {
            "type": "string",
            "optional": false,
            "field": "op"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          }
        ],
        "optional": false,
        "name": "dbserver1.inventory.customers.Envelope",
        "version": 1
      },
      "payload": {
        "before": null,
        "after": {
          "id": 1004,
          "first_name": "Anne",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "source": {
          "version": "2.1.4.Final",
          "name": "dbserver1",
          "server_id": 0,
          "ts_sec": 0,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 154,
          "row": 0,
          "snapshot": true,
          "thread": null,
          "db": "inventory",
          "table": "customers"
        },
        "op": "r",
        "ts_ms": 1486500577691
      }
    }
    Copy to Clipboard Toggle word wrap

    이벤트의 이 부분은 훨씬 길지만 이벤트의 처럼 스키마페이로드 도 있습니다. 스키마에 는 5개의 필드를 포함할 수 있는 dbserver1.inventory.customers.Envelope (버전 1)라는 Kafka Connect 스키마가 포함되어 있습니다.

    op
    작업 유형을 설명하는 문자열 값이 포함된 필수 필드입니다. MySQL 커넥터의 값은 생성(또는 삽입), update의 경우 u, delete는 d, 읽기(스냅샷의 경우)는 r 입니다.
    before
    이벤트가 발생하기 전에 행의 상태를 포함하는 선택적 필드입니다. 이 구조는 dbserver1 커넥터가 inventory.customers 테이블의 모든 행에 사용하는 dbserver1.inventory.customers.Value Kafka Connect 스키마로 설명합니다.
    이벤트가 발생한 행의 상태를 포함하는 선택적 필드입니다. 구조는 이전에 사용된 것과 동일한 dbserver1.inventory.customers.Value Kafka Connect 스키마로 설명되어 있습니다.
    source
    이벤트에 대한 소스 메타데이터를 설명하는 구조를 포함하는 필수 필드에는 MySQL의 경우 커넥터 이름, 이벤트가 기록된 binlog 파일의 이름, 이벤트가 표시된 해당 binlog 파일의 위치, 이벤트 내의 행(두 개 이상 있는 경우), 영향을 받는 데이터베이스 및 테이블의 이름, 영향을 받는 데이터베이스 및 테이블의 이름이 포함되어 있습니다. MySQL 스레드 ID(이 이벤트가 스냅샷의 일부인지 여부), 사용 가능한 경우 MySQL 서버 ID 및 타임스탬프(초)입니다.
    ts_ms
    커넥터가 이벤트를 처리한 경우 Kafka Connect 작업을 실행하는 JVM에서 시스템 시계를 사용하는 선택적 필드입니다.
    참고

    이벤트의 JSON 표현은 설명하는 행보다 훨씬 길습니다. 이는 모든 이벤트 키와 값과 함께 Kafka Connect가 페이로드 를 설명하는 스키마 를 제공하기 때문입니다. 시간이 지남에 따라 이 구조는 변경될 수 있습니다. 그러나 이벤트 자체의 키와 값에 대한 스키마를 보유하면 특히 시간이 지남에 따라 발전할 때 애플리케이션을 사용하는 것이 메시지를 훨씬 쉽게 이해할 수 있습니다.

    Debezium MySQL 커넥터는 데이터베이스 테이블 구조에 따라 이러한 스키마를 구성합니다. DDL 문을 사용하여 MySQL 데이터베이스의 테이블 정의를 변경하는 경우 커넥터는 이러한 DDL 문을 읽고 Kafka Connect 스키마를 업데이트합니다. 이는 각 이벤트가 이벤트가 발생한 시점에서 시작된 테이블과 정확히 동일하게 구조화되는 유일한 방법입니다. 그러나 단일 테이블에 대한 모든 이벤트를 포함하는 Kafka 항목에 테이블 정의의 각 상태에 해당하는 이벤트가 있을 수 있습니다.

    JSON 컨버터에는 모든 메시지에 키 및 값 스키마가 포함되어 있으므로 매우 자세한 이벤트를 생성합니다.

  4. 이벤트의 스키마를 inventory 데이터베이스의 상태와 비교합니다. MySQL 명령줄 클라이언트를 실행하는 터미널에서 다음 문을 실행합니다.

    mysql> SELECT * FROM customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne       | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)
    Copy to Clipboard Toggle word wrap

    이는 검토한 이벤트 레코드가 데이터베이스의 레코드와 일치함을 보여줍니다.

Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat
맨 위로 이동