2.3. Kafka Connect 클러스터 구성


이 섹션에서는 AMQ Streams 클러스터에서 Kafka Connect 배포를 구성하는 방법을 설명합니다.

Kafka Connect는 커넥터 플러그인을 사용하여 Kafka 브로커와 기타 시스템 간에 데이터를 스트리밍하기 위한 통합 툴킷입니다. Kafka Connect는 커넥터를 사용하여 데이터를 가져오거나 내보내기 위해 데이터베이스와 같은 외부 데이터 소스 또는 대상을 통합하기 위한 프레임워크를 제공합니다. Connectors는 필요한 연결 구성을 제공하는 플러그인입니다. KafkaConnect 리소스의 전체 스키마는 12.2.59절. “KafkaConnect 스키마 참조” 에 설명되어 있습니다.

커넥터 플러그인 배포에 대한 자세한 내용은 커넥터 플러그인을 사용하여 Kafka Connect 확장 기능을 참조하십시오.

2.3.1. Kafka Connect 구성

Kafka Connect를 사용하여 Kafka 클러스터에 대한 외부 데이터 연결을 설정합니다. KafkaConnect 리소스의 속성을 사용하여 Kafka Connect 배포를 구성합니다.

KafkaConnector 구성

KafkaConnector 리소스를 사용하면 OpenShift 네이티브 방식으로 Kafka Connect의 커넥터 인스턴스를 생성하고 관리할 수 있습니다.

Kafka Connect 구성에서 strimzi.io/use-connector-resources 주석을 추가하여 Kafka Connect 클러스터에 대한 KafkaConnectors를 활성화합니다. AMQ Streams가 데이터 연결에 필요한 커넥터 플러그인으로 컨테이너 이미지를 자동으로 빌드하도록 빌드 구성을 추가할 수도 있습니다. Kafka Connect 커넥터에 대한 외부 구성은 externalConfiguration 속성을 통해 지정됩니다.

커넥터를 관리하려면 Kafka Connect REST API를 사용하거나 KafkaConnector 사용자 정의 리소스를 사용할 수 있습니다. KafkaConnector 리소스는 연결된 Kafka Connect 클러스터와 동일한 네임스페이스에 배포해야 합니다. 이러한 방법을 사용하여 커넥터를 생성, 재구성 또는 삭제하는 방법에 대한 자세한 내용은 커넥터 생성 및 관리를 참조하십시오.

커넥터 구성은 HTTP 요청의 일부로 Kafka Connect에 전달되어 Kafka 자체에 저장됩니다. ConfigMaps 및 Secrets는 구성 및 기밀 데이터를 저장하는 데 사용되는 표준 OpenShift 리소스입니다. ConfigMaps 및 Secrets를 사용하여 커넥터의 특정 요소를 구성할 수 있습니다. 그런 다음 필요한 경우 구성을 분리하고 더 안전하게 유지하는 HTTP REST 명령에서 구성 값을 참조할 수 있습니다. 이 방법은 특히 사용자 이름, 암호 또는 인증서와 같은 기밀 데이터에 적용됩니다.

대량의 메시지 처리

많은 양의 메시지를 처리하도록 구성을 조정할 수 있습니다. 자세한 내용은 2.7절. “대량의 메시지 처리”의 내용을 참조하십시오.

사전 요구 사항

  • OpenShift 클러스터
  • 실행중인 Cluster Operator

다음을 실행하는 방법에 대한 자세한 내용은 OpenShift에서 AMQ Streams 배포 및 업그레이드 가이드를 참조하십시오.

