Debezium 시작하기


Red Hat Integration 2023.q2

Red Hat Integration 2.1.4와 함께 사용할 수 있습니다.

Red Hat Integration Documentation Team

초록

이 가이드에서는 Red Hat Integration 사용을 시작하는 방법을 설명합니다.

머리말

이 튜토리얼에서는 Debezium을 사용하여 MySQL 데이터베이스에서 업데이트를 캡처하는 방법을 설명합니다. 데이터베이스의 데이터가 변경되면 결과 이벤트 스트림을 볼 수 있습니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 용어를 교체하기 위해 최선을 다하고 있습니다. 먼저 마스터(master), 슬레이브(slave), 블랙리스트(blacklist), 화이트리스트(whitelist) 등 네 가지 용어를 교체하고 있습니다. 이러한 변경 작업은 작업 범위가 크므로 향후 여러 릴리스에 걸쳐 점차 구현할 예정입니다. 자세한 내용은 CTO Chris Wright의 메시지를 참조하십시오.

Red Hat 문서에 관한 피드백 제공

문서 개선을 위한 의견에 감사드립니다. 피드백을 제공하려면 문서의 텍스트를 강조 표시하고 주석을 추가합니다.

사전 요구 사항

  • Red Hat 고객 포털에 로그인되어 있습니다.
  • Red Hat 고객 포털에서 문서는 다중 페이지 HTML 보기 형식으로 되어 있습니다.

절차

피드백을 제공하려면 다음 단계를 수행합니다.

  1. 문서의 오른쪽 상단에 있는 피드백 버튼을 클릭하여 기존 피드백을 확인합니다.

    참고

    피드백 기능은 다중 페이지 HTML 형식에서만 활성화됩니다.

  2. 피드백을 제공하려는 문서의 섹션을 강조 표시합니다.
  3. 강조 표시된 텍스트 옆에 표시되는 피드백 추가 팝업을 클릭합니다.

    페이지 오른쪽에 있는 피드백 섹션에 입력란이 표시됩니다.

  4. 텍스트 상자에 피드백을 입력하고 제출을 클릭합니다.

    문서 문제가 생성됩니다.

  5. 문제를 보려면 피드백 보기에서 문제 링크를 클릭합니다.

1장. 이 튜토리얼 정보

이 튜토리얼에는 다음 단계가 포함됩니다.

  • 간단한 예제 데이터베이스를 사용하여 OpenShift에 MySQL 데이터베이스 서버를 배포합니다.
  • AMQ Streams에서 사용자 정의 리소스를 적용하여 Debezium MySQL 커넥터 플러그인을 포함하는 Kafka Connect 컨테이너 이미지를 자동으로 빌드합니다.
  • Debezium MySQL 커넥터 리소스를 생성하여 데이터베이스의 변경 사항을 캡처합니다.
  • 커넥터 배포를 확인합니다.
  • 커넥터가 데이터베이스에서 Kafka 주제로 내보내는 변경 이벤트를 확인합니다.

사전 요구 사항

  • OpenShift 및 AMQ Streams에 대해 잘 알고 있습니다.
  • 클러스터 Operator가 설치된 OpenShift 클러스터에 액세스할 수 있습니다.
  • AMQ Streams Operator가 실행 중입니다.
  • Apache Kafka 클러스터는 OpenShift에서 AMQ Streams 배포 및 업그레이드에 설명된 대로 배포됩니다.
  • 고객님은 Red Hat Integration 라이센스가 있습니다.
  • OpenShift 관리 툴 사용 방법을 알고 있습니다. OpenShift oc CLI 클라이언트가 설치 되었거나 OpenShift Container Platform 웹 콘솔에 액세스할 수 있습니다.
  • Kafka Connect 빌드 이미지를 저장하는 방법에 따라 컨테이너 레지스트리에 액세스할 수 있는 권한이 있어야 합니다. 또는 OpenShift에서 ImageStream 리소스를 생성해야 합니다.

    Red Hat Quay.io 또는 Docker Hub와 같은 이미지 레지스트리에 빌드 이미지를 저장하려면 다음을 수행합니다.
    • 레지스트리에서 이미지를 생성하고 관리하는 계정 및 권한입니다.
    빌드 이미지를 네이티브 OpenShift ImageStream으로 저장하려면 다음을 수행합니다.
    • ImageStream 리소스는 새 컨테이너 이미지를 저장하기 위해 클러스터에 배포됩니다. 클러스터의 ImageStream을 명시적으로 생성해야 합니다. 이미지 스트림은 기본적으로 사용할 수 없습니다.

2장. Debezium 소개

Debezium은 기존 데이터베이스의 정보를 이벤트 스트림으로 변환하여 애플리케이션을 탐지하고 데이터베이스의 행 수준 변경에 즉시 응답할 수 있도록 하는 분산 플랫폼입니다.

Debezium은 Apache Kafka 를 기반으로 하며 Kafka Connect 호환 커넥터 세트를 제공합니다. 각 커넥터는 특정 DBMS(Database Management System)에서 작동합니다. 커넥터는 변경 사항을 감지하여 DBMS의 데이터 변경 내역을 기록하고 각 변경 이벤트의 레코드를 Kafka 주제로 스트리밍합니다. 그런 다음 애플리케이션을 사용하는 경우 Kafka 주제에서 결과 이벤트 레코드를 읽을 수 있습니다.

Kafka의 안정적인 스트리밍 플랫폼을 활용함으로써 Debezium을 사용하면 애플리케이션이 데이터베이스에서 발생하는 변경 사항을 정확하고 완전하게 사용할 수 있습니다. 애플리케이션이 예기치 않게 중지되거나 연결이 끊어도 중단 중에 발생하는 이벤트를 놓치지 않습니다. 애플리케이션을 다시 시작한 후 해제된 지점에서 주제에서 읽기를 다시 시작합니다.

다음 튜토리얼에서는 간단한 구성으로 Debezium MySQL 커넥터 를 배포하고 사용하는 방법을 보여줍니다. Debezium 커넥터 배포 및 사용에 대한 자세한 내용은 커넥터 설명서를 참조하십시오.

3장. 서비스 시작

Debezium을 사용하려면 Kafka 및 Kafka Connect, 데이터베이스 및 Debezium 커넥터 서비스와 함께 AMQ Streams가 필요합니다. 이 튜토리얼에 대한 서비스를 실행하려면 다음을 수행해야합니다.

