3.2. Kafka Connect 배포


MySQL 데이터베이스를 배포한 후 AMQ Streams를 사용하여 Debezium MySQL 커넥터 플러그인이 포함된 Kafka Connect 컨테이너 이미지를 빌드합니다. 배포 프로세스 중에 다음 사용자 정의 리소스(CR)를 생성하고 사용합니다.

  • Kafka Connect 인스턴스를 정의하고 이미지에 포함할 MySQL 커넥터 아티팩트에 대한 정보를 포함하는 KafkaConnect CR입니다.
  • MySQL 커넥터가 소스 데이터베이스에 액세스하는 데 사용하는 정보를 포함하는 세부 정보를 제공하는 KafkaConnector CR입니다. AMQ Streams가 Kafka Connect Pod를 시작한 후 KafkaConnector CR을 적용하여 커넥터를 시작합니다.

빌드 프로세스 중에 AMQ Streams Operator는 Debezium 커넥터 정의를 비롯한 KafkaConnect 사용자 정의 리소스의 입력 매개변수를 Kafka Connect 컨테이너 이미지로 변환합니다. 이 빌드에서는 Red Hat Maven 리포지토리에서 필요한 아티팩트를 다운로드하여 이미지에 통합합니다. 새로 생성된 컨테이너는 .spec.build.output 에 지정된 컨테이너 레지스트리로 푸시되며 Kafka Connect Pod를 배포하는 데 사용됩니다. AMQ Streams가 Kafka Connect 이미지를 빌드한 후 KafkaConnector 사용자 정의 리소스를 사용하여 커넥터를 시작합니다.

사전 요구 사항

