6.4. Kafka Connect 배포


Kafka Connect 는 커넥터 플러그인을 사용하여 Kafka 브로커와 기타 시스템 간에 데이터를 스트리밍하기 위한 통합 툴킷입니다. Kafka Connect는 커넥터를 사용하여 데이터를 가져오거나 내보낼 수 있도록 데이터베이스 또는 메시징 시스템과 같은 외부 데이터 소스 또는 대상을 통합하기 위한 프레임워크를 제공합니다. Connectors는 필요한 연결 구성을 제공하는 플러그인입니다.

AMQ Streams에서 Kafka Connect는 분산 모드로 배포됩니다. Kafka Connect는 독립 실행형 모드에서도 작동할 수 있지만 AMQ Streams에서는 지원되지 않습니다.

커넥터 의 개념을 사용하여 Kafka Connect는 확장성과 신뢰성을 유지하면서 Kafka 클러스터 내외로 대량의 데이터를 이동하기 위한 프레임워크를 제공합니다.

Cluster Operator는 KafkaConnector 리소스를 사용하여 생성된 KafkaConnect 리소스 및 커넥터를 사용하여 배포된 Kafka Connect 클러스터를 관리합니다.

Kafka Connect를 사용하려면 다음을 수행해야 합니다.

참고

커넥터 라는 용어는 Kafka Connect 클러스터 또는 커넥터 클래스 내에서 실행되는 커넥터 인스턴스를 의미합니다. 이 가이드에서 커넥터 라는 용어는 컨텍스트에서 의미가 명확해질 때 사용됩니다.

6.4.1. OpenShift 클러스터에 Kafka Connect 배포

다음 절차에서는 Cluster Operator를 사용하여 Kafka Connect 클러스터를 OpenShift 클러스터에 배포하는 방법을 설명합니다.

Kafka Connect 클러스터 배포는 연결 작업의 워크로드를 작업으로 배포하는 구성 가능한 수의 노드( 작업자라고도 함)로 구현되어 메시지 flow가 확장성이 높고 신뢰할 수 있습니다.

배포에서는 YAML 파일을 사용하여 사양을 제공하여 KafkaConnect 리소스를 생성합니다.

AMQ Streams는 구성 파일 예제 를 제공합니다. 이 절차에서는 다음 예제 파일을 사용합니다.

  • examples/connect/kafka-connect.yaml

절차

  1. OpenShift 클러스터에 Kafka Connect를 배포합니다. examples/connect/kafka-connect.yaml 파일을 사용하여 Kafka Connect를 배포합니다.

    oc apply -f examples/connect/kafka-connect.yaml
  2. 배포 상태를 확인합니다.

    oc get pods -n <my_cluster_operator_namespace>

    출력에 배포 이름과 준비 상태가 표시됩니다.

    NAME                                 READY  STATUS   RESTARTS
    my-connect-cluster-connect-<pod_id>  1/1    Running  0

    my-connect-cluster 는 Kafka Connect 클러스터의 이름입니다.

    Pod ID는 생성된 각 Pod를 식별합니다.

    기본 배포를 통해 단일 Kafka Connect Pod를 생성합니다.

    READY 에는 준비되거나 예상된 복제본 수가 표시됩니다. STATUSRunning 으로 표시되면 배포가 성공적으로 수행됩니다.

6.4.2. 여러 인스턴스에 대한 Kafka Connect 구성

Kafka Connect의 여러 인스턴스를 실행하는 경우 다음 구성 속성의 기본 구성을 변경해야 합니다.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: connect-cluster 
1

    offset.storage.topic: connect-cluster-offsets 
2

    config.storage.topic: connect-cluster-configs 
3

    status.storage.topic: connect-cluster-status  
4

    # ...
# ...
1
Kafka 내의 Kafka Connect 클러스터 ID입니다.
2
커넥터 오프셋을 저장하는 Kafka 주제입니다.
3
커넥터 및 작업 상태 구성을 저장하는 Kafka 주제입니다.
4
커넥터 및 작업 상태 업데이트를 저장하는 Kafka 주제입니다.
참고

세 항목의 값은 동일한 group.id 가 있는 모든 Kafka Connect 인스턴스에 대해 동일해야 합니다.