3.1. MySQL 데이터베이스 배포

데이터로 미리 채워진 여러 테이블을 포함하는 inventory 데이터베이스 예제를 포함하는 MySQL 데이터베이스 서버를 배포합니다. Debezium MySQL 커넥터는 샘플 테이블에서 발생하는 변경 사항을 캡처하고 변경 사항 이벤트 레코드를 Apache Kafka 항목에 전송합니다.

절차

  1. 다음 명령을 실행하여 MySQL 데이터베이스를 시작합니다. 이 명령은 예제 inventory 데이터베이스로 구성된 MySQL 데이터베이스 서버를 시작합니다.

    $ oc new-app -l app=mysql --name=mysql quay.io/debezium/example-mysql:latest
    Copy to Clipboard Toggle word wrap
  2. MySQL 데이터베이스의 배포 구성을 업데이트하여 사용자 이름과 암호를 추가하는 다음 명령을 실행하여 MySQL 데이터베이스에 대한 자격 증명을 구성합니다.

    $ oc set env deployment/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
    Copy to Clipboard Toggle word wrap
  3. 다음 명령을 호출하여 MySQL 데이터베이스가 실행 중인지 확인합니다. 출력 뒤에 MySQL 데이터베이스가 실행 중이고 포드가 준비되었음을 보여줍니다.

    $ oc get pods -l app=mysql
    NAME            READY   STATUS    RESTARTS   AGE
    mysql-1-2gzx5   1/1     Running   1          23s
    Copy to Clipboard Toggle word wrap
  4. 새 터미널을 열고 샘플 인벤토리 데이터베이스에 로그인합니다.

    이 명령은 MySQL 데이터베이스를 실행하는 포드에서 MySQL 명령줄 클라이언트를 엽니다. 클라이언트는 이전에 구성한 사용자 이름과 암호를 사용합니다.

    $ oc exec mysql-1-2gzx5 -it -- mysql -u mysqluser -pmysqlpw inventory
    mysql: [Warning] Using a password on the command line interface can be insecure.
    Reading table information for completion of table and column names
    You can turn off this feature to get a quicker startup with -A
    
    Welcome to the MySQL monitor.  Commands end with ; or \g.
    Your MySQL connection id is 7
    Server version: 5.7.29-log MySQL Community Server (GPL)
    
    Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.
    
    Oracle is a registered trademark of Oracle Corporation and/or its
    affiliates. Other names may be trademarks of their respective
    owners.
    
    Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
    
    mysql>
    Copy to Clipboard Toggle word wrap
  5. inventory 데이터베이스의 테이블을 나열합니다.

    mysql> show tables;
    +---------------------+
    | Tables_in_inventory |
    +---------------------+
    | addresses           |
    | customers           |
    | geom                |
    | orders              |
    | products            |
    | products_on_hand    |
    +---------------------+
    6 rows in set (0.00 sec)
    Copy to Clipboard Toggle word wrap
  6. 데이터베이스를 살펴보고 포함된 데이터를 확인합니다(예: customers 테이블을 확인합니다.

    mysql> select * from customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne       | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)
    Copy to Clipboard Toggle word wrap

3.2. Kafka Connect 배포

MySQL 데이터베이스를 배포한 후 AMQ Streams를 사용하여 Debezium MySQL 커넥터 플러그인이 포함된 Kafka Connect 컨테이너 이미지를 빌드합니다. 배포 프로세스 중에 다음 사용자 정의 리소스(CR)를 생성하고 사용합니다.

  • Kafka Connect 인스턴스를 정의하고 이미지에 포함할 MySQL 커넥터 아티팩트에 대한 정보를 포함하는 KafkaConnect CR입니다.
  • MySQL 커넥터가 소스 데이터베이스에 액세스하는 데 사용하는 정보를 포함하는 세부 정보를 제공하는 KafkaConnector CR입니다. AMQ Streams가 Kafka Connect Pod를 시작한 후 KafkaConnector CR을 적용하여 커넥터를 시작합니다.

빌드 프로세스 중에 AMQ Streams Operator는 Debezium 커넥터 정의를 비롯한 KafkaConnect 사용자 정의 리소스의 입력 매개변수를 Kafka Connect 컨테이너 이미지로 변환합니다. 이 빌드는 Red Hat Maven 리포지토리에서 필요한 아티팩트를 다운로드하여 이미지에 통합합니다. 새로 생성된 컨테이너는 .spec.build.output 에 지정된 컨테이너 레지스트리로 푸시되며 Kafka Connect Pod를 배포하는 데 사용됩니다. AMQ Streams가 Kafka Connect 이미지를 빌드한 후 KafkaConnector 사용자 정의 리소스를 사용하여 커넥터를 시작합니다.