절차

  1. KafkaConnect 리소스의 사양 속성을 편집합니다.

    구성할 수 있는 속성은 이 예제 구성에 표시됩니다.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect 
    1
    
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 
    2
    
    spec:
      replicas: 3 
    3
    
      authentication: 
    4
    
        type: tls
        certificateAndKey:
          certificate: source.crt
          key: source.key
          secretName: my-user-source
      bootstrapServers: my-cluster-kafka-bootstrap:9092 
    5
    
      tls: 
    6
    
        trustedCertificates:
          - secretName: my-cluster-cluster-cert
            certificate: ca.crt
          - secretName: my-cluster-cluster-cert
            certificate: ca2.crt
      config: 
    7
    
        group.id: my-connect-cluster
        offset.storage.topic: my-connect-cluster-offsets
        config.storage.topic: my-connect-cluster-configs
        status.storage.topic: my-connect-cluster-status
        key.converter: org.apache.kafka.connect.json.JsonConverter
        value.converter: org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable: true
        value.converter.schemas.enable: true
        config.storage.replication.factor: 3
        offset.storage.replication.factor: 3
        status.storage.replication.factor: 3
      build: 
    8
    
        output: 
    9
    
          type: docker
          image: my-registry.io/my-org/my-connect-cluster:latest
          pushSecret: my-registry-credentials
        plugins: 
    10
    
          - name: debezium-postgres-connector
            artifacts:
              - type: tgz
                url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.1.Final/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz
                sha512sum: 962a12151bdf9a5a30627eebac739955a4fd95a08d373b86bdcea2b4d0c27dd6e1edd5cb548045e115e33a9e69b1b2a352bee24df035a0447cb820077af00c03
          - name: camel-telegram
            artifacts:
              - type: tgz
                url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.7.0/camel-telegram-kafka-connector-0.7.0-package.tar.gz
                sha512sum: a9b1ac63e3284bea7836d7d24d84208c49cdf5600070e6bd1535de654f6920b74ad950d51733e8020bf4187870699819f54ef5859c7846ee4081507f48873479
      externalConfiguration: 
    11
    
        env:
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsAccessKey
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsSecretAccessKey
      resources: 
    12
    
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      logging: 
    13
    
        type: inline
        loggers:
          log4j.rootLogger: "INFO"
      readinessProbe: 
    14
    
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      metricsConfig: 
    15
    
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            name: my-config-map
            key: my-key
      jvmOptions: 
    16
    
        "-Xmx": "1g"
        "-Xms": "1g"
      image: my-org/my-image:latest 
    17
    
      rack:
        topologyKey: topology.kubernetes.io/zone 
    18
    
      template: 
    19
    
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
        connectContainer: 
    20
    
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
    Copy to Clipboard Toggle word wrap
    1
    KafkaConnect 를 사용합니다.
    2
    Kafka Connect 클러스터의 KafkaConnectors를 활성화합니다.
    3
    작업을 실행하는 작업자 의 복제본 노드 수입니다.
    4
    여기에 표시된 대로, OAuth 전달 토큰 을 사용하거나 SASL 기반 SCRAM-SHA-256/SCRAM-SHA-512 또는 PLAIN 메커니즘을 사용하여 Kafka Connect 클러스터에 대한 인증입니다. ??? 기본적으로 Kafka Connect는 일반 텍스트 연결을 사용하여 Kafka 브로커에 연결합니다.
    5
    Kafka Connect 클러스터 연결을 위한 부트스트랩 서버 입니다.
    6
    TLS 인증서가 클러스터의 X.509 형식으로 저장되는 키 이름으로 TLS 암호화 인증서가 동일한 시크릿에 저장된 경우 여러 번 나열할 수 있습니다.
    7
    작업자의 Kafka Connect 구성 (연결이 아님). 표준 Apache Kafka 구성은 AMQ Streams에서 직접 관리하지 않는 속성으로 제한될 수 있습니다.
    8
    커넥터 플러그인을 사용하여 컨테이너 이미지를 빌드하기 위한 구성 속성을 자동으로 빌드 합니다.
    9
    (필수) 새 이미지를 내보내는 컨테이너 레지스트리의 구성입니다.
    10
    (필수) 새 컨테이너 이미지에 추가할 커넥터 플러그인 및 해당 아티팩트 목록입니다. 각 플러그인은 하나 이상의 아티팩트 로 구성해야 합니다.
    11
    여기 또는 볼륨과 같이 환경 변수를 사용하여 Kafka 커넥터에 대한 외부 구성입니다. 구성 공급자 플러그인 을 사용하여 외부 소스에서 구성 값을 로드할 수도 있습니다.
    12
    지원되는 리소스 예약 요청, 현재 cpumemory, 사용할 수 있는 최대 리소스를 지정합니다.
    13
    지정된 Kafka Connect 로거 및 로그 수준이 직접(인라인) 또는 ConfigMap을 통해 간접적으로(외부)됩니다. 사용자 정의 ConfigMap은 log4j.properties 또는 log4j2.properties 키 아래에 배치해야 합니다. Kafka Connect log4j.rootLogger 로거의 경우 로그 수준을 INFO, ERROR, WARN, TRACE, DEBUG, FATAL 또는 OFF로 설정할 수 있습니다.
    14
    컨테이너를 다시 시작할 시기(라이브)와 컨테이너가 트래픽을 허용할 시기(준비)를 확인할 상태 점검 입니다.
    15
    Prometheus 지표: 이 예제에서 Prometheus Cryostat 내보내기에 대한 구성이 포함된 ConfigMap을 참조하여 활성화됩니다. metricsConfig.valueFrom.configMapKeyRef.key 아래에 빈 파일이 포함된 ConfigMap에 대한 참조를 사용하여 추가 구성 없이 메트릭을 활성화할 수 있습니다.
    16
    17
    18
    SPECIALIZED OPTION: 배포에 대한 Rack 인식 구성입니다. 이는 지역이 아닌 동일한 위치 내의 배포를 위한 특수 옵션입니다. 리더 복제본 대신 커넥터가 가장 가까운 복제본에서 사용할 수 있도록 하려면 이 옵션을 사용합니다. 경우에 따라 가장 가까운 복제본에서 소비하면 네트워크 사용률을 개선하거나 비용을 절감할 수 있습니다. topologyKey 는 랙 ID가 포함된 노드 레이블과 일치해야 합니다. 이 구성에 사용된 예제에서는 표준 topology.kubernetes.io/zone 레이블을 사용하는 영역을 지정합니다. 가장 가까운 복제본에서 사용하려면 Kafka 브로커 구성에서 RackAwareReplicaSelector 를 활성화합니다.
    19
    템플릿 사용자 지정. 여기에서 Pod는 유사성 방지를 사용하여 예약되므로 이름이 동일한 노드에 Pod가 예약되지 않습니다.
    20
  2. 리소스를 생성하거나 업데이트합니다.

    oc apply -f KAFKA-CONNECT-CONFIG-FILE
    Copy to Clipboard Toggle word wrap
  3. Kafka Connect에 대한 권한 부여가 활성화된 경우 Kafka Connect 사용자를 구성하여 Kafka Connect 소비자 그룹 및 항목에 대한 액세스를 활성화합니다.