기본 설정을 변경하지 않으면 동일한 Kafka 클러스터에 연결하는 각 Kafka Connect 인스턴스가 동일한 값으로 배포됩니다. 실제로 모든 인스턴스가 클러스터에서 실행되고 동일한 주제를 사용하도록 연결된 것입니다.

여러 Kafka Connect 클러스터가 동일한 주제를 사용하려고 하면 Kafka Connect가 예상대로 작동하지 않고 오류를 생성합니다.

여러 Kafka Connect 인스턴스를 실행하려면 각 인스턴스에 대해 이러한 속성의 값을 변경합니다.

6.4.3. 커넥터 추가

Kafka Connect는 커넥터를 사용하여 다른 시스템과 통합하여 데이터를 스트리밍합니다. 커넥터는 다음 유형 중 하나일 수 있는 Kafka Connector 클래스의 인스턴스입니다.

소스 커넥터
소스 커넥터는 외부 시스템에서 데이터를 가져와서 메시지로 Kafka에 제공하는 런타임 엔티티입니다.
싱크 커넥터
싱크 커넥터는 Kafka 주제에서 메시지를 가져와서 외부 시스템에 제공하는 런타임 엔티티입니다.

Kafka Connect는 플러그인 아키텍처를 사용하여 커넥터에 대한 구현 아티팩트를 제공합니다. 플러그인을 사용하면 다른 시스템에 연결하고 데이터를 조작할 수 있는 추가 구성을 제공할 수 있습니다. 플러그인에는 커넥터 및 기타 구성 요소(예: 데이터 변환기 및 변환)가 포함됩니다. 커넥터는 특정 유형의 외부 시스템에서 작동합니다. 각 커넥터는 해당 구성의 스키마를 정의합니다. Kafka Connect에 구성을 제공하여 Kafka Connect 내에서 커넥터 인스턴스를 생성합니다. 그런 다음 커넥터 인스턴스는 시스템 간에 데이터를 이동하기 위한 일련의 작업을 정의합니다.

다음 방법 중 하나로 Kafka Connect에 커넥터 플러그인을 추가합니다.

컨테이너 이미지에 플러그인을 추가한 후에는 다음과 같은 방법으로 커넥터 인스턴스를 시작, 중지 및 관리할 수 있습니다.

이러한 옵션을 사용하여 새 커넥터 인스턴스를 만들 수도 있습니다.

AMQ Streams가 추가 커넥터를 사용하여 새 컨테이너 이미지를 자동으로 빌드하도록 Kafka Connect를 구성합니다. KafkaConnect 사용자 정의 리소스의 .spec.build.plugins 속성을 사용하여 커넥터 플러그인을 정의합니다. AMQ Streams는 커넥터 플러그인을 자동으로 다운로드하여 새 컨테이너 이미지에 추가합니다. 컨테이너는 .spec.build.output 에 지정된 컨테이너 리포지토리로 푸시되고 Kafka Connect 배포에 자동으로 사용됩니다.

사전 요구 사항

이미지를 푸시, 저장 및 가져올 수 있는 자체 컨테이너 레지스트리를 제공해야 합니다. AMQ Streams는 프라이빗 컨테이너 레지스트리 및 Quay 또는 Docker Hub 와 같은 공개 레지스트리를 지원합니다.