절차

  1. OpenShift 클러스터에 로그인하고 프로젝트를 생성하거나 엽니다(예: debezium ).
  2. 커넥터에 대한 KafkaConnect 사용자 정의 리소스(CR)를 생성하거나 기존 리소스를 수정합니다.
    다음 예제는 KafkaConnect 사용자 정의 리소스를 설명하는 dbz-connect.yaml 파일에서 발췌한 내용을 보여줍니다.
    metadata.annotationsspec.build 속성이 필요합니다.

    예 3.1. Debezium 커넥터를 포함하는 KafkaConnect 사용자 정의 리소스를 정의하는 dbz-connect.yaml 파일

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 
    1
    
    spec:
      replicas: 1
      version: 3.3.1
      build: 
    2
    
        output: 
    3
    
          type: imagestream  
    4
    
          image: debezium-streams-connect:latest
        plugins: 
    5
    
          - name: debezium-connector-mysql
            artifacts:
              - type: zip 
    6
    
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.1.4.Final-redhat-00001/debezium-connector-mysql-2.1.4.Final-redhat-00001-plugin.zip 
    7
    
      bootstrapServers: my-cluster-kafka-bootstrap:9093
    
    ...
    Copy to Clipboard Toggle word wrap
    Expand
    표 3.1. Kafka Connect 구성 설정에 대한 설명
    항목설명

    1

    Cluster Operator가 KafkaConnector 리소스를 사용하여 이 Kafka Connect 클러스터에서 커넥터를 구성할 수 있도록 strimzi.io/use-connector-resources 주석을 "true" 로 설정합니다.

    2

    spec.build 구성은 빌드 이미지를 저장할 위치를 지정하고 플러그인 아티팩트의 위치와 함께 이미지에 포함할 플러그인을 나열합니다.

    3

    build.output 은 새로 빌드된 이미지가 저장된 레지스트리를 지정합니다.

    4

    이미지 출력의 이름과 이미지 이름을 지정합니다. output.type 에 유효한 값은 Docker Hub 또는 Quay와 같은 컨테이너 레지스트리로 푸시하거나 이미지를 내부 OpenShift ImageStream으로 푸시하는 이미지 스트림 입니다. ImageStream을 사용하려면 ImageStream 리소스를 클러스터에 배포해야 합니다. KafkaConnect 구성에서 build.output 을 지정하는 방법에 대한 자세한 내용은 AMQ Streams Build 스키마 참조 설명서를 참조하십시오.

    5

    plugins 구성에는 Kafka Connect 이미지에 포함하려는 모든 커넥터가 나열됩니다. 목록의 각 항목에 대해 플러그인 이름을 지정하고 커넥터를 빌드하는 데 필요한 아티팩트에 대한 정보를 지정합니다. 선택적으로 각 커넥터 플러그인에 대해 커넥터와 함께 사용할 수 있는 다른 구성 요소를 포함할 수 있습니다. 예를 들어 서비스 레지스트리 아티팩트 또는 Debezium 스크립팅 구성 요소를 추가할 수 있습니다.

    6

    artifacts.type 값은 artifacts.url 에 지정된 아티팩트의 파일 유형을 지정합니다. 유효한 유형은 zip,tgz, 또는 10.0.0.1 입니다. Debezium 커넥터 아카이브는 .zip 파일 형식으로 제공됩니다. JDBC 드라이버 파일은 .jar 형식입니다. type 값은 url 필드에서 참조되는 파일의 유형과 일치해야 합니다.

    7

    artifacts.url 값은 커넥터 아티팩트의 파일을 저장하는 Maven 리포지토리와 같은 HTTP 서버의 주소를 지정합니다. OpenShift 클러스터는 지정된 서버에 액세스할 수 있어야 합니다.

  3. 다음 명령을 입력하여 KafkaConnect 빌드 사양을 OpenShift 클러스터에 적용합니다.

    oc create -f dbz-connect.yaml
    Copy to Clipboard Toggle word wrap

    사용자 정의 리소스에 지정된 구성에 따라 AMQ Streams Operator는 배포할 Kafka Connect 이미지를 준비합니다.
    빌드가 완료되면 Operator에서 이미지를 지정된 레지스트리 또는 ImageStream으로 푸시하고 Kafka Connect 클러스터를 시작합니다. 구성에 나열된 커넥터 아티팩트는 클러스터에서 사용할 수 있습니다.

  4. KafkaConnector 리소스를 생성하여 MySQL 커넥터의 인스턴스를 정의합니다.
    예를 들어 다음 KafkaConnector CR을 생성하여 debezium-inventory-connector.yaml로 저장합니다.

    예 3.2. Debezium 커넥터의 KafkaConnector 사용자 정의 리소스를 정의하는 mysql-inventory-connector.yaml 파일

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: my-connect-cluster
      name: inventory-connector 
    1
    
    spec:
      class: io.debezium.connector.mysql.MySqlConnector 
    2
    
      tasksMax: 1  
    3
    
      config:  
    4
    
        database.hostname: mysql 
    5
    
        database.port: 3306   
    6
    
        database.user: debezium  
    7
    
        database.password: dbz  
    8
    
        database.server.id: 184054
        topic.prefix: dbserver1  
    9
    
        table.include.list: inventory.*  
    10
    
        schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 
    11
    
        schema.history.internal.kafka.topic: schema-changes.inventory 
    12
    Copy to Clipboard Toggle word wrap
    Expand
    표 3.2. 커넥터 구성 설정에 대한 설명
    항목설명

    1

    Kafka Connect 클러스터에 등록할 커넥터의 이름입니다.

    2

    커넥터 클래스의 이름입니다.

    3

    한 번에 하나의 작업만 작동해야 합니다. MySQL 커넥터가 MySQL 서버의 binlog 를 읽을 때 단일 커넥터 작업을 사용하여 적절한 순서 및 이벤트 처리를 확인합니다. Kafka Connect 서비스는 커넥터를 사용하여 작업을 완료하기 위해 하나 이상의 작업을 시작합니다. Kafka Connect 서비스 클러스터에 실행 중인 작업을 자동으로 배포합니다. 서비스가 중지되거나 중단되면 작업이 실행 중인 서비스에 재배포됩니다.

    4

    커넥터의 구성입니다.

    5

    MySQL 데이터베이스 인스턴스의 호스트 이름 또는 주소입니다.

    6

    데이터베이스 인스턴스의 포트 번호입니다.

    7

    Debezium이 데이터베이스에 연결하는 사용자 계정의 이름입니다.

    8

    Debezium에서 데이터베이스 사용자 계정에 연결하는 데 사용하는 암호입니다.

    9

    MySQL 서버 또는 클러스터의 주제 접두사입니다. 이 문자열은 커넥터가 이벤트 레코드를 전송하는 모든 Kafka 항목의 이름을 접두사로 지정합니다.

    10

    커넥터가 변경 이벤트를 캡처하는 테이블 목록입니다. 커넥터는 인벤토리 테이블에서 발생하는 경우에만 변경 사항을 탐지합니다.

    11

    커넥터가 DDL 문을 데이터베이스 스키마 기록 항목에 쓰고 복구하는 데 사용하는 Kafka 브로커 목록입니다. 커넥터가 변경 이벤트 레코드를 전송하는 브로커와 동일합니다. 다시 시작한 후 커넥터는 커넥터가 읽기를 재개할 때 binlog의 시점에 존재하는 데이터베이스 스키마를 복구합니다.

    12

    데이터베이스 스키마 기록 항목의 이름입니다. 이 주제는 내부 용도로만 사용되며 소비자가 사용해서는 안 됩니다.

  5. 다음 명령을 실행하여 커넥터 리소스를 생성합니다.

    oc create -n <namespace> -f <kafkaConnector>.yaml
    Copy to Clipboard Toggle word wrap

    예를 들면 다음과 같습니다.

    oc create -n debezium -f mysql-inventory-connector.yaml
    Copy to Clipboard Toggle word wrap

    커넥터는 Kafka Connect 클러스터에 등록되며 KafkaConnector CR에서 spec.config.database.dbname 에 지정된 데이터베이스에 대해 실행되기 시작합니다. 커넥터 Pod가 준비되면 Debezium이 실행됩니다.