2.3.2. 여러 인스턴스에 대한 Kafka Connect 구성

Kafka Connect의 여러 인스턴스를 실행하는 경우 다음 구성 속성의 기본 구성을 변경해야 합니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: connect-cluster 
1

    offset.storage.topic: connect-cluster-offsets 
2

    config.storage.topic: connect-cluster-configs 
3

    status.storage.topic: connect-cluster-status  
4

    # ...
# ...
Copy to Clipboard Toggle word wrap
1
Kafka 내의 Kafka Connect 클러스터 ID입니다.
2
커넥터 오프셋을 저장하는 Kafka 주제입니다.
3
커넥터 및 작업 상태 구성을 저장하는 Kafka 주제입니다.
4
커넥터 및 작업 상태 업데이트를 저장하는 Kafka 주제입니다.
참고

세 주제의 값은 동일한 group.id 가 있는 모든 Kafka Connect 인스턴스에 대해 동일해야 합니다.

기본 설정을 변경하지 않으면 동일한 Kafka 클러스터에 연결하는 각 Kafka Connect 인스턴스가 동일한 값으로 배포됩니다. 실제로 모든 인스턴스가 클러스터에서 실행되고 동일한 주제를 사용하기 위해 결합됩니다.

여러 Kafka Connect 클러스터가 동일한 주제를 사용하려고 하면 Kafka Connect가 예상대로 작동하지 않고 오류를 생성합니다.

여러 Kafka Connect 인스턴스를 실행하려면 각 인스턴스의 이러한 속성 값을 변경합니다.

2.3.3. Kafka Connect 사용자 권한 부여 구성

다음 절차에서는 Kafka Connect에 대한 사용자 액세스 권한을 부여하는 방법을 설명합니다.

Kafka에서 모든 유형의 권한 부여를 사용하는 경우 Kafka Connect 사용자에게 소비자 그룹에 대한 읽기/쓰기 액세스 권한과 Kafka Connect의 내부 주제가 필요합니다.

소비자 그룹 및 내부 주제의 속성은 AMQ Streams에 의해 자동으로 구성되거나 KafkaConnect 리소스의 사양에 명시적으로 지정할 수 있습니다.

KafkaConnect 리소스의 구성 속성 예

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: my-connect-cluster 
1

    offset.storage.topic: my-connect-cluster-offsets 
2

    config.storage.topic: my-connect-cluster-configs 
3

    status.storage.topic: my-connect-cluster-status 
4

    # ...
  # ...
Copy to Clipboard Toggle word wrap

1
Kafka 내의 Kafka Connect 클러스터 ID입니다.
2
커넥터 오프셋을 저장하는 Kafka 주제입니다.
3
커넥터 및 작업 상태 구성을 저장하는 Kafka 주제입니다.
4
커넥터 및 작업 상태 업데이트를 저장하는 Kafka 주제입니다.

다음 절차에서는 간단한 권한 부여를 사용할 때 액세스 제공 방법을 보여줍니다.

간단한 인증에서는 Kafka AclAuthorizer 플러그인에서 처리하는 ACL 규칙을 사용하여 올바른 액세스 수준을 제공합니다. 간단한 인증을 사용하도록 KafkaUser 리소스를 구성하는 방법에 대한 자세한 내용은 AclRule 스키마 참조를 참조하십시오.

참고

소비자 그룹 및 주제의 기본값은 여러 인스턴스를 실행할 때 다릅니다.

사전 요구 사항

  • OpenShift 클러스터
  • 실행중인 Cluster Operator