절차

  1. .spec.build.output 에서 컨테이너 레지스트리를 지정하고 .spec.build.plugins 에서 추가 커넥터를 지정하여 KafkaConnect 사용자 정의 리소스를 구성합니다.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
    spec: 
    1
    
      #...
      build:
        output: 
    2
    
          type: docker
          image: my-registry.io/my-org/my-connect-cluster:latest
          pushSecret: my-registry-credentials
        plugins: 
    3
    
          - name: debezium-postgres-connector
            artifacts:
              - type: tgz
                url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.1.3.Final/debezium-connector-postgres-2.1.3.Final-plugin.tar.gz
                sha512sum: c4ddc97846de561755dc0b021a62aba656098829c70eb3ade3b817ce06d852ca12ae50c0281cc791a5a131cb7fc21fb15f4b8ee76c6cae5dd07f9c11cb7c6e79
          - name: camel-telegram
            artifacts:
              - type: tgz
                url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.11.5/camel-telegram-kafka-connector-0.11.5-package.tar.gz
                sha512sum: d6d9f45e0d1dbfcc9f6d1c7ca2046168c764389c78bc4b867dab32d24f710bb74ccf2a007d7d7a8af2dfca09d9a52ccbc2831fc715c195a3634cca055185bd91
      #...
    1
    2
    (필수) 새 이미지가 푸시되는 컨테이너 레지스트리의 설정(필수)입니다.
    3
    (필수) 새 컨테이너 이미지에 추가할 커넥터 플러그인 및 아티팩트 목록. 각 플러그인은 하나 이상의 아티팩트 로 구성해야 합니다.
  2. 리소스를 생성하거나 업데이트합니다.

    $ oc apply -f <kafka_connect_configuration_file>
  3. 새 컨테이너 이미지가 빌드될 때까지 기다린 후 Kafka Connect 클러스터가 배포될 때까지 기다립니다.
  4. Kafka Connect REST API 또는 KafkaConnector 사용자 정의 리소스를 사용하여 사용자가 추가한 커넥터 플러그인을 사용합니다.

Kafka Connect 기본 이미지에서 커넥터 플러그인을 사용하여 사용자 지정 Docker 이미지를 /opt/kafka/plugins 디렉터리에 추가합니다.

Red Hat Ecosystem Catalog 의 Kafka 컨테이너 이미지를 기본 이미지로 사용하여 추가 커넥터 플러그인으로 자체 사용자 지정 이미지를 생성할 수 있습니다.

시작 시 Kafka Connect의 AMQ Streams 버전은 /opt/kafka/plugins 디렉터리에 포함된 타사 커넥터 플러그인을 로드합니다.

절차

  1. registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1 을 기본 이미지로 사용하여 새 Dockerfile 을 생성합니다.

    FROM registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1
    USER root:root
    COPY ./my-plugins/ /opt/kafka/plugins/
    USER 1001

    plugins 파일 예

    $ tree ./my-plugins/
    ./my-plugins/
    ├── debezium-connector-mongodb
    │   ├── bson-<version>.jar
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mongodb-<version>.jar
    │   ├── debezium-core-<version>.jar
    │   ├── LICENSE.txt
    │   ├── mongodb-driver-core-<version>.jar
    │   ├── README.md
    │   └── # ...
    ├── debezium-connector-mysql
    │   ├── CHANGELOG.md
    │   ├── CONTRIBUTE.md
    │   ├── COPYRIGHT.txt
    │   ├── debezium-connector-mysql-<version>.jar
    │   ├── debezium-core-<version>.jar
    │   ├── LICENSE.txt
    │   ├── mysql-binlog-connector-java-<version>.jar
    │   ├── mysql-connector-java-<version>.jar
    │   ├── README.md
    │   └── # ...
    └── debezium-connector-postgres
        ├── CHANGELOG.md
        ├── CONTRIBUTE.md
        ├── COPYRIGHT.txt
        ├── debezium-connector-postgres-<version>.jar
        ├── debezium-core-<version>.jar
        ├── LICENSE.txt
        ├── postgresql-<version>.jar
        ├── protobuf-java-<version>.jar
        ├── README.md
        └── # ...

    COPY 명령은 컨테이너 이미지에 복사할 플러그인 파일을 가리킵니다.

    이 예제에서는 모든 파일이 간결하게 나열되지는 않지만 Debezium 커넥터(MongoDB, MySQL 및 PostgreSQL)에 대한 플러그인을 추가합니다. Kafka Connect에서 실행 중인 Debezium은 다른 Kafka Connect 작업과 동일합니다.

  2. 컨테이너 이미지를 빌드합니다.
  3. 사용자 정의 이미지를 컨테이너 레지스트리로 내보냅니다.
  4. 새 컨테이너 이미지를 가리킵니다.

    다음 방법 중 하나로 이미지를 가리킬 수 있습니다.

    • KafkaConnect 사용자 정의 리소스의 KafkaConnect.spec.image 속성을 편집합니다.

      설정된 경우 이 속성은 Cluster Operator의 STRIMZI_KAFKA_CONNECT_IMAGES 환경 변수를 덮어씁니다.

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-connect-cluster
      spec: 
      1
      
        #...
        image: my-new-container-image 
      2
      
        config: 
      3
      
          #...
      1
      2
      포드의 Docker 이미지입니다.
      3
      Kafka Connect 작업자 구성(연결이 아님).
    • install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 파일에서 STRIMZI_KAFKA_CONNECT_IMAGES 환경 변수를 편집하여 새 컨테이너 이미지를 가리키는 다음 Cluster Operator를 다시 설치합니다.