이제 커넥터가 생성되었고 inventory 데이터베이스의 변경 사항을 캡처하기 시작했는지 확인할 준비가 되었습니다.

3.3. 커넥터 배포 확인

커넥터가 오류 없이 올바르게 시작되면 커넥터가 캡처하도록 구성된 각 테이블에 대해 항목이 생성됩니다. 다운스트림 애플리케이션은 이러한 주제를 구독하여 소스 데이터베이스에서 발생하는 정보 이벤트를 검색할 수 있습니다.

커넥터가 실행 중인지 확인하려면 OpenShift Container Platform 웹 콘솔 또는 OpenShift CLI 툴(oc)에서 다음 작업을 수행합니다.

  • 커넥터 상태를 확인합니다.
  • 커넥터가 주제를 생성하는지 확인합니다.
  • 커넥터가 각 테이블의 초기 스냅샷 중에 생성하는 읽기 작업("op":"r")에 대해 주제가 채워지는지 확인합니다.

사전 요구 사항

  • Debezium 커넥터는 OpenShift의 AMQ Streams에 배포됩니다.
  • OpenShift oc CLI 클라이언트가 설치되어 있어야 합니다.
  • OpenShift Container Platform 웹 콘솔에 액세스할 수 있습니다.

절차

  1. 다음 방법 중 하나를 사용하여 KafkaConnector 리소스의 상태를 확인합니다.

    • OpenShift Container Platform 웹 콘솔에서 다음을 수행합니다.

      1. 홈 → 검색으로 이동합니다.
      2. 검색 페이지에서 리소스를 클릭하여 리소스 선택 상자를 연 다음 KafkaConnector 를 입력합니다.
      3. KafkaConnectors 목록에서 확인할 커넥터의 이름을 클릭합니다(예: inventory-connector ).
      4. Conditions 섹션에서 TypeStatus 열의 값이 ReadyTrue 로 설정되어 있는지 확인합니다.
    • 터미널 창에서 다음을 수행합니다.

      1. 다음 명령을 실행합니다.

        oc describe KafkaConnector <connector-name> -n <project>
        Copy to Clipboard Toggle word wrap

        예를 들면 다음과 같습니다.

        oc describe KafkaConnector inventory-connector -n debezium
        Copy to Clipboard Toggle word wrap

        이 명령은 다음 출력과 유사한 상태 정보를 반환합니다.

        예 3.3. KafkaConnector 리소스 상태

        Name:         inventory-connector
        Namespace:    debezium
        Labels:       strimzi.io/cluster=my-connect-cluster
        Annotations:  <none>
        API Version:  kafka.strimzi.io/v1beta2
        Kind:         KafkaConnector
        
        ...
        
        Status:
          Conditions:
            Last Transition Time:  2021-12-08T17:41:34.897153Z
            Status:                True
            Type:                  Ready
          Connector Status:
            Connector:
              State:      RUNNING
              worker_id:  10.131.1.124:8083
            Name:         inventory-connector
            Tasks:
              Id:               0
              State:            RUNNING
              worker_id:        10.131.1.124:8083
            Type:               source
          Observed Generation:  1
          Tasks Max:            1
          Topics:
            dbserver1
            dbserver1.inventory.addresses
            dbserver1.inventory.customers
            dbserver1.inventory.geom
            dbserver1.inventory.orders
            dbserver1.inventory.products
            dbserver1.inventory.products_on_hand
        Events:  <none>
        Copy to Clipboard Toggle word wrap
  2. 커넥터가 Kafka 주제를 생성했는지 확인합니다.

    • OpenShift Container Platform 웹 콘솔에서 다음을 수행합니다.

      1. 홈 → 검색으로 이동합니다.
      2. 검색 페이지에서 리소스를 클릭하여 리소스 선택 상자를 연 다음 KafkaTopic 을 입력합니다.
      3. KafkaTopics 목록에서 dbserver1.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d 항목을 클릭합니다.
      4. Conditions 섹션에서 TypeStatus 열의 값이 ReadyTrue 로 설정되어 있는지 확인합니다.
    • 터미널 창에서 다음을 수행합니다.

      1. 다음 명령을 실행합니다.

        oc get kafkatopics
        Copy to Clipboard Toggle word wrap

        이 명령은 다음 출력과 유사한 상태 정보를 반환합니다.

        예 3.4. KafkaTopic 리소스 상태

        NAME                   CLUSTER  PARTITIONS  REPLICATION FACTOR  READY
        connect-cluster-configs  my-cluster   1        1            True
        connect-cluster-offsets  my-cluster   25       1            True
        connect-cluster-status   my-cluster   5        1            True
        consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 1 True
        dbserver1---a96f69b23d6118ff415f772679da623fbbb99421 my-cluster 1 1 True
        dbserver1.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480  my-cluster 1 1 True
        dbserver1.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b  my-cluster 1 1   True
        dbserver1.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5  my-cluster 1 1   True
        dbserver1.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d my-cluster 1 1   True
        dbserver1.inventory.products---df0746db116844cee2297fab611c21b56f82dcef my-cluster 1 1   True
        dbserver1.inventory.products-on-hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5  my-cluster 1  1 True
        schema-changes.inventory my-cluster    1           1       True
        strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                  my-cluster   1    1  True
        strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1    1  True
        Copy to Clipboard Toggle word wrap
  3. 주제 콘텐츠를 확인합니다.

    • 터미널 창에서 다음 명령을 입력합니다.
    oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>
    Copy to Clipboard Toggle word wrap

    예를 들면 다음과 같습니다.

     oc exec -n debezium  -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=dbserver1.inventory.products_on_hand
    Copy to Clipboard Toggle word wrap

    주제 이름을 지정하는 형식은 oc describe 명령과 1단계에서 반환하는 것과 동일합니다(예: dbserver1.inventory.addresses ).

    주제의 각 이벤트에 대해 명령에서 다음 출력과 유사한 정보를 반환합니다.

    예 3.5. Debezium 변경 이벤트의 내용

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.1.4.Final-redhat-00001","connector":"mysql","name":"dbserver1","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
    Copy to Clipboard Toggle word wrap

    위 예제에서 페이로드 값은 커넥터 스냅샷이 dbserver1.anchor_on_hand 에서 읽기("op" ="r") 이벤트를 생성했음을 보여줍니다. product_id 레코드의 "before" 상태는 레코드에 이전 값이 없음을 나타내는 null 입니다. "after" 상태는 product_id 101 이 있는 항목에 대한 수량 을 표시합니다.