프로세스

  1. OpenShift 클러스터에 로그인하여 프로젝트를 생성하거나 엽니다(예: debezium ).
  2. 커넥터에 대한 Debezium KafkaConnect 사용자 정의 리소스(CR)를 생성하거나 기존 리소스를 수정합니다.
    다음 예제에서는 KafkaConnect 사용자 정의 리소스를 설명하는 dbz-connect.yaml 파일에서 발췌한 내용을 보여줍니다.
    metadata.annotationsspec.build 속성이 필요합니다.

    예 3.1. Debezium 커넥터를 포함하는 KafkaConnect 사용자 정의 리소스를 정의하는 dbz-connect.yaml 파일

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      replicas: 1
      version: 3.5.0
      build: 2
        output: 3
          type: imagestream  4
          image: debezium-streams-connect:latest
        plugins: 5
          - name: debezium-connector-mysql
            artifacts:
              - type: zip 6
                url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.3.4.Final-redhat-00001/debezium-connector-mysql-2.3.4.Final-redhat-00001-plugin.zip 7
      bootstrapServers: my-cluster-kafka-bootstrap:9093
    
    ...
    표 3.1. Kafka Connect 구성 설정에 대한 설명
    항목설명

    1

    strimzi.io/use-connector-resources 주석을 "true" 로 설정하여 Cluster Operator에서 KafkaConnector 리소스를 사용하여 이 Kafka Connect 클러스터에서 커넥터를 구성할 수 있습니다.

    2

    spec.build 구성은 빌드 이미지를 저장할 위치를 지정하고 플러그인 아티팩트 위치와 함께 이미지에 포함할 플러그인을 나열합니다.

    3

    build.output 은 새로 빌드된 이미지가 저장되는 레지스트리를 지정합니다.

    4

    이미지 출력의 이름과 이미지 이름을 지정합니다. output.type 에 유효한 값은 Docker Hub 또는 Quay와 같은 컨테이너 레지스트리로 푸시하는 docker 또는 이미지를 내부 OpenShift ImageStream으로 내보내는 이미지 스트림 입니다. ImageStream을 사용하려면 ImageStream 리소스를 클러스터에 배포해야 합니다. KafkaConnect 구성에서 build.output 을 지정하는 방법에 대한 자세한 내용은 AMQ Streams Build 스키마 참조 설명서를 참조하십시오.

    5

    플러그인 구성에는 Kafka Connect 이미지에 포함할 모든 커넥터가 나열됩니다. 목록의 각 항목에 대해 플러그인 이름을 지정하고 커넥터를 빌드하는 데 필요한 아티팩트에 대한 정보를 지정합니다. 선택적으로 각 커넥터 플러그인에 대해 커넥터와 함께 사용할 수 있는 다른 구성 요소를 포함할 수 있습니다. 예를 들어 서비스 레지스트리 아티팩트 또는 Debezium 스크립팅 구성 요소를 추가할 수 있습니다.

    6

    artifacts.type 값은 artifacts.url 에 지정된 아티팩트의 파일 유형을 지정합니다. 유효한 유형은 zip,tgz 또는 Cryostat 입니다. Debezium 커넥터 아카이브는 .zip 파일 형식으로 제공됩니다. JDBC 드라이버 파일은 .jar 형식입니다. type 값은 url 필드에서 참조되는 파일의 유형과 일치해야 합니다.

    7

    artifacts.url 값은 커넥터 아티팩트에 대한 파일을 저장하는 Maven 리포지토리와 같은 HTTP 서버의 주소를 지정합니다. OpenShift 클러스터는 지정된 서버에 액세스할 수 있어야 합니다.

  3. 다음 명령을 입력하여 KafkaConnect 빌드 사양을 OpenShift 클러스터에 적용합니다.

    oc create -f dbz-connect.yaml

    사용자 정의 리소스에 지정된 구성에 따라 AMQ Streams Operator는 Kafka Connect 이미지를 배포하여 준비합니다.
    빌드가 완료되면 Operator에서 이미지를 지정된 레지스트리 또는 ImageStream으로 푸시하고 Kafka Connect 클러스터를 시작합니다. 구성에 나열된 커넥터 아티팩트는 클러스터에서 사용할 수 있습니다.

  4. KafkaConnector 리소스를 생성하여 MySQL 커넥터의 인스턴스를 정의합니다.
    예를 들어 다음 KafkaConnector CR을 생성하고 debezium-inventory-connector.yaml로 저장합니다.

    예 3.2. Debezium 커넥터에 대한 KafkaConnector 사용자 정의 리소스를 정의하는 mysql-inventory-connector.yaml 파일

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: my-connect-cluster
      name: inventory-connector 1
    spec:
      class: io.debezium.connector.mysql.MySqlConnector 2
      tasksMax: 1  3
      config:  4
        database.hostname: mysql 5
        database.port: 3306   6
        database.user: debezium  7
        database.password: dbz  8
        database.server.id: 184054
        topic.prefix: dbserver1  9
        table.include.list: inventory.*  10
        schema.history.internal.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 11
        schema.history.internal.kafka.topic: schema-changes.inventory 12
    표 3.2. 커넥터 구성 설정에 대한 설명
    항목설명

    1

    Kafka Connect 클러스터에 등록할 커넥터의 이름입니다.

    2

    커넥터 클래스의 이름입니다.

    3

    한 번에 하나의 작업만 작동해야 합니다. MySQL 커넥터가 MySQL 서버의 binlog 를 읽을 때 적절한 순서 및 이벤트 처리를 보장하기 위해 단일 커넥터 작업을 사용합니다. Kafka Connect 서비스는 커넥터를 사용하여 작업을 완료하기 위해 하나 이상의 작업을 시작합니다. Kafka Connect 서비스의 클러스터에서 실행 중인 작업을 자동으로 배포합니다. 서비스가 중지되거나 충돌하면 작업이 실행 중인 서비스에 재배포됩니다.

    4

    커넥터의 구성입니다.

    5

    MySQL 데이터베이스 인스턴스의 호스트 이름 또는 주소입니다.

    6

    데이터베이스 인스턴스의 포트 번호입니다.

    7

    Debezium이 데이터베이스에 연결되는 사용자 계정의 이름입니다.

    8

    Debezium이 데이터베이스 사용자 계정에 연결하는 데 사용하는 암호입니다.

    9

    MySQL 서버 또는 클러스터의 주제 접두사입니다. 이 문자열은 커넥터가 이벤트 레코드를 보내는 모든 Kafka 항목의 이름을 접두사로 지정합니다.

    10

    커넥터가 변경 이벤트를 캡처하는 테이블 목록입니다. 커넥터는 인벤토리 테이블에서 발생하는 경우에만 변경 사항을 탐지합니다.

    11

    커넥터가 DDL 문을 작성하고 복구하는 데 사용하는 Kafka 브로커 목록입니다. 커넥터가 변경 이벤트 레코드를 보내는 브로커와 동일합니다. 다시 시작한 후 커넥터는 커넥터가 읽기를 다시 시작할 때 binlog의 시점에 존재하는 데이터베이스 스키마를 복구합니다.

    12

    데이터베이스 스키마 기록 주제의 이름입니다. 이 주제는 내부 용도로만 사용되며 소비자가 사용해서는 안 됩니다.

  5. 다음 명령을 실행하여 커넥터 리소스를 생성합니다.

    oc create -n <namespace> -f <kafkaConnector>.yaml

    예를 들면 다음과 같습니다.

    oc create -n debezium -f mysql-inventory-connector.yaml

    커넥터는 Kafka Connect 클러스터에 등록되어 있으며 KafkaConnector CR의 spec.config.database.dbname 에 지정된 데이터베이스에 대해 실행하기 시작합니다. 커넥터 Pod가 준비되면 Debezium이 실행됩니다.

이제 커넥터가 생성되었고 inventory 데이터베이스의 변경 사항을 캡처하기 시작했는지 확인할 수 있습니다.

Red Hat logoGithubRedditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

© 2024 Red Hat, Inc.