6.4.3.3. KafkaConnector 리소스 배포

KafkaConnector 리소스를 배포하여 커넥터를 관리합니다. KafkaConnector 사용자 정의 리소스는 Cluster Operator의 커넥터 관리에 OpenShift 네이티브 접근 방식을 제공합니다. Kafka Connect REST API와 마찬가지로 커넥터를 관리하기 위해 HTTP 요청을 보낼 필요가 없습니다. 해당 KafkaConnector 리소스를 업데이트한 다음 업데이트를 적용하여 실행 중인 커넥터 인스턴스를 관리합니다. Cluster Operator는 실행 중인 커넥터 인스턴스의 구성을 업데이트합니다. 해당 KafkaConnector 를 삭제하여 커넥터를 제거합니다.

KafkaConnector 리소스를 연결하는 Kafka Connect 클러스터와 동일한 네임스페이스에 배포해야 합니다.

이 절차에 표시된 구성에서 autoRestart 속성은 true 로 설정됩니다. 이를 통해 실패한 커넥터 및 작업을 자동으로 다시 시작할 수 있습니다. 재시작 횟수는 최대 7개까지 수행되며, 이 후에는 수동으로 다시 시작해야 합니다. KafkaConnector 리소스에 커넥터 를 다시 시작하거나 커넥터 작업을 수동으로 다시 시작하도록 주석을 답니다.

커넥터 예

자체 커넥터를 사용하거나 AMQ Streams에서 제공하는 예제를 시도할 수 있습니다. Apache Kafka 3.1.0까지 예제 파일 커넥터 플러그인이 Apache Kafka에 포함되었습니다. Apache Kafka의 3.1.1 및 3.2.0 릴리스에서 시작하여 다른 커넥터와 함께 예제를 플러그인 경로에 추가해야 합니다.

AMQ Streams는 예제 파일 커넥터 플러그인에 대한 KafkaConnector 구성 파일 (examples/connect/source-connector.yaml) 예를 제공합니다. 이 파일은 다음 커넥터 인스턴스를 KafkaConnector 리소스로 생성합니다.

  • Kafka 라이센스 파일(소스)에서 각 행을 읽고 해당 데이터를 단일 Kafka 항목에 메시지로 기록하는 instance입니다.
  • Kafka 주제에서 메시지를 읽고 메시지를 임시 파일(스케크)에 쓰는 instance입니다.

이 절차에서는 예제 파일을 사용하여 커넥터를 생성합니다.

참고

예제 커넥터는 프로덕션 환경에서 사용하기 위한 것이 아닙니다.

사전 요구 사항

  • Kafka Connect 배포
  • Cluster Operator가 실행 중입니다.