이제 Debezium 커넥터가 인벤토리 데이터베이스에서 캡처하는 변경 이벤트를 볼 수 있습니다.

4장. 변경 이벤트 보기

Debezium MySQL 커넥터를 배포한 후 인벤토리 데이터베이스에 대한 변경 사항 캡처를 시작합니다.

커넥터가 시작되면 MySQL 데이터베이스의 테이블 중 하나를 나타내는 Apache Kafka 주제 집합에 이벤트를 씁니다. 각 항목의 이름은 데이터베이스 서버 dbserver1 의 이름으로 시작됩니다.

커넥터는 다음 Kafka 항목에 씁니다.

dbserver1
변경 사항이 캡처되는 테이블에 적용되는 DDL 문을 작성하는 스키마 변경 주제입니다.
dbserver1.inventory.products
inventory 데이터베이스의 products 테이블에 대한 변경 이벤트 레코드를 수신합니다.
dbserver1.inventory.products_on_hand
inventory 데이터베이스의 products_on_hand 테이블에 대한 변경 이벤트 레코드를 수신합니다.
dbserver1.inventory.customers
inventory 데이터베이스의 customers 테이블에 대한 변경 이벤트 레코드를 받습니다.
dbserver1.inventory.orders
inventory 데이터베이스의 orders 테이블에 대한 변경 이벤트 레코드를 수신합니다.

이 튜토리얼의 나머지 부분에서는 dbserver1.inventory.customers Kafka 주제를 검사합니다. 주제를 더 자세히 살펴보면 다양한 유형의 변경 이벤트를 나타내는 방법을 확인하고 각 이벤트가 캡처된 커넥터에 대한 정보를 찾을 수 있습니다.

이 튜토리얼에는 다음 섹션이 포함되어 있습니다.

4.1. 생성 이벤트 보기

dbserver1.inventory.customers 주제를 보면 MySQL 커넥터가 inventory 데이터베이스에서 생성 이벤트를 캡처하는 방법을 확인할 수 있습니다. 이 경우 생성 이벤트는 데이터베이스에 추가되는 새 고객을 캡처합니다.