절차

  1. KafkaUser 리소스에서 권한 부여 속성을 편집하여 사용자에게 액세스 권한을 제공합니다.

    다음 예에서 액세스 권한은 리터럴 이름 값을 사용하여 Kafka Connect 주제 및 소비자 그룹에 대해 구성됩니다.

    Expand
    속성이름

    offset.storage.topic

    connect-cluster-offsets

    status.storage.topic

    connect-cluster-status

    config.storage.topic

    connect-cluster-configs

    group

    connect-cluster

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-user
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      # ...
      authorization:
        type: simple
        acls:
          # access to offset.storage.topic
          - resource:
              type: topic
              name: connect-cluster-offsets
              patternType: literal
            operation: Write
            host: "*"
          - resource:
              type: topic
              name: connect-cluster-offsets
              patternType: literal
            operation: Create
            host: "*"
          - resource:
              type: topic
              name: connect-cluster-offsets
              patternType: literal
            operation: Describe
            host: "*"
          - resource:
              type: topic
              name: connect-cluster-offsets
              patternType: literal
            operation: Read
            host: "*"
          # access to status.storage.topic
          - resource:
              type: topic
              name: connect-cluster-status
              patternType: literal
            operation: Write
            host: "*"
          - resource:
              type: topic
              name: connect-cluster-status
              patternType: literal
            operation: Create
            host: "*"
          - resource:
              type: topic
              name: connect-cluster-status
              patternType: literal
            operation: Describe
            host: "*"
          - resource:
              type: topic
              name: connect-cluster-status
              patternType: literal
            operation: Read
            host: "*"
          # access to config.storage.topic
          - resource:
              type: topic
              name: connect-cluster-configs
              patternType: literal
            operation: Write
            host: "*"
          - resource:
              type: topic
              name: connect-cluster-configs
              patternType: literal
            operation: Create
            host: "*"
          - resource:
              type: topic
              name: connect-cluster-configs
              patternType: literal
            operation: Describe
            host: "*"
          - resource:
              type: topic
              name: connect-cluster-configs
              patternType: literal
            operation: Read
            host: "*"
          # consumer group
          - resource:
              type: group
              name: connect-cluster
              patternType: literal
            operation: Read
            host: "*"
    Copy to Clipboard Toggle word wrap
  2. 리소스를 생성하거나 업데이트합니다.

    oc apply -f KAFKA-USER-CONFIG-FILE
    Copy to Clipboard Toggle word wrap

2.3.4. Kafka Connect 클러스터 리소스 목록

다음 리소스는 OpenShift 클러스터의 Cluster Operator에 의해 생성됩니다.

connect-cluster-name-connect
Kafka Connect 작업자 노드 Pod를 생성합니다.
connect-cluster-name-connect-api
Kafka Connect 클러스터 관리를 위한 REST 인터페이스를 노출하는 서비스입니다.
connect-cluster-name-config
Kafka Connect ancillary 구성이 포함되어 Kafka 브로커 Pod를 통해 볼륨으로 마운트되는 ConfigMap입니다.
connect-cluster-name-connect
Kafka Connect 작업자 노드에 대해 구성된 Pod 중단 예산입니다.

2.3.5. 변경 데이터 캡처를 위한 Debezium 통합

Red Hat Debezium은 분산 변경 데이터 캡처 플랫폼입니다. 데이터베이스의 행 수준 변경 사항을 캡처하고 변경 이벤트 레코드를 생성하며 Kafka 주제로 레코드를 스트리밍합니다. Debezium은 Apache Kafka를 기반으로 합니다. Debezium을 AMQ Streams와 배포 및 통합할 수 있습니다. AMQ Streams를 배포한 후 Kafka Connect를 통해 Debezium을 커넥터 구성으로 배포합니다. Debezium은 OpenShift의 AMQ Streams에 변경 이벤트 레코드를 전달합니다. 애플리케이션은 이러한 변경 이벤트 스트림을 읽고 해당 스트림이 발생한 순서대로 변경 이벤트에 액세스할 수 있습니다.

Debezium은 다음을 포함하여 여러 가지 용도가 있습니다.

  • 데이터 복제
  • 캐시 및 검색 인덱스 업데이트
  • 모놀리식 애플리케이션 간소화
  • 데이터 통합
  • 스트리밍 쿼리 활성화

데이터베이스 변경 사항을 캡처하려면 Debezium 데이터베이스 커넥터를 사용하여 Kafka Connect를 배포합니다. 커넥터 인스턴스를 정의하도록 KafkaConnector 리소스를 구성합니다.

AMQ Streams를 사용하여 Debezium을 배포하는 방법에 대한 자세한 내용은 제품 설명서 를 참조하십시오. Debezium 설명서에는 데이터베이스 업데이트의 변경 이벤트 레코드를 보는 데 필요한 서비스 및 커넥터 설정 프로세스를 안내하는 Debezium 가이드가 포함되어 있습니다.

Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat
맨 위로 이동