절차

  1. 다음 방법 중 하나로 Kafka Connect에 ScanSetting SourceConnector 및 ScanSettingSink Connector 플러그인을 추가합니다.

  2. Kafka Connect 구성에서 strimzi.io/use-connector-resources 주석을 true 로 설정합니다.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
        # ...

    KafkaConnector 리소스를 활성화하면 Cluster Operator에서 해당 리소스를 감시합니다.

  3. examples/connect/source-connector.yaml 파일을 편집합니다.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector 
    1
    
      labels:
        strimzi.io/cluster: my-connect-cluster 
    2
    
    spec:
      class: org.apache.kafka.connect.file.FileStreamSourceConnector 
    3
    
      tasksMax: 2 
    4
    
      autoRestart: 
    5
    
        enabled: true
      config: 
    6
    
        file: "/opt/kafka/LICENSE" 
    7
    
        topic: my-topic 
    8
    
        # ...
    1
    커넥터의 이름으로 사용되는 KafkaConnector 리소스의 이름입니다. OpenShift 리소스에 유효한 이름을 사용합니다.
    2
    Kafka Connect 클러스터의 이름이 에서 커넥터 인스턴스를 생성합니다. 커넥터를 연결하는 Kafka Connect 클러스터와 동일한 네임스페이스에 배포해야 합니다.
    3
    커넥터 클래스의 전체 이름 또는 별칭입니다. Kafka Connect 클러스터에서 사용하는 이미지에 있어야 합니다.
    4
    커넥터가 생성할 수 있는 최대 Kafka Connect 작업 수입니다.
    5
    실패한 커넥터 및 작업의 자동 재시작을 활성화합니다.
    6
    커넥터 구성은 키-값 쌍으로 설정됩니다.
    7
    이 예제 소스 커넥터 구성은 /opt/kafka/LICENSE 파일에서 데이터를 읽습니다.
    8
    소스 데이터를 게시하는 Kafka 주제입니다.
  4. OpenShift 클러스터에서 소스 KafkaConnector 를 생성합니다.

    oc apply -f examples/connect/source-connector.yaml
  5. examples/connect/sink-connector.yaml 파일을 생성합니다.

    touch examples/connect/sink-connector.yaml
  6. 다음 YAML을 sink-connector.yaml 파일에 붙여넣습니다.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-sink-connector
      labels:
        strimzi.io/cluster: my-connect
    spec:
      class: org.apache.kafka.connect.file.FileStreamSinkConnector 
    1
    
      tasksMax: 2
      config: 
    2
    
        file: "/tmp/my-file" 
    3
    
        topics: my-topic 
    4
    1
    커넥터 클래스의 전체 이름 또는 별칭입니다. Kafka Connect 클러스터에서 사용하는 이미지에 있어야 합니다.
    2
    커넥터 구성은 키-값 쌍으로 설정됩니다.
    3
    소스 데이터를 게시하는 임시 파일입니다.
    4
    Kafka 주제에서 소스 데이터를 읽습니다.
  7. OpenShift 클러스터에 싱크 KafkaConnector 를 생성합니다.

    oc apply -f examples/connect/sink-connector.yaml
  8. 커넥터 리소스가 생성되었는지 확인합니다.

    oc get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name
    
    my-source-connector
    my-sink-connector

    <my_connect_cluster>를 Kafka Connect 클러스터 이름으로 바꿉니다.

  9. 컨테이너에서 kafka-console-consumer.sh 를 실행하여 소스 커넥터가 항목에 기록된 메시지를 읽습니다.

    oc exec <my_kafka_cluster>-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning

    <my_kafka_cluster>를 Kafka 클러스터 이름으로 바꿉니다.

소스 및 싱크 커넥터 구성 옵션

커넥터 구성은 KafkaConnector 리소스의 spec.config 속성에 정의되어 있습니다.

ECDHE SourceConnectorECDHESinkConnector 클래스는 Kafka Connect REST API와 동일한 구성 옵션을 지원합니다. 기타 커넥터는 다양한 구성 옵션을 지원합니다.

Expand
표 6.1. 10.0.0.1 Source 커넥터 클래스에 대한 구성 옵션
이름유형기본값설명

file

문자열

null

메시지를 작성할 소스 파일입니다. 지정하지 않으면 표준 입력이 사용됩니다.

주제

list

null

데이터를 게시하는 Kafka 주제입니다.

Expand
표 6.2. ScanSetting SinkConnector 클래스에 대한 구성 옵션
이름유형기본값설명

file

문자열

null

메시지를 작성할 대상 파일입니다. 지정하지 않으면 표준 출력이 사용됩니다.

주제

list

null

데이터를 읽을 하나 이상의 Kafka 주제입니다.

topics.regex

문자열

null

데이터를 읽을 하나 이상의 Kafka 주제와 일치하는 정규식입니다.

6.4.3.4. 커넥터 수동 다시 시작

KafkaConnector 리소스를 사용하여 커넥터를 관리하는 경우 restart 주석을 사용하여 커넥터 재시작을 수동으로 트리거합니다.

사전 요구 사항

  • Cluster Operator가 실행 중입니다.