절차

  1. 새 터미널을 열고 kafka-console-consumer 를 사용하여 주제 시작부터 dbserver1.inventory.customers 주제를 사용합니다.

    이 명령은 Kafka(my-cluster-kafka-0)를 실행하는 Pod에서 간단한 소비자(kafka-console-consumer.sh)를 실행합니다.

    $ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server localhost:9092 \
      --from-beginning \
      --property print.key=true \
      --topic dbserver1.inventory.customers
    Copy to Clipboard Toggle word wrap

    소비자는 customers 테이블의 각 행에 대해 4개의 메시지(JSON 형식)를 반환합니다. 각 메시지에는 해당 테이블 행에 대한 이벤트 레코드가 포함되어 있습니다.

    각 이벤트에는 의 두 개의 JSON 문서가 있습니다. 키는 행의 기본 키에 해당하고, 값은 행의 세부 정보(행이 포함된 필드, 각 필드의 값, 행에서 수행된 작업 유형)를 표시합니다.

  2. 마지막 이벤트의 경우 의 세부 정보를 검토합니다.

    다음은 마지막 이벤트의 (읽성을 위해 포맷됨)에 대한 세부 정보입니다.

    {
      "schema":{
        "type":"struct",
          "fields":[
            {
              "type":"int32",
              "optional":false,
              "field":"id"
            }
          ],
        "optional":false,
        "name":"dbserver1.inventory.customers.Key"
      },
      "payload":{
        "id":1004
      }
    }
    Copy to Clipboard Toggle word wrap

    이벤트에는 스키마페이로드 의 두 부분이 있습니다. 스키마에 는 페이로드에 있는 내용을 설명하는 Kafka Connect 스키마가 포함되어 있습니다. 이 경우 페이로드는 선택 사항이 아니며 필수 필드( int32유형의id )가 있는 dbserver1.inventory.customers.Key 라는 구조입니다.

    페이로드 에는 값이 1004 인 단일 id 필드가 있습니다.

    이벤트 를 검토하면 id 기본 키 열에 값이 1004inventory.customers 테이블의 행에 이 이벤트가 적용되는 것을 확인할 수 있습니다.

  3. 동일한 이벤트의 의 세부 사항을 검토합니다.

    이벤트 값은 행이 생성되었으며 포함된 내용(이 경우 id,first_name,last_name, 삽입된 행의 이메일 )을 설명합니다.

    다음은 마지막 이벤트의 (읽성을 위해 포맷됨)에 대한 세부 정보입니다.

    {
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "before"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "int32",
                "optional": false,
                "field": "id"
              },
              {
                "type": "string",
                "optional": false,
                "field": "first_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "last_name"
              },
              {
                "type": "string",
                "optional": false,
                "field": "email"
              }
            ],
            "optional": true,
            "name": "dbserver1.inventory.customers.Value",
            "field": "after"
          },
          {
            "type": "struct",
            "fields": [
              {
                "type": "string",
                "optional": true,
                "field": "version"
              },
              {
                "type": "string",
                "optional": false,
                "field": "name"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "server_id"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "ts_sec"
              },
              {
                "type": "string",
                "optional": true,
                "field": "gtid"
              },
              {
                "type": "string",
                "optional": false,
                "field": "file"
              },
              {
                "type": "int64",
                "optional": false,
                "field": "pos"
              },
              {
                "type": "int32",
                "optional": false,
                "field": "row"
              },
              {
                "type": "boolean",
                "optional": true,
                "field": "snapshot"
              },
              {
                "type": "int64",
                "optional": true,
                "field": "thread"
              },
              {
                "type": "string",
                "optional": true,
                "field": "db"
              },
              {
                "type": "string",
                "optional": true,
                "field": "table"
              }
            ],
            "optional": false,
            "name": "io.debezium.connector.mysql.Source",
            "field": "source"
          },
          {
            "type": "string",
            "optional": false,
            "field": "op"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          }
        ],
        "optional": false,
        "name": "dbserver1.inventory.customers.Envelope",
        "version": 1
      },
      "payload": {
        "before": null,
        "after": {
          "id": 1004,
          "first_name": "Anne",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "source": {
          "version": "2.1.4.Final",
          "name": "dbserver1",
          "server_id": 0,
          "ts_sec": 0,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 154,
          "row": 0,
          "snapshot": true,
          "thread": null,
          "db": "inventory",
          "table": "customers"
        },
        "op": "r",
        "ts_ms": 1486500577691
      }
    }
    Copy to Clipboard Toggle word wrap

    이벤트의 이 부분은 훨씬 길지만 이벤트의 처럼 스키마페이로드 도 있습니다. 스키마에 는 5개의 필드를 포함할 수 있는 dbserver1.inventory.customers.Envelope (버전 1)라는 Kafka Connect 스키마가 포함되어 있습니다.

    op
    작업 유형을 설명하는 문자열 값이 포함된 필수 필드입니다. MySQL 커넥터의 값은 생성(또는 삽입), update의 경우 u, delete는 d, 읽기(스냅샷의 경우)는 r 입니다.
    before
    이벤트가 발생하기 전에 행의 상태를 포함하는 선택적 필드입니다. 이 구조는 dbserver1 커넥터가 inventory.customers 테이블의 모든 행에 사용하는 dbserver1.inventory.customers.Value Kafka Connect 스키마로 설명합니다.
    이벤트가 발생한 행의 상태를 포함하는 선택적 필드입니다. 구조는 이전에 사용된 것과 동일한 dbserver1.inventory.customers.Value Kafka Connect 스키마로 설명되어 있습니다.
    source
    이벤트에 대한 소스 메타데이터를 설명하는 구조를 포함하는 필수 필드에는 MySQL의 경우 커넥터 이름, 이벤트가 기록된 binlog 파일의 이름, 이벤트가 표시된 해당 binlog 파일의 위치, 이벤트 내의 행(두 개 이상 있는 경우), 영향을 받는 데이터베이스 및 테이블의 이름, 영향을 받는 데이터베이스 및 테이블의 이름이 포함되어 있습니다. MySQL 스레드 ID(이 이벤트가 스냅샷의 일부인지 여부), 사용 가능한 경우 MySQL 서버 ID 및 타임스탬프(초)입니다.
    ts_ms
    커넥터가 이벤트를 처리한 경우 Kafka Connect 작업을 실행하는 JVM에서 시스템 시계를 사용하는 선택적 필드입니다.
    참고

    이벤트의 JSON 표현은 설명하는 행보다 훨씬 길습니다. 이는 모든 이벤트 키와 값과 함께 Kafka Connect가 페이로드 를 설명하는 스키마 를 제공하기 때문입니다. 시간이 지남에 따라 이 구조는 변경될 수 있습니다. 그러나 이벤트 자체의 키와 값에 대한 스키마를 보유하면 특히 시간이 지남에 따라 발전할 때 애플리케이션을 사용하는 것이 메시지를 훨씬 쉽게 이해할 수 있습니다.

    Debezium MySQL 커넥터는 데이터베이스 테이블 구조에 따라 이러한 스키마를 구성합니다. DDL 문을 사용하여 MySQL 데이터베이스의 테이블 정의를 변경하는 경우 커넥터는 이러한 DDL 문을 읽고 Kafka Connect 스키마를 업데이트합니다. 이는 각 이벤트가 이벤트가 발생한 시점에서 시작된 테이블과 정확히 동일하게 구조화되는 유일한 방법입니다. 그러나 단일 테이블에 대한 모든 이벤트를 포함하는 Kafka 항목에 테이블 정의의 각 상태에 해당하는 이벤트가 있을 수 있습니다.

    JSON 컨버터에는 모든 메시지에 키 및 값 스키마가 포함되어 있으므로 매우 자세한 이벤트를 생성합니다.

  4. 이벤트의 스키마를 inventory 데이터베이스의 상태와 비교합니다. MySQL 명령줄 클라이언트를 실행하는 터미널에서 다음 문을 실행합니다.

    mysql> SELECT * FROM customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne       | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)
    Copy to Clipboard Toggle word wrap

    이는 검토한 이벤트 레코드가 데이터베이스의 레코드와 일치함을 보여줍니다.

4.2. 데이터베이스 업데이트 및 업데이트 이벤트 보기

이제 Debezium MySQL 커넥터가 인벤토리 데이터베이스에서 create 이벤트를 캡처한 방법을 보냈으므로 이제 레코드 중 하나를 변경하고 커넥터가 이를 캡처하는 방법을 확인할 수 있습니다.

이 절차를 완료하면 데이터베이스 커밋에서 변경된 사항에 대한 세부 정보와 변경 이벤트를 비교하여 다른 변경 사항과 관련하여 변경이 발생한 시기를 결정하는 방법을 알아봅니다.

