3.2. Kafka Connect 배포
MySQL 데이터베이스를 배포한 후 AMQ Streams를 사용하여 Debezium MySQL 커넥터 플러그인이 포함된 Kafka Connect 컨테이너 이미지를 빌드합니다. 배포 프로세스 중에 다음 사용자 정의 리소스(CR)를 생성하고 사용합니다.
-
Kafka Connect
인스턴스를 정의하고 이미지에 포함할 MySQL 커넥터 아티팩트에 대한 정보를 포함하는 KafkaConnect CR입니다. -
MySQL 커넥터가 소스 데이터베이스에 액세스하는 데 사용하는 정보를 포함하는 세부 정보를 제공하는
KafkaConnector
CR입니다. AMQ Streams가 Kafka Connect Pod를 시작한 후KafkaConnector
CR을 적용하여 커넥터를 시작합니다.
빌드 프로세스 중에 AMQ Streams Operator는 Debezium 커넥터 정의를 비롯한 KafkaConnect
사용자 정의 리소스의 입력 매개변수를 Kafka Connect 컨테이너 이미지로 변환합니다. 이 빌드에서는 Red Hat Maven 리포지토리에서 필요한 아티팩트를 다운로드하여 이미지에 통합합니다. 새로 생성된 컨테이너는 .spec.build.output
에 지정된 컨테이너 레지스트리로 푸시되며 Kafka Connect Pod를 배포하는 데 사용됩니다. AMQ Streams가 Kafka Connect 이미지를 빌드한 후 KafkaConnector
사용자 정의 리소스를 사용하여 커넥터를 시작합니다.
사전 요구 사항
- AMQ Streams는 OpenShift 클러스터에서 실행되고 있습니다.
- AMQ Streams Cluster Operator 가 OpenShift 클러스터에 설치되어 있습니다.
- Apache Kafka 및 Kafka Connect 는 AMQ Streams에서 실행되고 있습니다.
프로세스
-
OpenShift 클러스터에 로그인하여 프로젝트를 생성하거나 엽니다(예:
debezium
). 커넥터에 대한 Debezium
KafkaConnect
사용자 정의 리소스(CR)를 생성하거나 기존 리소스를 수정합니다.
다음 예제에서는KafkaConnect
사용자 정의 리소스를 설명하는dbz-connect.yaml
파일에서 발췌한 내용을 보여줍니다.
metadata.annotations
및spec.build
속성이 필요합니다.예 3.1. Debezium 커넥터를 포함하는
KafkaConnect
사용자 정의 리소스를 정의하는dbz-connect.yaml
파일apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster annotations: strimzi.io/use-connector-resources: "true" 1 spec: replicas: 1 version: 3.5.0 build: 2 output: 3 type: imagestream 4 image: debezium-streams-connect:latest plugins: 5 - name: debezium-connector-mysql artifacts: - type: zip 6 url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.3.4.Final-redhat-00001/debezium-connector-mysql-2.3.4.Final-redhat-00001-plugin.zip 7 bootstrapServers: my-cluster-kafka-bootstrap:9093 ...
표 3.1. Kafka Connect 구성 설정에 대한 설명 항목 설명 1
strimzi.io/use-connector-resources
주석을"true"
로 설정하여 Cluster Operator에서KafkaConnector
리소스를 사용하여 이 Kafka Connect 클러스터에서 커넥터를 구성할 수 있습니다.2
spec.build
구성은 빌드 이미지를 저장할 위치를 지정하고 플러그인 아티팩트 위치와 함께 이미지에 포함할 플러그인을 나열합니다.3
build.output
은 새로 빌드된 이미지가 저장되는 레지스트리를 지정합니다.4
이미지 출력의 이름과 이미지 이름을 지정합니다.
output.type
에 유효한 값은 Docker Hub 또는 Quay와 같은 컨테이너 레지스트리로 푸시하는docker
또는 이미지를 내부 OpenShift ImageStream으로 내보내는 이미지스트림
입니다. ImageStream을 사용하려면 ImageStream 리소스를 클러스터에 배포해야 합니다. KafkaConnect 구성에서build.output
을 지정하는 방법에 대한 자세한 내용은 AMQ Streams Build 스키마 참조 설명서를 참조하십시오.5
플러그인
구성에는 Kafka Connect 이미지에 포함할 모든 커넥터가 나열됩니다. 목록의 각 항목에 대해 플러그인이름을
지정하고 커넥터를 빌드하는 데 필요한 아티팩트에 대한 정보를 지정합니다. 선택적으로 각 커넥터 플러그인에 대해 커넥터와 함께 사용할 수 있는 다른 구성 요소를 포함할 수 있습니다. 예를 들어 서비스 레지스트리 아티팩트 또는 Debezium 스크립팅 구성 요소를 추가할 수 있습니다.6
artifacts.type
값은artifacts.url
에 지정된 아티팩트의 파일 유형을 지정합니다. 유효한 유형은zip
,tgz
또는 Cryostat입니다
. Debezium 커넥터 아카이브는.zip
파일 형식으로 제공됩니다. JDBC 드라이버 파일은.jar
형식입니다.type
값은url
필드에서 참조되는 파일의 유형과 일치해야 합니다.7
artifacts.url
값은 커넥터 아티팩트에 대한 파일을 저장하는 Maven 리포지토리와 같은 HTTP 서버의 주소를 지정합니다. OpenShift 클러스터는 지정된 서버에 액세스할 수 있어야 합니다.다음 명령을 입력하여
KafkaConnect
빌드 사양을 OpenShift 클러스터에 적용합니다.oc create -f dbz-connect.yaml
사용자 정의 리소스에 지정된 구성에 따라 AMQ Streams Operator는 Kafka Connect 이미지를 배포하여 준비합니다.
빌드가 완료되면 Operator에서 이미지를 지정된 레지스트리 또는 ImageStream으로 푸시하고 Kafka Connect 클러스터를 시작합니다. 구성에 나열된 커넥터 아티팩트는 클러스터에서 사용할 수 있습니다.KafkaConnector
리소스를 생성하여 MySQL 커넥터의 인스턴스를 정의합니다.
예를 들어 다음KafkaConnector
CR을 생성하고debezium-inventory-connector.yaml
로 저장합니다.예 3.2. Debezium 커넥터에 대한
KafkaConnector
사용자 정의 리소스를 정의하는mysql-inventory-connector.yaml
파일apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: inventory-connector 1 spec: class: io.debezium.connector.mysql.MySqlConnector 2 tasksMax: 1 3 config: 4 database.hostname: mysql 5 database.port: 3306 6 database.user: debezium 7 database.password: dbz 8 database.server.id: 184054 topic.prefix: dbserver1 9 table.include.list: inventory.* 10 schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 11 schema.history.internal.kafka.topic: schema-changes.inventory 12
표 3.2. 커넥터 구성 설정에 대한 설명 항목 설명 1
Kafka Connect 클러스터에 등록할 커넥터의 이름입니다.
2
커넥터 클래스의 이름입니다.
3
한 번에 하나의 작업만 작동해야 합니다. MySQL 커넥터가 MySQL 서버의
binlog
를 읽을 때 적절한 순서 및 이벤트 처리를 보장하기 위해 단일 커넥터 작업을 사용합니다. Kafka Connect 서비스는 커넥터를 사용하여 작업을 완료하기 위해 하나 이상의 작업을 시작합니다. Kafka Connect 서비스의 클러스터에서 실행 중인 작업을 자동으로 배포합니다. 서비스가 중지되거나 충돌하면 작업이 실행 중인 서비스에 재배포됩니다.4
커넥터의 구성입니다.
5
MySQL 데이터베이스 인스턴스의 호스트 이름 또는 주소입니다.
6
데이터베이스 인스턴스의 포트 번호입니다.
7
Debezium이 데이터베이스에 연결되는 사용자 계정의 이름입니다.
8
Debezium이 데이터베이스 사용자 계정에 연결하는 데 사용하는 암호입니다.
9
MySQL 서버 또는 클러스터의 주제 접두사입니다. 이 문자열은 커넥터가 이벤트 레코드를 보내는 모든 Kafka 항목의 이름을 접두사로 지정합니다.
10
커넥터가 변경 이벤트를 캡처하는 테이블 목록입니다. 커넥터는
인벤토리
테이블에서 발생하는 경우에만 변경 사항을 탐지합니다.11
커넥터가 DDL 문을 작성하고 복구하는 데 사용하는 Kafka 브로커 목록입니다. 커넥터가 변경 이벤트 레코드를 보내는 브로커와 동일합니다. 다시 시작한 후 커넥터는 커넥터가 읽기를 다시 시작할 때 binlog의 시점에 존재하는 데이터베이스 스키마를 복구합니다.
12
데이터베이스 스키마 기록 주제의 이름입니다. 이 주제는 내부 용도로만 사용되며 소비자가 사용해서는 안 됩니다.
다음 명령을 실행하여 커넥터 리소스를 생성합니다.
oc create -n <namespace> -f <kafkaConnector>.yaml
예를 들면 다음과 같습니다.
oc create -n debezium -f mysql-inventory-connector.yaml
커넥터는 Kafka Connect 클러스터에 등록되어 있으며
KafkaConnector
CR의spec.config.database.dbname
에 지정된 데이터베이스에 대해 실행하기 시작합니다. 커넥터 Pod가 준비되면 Debezium이 실행됩니다.
이제 커넥터가 생성되었고 inventory
데이터베이스의 변경 사항을 캡처하기 시작했는지 확인할 수 있습니다.