절차

  1. 재시작하려는 Kafka 커넥터를 제어하는 KafkaConnector 사용자 정의 리소스의 이름을 찾습니다.

    oc get KafkaConnector
  2. OpenShift에서 KafkaConnector 리소스에 주석을 달아 커넥터를 다시 시작합니다.

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart=true

    restart 주석은 true 로 설정됩니다.

  3. 다음 조정이 수행될 때까지 기다립니다(기본적으로 2분 마다).

    조정 프로세스에서 주석이 감지된 한 Kafka 커넥터가 재시작됩니다. Kafka Connect에서 재시작 요청을 수락하면 주석이 KafkaConnector 사용자 정의 리소스에서 제거됩니다.

6.4.3.5. Kafka 커넥터 작업 수동 다시 시작

KafkaConnector 리소스를 사용하여 커넥터를 관리하는 경우 restart-task 주석을 사용하여 커넥터 작업 재시작을 수동으로 트리거합니다.

사전 요구 사항

  • Cluster Operator가 실행 중입니다.

절차

  1. 재시작하려는 Kafka 커넥터 작업을 제어하는 KafkaConnector 사용자 정의 리소스의 이름을 찾습니다.

    oc get KafkaConnector
  2. KafkaConnector 사용자 정의 리소스에서 재시작할 작업의 ID를 찾습니다. 작업 ID는 0부터 시작하여 음수가 아닌 정수입니다.

    oc describe KafkaConnector <kafka_connector_name>
  3. OpenShift에서 KafkaConnector 리소스에 주석을 달아 ID를 사용하여 커넥터 작업을 다시 시작합니다.

    oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task=0

    이 예에서는 작업 0 이 다시 시작됩니다.

  4. 다음 조정이 수행될 때까지 기다립니다(기본적으로 2분 마다).

    조정 프로세스에서 주석을 탐지한 경우 Kafka 커넥터 작업이 재시작됩니다. Kafka Connect에서 재시작 요청을 수락하면 주석이 KafkaConnector 사용자 정의 리소스에서 제거됩니다.

6.4.3.6. Kafka Connect API 노출

KafkaConnector 리소스를 사용하여 커넥터를 관리하는 대신 Kafka Connect REST API를 사용합니다. Kafka Connect REST API는 < connect_cluster_name> -connect-api:8083 에서 실행되는 서비스로 사용할 수 있습니다. 여기서 < connect_cluster_name >은 Kafka Connect 클러스터의 이름입니다. 이 서비스는 Kafka Connect 인스턴스를 생성할 때 생성됩니다.

Kafka Connect REST API에서 지원하는 작업은 Apache Kafka Connect API 설명서에 설명되어 있습니다.

참고

strimzi.io/use-connector-resources 주석은 KafkaConnectors를 활성화합니다. KafkaConnect 리소스 구성에 주석을 적용한 경우 Kafka Connect API를 사용하려면 해당 주석을 제거해야 합니다. 그렇지 않으면 Kafka Connect REST API를 직접 사용하여 수행한 수동 변경 사항을 Cluster Operator에 의해 되돌립니다.

커넥터 구성을 JSON 오브젝트로 추가할 수 있습니다.

커넥터 구성 추가에 대한 curl 요청의 예

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
    }
}'

API는 OpenShift 클러스터 내에서만 액세스할 수 있습니다. OpenShift 클러스터 외부에서 실행되는 애플리케이션에서 Kafka Connect API에 액세스하도록 하려면 다음 기능 중 하나를 생성하여 수동으로 노출할 수 있습니다.

  • LoadBalancer 또는 NodePort 유형 서비스
  • Ingress 리소스(Kubernetes만 해당)
  • OpenShift 경로(OpenShift만 해당)
참고

연결이 안전하지 않으므로 외부 액세스를 권장합니다.

서비스를 생성하기로 결정하는 경우 < connect_cluster_name> -connect-api 서비스의 선택기 에서 레이블을 사용하여 서비스에서 트래픽을 라우팅할 Pod를 구성합니다.

서비스에 대한 선택기 구성

# ...
selector:
  strimzi.io/cluster: my-connect-cluster 
1

  strimzi.io/kind: KafkaConnect
  strimzi.io/name: my-connect-cluster-connect 
