Debezium 시작하기
Red Hat Integration 2.1.4와 함께 사용할 수 있습니다.
초록
머리말 링크 복사링크가 클립보드에 복사되었습니다!
이 튜토리얼에서는 Debezium을 사용하여 MySQL 데이터베이스에서 업데이트를 캡처하는 방법을 설명합니다. 데이터베이스의 데이터가 변경되면 결과 이벤트 스트림을 볼 수 있습니다.
보다 포괄적 수용을 위한 오픈 소스 용어 교체
Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 용어를 교체하기 위해 최선을 다하고 있습니다. 먼저 마스터(master), 슬레이브(slave), 블랙리스트(blacklist), 화이트리스트(whitelist) 등 네 가지 용어를 교체하고 있습니다. 이러한 변경 작업은 작업 범위가 크므로 향후 여러 릴리스에 걸쳐 점차 구현할 예정입니다. 자세한 내용은 CTO Chris Wright의 메시지를 참조하십시오.
Red Hat 문서에 관한 피드백 제공
문서 개선을 위한 의견에 감사드립니다. 피드백을 제공하려면 문서의 텍스트를 강조 표시하고 주석을 추가합니다.
사전 요구 사항
- Red Hat 고객 포털에 로그인되어 있습니다.
- Red Hat 고객 포털에서 문서는 다중 페이지 HTML 보기 형식으로 되어 있습니다.
절차
피드백을 제공하려면 다음 단계를 수행합니다.
문서의 오른쪽 상단에 있는 피드백 버튼을 클릭하여 기존 피드백을 확인합니다.
참고피드백 기능은 다중 페이지 HTML 형식에서만 활성화됩니다.
- 피드백을 제공하려는 문서의 섹션을 강조 표시합니다.
강조 표시된 텍스트 옆에 표시되는 피드백 추가 팝업을 클릭합니다.
페이지 오른쪽에 있는 피드백 섹션에 입력란이 표시됩니다.
텍스트 상자에 피드백을 입력하고 제출을 클릭합니다.
문서 문제가 생성됩니다.
- 문제를 보려면 피드백 보기에서 문제 링크를 클릭합니다.
1장. 이 튜토리얼 정보 링크 복사링크가 클립보드에 복사되었습니다!
이 튜토리얼에는 다음 단계가 포함됩니다.
- 간단한 예제 데이터베이스를 사용하여 OpenShift에 MySQL 데이터베이스 서버를 배포합니다.
- AMQ Streams에서 사용자 정의 리소스를 적용하여 Debezium MySQL 커넥터 플러그인을 포함하는 Kafka Connect 컨테이너 이미지를 자동으로 빌드합니다.
- Debezium MySQL 커넥터 리소스를 생성하여 데이터베이스의 변경 사항을 캡처합니다.
- 커넥터 배포를 확인합니다.
- 커넥터가 데이터베이스에서 Kafka 주제로 내보내는 변경 이벤트를 확인합니다.
사전 요구 사항
- OpenShift 및 AMQ Streams에 대해 잘 알고 있습니다.
- 클러스터 Operator가 설치된 OpenShift 클러스터에 액세스할 수 있습니다.
- AMQ Streams Operator가 실행 중입니다.
- Apache Kafka 클러스터는 OpenShift에서 AMQ Streams 배포 및 업그레이드에 설명된 대로 배포됩니다.
- 고객님은 Red Hat Integration 라이센스가 있습니다.
-
OpenShift 관리 툴 사용 방법을 알고 있습니다. OpenShift
ocCLI 클라이언트가 설치 되었거나 OpenShift Container Platform 웹 콘솔에 액세스할 수 있습니다. Kafka Connect 빌드 이미지를 저장하는 방법에 따라 컨테이너 레지스트리에 액세스할 수 있는 권한이 있어야 합니다. 또는 OpenShift에서 ImageStream 리소스를 생성해야 합니다.
- Red Hat Quay.io 또는 Docker Hub와 같은 이미지 레지스트리에 빌드 이미지를 저장하려면 다음을 수행합니다.
- 레지스트리에서 이미지를 생성하고 관리하는 계정 및 권한입니다.
- 빌드 이미지를 네이티브 OpenShift ImageStream으로 저장하려면 다음을 수행합니다.
- ImageStream 리소스는 새 컨테이너 이미지를 저장하기 위해 클러스터에 배포됩니다. 클러스터의 ImageStream을 명시적으로 생성해야 합니다. 이미지 스트림은 기본적으로 사용할 수 없습니다.
2장. Debezium 소개 링크 복사링크가 클립보드에 복사되었습니다!
Debezium은 기존 데이터베이스의 정보를 이벤트 스트림으로 변환하여 애플리케이션을 탐지하고 데이터베이스의 행 수준 변경에 즉시 응답할 수 있도록 하는 분산 플랫폼입니다.
Debezium은 Apache Kafka 를 기반으로 하며 Kafka Connect 호환 커넥터 세트를 제공합니다. 각 커넥터는 특정 DBMS(Database Management System)에서 작동합니다. 커넥터는 변경 사항을 감지하여 DBMS의 데이터 변경 내역을 기록하고 각 변경 이벤트의 레코드를 Kafka 주제로 스트리밍합니다. 그런 다음 애플리케이션을 사용하는 경우 Kafka 주제에서 결과 이벤트 레코드를 읽을 수 있습니다.
Kafka의 안정적인 스트리밍 플랫폼을 활용함으로써 Debezium을 사용하면 애플리케이션이 데이터베이스에서 발생하는 변경 사항을 정확하고 완전하게 사용할 수 있습니다. 애플리케이션이 예기치 않게 중지되거나 연결이 끊어도 중단 중에 발생하는 이벤트를 놓치지 않습니다. 애플리케이션을 다시 시작한 후 해제된 지점에서 주제에서 읽기를 다시 시작합니다.
다음 튜토리얼에서는 간단한 구성으로 Debezium MySQL 커넥터 를 배포하고 사용하는 방법을 보여줍니다. Debezium 커넥터 배포 및 사용에 대한 자세한 내용은 커넥터 설명서를 참조하십시오.
3장. 서비스 시작 링크 복사링크가 클립보드에 복사되었습니다!
Debezium을 사용하려면 Kafka 및 Kafka Connect, 데이터베이스 및 Debezium 커넥터 서비스와 함께 AMQ Streams가 필요합니다. 이 튜토리얼에 대한 서비스를 실행하려면 다음을 수행해야합니다.
3.1. MySQL 데이터베이스 배포 링크 복사링크가 클립보드에 복사되었습니다!
데이터로 미리 채워진 여러 테이블을 포함하는 inventory 데이터베이스 예제를 포함하는 MySQL 데이터베이스 서버를 배포합니다. Debezium MySQL 커넥터는 샘플 테이블에서 발생하는 변경 사항을 캡처하고 변경 사항 이벤트 레코드를 Apache Kafka 항목에 전송합니다.
절차
다음 명령을 실행하여 MySQL 데이터베이스를 시작합니다. 이 명령은 예제
inventory데이터베이스로 구성된 MySQL 데이터베이스 서버를 시작합니다.oc new-app -l app=mysql --name=mysql quay.io/debezium/example-mysql:latest
$ oc new-app -l app=mysql --name=mysql quay.io/debezium/example-mysql:latestCopy to Clipboard Copied! Toggle word wrap Toggle overflow MySQL 데이터베이스의 배포 구성을 업데이트하여 사용자 이름과 암호를 추가하는 다음 명령을 실행하여 MySQL 데이터베이스에 대한 자격 증명을 구성합니다.
oc set env deployment/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw
$ oc set env deployment/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpwCopy to Clipboard Copied! Toggle word wrap Toggle overflow 다음 명령을 호출하여 MySQL 데이터베이스가 실행 중인지 확인합니다. 출력 뒤에 MySQL 데이터베이스가 실행 중이고 포드가 준비되었음을 보여줍니다.
oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23s
$ oc get pods -l app=mysql NAME READY STATUS RESTARTS AGE mysql-1-2gzx5 1/1 Running 1 23sCopy to Clipboard Copied! Toggle word wrap Toggle overflow 새 터미널을 열고 샘플
인벤토리데이터베이스에 로그인합니다.이 명령은 MySQL 데이터베이스를 실행하는 포드에서 MySQL 명령줄 클라이언트를 엽니다. 클라이언트는 이전에 구성한 사용자 이름과 암호를 사용합니다.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow inventory데이터베이스의 테이블을 나열합니다.Copy to Clipboard Copied! Toggle word wrap Toggle overflow 데이터베이스를 살펴보고 포함된 데이터를 확인합니다(예:
customers테이블을 확인합니다.Copy to Clipboard Copied! Toggle word wrap Toggle overflow
3.2. Kafka Connect 배포 링크 복사링크가 클립보드에 복사되었습니다!
MySQL 데이터베이스를 배포한 후 AMQ Streams를 사용하여 Debezium MySQL 커넥터 플러그인이 포함된 Kafka Connect 컨테이너 이미지를 빌드합니다. 배포 프로세스 중에 다음 사용자 정의 리소스(CR)를 생성하고 사용합니다.
-
Kafka Connect인스턴스를 정의하고 이미지에 포함할 MySQL 커넥터 아티팩트에 대한 정보를 포함하는 KafkaConnect CR입니다. -
MySQL 커넥터가 소스 데이터베이스에 액세스하는 데 사용하는 정보를 포함하는 세부 정보를 제공하는
KafkaConnectorCR입니다. AMQ Streams가 Kafka Connect Pod를 시작한 후KafkaConnectorCR을 적용하여 커넥터를 시작합니다.
빌드 프로세스 중에 AMQ Streams Operator는 Debezium 커넥터 정의를 비롯한 KafkaConnect 사용자 정의 리소스의 입력 매개변수를 Kafka Connect 컨테이너 이미지로 변환합니다. 이 빌드는 Red Hat Maven 리포지토리에서 필요한 아티팩트를 다운로드하여 이미지에 통합합니다. 새로 생성된 컨테이너는 .spec.build.output 에 지정된 컨테이너 레지스트리로 푸시되며 Kafka Connect Pod를 배포하는 데 사용됩니다. AMQ Streams가 Kafka Connect 이미지를 빌드한 후 KafkaConnector 사용자 정의 리소스를 사용하여 커넥터를 시작합니다.
절차
-
OpenShift 클러스터에 로그인하고 프로젝트를 생성하거나 엽니다(예:
debezium). 커넥터에 대한
KafkaConnect사용자 정의 리소스(CR)를 생성하거나 기존 리소스를 수정합니다.
다음 예제는KafkaConnect사용자 정의 리소스를 설명하는dbz-connect.yaml파일에서 발췌한 내용을 보여줍니다.
metadata.annotations및spec.build속성이 필요합니다.예 3.1. Debezium 커넥터를 포함하는
KafkaConnect사용자 정의 리소스를 정의하는dbz-connect.yaml파일Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 표 3.1. Kafka Connect 구성 설정에 대한 설명 항목 설명 1
Cluster Operator가
KafkaConnector리소스를 사용하여 이 Kafka Connect 클러스터에서 커넥터를 구성할 수 있도록strimzi.io/use-connector-resources주석을"true"로 설정합니다.2
spec.build구성은 빌드 이미지를 저장할 위치를 지정하고 플러그인 아티팩트의 위치와 함께 이미지에 포함할 플러그인을 나열합니다.3
build.output은 새로 빌드된 이미지가 저장된 레지스트리를 지정합니다.4
이미지 출력의 이름과 이미지 이름을 지정합니다.
output.type에 유효한 값은 Docker Hub 또는 Quay와 같은 컨테이너 레지스트리로 푸시하거나 이미지를 내부 OpenShift ImageStream으로 푸시하는 이미지스트림입니다.ImageStream을 사용하려면 ImageStream 리소스를 클러스터에 배포해야 합니다. KafkaConnect 구성에서build.output을 지정하는 방법에 대한 자세한 내용은 AMQ Streams Build 스키마 참조 설명서를 참조하십시오.5
plugins구성에는 Kafka Connect 이미지에 포함하려는 모든 커넥터가 나열됩니다. 목록의 각 항목에 대해 플러그인이름을지정하고 커넥터를 빌드하는 데 필요한 아티팩트에 대한 정보를 지정합니다. 선택적으로 각 커넥터 플러그인에 대해 커넥터와 함께 사용할 수 있는 다른 구성 요소를 포함할 수 있습니다. 예를 들어 서비스 레지스트리 아티팩트 또는 Debezium 스크립팅 구성 요소를 추가할 수 있습니다.6
artifacts.type값은artifacts.url에 지정된 아티팩트의 파일 유형을 지정합니다. 유효한 유형은zip,tgz, 또는 10.0.0.1입니다. Debezium 커넥터 아카이브는.zip파일 형식으로 제공됩니다. JDBC 드라이버 파일은.jar형식입니다.type값은url필드에서 참조되는 파일의 유형과 일치해야 합니다.7
artifacts.url값은 커넥터 아티팩트의 파일을 저장하는 Maven 리포지토리와 같은 HTTP 서버의 주소를 지정합니다. OpenShift 클러스터는 지정된 서버에 액세스할 수 있어야 합니다.다음 명령을 입력하여
KafkaConnect빌드 사양을 OpenShift 클러스터에 적용합니다.oc create -f dbz-connect.yaml
oc create -f dbz-connect.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 사용자 정의 리소스에 지정된 구성에 따라 AMQ Streams Operator는 배포할 Kafka Connect 이미지를 준비합니다.
빌드가 완료되면 Operator에서 이미지를 지정된 레지스트리 또는 ImageStream으로 푸시하고 Kafka Connect 클러스터를 시작합니다. 구성에 나열된 커넥터 아티팩트는 클러스터에서 사용할 수 있습니다.KafkaConnector리소스를 생성하여 MySQL 커넥터의 인스턴스를 정의합니다.
예를 들어 다음KafkaConnectorCR을 생성하여debezium-inventory-connector.yaml로 저장합니다.예 3.2. Debezium 커넥터의
KafkaConnector사용자 정의 리소스를 정의하는mysql-inventory-connector.yaml파일Copy to Clipboard Copied! Toggle word wrap Toggle overflow Expand 표 3.2. 커넥터 구성 설정에 대한 설명 항목 설명 1
Kafka Connect 클러스터에 등록할 커넥터의 이름입니다.
2
커넥터 클래스의 이름입니다.
3
한 번에 하나의 작업만 작동해야 합니다. MySQL 커넥터가 MySQL 서버의
binlog를 읽을 때 단일 커넥터 작업을 사용하여 적절한 순서 및 이벤트 처리를 확인합니다. Kafka Connect 서비스는 커넥터를 사용하여 작업을 완료하기 위해 하나 이상의 작업을 시작합니다. Kafka Connect 서비스 클러스터에 실행 중인 작업을 자동으로 배포합니다. 서비스가 중지되거나 중단되면 작업이 실행 중인 서비스에 재배포됩니다.4
커넥터의 구성입니다.
5
MySQL 데이터베이스 인스턴스의 호스트 이름 또는 주소입니다.
6
데이터베이스 인스턴스의 포트 번호입니다.
7
Debezium이 데이터베이스에 연결하는 사용자 계정의 이름입니다.
8
Debezium에서 데이터베이스 사용자 계정에 연결하는 데 사용하는 암호입니다.
9
MySQL 서버 또는 클러스터의 주제 접두사입니다. 이 문자열은 커넥터가 이벤트 레코드를 전송하는 모든 Kafka 항목의 이름을 접두사로 지정합니다.
10
커넥터가 변경 이벤트를 캡처하는 테이블 목록입니다. 커넥터는
인벤토리테이블에서 발생하는 경우에만 변경 사항을 탐지합니다.11
커넥터가 DDL 문을 데이터베이스 스키마 기록 항목에 쓰고 복구하는 데 사용하는 Kafka 브로커 목록입니다. 커넥터가 변경 이벤트 레코드를 전송하는 브로커와 동일합니다. 다시 시작한 후 커넥터는 커넥터가 읽기를 재개할 때 binlog의 시점에 존재하는 데이터베이스 스키마를 복구합니다.
12
데이터베이스 스키마 기록 항목의 이름입니다. 이 주제는 내부 용도로만 사용되며 소비자가 사용해서는 안 됩니다.
다음 명령을 실행하여 커넥터 리소스를 생성합니다.
oc create -n <namespace> -f <kafkaConnector>.yaml
oc create -n <namespace> -f <kafkaConnector>.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 예를 들면 다음과 같습니다.
oc create -n debezium -f mysql-inventory-connector.yaml
oc create -n debezium -f mysql-inventory-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 커넥터는 Kafka Connect 클러스터에 등록되며
KafkaConnectorCR에서spec.config.database.dbname에 지정된 데이터베이스에 대해 실행되기 시작합니다. 커넥터 Pod가 준비되면 Debezium이 실행됩니다.
이제 커넥터가 생성되었고 inventory 데이터베이스의 변경 사항을 캡처하기 시작했는지 확인할 준비가 되었습니다.
3.3. 커넥터 배포 확인 링크 복사링크가 클립보드에 복사되었습니다!
커넥터가 오류 없이 올바르게 시작되면 커넥터가 캡처하도록 구성된 각 테이블에 대해 항목이 생성됩니다. 다운스트림 애플리케이션은 이러한 주제를 구독하여 소스 데이터베이스에서 발생하는 정보 이벤트를 검색할 수 있습니다.
커넥터가 실행 중인지 확인하려면 OpenShift Container Platform 웹 콘솔 또는 OpenShift CLI 툴(oc)에서 다음 작업을 수행합니다.
- 커넥터 상태를 확인합니다.
- 커넥터가 주제를 생성하는지 확인합니다.
- 커넥터가 각 테이블의 초기 스냅샷 중에 생성하는 읽기 작업("op":"r")에 대해 주제가 채워지는지 확인합니다.
사전 요구 사항
- Debezium 커넥터는 OpenShift의 AMQ Streams에 배포됩니다.
-
OpenShift
ocCLI 클라이언트가 설치되어 있어야 합니다. - OpenShift Container Platform 웹 콘솔에 액세스할 수 있습니다.
절차
다음 방법 중 하나를 사용하여
KafkaConnector리소스의 상태를 확인합니다.OpenShift Container Platform 웹 콘솔에서 다음을 수행합니다.
- 홈 → 검색으로 이동합니다.
-
검색 페이지에서 리소스를 클릭하여 리소스 선택 상자를 연 다음
KafkaConnector를 입력합니다. - KafkaConnectors 목록에서 확인할 커넥터의 이름을 클릭합니다(예: inventory-connector ).
- Conditions 섹션에서 Type 및 Status 열의 값이 Ready 및 True 로 설정되어 있는지 확인합니다.
터미널 창에서 다음을 수행합니다.
다음 명령을 실행합니다.
oc describe KafkaConnector <connector-name> -n <project>
oc describe KafkaConnector <connector-name> -n <project>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 예를 들면 다음과 같습니다.
oc describe KafkaConnector inventory-connector -n debezium
oc describe KafkaConnector inventory-connector -n debeziumCopy to Clipboard Copied! Toggle word wrap Toggle overflow 이 명령은 다음 출력과 유사한 상태 정보를 반환합니다.
예 3.3.
KafkaConnector리소스 상태Copy to Clipboard Copied! Toggle word wrap Toggle overflow
커넥터가 Kafka 주제를 생성했는지 확인합니다.
OpenShift Container Platform 웹 콘솔에서 다음을 수행합니다.
- 홈 → 검색으로 이동합니다.
-
검색 페이지에서 리소스를 클릭하여 리소스 선택 상자를 연 다음
KafkaTopic을 입력합니다. - KafkaTopics 목록에서 dbserver1.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d 항목을 클릭합니다.
- Conditions 섹션에서 Type 및 Status 열의 값이 Ready 및 True 로 설정되어 있는지 확인합니다.
터미널 창에서 다음을 수행합니다.
다음 명령을 실행합니다.
oc get kafkatopics
oc get kafkatopicsCopy to Clipboard Copied! Toggle word wrap Toggle overflow 이 명령은 다음 출력과 유사한 상태 정보를 반환합니다.
예 3.4.
KafkaTopic리소스 상태Copy to Clipboard Copied! Toggle word wrap Toggle overflow
주제 콘텐츠를 확인합니다.
- 터미널 창에서 다음 명령을 입력합니다.
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 예를 들면 다음과 같습니다.
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_hand
oc exec -n debezium -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=dbserver1.inventory.products_on_handCopy to Clipboard Copied! Toggle word wrap Toggle overflow 주제 이름을 지정하는 형식은
oc describe명령과 1단계에서 반환하는 것과 동일합니다(예:dbserver1.inventory.addresses).주제의 각 이벤트에 대해 명령에서 다음 출력과 유사한 정보를 반환합니다.
예 3.5. Debezium 변경 이벤트의 내용
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.1.4.Final-redhat-00001","connector":"mysql","name":"dbserver1","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"dbserver1.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.1.4.Final-redhat-00001","connector":"mysql","name":"dbserver1","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}Copy to Clipboard Copied! Toggle word wrap Toggle overflow 위 예제에서
페이로드값은 커넥터 스냅샷이dbserver1.anchor_on_hand에서 읽기("op" ="r") 이벤트를 생성했음을 보여줍니다.product_id레코드의"before"상태는 레코드에 이전 값이 없음을 나타내는null입니다."after"상태는product_id101이 있는 항목에 대한수량을 표시합니다.
4장. 변경 이벤트 보기 링크 복사링크가 클립보드에 복사되었습니다!
Debezium MySQL 커넥터를 배포한 후 인벤토리 데이터베이스에 대한 변경 사항 캡처를 시작합니다.
커넥터가 시작되면 MySQL 데이터베이스의 테이블 중 하나를 나타내는 Apache Kafka 주제 집합에 이벤트를 씁니다. 각 항목의 이름은 데이터베이스 서버 dbserver1 의 이름으로 시작됩니다.
커넥터는 다음 Kafka 항목에 씁니다.
dbserver1- 변경 사항이 캡처되는 테이블에 적용되는 DDL 문을 작성하는 스키마 변경 주제입니다.
dbserver1.inventory.products-
inventory데이터베이스의products테이블에 대한 변경 이벤트 레코드를 수신합니다. dbserver1.inventory.products_on_hand-
inventory데이터베이스의products_on_hand테이블에 대한 변경 이벤트 레코드를 수신합니다. dbserver1.inventory.customers-
inventory데이터베이스의customers테이블에 대한 변경 이벤트 레코드를 받습니다. dbserver1.inventory.orders-
inventory데이터베이스의orders테이블에 대한 변경 이벤트 레코드를 수신합니다.
이 튜토리얼의 나머지 부분에서는 dbserver1.inventory.customers Kafka 주제를 검사합니다. 주제를 더 자세히 살펴보면 다양한 유형의 변경 이벤트를 나타내는 방법을 확인하고 각 이벤트가 캡처된 커넥터에 대한 정보를 찾을 수 있습니다.
이 튜토리얼에는 다음 섹션이 포함되어 있습니다.
4.1. 생성 이벤트 보기 링크 복사링크가 클립보드에 복사되었습니다!
dbserver1.inventory.customers 주제를 보면 MySQL 커넥터가 inventory 데이터베이스에서 생성 이벤트를 캡처하는 방법을 확인할 수 있습니다. 이 경우 생성 이벤트는 데이터베이스에 추가되는 새 고객을 캡처합니다.
절차
새 터미널을 열고
kafka-console-consumer를 사용하여 주제 시작부터dbserver1.inventory.customers주제를 사용합니다.이 명령은 Kafka(
my-cluster-kafka-0)를 실행하는 Pod에서 간단한 소비자()를 실행합니다.kafka-console-consumer.shoc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customersCopy to Clipboard Copied! Toggle word wrap Toggle overflow 소비자는
customers테이블의 각 행에 대해 4개의 메시지(JSON 형식)를 반환합니다. 각 메시지에는 해당 테이블 행에 대한 이벤트 레코드가 포함되어 있습니다.각 이벤트에는 키 와 값 의 두 개의 JSON 문서가 있습니다. 키는 행의 기본 키에 해당하고, 값은 행의 세부 정보(행이 포함된 필드, 각 필드의 값, 행에서 수행된 작업 유형)를 표시합니다.
마지막 이벤트의 경우 키 의 세부 정보를 검토합니다.
다음은 마지막 이벤트의 키 (읽성을 위해 포맷됨)에 대한 세부 정보입니다.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 이벤트에는
스키마와페이로드의 두 부분이 있습니다.스키마에는 페이로드에 있는 내용을 설명하는 Kafka Connect 스키마가 포함되어 있습니다. 이 경우 페이로드는 선택 사항이 아니며 필수 필드(int32유형의id)가 있는dbserver1.inventory.customers.Key라는 구조입니다.페이로드에는 값이1004인 단일id필드가 있습니다.이벤트 키 를 검토하면
id기본 키 열에 값이1004인inventory.customers테이블의 행에 이 이벤트가 적용되는 것을 확인할 수 있습니다.동일한 이벤트의 값 의 세부 사항을 검토합니다.
이벤트 값은 행이 생성되었으며 포함된 내용(이 경우
id,first_name,last_name, 삽입된 행의이메일)을 설명합니다.다음은 마지막 이벤트의 값 (읽성을 위해 포맷됨)에 대한 세부 정보입니다.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 이벤트의 이 부분은 훨씬 길지만 이벤트의 키 처럼
스키마와페이로드도 있습니다.스키마에는 5개의 필드를 포함할 수 있는dbserver1.inventory.customers.Envelope(버전 1)라는 Kafka Connect 스키마가 포함되어 있습니다.op-
작업 유형을 설명하는 문자열 값이 포함된 필수 필드입니다. MySQL 커넥터의 값은 생성(또는 삽입), update의 경우
u, delete는d, 읽기(스냅샷의 경우)는r입니다. before-
이벤트가 발생하기 전에 행의 상태를 포함하는 선택적 필드입니다. 이 구조는
dbserver1커넥터가inventory.customers테이블의 모든 행에 사용하는dbserver1.inventory.customers.ValueKafka Connect 스키마로 설명합니다. 후-
이벤트가 발생한 후 행의 상태를 포함하는 선택적 필드입니다. 구조는
이전에사용된 것과 동일한dbserver1.inventory.customers.ValueKafka Connect 스키마로 설명되어 있습니다. source-
이벤트에 대한 소스 메타데이터를 설명하는 구조를 포함하는 필수 필드에는 MySQL의 경우 커넥터 이름, 이벤트가 기록된 binlog 파일의 이름, 이벤트가 표시된 해당
파일의 위치, 이벤트 내의 행(두 개 이상 있는 경우), 영향을 받는 데이터베이스 및 테이블의 이름, 영향을 받는 데이터베이스 및 테이블의 이름이 포함되어 있습니다. MySQL 스레드 ID(이 이벤트가 스냅샷의 일부인지 여부), 사용 가능한 경우 MySQL 서버 ID 및 타임스탬프(초)입니다.binlog ts_ms- 커넥터가 이벤트를 처리한 경우 Kafka Connect 작업을 실행하는 JVM에서 시스템 시계를 사용하는 선택적 필드입니다.
참고이벤트의 JSON 표현은 설명하는 행보다 훨씬 길습니다. 이는 모든 이벤트 키와 값과 함께 Kafka Connect가 페이로드 를 설명하는 스키마 를 제공하기 때문입니다. 시간이 지남에 따라 이 구조는 변경될 수 있습니다. 그러나 이벤트 자체의 키와 값에 대한 스키마를 보유하면 특히 시간이 지남에 따라 발전할 때 애플리케이션을 사용하는 것이 메시지를 훨씬 쉽게 이해할 수 있습니다.
Debezium MySQL 커넥터는 데이터베이스 테이블 구조에 따라 이러한 스키마를 구성합니다. DDL 문을 사용하여 MySQL 데이터베이스의 테이블 정의를 변경하는 경우 커넥터는 이러한 DDL 문을 읽고 Kafka Connect 스키마를 업데이트합니다. 이는 각 이벤트가 이벤트가 발생한 시점에서 시작된 테이블과 정확히 동일하게 구조화되는 유일한 방법입니다. 그러나 단일 테이블에 대한 모든 이벤트를 포함하는 Kafka 항목에 테이블 정의의 각 상태에 해당하는 이벤트가 있을 수 있습니다.
JSON 컨버터에는 모든 메시지에 키 및 값 스키마가 포함되어 있으므로 매우 자세한 이벤트를 생성합니다.
이벤트의 키 와 값 스키마를
inventory데이터베이스의 상태와 비교합니다. MySQL 명령줄 클라이언트를 실행하는 터미널에서 다음 문을 실행합니다.Copy to Clipboard Copied! Toggle word wrap Toggle overflow 이는 검토한 이벤트 레코드가 데이터베이스의 레코드와 일치함을 보여줍니다.
4.2. 데이터베이스 업데이트 및 업데이트 이벤트 보기 링크 복사링크가 클립보드에 복사되었습니다!
이제 Debezium MySQL 커넥터가 인벤토리 데이터베이스에서 create 이벤트를 캡처한 방법을 보냈으므로 이제 레코드 중 하나를 변경하고 커넥터가 이를 캡처하는 방법을 확인할 수 있습니다.
이 절차를 완료하면 데이터베이스 커밋에서 변경된 사항에 대한 세부 정보와 변경 이벤트를 비교하여 다른 변경 사항과 관련하여 변경이 발생한 시기를 결정하는 방법을 알아봅니다.
절차
MySQL 명령줄 클라이언트를 실행하는 터미널에서 다음 문을 실행합니다.
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0Copy to Clipboard Copied! Toggle word wrap Toggle overflow 업데이트된
고객테이블을 확인합니다.Copy to Clipboard Copied! Toggle word wrap Toggle overflow kafka-console-consumer를 실행하는 터미널로 전환하여 5번째 새 이벤트를 확인합니다.고객테이블의 레코드를 변경하면 Debezium MySQL 커넥터가 새 이벤트를 생성했습니다. 두 개의 새로운 JSON 문서, 즉 하나는 이벤트의 키 용이고 다른 하나는 새 이벤트의 값 이어야 합니다.다음은 업데이트 이벤트의 키 세부 정보(읽성을 위해 포맷됨)입니다.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 이 키는 이전 이벤트의 키 와 동일합니다.
다음은 이 새로운 이벤트의 가치입니다.
스키마섹션에는 변경 사항이 없으므로페이로드섹션만 표시됩니다( 읽기 쉽도록 포맷됨).Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1 1 1
- 이제
before필드에는 데이터베이스가 커밋되기 전에 값이 포함된 행 상태가 됩니다. - 2 2 2
after필드에는 이제 행의 업데이트된 상태가 되고first_name값은 이제Anne Marie입니다.- 3 3 3
소스필드 구조에는ts_sec및pos필드가 변경되었음을 제외하고 이전과 동일한 값이 많이 있습니다(다른 상황에서파일이변경될 수 있음).- 4 4 4
- 이제
op필드 값이u이므로 이 행이 업데이트로 인해 변경되었음을 나타냅니다. - 5 5 5
ts_ms필드는 Debezium이 이벤트를 처리할 때의 타임 스탬프를 보여줍니다.
페이로드섹션을 보면 업데이트 이벤트에 대한 몇 가지 중요한 사항을 확인할 수 있습니다.-
이전구조와이후구조를 비교하면 커밋으로 인해 영향을 받는 행에서 실제로 변경된 내용을 확인할 수 있습니다. -
소스구조를 검토하여 변경 사항의 MySQL 레코드에 대한 정보를 찾을 수 있습니다(추적 추적 기능 제공). -
이벤트의
페이로드섹션을 동일한 주제(또는 다른 주제)의 다른 이벤트와 비교하면 이벤트가 다른 이벤트와 동일한 MySQL 커밋 전, 이후 또는 일부인지 확인할 수 있습니다.
4.3. 데이터베이스에서 레코드를 삭제하고 삭제 이벤트 보기 링크 복사링크가 클립보드에 복사되었습니다!
이제 Debezium MySQL 커넥터가 인벤토리 데이터베이스에서 생성 및 업데이트 이벤트를 캡처하는 방법을 보냈으므로 이제 레코드 중 하나를 삭제하고 커넥터가 이를 캡처하는 방법을 확인할 수 있습니다.
이 절차를 완료하면 삭제 이벤트 관련 세부 정보를 찾는 방법과 Kafka가 로그 압축 을 사용하여 사용자가 모든 이벤트를 가져올 수 있도록 삭제 이벤트 수를 줄이는 방법을 알아봅니다.
절차
MySQL 명령줄 클라이언트를 실행하는 터미널에서 다음 문을 실행합니다.
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)Copy to Clipboard Copied! Toggle word wrap Toggle overflow 참고외부 키 제약 조건 위반으로 위의 명령이 실패하는 경우 다음 문을 사용하여 고객 주소 참조를 주소 표에서 제거해야 합니다.
mysql> DELETE FROM addresses WHERE customer_id=1004;
mysql> DELETE FROM addresses WHERE customer_id=1004;Copy to Clipboard Copied! Toggle word wrap Toggle overflow kafka-console-consumer를 실행하는 터미널로 전환하여 새 이벤트 두 개를 확인합니다.고객테이블의 행을 삭제하여 Debezium MySQL 커넥터는 두 개의 새로운 이벤트를 생성했습니다.첫 번째 새 이벤트의 키 와 값을 검토합니다.
다음은 첫 번째 새 이벤트의 키 세부 정보(읽성을 위해 포맷됨)입니다.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 이 키는 이전 두 이벤트의 키 와 동일합니다.
다음은 첫 번째 새 이벤트의 값 (읽성을 위해 포맷됨)입니다.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 따라서 이 이벤트는 소비자에게 행 제거를 처리하는 데 필요한 정보를 제공합니다. 일부 소비자는 제거를 적절히 처리해야 할 수 있으므로 이전 값도 제공됩니다.
두 번째 새 이벤트의 키 와 값을 검토합니다.
다음은 두 번째 새 이벤트의 키입니다 (읽성을 위해 포맷됨).
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 다시 한번, 이 키는 이전의 세 이벤트에서와 정확히 동일한 키입니다.
동일한 이벤트의 값 (읽성을 위해 포맷됨)은 다음과 같습니다.
{ "schema": null, "payload": null }{ "schema": null, "payload": null }Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka가 컴팩트한 로그 로 설정된 경우 동일한 키가 있는 항목의 뒷부분에 메시지가 한 개 이상 있는 경우 주제에서 이전 메시지가 제거됩니다. 이 마지막 이벤트는 키와 빈 값이 있기 때문에 tombstone 이벤트라고 합니다. 즉 Kafka는 동일한 키를 사용하여 이전 메시지를 모두 제거합니다. 이전 메시지가 제거되지만 tombstone 이벤트는 소비자가 처음부터 주제를 계속 읽고 이벤트를 놓치지 않을 수 있음을 의미합니다.
4.4. Kafka Connect 서비스 다시 시작 링크 복사링크가 클립보드에 복사되었습니다!
이제 Debezium MySQL 커넥터가 생성, 업데이트 및 삭제 이벤트를 캡처하는 방법을 확인했으므로 실행 중이 아닌 경우에도 변경 이벤트를 캡처하는 방법을 확인할 수 있습니다.
Kafka Connect 서비스는 등록된 커넥터에 대한 작업을 자동으로 관리합니다. 따라서 오프라인으로 전환되면 다시 시작하면 실행 중이 아닌 작업이 시작됩니다. 즉, Debezium이 실행되고 있지 않더라도 데이터베이스의 변경 사항을 보고할 수 있습니다.
이 절차에서는 Kafka Connect를 중지하고 데이터베이스의 일부 데이터를 변경한 다음 Kafka Connect를 다시 시작하여 변경 이벤트를 확인합니다.
절차
Kafka Connect 서비스를 중지합니다.
Kafka Connect 배포에 대한 구성을 엽니다.
oc edit deployment/my-connect-cluster-connect
$ oc edit deployment/my-connect-cluster-connectCopy to Clipboard Copied! Toggle word wrap Toggle overflow 배포 구성이 열립니다.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
spec.replicas값을0으로 변경합니다. - 설정을 저장합니다.
Kafka Connect 서비스가 중지되었는지 확인합니다.
이 명령은 Kafka Connect 서비스가 완료되고 실행 중인 Pod가 없음을 보여줍니다.
oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-1-dxcs9 0/1 Completed 0 7h
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-1-dxcs9 0/1 Completed 0 7hCopy to Clipboard Copied! Toggle word wrap Toggle overflow
Kafka Connect 서비스가 다운되는 동안 MySQL 클라이언트를 실행하는 터미널로 전환하고 데이터베이스에 새 레코드를 추가합니다.
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");Copy to Clipboard Copied! Toggle word wrap Toggle overflow Kafka Connect 서비스를 다시 시작합니다.
Kafka Connect 서비스에 대한 배포 구성을 엽니다.
oc edit deployment/my-connect-cluster-connect
$ oc edit deployment/my-connect-cluster-connectCopy to Clipboard Copied! Toggle word wrap Toggle overflow 배포 구성이 열립니다.
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
spec.replicas값을1로 변경합니다. - 배포 구성을 저장합니다.
Kafka Connect 서비스가 다시 시작되었는지 확인합니다.
이 명령은 Kafka Connect 서비스가 실행 중이며 Pod가 준비되었음을 보여줍니다.
oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-2-q9kkl 1/1 Running 0 74s
$ oc get pods -l strimzi.io/name=my-connect-cluster-connect NAME READY STATUS RESTARTS AGE my-connect-cluster-connect-2-q9kkl 1/1 Running 0 74sCopy to Clipboard Copied! Toggle word wrap Toggle overflow
-
kafka-console-consumer.sh를 실행하는 터미널로 전환합니다. 도착하면 새로운 이벤트가 나타납니다. Kafka Connect가 오프라인 상태일 때 생성한 레코드를 검사합니다(방문성을 위해 포맷됨).
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
5장. 다음 단계 링크 복사링크가 클립보드에 복사되었습니다!
자습서를 완료한 후 다음 단계를 고려하십시오.
튜토리얼을 더 자세히 살펴보십시오.
MySQL 명령줄 클라이언트를 사용하여 데이터베이스 테이블의 행을 추가, 수정, 제거하고 해당 항목에 미치는 영향을 확인합니다. 외부 키로 참조되는 행은 제거할 수 없습니다.
Debezium 배포 계획.
OpenShift 또는 Red Hat Enterprise Linux에 Debezium을 설치할 수 있습니다. 자세한 내용은 다음을 참조하십시오.
2023-09-19에 최종 업데이트된 문서