4장. 변경 이벤트 보기
Debezium MySQL 커넥터를 배포한 후 인벤토리
데이터베이스에 대한 변경 사항을 캡처하기 시작합니다.
커넥터가 시작되면 각 커넥터가 MySQL 데이터베이스의 테이블 중 하나를 나타내는 Apache Kafka 주제 세트에 이벤트를 씁니다. 각 주제의 이름은 데이터베이스 서버 이름, dbserver1
로 시작됩니다.
커넥터는 다음 Kafka 항목에 씁니다.
dbserver1
- 변경 사항을 캡처하는 테이블에 적용되는 DDL 문을 사용하는 스키마 변경 주제입니다.
dbserver1.inventory.products
-
inventory
데이터베이스의products
테이블에 대한 변경 이벤트 레코드를 수신합니다. dbserver1.inventory.products_on_hand
-
inventory
데이터베이스의products_on_hand
테이블에 대한 변경 이벤트 레코드를 수신합니다. dbserver1.inventory.customers
-
inventory
데이터베이스의customers
테이블에 대한 변경 이벤트 레코드를 수신합니다. dbserver1.inventory.orders
-
inventory
데이터베이스의orders
테이블에 대한 변경 이벤트 레코드를 수신합니다.
이 튜토리얼의 나머지 부분에서는 dbserver1.inventory.customers
Kafka 주제를 검사합니다. 주제를 자세히 살펴보면 다양한 유형의 변경 이벤트를 나타내는 방법을 확인하고 각 이벤트를 캡처한 커넥터에 대한 정보를 찾을 수 있습니다.
튜토리얼에는 다음 섹션이 포함되어 있습니다.
4.1. 생성 이벤트 보기
dbserver1.inventory.customers
주제를 보면 MySQL 커넥터가 inventory
데이터베이스에서 이벤트를 캡처 한 방법을 확인할 수 있습니다. 이 경우 생성 이벤트는 새 고객이 데이터베이스에 추가되는 것을 캡처합니다.
프로세스
새 터미널을 열고
kafka-console-consumer
를 사용하여 주제 시작부터dbserver1.inventory.customers
주제를 사용합니다.이 명령은 Kafka(
my-cluster-kafka-0
)를 실행 중인 Pod에서 간단한 소비자(kafka-console-consumer.sh
)를 실행합니다.$ oc exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
소비자는
customers
테이블의 각 행에 대해 4개의 메시지(JSON 형식)를 반환합니다. 각 메시지에는 해당 테이블 행의 이벤트 레코드가 포함되어 있습니다.각 이벤트에는 두 개의 JSON 문서가 있습니다. 키와 값. 키는 행의 기본 키에 해당하며 값에는 행의 세부 정보(해당 행에 포함된 필드, 각 필드의 값, 행에서 수행된 작업 유형)가 표시됩니다.
마지막 이벤트의 경우 키 세부 정보를 검토합니다.
다음은 마지막 이벤트의 키 세부 정보입니다(읽기 쉽도록 형식 지정).
{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" } ], "optional":false, "name":"dbserver1.inventory.customers.Key" }, "payload":{ "id":1004 } }
이벤트에는
스키마
와페이로드
의 두 부분이 있습니다.스키마
에는 페이로드에 무엇이 있는지 설명하는 Kafka Connect 스키마가 포함되어 있습니다. 이 경우 페이로드는 선택 사항이 아니며 하나의 필수 필드(예:int32
)가 있는dbserver1.inventory.customers.Key
라는 구조입니다.페이로드
에는 값이1004
인 단일id
필드가 있습니다.이벤트의 키를 검토하면 이 이벤트가
inventory.customers
테이블의 값에1004
값이 있는 ID 기본 키 열에 적용되는 것을 확인할 수 있습니다.동일한 이벤트의 값 세부 정보를 검토합니다.
이벤트의 값은 행이 생성되었음을 표시하고 포함된 항목(이 경우 ID ,
first_name
,last_name
,email
)을 설명합니다.다음은 마지막 이벤트 값 의 세부 정보입니다(읽기 쉽도록 형식 지정).
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "int64", "optional": false, "field": "ts_sec" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "boolean", "optional": true, "field": "snapshot" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "db" }, { "type": "string", "optional": true, "field": "table" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "version": "2.3.4.Final", "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "inventory", "table": "customers" }, "op": "r", "ts_ms": 1486500577691 } }
이벤트의 이 부분은 훨씬 길지만 이벤트의 키와 마찬가지로
스키마
와페이로드
도 있습니다.스키마
에는 5개의 필드를 포함할 수 있는dbserver1.inventory.customers.Envelope
(버전 1)라는 Kafka Connect 스키마가 포함되어 있습니다.op
-
작업 유형을 설명하는 문자열 값이 포함된 필수 필드입니다. MySQL 커넥터의 값은 생성(또는 삽입)용
c
이며 업데이트의 경우u
, delete의 경우d
, 읽기(스냅샷의 경우)r
입니다. 이전
-
존재하는 경우 이벤트 발생 전 행의 상태를 포함하는 선택적 필드입니다. 구조는
dbserver1.inventory.customers.Value
Kafka Connect 스키마로 설명되며,dbserver1
커넥터는inventory.customers
테이블의 모든 행에 사용합니다. 이후
-
존재하는 경우 이벤트가 발생한 후 행의 상태를 포함하는 선택적 필드입니다. 구조는
이전에
사용된 동일한dbserver1.inventory.customers.Value
Kafka Connect 스키마로 설명되어 있습니다. 소스
-
MySQL의 경우 커넥터 이름, 이벤트가 기록된 binlog 파일의 이름, 이벤트가 표시되는 해당
binlog
ts_ms
- 존재하는 경우 커넥터가 이벤트를 처리하는 JVM에서 Kafka Connect 작업을 실행하는 JVM에서 시스템 시계를 사용하는 시간이 포함된 선택적 필드입니다.
참고이벤트의 JSON 표현은 설명하는 행보다 훨씬 길 수 있습니다. 이는 모든 이벤트 키와 값이 있는 Kafka Connect가 페이로드 를 설명하는 스키마 를 제공하기 때문입니다. 시간이 지남에 따라 이러한 구조는 변경될 수 있습니다. 그러나 이벤트 자체에서 키와 값에 대한 스키마를 사용하면 특히 시간이 지남에 따라 진화할 때 애플리케이션이 메시지를 이해하기가 훨씬 쉬워집니다.
Debezium MySQL 커넥터는 데이터베이스 테이블의 구조를 기반으로 이러한 스키마를 구성합니다. DDL 문을 사용하여 MySQL 데이터베이스의 테이블 정의를 변경하는 경우 커넥터는 이러한 DDL 문을 읽고 Kafka Connect 스키마를 업데이트합니다. 이는 각 이벤트가 이벤트가 발생한 시점의 테이블과 정확히 동일하게 구조화되는 유일한 방법입니다. 그러나 단일 테이블에 대한 모든 이벤트를 포함하는 Kafka 항목에는 테이블 정의의 각 상태에 해당하는 이벤트가 있을 수 있습니다.
JSON 변환기는 모든 메시지의 키 및 값 스키마를 포함하므로 매우 자세한 이벤트를 생성합니다.
이벤트의 키 및 값 스키마를
인벤토리
데이터베이스 상태와 비교합니다. MySQL 명령줄 클라이언트를 실행 중인 터미널에서 다음 문을 실행합니다.mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
검토한 이벤트 레코드가 데이터베이스의 레코드와 일치함을 나타냅니다.