2

#...

1
OpenShift 클러스터에서 Kafka Connect 사용자 정의 리소스의 이름입니다.
2
Cluster Operator가 생성한 Kafka Connect 배포의 이름입니다.

외부 클라이언트의 HTTP 요청을 허용하는 NetworkPolicy 도 생성해야 합니다.

Kafka Connect API에 대한 요청을 허용하는 NetworkPolicy의 예

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: my-custom-connect-network-policy
spec:
  ingress:
  - from:
    - podSelector: 
1

        matchLabels:
          app: my-connector-manager
    ports:
    - port: 8083
      protocol: TCP
  podSelector:
    matchLabels:
      strimzi.io/cluster: my-connect-cluster
      strimzi.io/kind: KafkaConnect
      strimzi.io/name: my-connect-cluster-connect
  policyTypes:
  - Ingress

1
API에 연결할 수 있는 Pod의 레이블입니다.

클러스터 외부에 커넥터 구성을 추가하려면 curl 명령에서 API를 노출하는 리소스의 URL을 사용합니다.

6.4.3.7. Kafka Connect API에 대한 액세스 제한

인증되지 않은 작업 및 잠재적 보안 문제를 방지하기 위해 Kafka Connect API에 대한 액세스를 신뢰할 수 있는 사용자에게만 제한하는 것이 중요합니다. Kafka Connect API는 커넥터 구성을 변경하기 위한 광범위한 기능을 제공하므로 보안 예방 조치를 취하는 것이 더 중요합니다. Kafka Connect API에 액세스할 수 있는 사용자는 관리자가 안전한 것으로 가정할 수 있는 중요한 정보를 얻을 수 있습니다.

Kafka Connect REST API는 OpenShift 클러스터에 대한 인증 액세스 권한이 있고 호스트 이름/IP 주소 및 포트 번호를 포함하는 끝점 URL을 알고 있는 모든 사용자가 액세스할 수 있습니다.

예를 들어 조직이 Kafka Connect 클러스터 및 커넥터를 사용하여 중요한 데이터를 고객 데이터베이스에서 중앙 데이터베이스로 스트리밍한다고 가정합니다. 관리자는 구성 공급자 플러그인을 사용하여 고객 데이터베이스 및 중앙 데이터베이스 연결(예: 데이터베이스 연결 세부 정보 및 인증 자격 증명)과 관련된 중요한 정보를 저장합니다. 구성 공급자는 이 민감한 정보가 인증되지 않은 사용자에게 노출되는 것을 보호합니다. 그러나 Kafka Connect API에 액세스할 수 있는 사용자는 관리자의 동의 없이 고객 데이터베이스에 계속 액세스할 수 있습니다. 이는 페이크 데이터베이스를 설정하고 연결하도록 커넥터를 구성하여 수행할 수 있습니다. 그런 다음 고객 데이터베이스를 가리키도록 커넥터 구성을 수정하지만 데이터를 중앙 데이터베이스로 보내는 대신 페이크 데이터베이스로 전송합니다. 페이크 데이터베이스에 연결하도록 커넥터를 구성하면 구성 공급자에 안전하게 저장된 경우에도 고객 데이터베이스에 연결하는 데 필요한 로그인 세부 정보와 인증 정보가 가로채집니다.

KafkaConnector 사용자 정의 리소스를 사용하는 경우 기본적으로 OpenShift RBAC 규칙에서는 OpenShift 클러스터 관리자만 커넥터를 변경할 수 있습니다. AMQ Streams 리소스를 관리하기 위해 비 클러스터 관리자를 지정할 수도 있습니다. Kafka Connect 구성에서 KafkaConnector 리소스를 활성화하면 Kafka Connect REST API를 사용하여 직접 변경한 사항이 Cluster Operator에 의해 되돌아갑니다. KafkaConnector 리소스를 사용하지 않는 경우 기본 RBAC 규칙은 Kafka Connect API에 대한 액세스를 제한하지 않습니다. OpenShift RBAC를 사용하여 Kafka Connect REST API에 대한 직접 액세스를 제한하려면 KafkaConnector 리소스를 활성화하고 사용해야 합니다.

보안을 개선하기 위해 Kafka Connect API에 대한 다음 속성을 구성하는 것이 좋습니다.

org.apache.kafka.disallowed.login.modules

(Kafka 3.4 이상) org.apache.kafka.disallowed.login.modules Java 시스템 속성을 설정하여 비보안 로그인 모듈을 사용하지 않도록 합니다. 예를 들어 com.sun.security.auth.module.JndiLoginModule 을 지정하면 Kafka JndiLoginModule 이 사용되지 않습니다.

로그인 모듈을 허용하지 않는 설정 예

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  jvmOptions:
    javaSystemProperties:
      - name: org.apache.kafka.disallowed.login.modules
        value: com.sun.security.auth.module.JndiLoginModule, org.apache.kafka.common.security.kerberos.KerberosLoginModule
# ...

신뢰할 수 있는 로그인 모듈만 허용하고 사용하는 버전에 대해 Kafka의 최신 조언을 따릅니다. 가장 좋은 방법은 org.apache.kafka.disallowed.login.modules 시스템 속성을 사용하여 Kafka Connect 구성에서 비보안 로그인 모듈을 명시적으로 허용하지 않아야 합니다.

connector.client.config.override.policy

커넥터 구성이 Kafka Connect 구성 및 사용하는 소비자 및 생산자를 덮어쓰지 않도록 connector.client.config.override.policy 속성을 None 으로 설정합니다.

커넥터 덮어쓰기 정책 지정 구성 예

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  config:
    connector.client.config.override.policy: None
# ...

Kafka Connect API를 사용하여 KafkaConnector 사용자 지정 리소스를 사용하여 커넥터를 관리할 수 있습니다. 전환하려면 표시된 순서대로 다음을 수행합니다.

  1. 구성과 함께 KafkaConnector 리소스를 배포하여 커넥터 인스턴스를 생성합니다.
  2. strimzi.io/use-connector-resources 주석을 true 로 설정하여 Kafka Connect 구성에서 KafkaConnector 리소스를 활성화합니다.
주의

만들기 전에 KafkaConnector 리소스를 활성화하면 모든 커넥터를 삭제합니다.

KafkaConnector 리소스를 Kafka Connect API를 사용하도록 전환하려면 먼저 Kafka Connect 구성에서 KafkaConnector 리소스를 활성화하는 주석을 제거합니다. 그렇지 않으면 Kafka Connect REST API를 직접 사용하여 수행한 수동 변경 사항을 Cluster Operator에 의해 되돌립니다.

전환할 때 KafkaConnect 리소스의 상태를 확인합니다. metadata.generation 값(현재 배포 버전)은 status.observedGeneration (리소스의 최신 조정)과 일치해야 합니다. Kafka Connect 클러스터가 준비 상태이면 KafkaConnector 리소스를 삭제할 수 있습니다.

6.4.4. Kafka Connect 클러스터 리소스 목록

다음 리소스는 OpenShift 클러스터의 Cluster Operator에 의해 생성됩니다.

connect-cluster-name-connect

다음 Kafka Connect 리소스에 지정된 이름입니다.

  • Kafka Connect 작업자 노드 Pod를 생성하는 배포( StableConnectIdentities 기능 게이트가 비활성화된 경우).
  • Kafka Connect 작업자 노드 Pod를 생성하는 StrimziPodSet( StableConnectIdentities 기능 게이트가 활성화된 경우)
  • Connect Pod에 안정적인 DNS 이름을 제공하는 헤드리스 서비스( StableConnectIdentities 기능 게이트가 활성화된 경우).
  • Kafka Connect 작업자 노드에 대해 구성된 Pod 중단 예산입니다.
connect-cluster-name-connect-idx
Kafka Connect StrimziPodSet에서 생성한 Pod( StableConnectIdentities 기능 게이트가 활성화된 경우)
connect-cluster-name-connect-api
Kafka Connect 클러스터 관리를 위한 REST 인터페이스를 노출하는 서비스입니다.
connect-cluster-name-config
Kafka Connect ancillary 구성이 포함되어 Kafka 브로커 Pod를 통해 볼륨으로 마운트되는 ConfigMap입니다.
Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat
맨 위로 이동