2.2. AMQ Streams에 Debezium 배포
Red Hat OpenShift Container Platform에서 Debezium 커넥터를 설정하려면 AMQ Streams를 사용하여 사용하려는 각 커넥터 플러그인에 대한 커넥터 플러그인이 포함된 Kafka Connect 컨테이너 이미지를 빌드합니다. 커넥터가 시작되면 구성된 데이터베이스에 연결하고 삽입, 업데이트 및 삭제된 행 또는 문서에 대해 변경 이벤트 레코드를 생성합니다.
Debezium 1.7부터 Debezium 커넥터를 배포하는 데 권장되는 방법은 AMQ Streams를 사용하여 커넥터 플러그인이 포함된 Kafka Connect 컨테이너 이미지를 빌드하는 것입니다.
배포 프로세스 중에 다음 사용자 정의 리소스(CR)를 생성하고 사용합니다.
-
Kafka Connect
인스턴스를 정의하는 KafkaConnect CR과 커넥터 아티팩트에 대한 정보를 이미지에 포함해야 합니다. -
커넥터가 소스 데이터베이스에 액세스하는 데 사용하는 정보를 포함하는 세부 정보를 제공하는
KafkaConnector
CR입니다. AMQ Streams가 Kafka Connect Pod를 시작한 후KafkaConnector
CR을 적용하여 커넥터를 시작합니다.
Kafka Connect 이미지의 빌드 사양에서는 배포할 수 있는 커넥터를 지정할 수 있습니다. 각 커넥터 플러그인에 대해 배포에 사용할 다른 구성 요소를 지정할 수도 있습니다. 예를 들어 서비스 레지스트리 아티팩트 또는 Debezium 스크립팅 구성 요소를 추가할 수 있습니다. AMQ Streams는 Kafka Connect 이미지를 빌드할 때 지정된 아티팩트를 다운로드하여 이미지에 통합합니다.
KafkaConnect
CR의 spec.build.output
매개변수는 생성된 Kafka Connect 컨테이너 이미지를 저장할 위치를 지정합니다. 컨테이너 이미지는 Docker 레지스트리 또는 OpenShift ImageStream에 저장할 수 있습니다. 이미지를 ImageStream에 저장하려면 Kafka Connect를 배포하기 전에 ImageStream을 생성해야 합니다. 이미지 스트림은 자동으로 생성되지 않습니다.
KafkaConnect
리소스를 사용하여 클러스터를 생성하는 경우 나중에 Kafka Connect REST API를 사용하여 커넥터를 생성하거나 업데이트할 수 없습니다. REST API를 사용하여 정보를 검색할 수 있습니다.
추가 리소스
- OpenShift에서 AMQ Streams를 사용하여 Kafka 연결 구성.
- OpenShift의 AMQ Streams 배포 및 관리에서 자동으로 새 컨테이너 이미지 빌드.
2.2.1. AMQ Streams를 사용하여 Debezium 배포
동일한 단계에 따라 각 유형의 Debezium 커넥터를 배포합니다. 다음 섹션에서는 Debezium MySQL 커넥터를 배포하는 방법을 설명합니다.
이전 버전의 AMQ Streams에서 OpenShift에 Debezium 커넥터를 배포하려면 먼저 커넥터의 Kafka Connect 이미지를 빌드해야 했습니다. 현재 OpenShift에 커넥터를 배포하는 방법은 AMQ Streams의 빌드 구성을 사용하여 사용하려는 Debezium 커넥터 플러그인이 포함된 Kafka Connect 컨테이너 이미지를 자동으로 빌드하는 것입니다.
빌드 프로세스 중에 AMQ Streams Operator는 Debezium 커넥터 정의를 비롯한 KafkaConnect
사용자 정의 리소스의 입력 매개변수를 Kafka Connect 컨테이너 이미지로 변환합니다. 빌드는 Red Hat Maven 리포지토리 또는 다른 구성된 HTTP 서버에서 필요한 아티팩트를 다운로드합니다.
새로 생성된 컨테이너는 .spec.build.output
에 지정된 컨테이너 레지스트리로 푸시되며 Kafka Connect 클러스터를 배포하는 데 사용됩니다. AMQ Streams는 Kafka Connect 이미지를 빌드한 후 KafkaConnector
사용자 정의 리소스를 생성하여 빌드에 포함된 커넥터를 시작합니다.
사전 요구 사항
- 클러스터 Operator가 설치된 OpenShift 클러스터에 액세스할 수 있습니다.
- AMQ Streams Operator가 실행 중입니다.
- Apache Kafka 클러스터는 OpenShift에서 AMQ Streams 배포 및 관리에 설명된 대로 배포됩니다.
- Kafka Connect는 AMQ Streams에 배포됨
- Red Hat Integration 라이센스가 있습니다.
-
OpenShift
oc
CLI 클라이언트가 설치되어 있거나 OpenShift Container Platform 웹 콘솔에 액세스할 수 있습니다. Kafka Connect 빌드 이미지를 저장하려는 방법에 따라 레지스트리 권한이 필요하거나 ImageStream 리소스를 생성해야 합니다.
- Red Hat Quay.io 또는 Docker Hub와 같은 이미지 레지스트리에 빌드 이미지를 저장하려면 다음을 수행합니다.
- 레지스트리에서 이미지를 생성하고 관리하는 계정 및 권한.
- 빌드 이미지를 기본 OpenShift ImageStream으로 저장하려면 다음을 수행합니다.
- ImageStream 리소스는 새 컨테이너 이미지를 저장하기 위해 클러스터에 배포됩니다. 클러스터에 대한 ImageStream을 명시적으로 생성해야 합니다. 이미지 스트림은 기본적으로 사용할 수 없습니다. ImageStreams에 대한 자세한 내용은 OpenShift Container Platform 설명서의 이미지 스트림 관리를 참조하십시오.
프로세스
- OpenShift 클러스터에 로그인합니다.
커넥터에 대한 Debezium
KafkaConnect
사용자 정의 리소스(CR)를 생성하거나 기존 리소스를 수정합니다. 예를 들어metadata.annotations
및spec.build
속성을 지정하는dbz-connect.yaml
이라는 이름으로KafkaConnect
CR을 생성합니다. 다음 예제에서는KafkaConnect
사용자 정의 리소스를 설명하는dbz-connect.yaml
파일에서 발췌한 내용을 보여줍니다.
예 2.1. Debezium 커넥터를 포함하는
KafkaConnect
사용자 정의 리소스를 정의하는dbz-connect.yaml
파일다음 예제에서는 사용자 정의 리소스가 다음 아티팩트를 다운로드하도록 구성되어 있습니다.
- Debezium 커넥터 아카이브
- 서비스 레지스트리 아카이브입니다. 서비스 레지스트리는 선택적 구성 요소입니다. 커넥터와 함께 Avro serialization을 사용하려는 경우에만 Service Registry 구성 요소를 추가합니다.
- Debezium 스크립팅 SMT 아카이브 및 Debezium 커넥터와 함께 사용하려는 관련 스크립팅 엔진입니다. SMT 아카이브 및 스크립팅 언어 종속 항목은 선택적 구성 요소입니다. Debezium 콘텐츠 기반 라우팅 SMT를 사용하거나 SMT 를 필터링 하려는 경우에만 이러한 구성 요소를 추가합니다.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: debezium-kafka-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: version: 3.5.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-mysql artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.3.4.Final-redhat-00001/debezium-connector-mysql-2.3.4.Final-redhat-00001-plugin.zip 7 - type: zip url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.4.4.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.4.4.Final-redhat-<build-number>.zip 8 - type: zip url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.3.4.Final-redhat-00001/debezium-scripting-2.3.4.Final-redhat-00001.zip 9 - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.11/groovy-3.0.11.jar 10 - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar - type: jar url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093 ...
표 2.1. Kafka Connect 구성 설정에 대한 설명 항목 설명 1
strimzi.io/use-connector-resources
주석을"true"
로 설정하여 Cluster Operator에서KafkaConnector
리소스를 사용하여 이 Kafka Connect 클러스터에서 커넥터를 구성할 수 있습니다.2
spec.build
구성은 빌드 이미지를 저장할 위치를 지정하고 플러그인 아티팩트 위치와 함께 이미지에 포함할 플러그인을 나열합니다.3
build.output
은 새로 빌드된 이미지가 저장되는 레지스트리를 지정합니다.4
이미지 출력의 이름과 이미지 이름을 지정합니다.
output.type
에 유효한 값은 Docker Hub 또는 Quay와 같은 컨테이너 레지스트리로 푸시하는docker
또는이미지
스트림으로 이미지를 내부 OpenShift ImageStream으로 내보내는 이미지 스트림입니다. ImageStream을 사용하려면 ImageStream 리소스를 클러스터에 배포해야 합니다. KafkaConnect 구성에서build.output
을 지정하는 방법에 대한 자세한 내용은 OpenShift에서 AMQ Streams 구성의 AMQ Streams 빌드 스키마 참조를 참조하십시오.5
플러그인
구성에는 Kafka Connect 이미지에 포함할 모든 커넥터가 나열됩니다. 목록의 각 항목에 대해 플러그인이름을
지정하고 커넥터를 빌드하는 데 필요한 아티팩트에 대한 정보를 지정합니다. 선택적으로 각 커넥터 플러그인에 대해 커넥터와 함께 사용할 수 있는 다른 구성 요소를 포함할 수 있습니다. 예를 들어 서비스 레지스트리 아티팩트 또는 Debezium 스크립팅 구성 요소를 추가할 수 있습니다.6
artifacts.type
값은artifacts.url
에 지정된 아티팩트의 파일 유형을 지정합니다. 유효한 유형은zip
,tgz
또는 Cryostat입니다
. Debezium 커넥터 아카이브는.zip
파일 형식으로 제공됩니다.type
값은url
필드에서 참조되는 파일의 유형과 일치해야 합니다.7
artifacts.url
값은 커넥터 아티팩트에 대한 파일을 저장하는 Maven 리포지토리와 같은 HTTP 서버의 주소를 지정합니다. Debezium 커넥터 아티팩트는 Red Hat Maven 리포지토리에서 사용할 수 있습니다. OpenShift 클러스터는 지정된 서버에 액세스할 수 있어야 합니다.8
(선택 사항) Service Registry 구성 요소를 다운로드하기
위한 아티팩트
을 지정합니다. 커넥터가 Apache Avro를 사용하여 기본 JSON 변환기를 사용하는 대신 서비스 레지스트리에서 이벤트 키와 값을 직렬화하려는 경우에만 서비스 레지스트리 아티팩트를 포함합니다.유형
및 URL9
(선택 사항) Debezium 스크립팅 SMT 아카이브와 함께 Debezium 커넥터와 함께 사용할 아티팩트
유형
및URL
을 지정합니다. Debezium 콘텐츠 기반 라우팅 SMT를 사용하거나 SMT 를 필터링하려는 경우에만 스크립팅 SMT 를 포함하려면 groovy와 같은 Cryostat 223 호환 스크립팅 구현도 배포해야 합니다.10
(선택 사항) Debezium 스크립팅 SMT에 필요한 223 호환 스크립팅 구현의 JAR 파일에 대한 아티팩트
유형
및URL
을 지정합니다.중요AMQ Streams를 사용하여 커넥터 플러그인을 Kafka Connect 이미지에 통합하는 경우 필요한 각 스크립팅 언어 구성 요소
artifacts.url
에서 JAR 파일의 위치를 지정해야 하며artifacts.type
값도 Cryostat로 설정되어야 합니다.잘못된 값으로 인해 런타임 시 커넥터가 실패합니다.
스크립팅과 함께 Apache Groovy 언어를 사용하려면 예제의 사용자 정의 리소스는 다음 라이브러리의 JAR 파일을 검색합니다.
-
Groovy
-
groovy-jsr223
(스크립트 에이전트) -
groovy-json
(JSON 문자열 구문 분석용 모듈)
또는 Debezium 스크립팅 SMT는 GraalVM JavaScript의 223 구현도 지원합니다.
다음 명령을 입력하여
KafkaConnect
빌드 사양을 OpenShift 클러스터에 적용합니다.oc create -f dbz-connect.yaml
사용자 정의 리소스에 지정된 구성에 따라 Streams Operator는 Kafka Connect 이미지를 배포하여 준비합니다.
빌드가 완료되면 Operator에서 이미지를 지정된 레지스트리 또는 ImageStream으로 푸시하고 Kafka Connect 클러스터를 시작합니다. 구성에 나열된 커넥터 아티팩트는 클러스터에서 사용할 수 있습니다.KafkaConnector
리소스를 생성하여 배포하려는 각 커넥터의 인스턴스를 정의합니다.
예를 들어 다음KafkaConnector
CR을 생성하고mysql-inventory-connector.yaml
로 저장합니다.예 2.2. Debezium
커넥터에 대한
파일KafkaConnector
사용자 정의 리소스를 정의하는 MySQL-inventory-connector.yamlapiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: debezium-kafka-connect-cluster name: inventory-connector-mysql 1 spec: class: io.debezium.connector.mysql.MySqlConnector 2 tasksMax: 1 3 config: 4 schema.history.internal.kafka.bootstrap.servers: debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092 schema.history.internal.kafka.topic: schema-changes.inventory database.hostname: mysql.debezium-mysql.svc.cluster.local 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.server.id: 184054 9 topic.prefix: inventory-connector-mysql 10 table.include.list: inventory.* 11 ...
표 2.2. 커넥터 구성 설정에 대한 설명 항목 설명 1
Kafka Connect 클러스터에 등록할 커넥터의 이름입니다.
2
커넥터 클래스의 이름입니다.
3
동시에 작동할 수 있는 작업 수입니다.
4
커넥터의 구성입니다.
5
호스트 데이터베이스 인스턴스의 주소입니다.
6
데이터베이스 인스턴스의 포트 번호입니다.
7
Debezium이 데이터베이스에 연결하는 데 사용하는 계정의 이름입니다.
8
Debezium이 데이터베이스 사용자 계정에 연결하는 데 사용하는 암호입니다.
9
커넥터의 고유 숫자 ID입니다.
10
데이터베이스 인스턴스 또는 클러스터의 주제 접두사입니다.
지정된 이름은 영숫자 또는 밑줄에서만 구성되어야 합니다.
주제 접두사는 이 커넥터에서 변경 이벤트를 수신하는 Kafka 주제의 접두사로 사용되므로 클러스터의 커넥터 간에 이름은 고유해야 합니다.
이 네임스페이스는 커넥터를 Avro 커넥터와 통합하는 경우 관련 Kafka Connect 스키마의 이름 및 해당 Avro 스키마의 네임스페이스에도 사용됩니다. https://access.redhat.com/documentation/en-us/red_hat_integration/2023.q4/html-single/debezium_user_guide/index#avro-serialization11
커넥터가 변경 이벤트를 캡처하는 테이블 목록입니다.
다음 명령을 실행하여 커넥터 리소스를 생성합니다.
oc create -n <namespace> -f <kafkaConnector>.yaml
예를 들면 다음과 같습니다.
oc create -n debezium -f mysql-inventory-connector.yaml
커넥터는 Kafka Connect 클러스터에 등록되어 있으며
KafkaConnector
CR의spec.config.database.dbname
에 지정된 데이터베이스에 대해 실행하기 시작합니다. 커넥터 Pod가 준비되면 Debezium이 실행됩니다.
이제 Debezium 배포를 확인할 준비가 되었습니다.
2.2.2. Debezium 커넥터가 실행 중인지 확인
커넥터가 오류 없이 올바르게 시작되면 커넥터가 캡처하도록 구성된 각 테이블에 대한 항목이 생성됩니다. 다운스트림 애플리케이션은 이러한 주제를 구독하여 소스 데이터베이스에서 발생하는 정보 이벤트를 검색할 수 있습니다.
커넥터가 실행 중인지 확인하려면 OpenShift Container Platform 웹 콘솔 또는 OpenShift CLI 툴(oc)에서 다음 작업을 수행합니다.
- 커넥터 상태를 확인합니다.
- 커넥터가 주제를 생성하는지 확인합니다.
- 각 테이블의 초기 스냅샷 중에 커넥터가 생성하는 읽기 작업("op":"r")에 대한 이벤트로 항목이 입력되었는지 확인합니다.
사전 요구 사항
- Debezium 커넥터는 OpenShift의 AMQ Streams에 배포됩니다.
-
OpenShift
oc
CLI 클라이언트가 설치되어 있어야 합니다. - OpenShift Container Platform 웹 콘솔에 액세스할 수 있습니다.
프로세스
다음 방법 중 하나를 사용하여
KafkaConnector
리소스의 상태를 확인합니다.OpenShift Container Platform 웹 콘솔에서 다음을 수행합니다.
-
홈
검색으로 이동합니다. -
검색 페이지에서 리소스를 클릭하여 리소스 선택 상자를 연 다음
KafkaConnector
를 입력합니다. - KafkaConnectors 목록에서 확인할 커넥터의 이름을 클릭합니다(예: inventory-connector-mysql ).
- Conditions 섹션의 Type 및 Status 열의 값이 Ready 및 True 로 설정되어 있는지 확인합니다.
-
홈
터미널 창에서 다음을 수행합니다.
다음 명령을 실행합니다.
oc describe KafkaConnector <connector-name> -n <project>
예를 들면 다음과 같습니다.
oc describe KafkaConnector inventory-connector-mysql -n debezium
이 명령은 다음 출력과 유사한 상태 정보를 반환합니다.
예 2.3.
KafkaConnector
리소스 상태Name: inventory-connector-mysql Namespace: debezium Labels: strimzi.io/cluster=debezium-kafka-connect-cluster Annotations: <none> API Version: kafka.strimzi.io/v1beta2 Kind: KafkaConnector ... Status: Conditions: Last Transition Time: 2021-12-08T17:41:34.897153Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.131.1.124:8083 Name: inventory-connector-mysql Tasks: Id: 0 State: RUNNING worker_id: 10.131.1.124:8083 Type: source Observed Generation: 1 Tasks Max: 1 Topics: inventory-connector-mysql.inventory inventory-connector-mysql.inventory.addresses inventory-connector-mysql.inventory.customers inventory-connector-mysql.inventory.geom inventory-connector-mysql.inventory.orders inventory-connector-mysql.inventory.products inventory-connector-mysql.inventory.products_on_hand Events: <none>
커넥터가 Kafka 주제를 생성했는지 확인합니다.
OpenShift Container Platform 웹 콘솔에서 다음을 수행합니다.
-
홈
검색으로 이동합니다. -
검색 페이지에서 리소스를 클릭하여 리소스 선택 상자를 연 다음
KafkaTopic
을 입력합니다. -
KafkaTopics 목록에서 확인할 주제의 이름을 클릭합니다(예: inventory
-connector-mysql.inventory.orders--ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d
). - Conditions 섹션의 Type 및 Status 열의 값이 Ready 및 True 로 설정되어 있는지 확인합니다.
-
홈
터미널 창에서 다음을 수행합니다.
다음 명령을 실행합니다.
oc get kafkatopics
이 명령은 다음 출력과 유사한 상태 정보를 반환합니다.
예 2.4.
KafkaTopic
리소스 상태NAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-cluster-configs debezium-kafka-cluster 1 1 True connect-cluster-offsets debezium-kafka-cluster 25 1 True connect-cluster-status debezium-kafka-cluster 5 1 True consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a debezium-kafka-cluster 50 1 True inventory-connector-mysql--a96f69b23d6118ff415f772679da623fbbb99421 debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480 debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5 debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.products---df0746db116844cee2297fab611c21b56f82dcef debezium-kafka-cluster 1 1 True inventory-connector-mysql.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5 debezium-kafka-cluster 1 1 True schema-changes.inventory debezium-kafka-cluster 1 1 True strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 debezium-kafka-cluster 1 1 True strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b debezium-kafka-cluster 1 1 True
주제 콘텐츠를 확인합니다.
- 터미널 창에서 다음 명령을 입력합니다.
oc exec -n <project> -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=<topic-name>
예를 들면 다음과 같습니다.
oc exec -n debezium -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \ > --bootstrap-server localhost:9092 \ > --from-beginning \ > --property print.key=true \ > --topic=inventory-connector-mysql.inventory.products_on_hand
주제 이름을 지정하는 형식은
oc describe
명령과 동일합니다. 예를 들어inventory-connector-mysql.inventory.addresses
.주제의 각 이벤트에 대해 명령은 다음 출력과 유사한 정보를 반환합니다.
예 2.5. Debezium 변경 이벤트의 내용
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-mysql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mysql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mysql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-mysql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.3.4.Final-redhat-00001","connector":"mysql","name":"inventory-connector-mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}
이전 예에서
페이로드
값은 테이블inventory.products_on_hand
에서 커넥터 스냅샷이 읽기("op" ="r
") 이벤트를 생성했음을 보여줍니다.product_id
레코드의"이전"
상태는null
이며 레코드에 대해 이전 값이 없음을 나타냅니다."after"
상태는product_id
101
가 있는 항목에 대해3
의수량을
보여줍니다.
여러 Kafka Connect 서비스 클러스터 및 여러 Kafka 클러스터를 사용하여 Debezium을 실행할 수 있습니다. Kafka Connect 클러스터에 배포할 수 있는 커넥터 수는 데이터베이스 이벤트의 볼륨 및 비율에 따라 달라집니다.
다음 단계
특정 커넥터 배포에 대한 자세한 내용은 Debezium 사용자 가이드의 다음 항목을 참조하십시오.