절차

  1. MySQL 명령줄 클라이언트를 실행하는 터미널에서 다음 문을 실행합니다.

    mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
    Query OK, 1 row affected (0.05 sec)
    Rows matched: 1  Changed: 1  Warnings: 0
    Copy to Clipboard Toggle word wrap
  2. 업데이트된 고객 테이블을 확인합니다.

    mysql> SELECT * FROM customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne Marie | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)
    Copy to Clipboard Toggle word wrap
  3. kafka-console-consumer 를 실행하는 터미널로 전환하여 5번째 이벤트를 확인합니다.

    고객 테이블의 레코드를 변경하면 Debezium MySQL 커넥터가 새 이벤트를 생성했습니다. 두 개의 새로운 JSON 문서, 즉 하나는 이벤트의 용이고 다른 하나는 새 이벤트의 이어야 합니다.

    다음은 업데이트 이벤트의 세부 정보(읽성을 위해 포맷됨)입니다.

      {
        "schema": {
          "type": "struct",
          "name": "dbserver1.inventory.customers.Key"
          "optional": false,
          "fields": [
            {
              "field": "id",
              "type": "int32",
              "optional": false
            }
          ]
        },
        "payload": {
          "id": 1004
        }
      }
    Copy to Clipboard Toggle word wrap

    키는 이전 이벤트의 와 동일합니다.

    다음은 이 새로운 이벤트의 가치입니다. 스키마 섹션에는 변경 사항이 없으므로 페이로드 섹션만 표시됩니다( 읽기 쉽도록 포맷됨).

    {
      "schema": {...},
      "payload": {
        "before": {  
    1
    
          "id": 1004,
          "first_name": "Anne",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "after": {  
    2
    
          "id": 1004,
          "first_name": "Anne Marie",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "source": {  
    3
    
          "name": "2.1.4.Final",
          "name": "dbserver1",
          "server_id": 223344,
          "ts_sec": 1486501486,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 364,
          "row": 0,
          "snapshot": null,
          "thread": 3,
          "db": "inventory",
          "table": "customers"
        },
        "op": "u",  
    4
    
        "ts_ms": 1486501486308  
    5
    
      }
    }
    Copy to Clipboard Toggle word wrap
    1 1 1
    이제 before 필드에는 데이터베이스가 커밋되기 전에 값이 포함된 행 상태가 됩니다.
    2 2 2
    after 필드에는 이제 행의 업데이트된 상태가 되고 first_name 값은 이제 Anne Marie 입니다.
    3 3 3
    소스 필드 구조에는 ts_secpos 필드가 변경되었음을 제외하고 이전과 동일한 값이 많이 있습니다(다른 상황에서 파일이 변경될 수 있음).
    4 4 4
    이제 op 필드 값이 u 이므로 이 행이 업데이트로 인해 변경되었음을 나타냅니다.
    5 5 5
    ts_ms 필드는 Debezium이 이벤트를 처리할 때의 타임 스탬프를 보여줍니다.

    페이로드 섹션을 보면 업데이트 이벤트에 대한 몇 가지 중요한 사항을 확인할 수 있습니다.

    • 이전 구조와 이후 구조를 비교하면 커밋으로 인해 영향을 받는 행에서 실제로 변경된 내용을 확인할 수 있습니다.
    • 소스 구조를 검토하여 변경 사항의 MySQL 레코드에 대한 정보를 찾을 수 있습니다(추적 추적 기능 제공).
    • 이벤트의 페이로드 섹션을 동일한 주제(또는 다른 주제)의 다른 이벤트와 비교하면 이벤트가 다른 이벤트와 동일한 MySQL 커밋 전, 이후 또는 일부인지 확인할 수 있습니다.

4.3. 데이터베이스에서 레코드를 삭제하고 삭제 이벤트 보기

이제 Debezium MySQL 커넥터가 인벤토리 데이터베이스에서 생성업데이트 이벤트를 캡처하는 방법을 보냈으므로 이제 레코드 중 하나를 삭제하고 커넥터가 이를 캡처하는 방법을 확인할 수 있습니다.

이 절차를 완료하면 삭제 이벤트 관련 세부 정보를 찾는 방법과 Kafka가 로그 압축 을 사용하여 사용자가 모든 이벤트를 가져올 수 있도록 삭제 이벤트 수를 줄이는 방법을 알아봅니다.

절차

  1. MySQL 명령줄 클라이언트를 실행하는 터미널에서 다음 문을 실행합니다.

    mysql> DELETE FROM customers WHERE id=1004;
    Query OK, 1 row affected (0.00 sec)
    Copy to Clipboard Toggle word wrap
    참고

    외부 키 제약 조건 위반으로 위의 명령이 실패하는 경우 다음 문을 사용하여 고객 주소 참조를 주소 표에서 제거해야 합니다.

    mysql> DELETE FROM addresses WHERE customer_id=1004;
    Copy to Clipboard Toggle word wrap
  2. kafka-console-consumer 를 실행하는 터미널로 전환하여 새 이벤트 개를 확인합니다.

    고객 테이블의 행을 삭제하여 Debezium MySQL 커넥터는 두 개의 새로운 이벤트를 생성했습니다.

  3. 첫 번째 새 이벤트의 값을 검토합니다.

    다음은 첫 번째 새 이벤트의 세부 정보(읽성을 위해 포맷됨)입니다.

    {
      "schema": {
        "type": "struct",
        "name": "dbserver1.inventory.customers.Key"
        "optional": false,
        "fields": [
          {
            "field": "id",
            "type": "int32",
            "optional": false
          }
        ]
      },
      "payload": {
        "id": 1004
      }
    }
    Copy to Clipboard Toggle word wrap

    키는 이전 두 이벤트의 와 동일합니다.

    다음은 첫 번째 새 이벤트의 (읽성을 위해 포맷됨)입니다.

    {
      "schema": {...},
      "payload": {
        "before": {  
    1
    
          "id": 1004,
          "first_name": "Anne Marie",
          "last_name": "Kretchmar",
          "email": "annek@noanswer.org"
        },
        "after": null,  
    2
    
        "source": {  
    3
    
          "name": "2.1.4.Final",
          "name": "dbserver1",
          "server_id": 223344,
          "ts_sec": 1486501558,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 725,
          "row": 0,
          "snapshot": null,
          "thread": 3,
          "db": "inventory",
          "table": "customers"
        },
        "op": "d",  
    4
    
        "ts_ms": 1486501558315  
    5
    
      }
    }
    Copy to Clipboard Toggle word wrap
    1
    이제 before 필드에 데이터베이스 커밋으로 삭제된 행의 상태가 있습니다.
    2
    after 필드는 행이 더 이상 존재하지 않기 때문에 null 입니다.
    3
    소스 필드 구조는 ts_secpos 필드가 변경되었습니다(다른 상황에서 파일이 변경될 수 있음).
    4
    이제 op 필드 값이 d 이 행이 삭제되었음을 나타냅니다.
    5
    ts_ms 필드는 Debezium이 이벤트를 처리할 때의 타임 스탬프를 보여줍니다.

    따라서 이 이벤트는 소비자에게 행 제거를 처리하는 데 필요한 정보를 제공합니다. 일부 소비자는 제거를 적절히 처리해야 할 수 있으므로 이전 값도 제공됩니다.

  4. 두 번째 새 이벤트의 값을 검토합니다.

    다음은 두 번째 새 이벤트의 키입니다 (읽성을 위해 포맷됨).

      {
        "schema": {
          "type": "struct",
          "name": "dbserver1.inventory.customers.Key"
          "optional": false,
          "fields": [
            {
              "field": "id",
              "type": "int32",
              "optional": false
            }
          ]
        },
        "payload": {
          "id": 1004
        }
      }
    Copy to Clipboard Toggle word wrap

    다시 한번, 이 키는 이전의 세 이벤트에서와 정확히 동일한 키입니다.

    동일한 이벤트의 (읽성을 위해 포맷됨)은 다음과 같습니다.

    {
      "schema": null,
      "payload": null
    }
    Copy to Clipboard Toggle word wrap

    Kafka가 컴팩트한 로그 로 설정된 경우 동일한 키가 있는 항목의 뒷부분에 메시지가 한 개 이상 있는 경우 주제에서 이전 메시지가 제거됩니다. 이 마지막 이벤트는 키와 빈 값이 있기 때문에 tombstone 이벤트라고 합니다. 즉 Kafka는 동일한 키를 사용하여 이전 메시지를 모두 제거합니다. 이전 메시지가 제거되지만 tombstone 이벤트는 소비자가 처음부터 주제를 계속 읽고 이벤트를 놓치지 않을 수 있음을 의미합니다.

4.4. Kafka Connect 서비스 다시 시작

이제 Debezium MySQL 커넥터가 생성, 업데이트 및 삭제 이벤트를 캡처하는 방법을 확인했으므로 실행 중이 아닌 경우에도 변경 이벤트를 캡처하는 방법을 확인할 수 있습니다.

Kafka Connect 서비스는 등록된 커넥터에 대한 작업을 자동으로 관리합니다. 따라서 오프라인으로 전환되면 다시 시작하면 실행 중이 아닌 작업이 시작됩니다. 즉, Debezium이 실행되고 있지 않더라도 데이터베이스의 변경 사항을 보고할 수 있습니다.

이 절차에서는 Kafka Connect를 중지하고 데이터베이스의 일부 데이터를 변경한 다음 Kafka Connect를 다시 시작하여 변경 이벤트를 확인합니다.

절차

  1. Kafka Connect 서비스를 중지합니다.

    1. Kafka Connect 배포에 대한 구성을 엽니다.

      $ oc edit deployment/my-connect-cluster-connect
      Copy to Clipboard Toggle word wrap

      배포 구성이 열립니다.

      apiVersion: apps.openshift.io/v1
      kind: Deployment
      metadata:
        ...
      spec:
        replicas: 1
      ...
      Copy to Clipboard Toggle word wrap
    2. spec.replicas 값을 0 으로 변경합니다.
    3. 설정을 저장합니다.
    4. Kafka Connect 서비스가 중지되었는지 확인합니다.

      이 명령은 Kafka Connect 서비스가 완료되고 실행 중인 Pod가 없음을 보여줍니다.

      $ oc get pods -l strimzi.io/name=my-connect-cluster-connect
      NAME                                          READY   STATUS      RESTARTS   AGE
      my-connect-cluster-connect-1-dxcs9            0/1     Completed   0          7h
      Copy to Clipboard Toggle word wrap
  2. Kafka Connect 서비스가 다운되는 동안 MySQL 클라이언트를 실행하는 터미널로 전환하고 데이터베이스에 새 레코드를 추가합니다.

    mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
    Copy to Clipboard Toggle word wrap
  3. Kafka Connect 서비스를 다시 시작합니다.

    1. Kafka Connect 서비스에 대한 배포 구성을 엽니다.

      $ oc edit deployment/my-connect-cluster-connect
      Copy to Clipboard Toggle word wrap

      배포 구성이 열립니다.

      apiVersion: apps.openshift.io/v1
      kind: Deployment
      metadata:
        ...
      spec:
        replicas: 0
      ...
      Copy to Clipboard Toggle word wrap
    2. spec.replicas 값을 1 로 변경합니다.
    3. 배포 구성을 저장합니다.
    4. Kafka Connect 서비스가 다시 시작되었는지 확인합니다.

      이 명령은 Kafka Connect 서비스가 실행 중이며 Pod가 준비되었음을 보여줍니다.

      $ oc get pods -l strimzi.io/name=my-connect-cluster-connect
      NAME                                          READY   STATUS      RESTARTS   AGE
      my-connect-cluster-connect-2-q9kkl            1/1     Running     0          74s
      Copy to Clipboard Toggle word wrap
  4. kafka-console-consumer.sh 를 실행하는 터미널로 전환합니다. 도착하면 새로운 이벤트가 나타납니다.
  5. Kafka Connect가 오프라인 상태일 때 생성한 레코드를 검사합니다(방문성을 위해 포맷됨).

    {
      ...
      "payload":{
        "id":1005
      }
    }
    {
      ...
      "payload":{
        "before":null,
        "after":{
           "id":1005,
           "first_name":"Sarah",
           "last_name":"Thompson",
           "email":"kitt@acme.com"
        },
        "source":{
           "version":"2.1.4.Final",
           "connector":"mysql",
           "name":"dbserver1",
           "ts_ms":1582581502000,
           "snapshot":"false",
           "db":"inventory",
           "table":"customers",
           "server_id":223344,
           "gtid":null,
           "file":"mysql-bin.000004",
           "pos":364,
           "row":0,
           "thread":5,
           "query":null
        },
        "op":"c",
        "ts_ms":1582581502317
      }
    }
    Copy to Clipboard Toggle word wrap

5장. 다음 단계

자습서를 완료한 후 다음 단계를 고려하십시오.

  • 튜토리얼을 더 자세히 살펴보십시오.

    MySQL 명령줄 클라이언트를 사용하여 데이터베이스 테이블의 행을 추가, 수정, 제거하고 해당 항목에 미치는 영향을 확인합니다. 외부 키로 참조되는 행은 제거할 수 없습니다.

  • Debezium 배포 계획.

    OpenShift 또는 Red Hat Enterprise Linux에 Debezium을 설치할 수 있습니다. 자세한 내용은 다음을 참조하십시오.

2023-09-19에 최종 업데이트된 문서

법적 공지

Copyright © 2023 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
맨 위로 이동
Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat