6.5. Kafka Connect 커넥터 추가
Kafka Connect는 커넥터를 사용하여 데이터를 스트리밍하기 위해 다른 시스템과 통합합니다. 커넥터는 Kafka Connector 클래스의 인스턴스이며 다음 유형 중 하나일 수 있습니다.
- 소스 커넥터
- 소스 커넥터는 외부 시스템에서 데이터를 가져와서 Kafka에 메시지로 제공하는 런타임 엔티티입니다.
- 싱크 커넥터
- 싱크 커넥터는 Kafka 주제에서 메시지를 가져와서 외부 시스템에 제공하는 런타임 엔티티입니다.
Kafka Connect는 플러그인 아키텍처를 사용하여 커넥터에 구현 아티팩트를 제공합니다. 플러그인을 사용하면 다른 시스템에 대한 연결을 허용하고 데이터를 조작하기 위한 추가 구성을 제공합니다. 플러그인에는 커넥터 및 데이터 변환기 및 변환과 같은 기타 구성 요소가 포함됩니다. 커넥터는 특정 유형의 외부 시스템에서 작동합니다. 각 커넥터는 구성에 사용할 스키마를 정의합니다. Kafka Connect에 구성을 제공하여 Kafka Connect 내에서 커넥터 인스턴스를 생성합니다. 그런 다음 Connector 인스턴스는 시스템 간에 데이터를 이동하기 위한 일련의 작업을 정의합니다.
다음 방법 중 하나로 Kafka Connect에 커넥터 플러그인을 추가합니다.
컨테이너 이미지에 플러그인을 추가한 후 다음과 같은 방법으로 커넥터 인스턴스를 시작, 중지 및 관리할 수 있습니다.
이러한 옵션을 사용하여 새 커넥터 인스턴스를 생성할 수도 있습니다.
6.5.1. 커넥터 플러그인을 사용하여 새 컨테이너 이미지 빌드 링크 복사링크가 클립보드에 복사되었습니다!
AMQ Streams가 추가 커넥터를 사용하여 새 컨테이너 이미지를 자동으로 빌드하도록 Kafka Connect를 구성합니다. KafkaConnect 사용자 정의 리소스의 .spec.build.plugins 속성을 사용하여 커넥터 플러그인을 정의합니다. AMQ Streams는 커넥터 플러그인을 자동으로 다운로드하여 새 컨테이너 이미지에 추가합니다. 컨테이너는 .spec.build.output 에 지정된 컨테이너 리포지토리로 푸시되고 Kafka Connect 배포에서 자동으로 사용됩니다.
사전 요구 사항
- Cluster Operator를 배포해야 합니다.
- 컨테이너 레지스트리.
이미지를 푸시, 저장 및 가져올 수 있는 자체 컨테이너 레지스트리를 제공해야 합니다. AMQ Streams는 Quay 또는 Docker Hub 와 같은 공용 레지스트리뿐만 아니라 개인 컨테이너 레지스트리를 지원합니다.
프로세스
.spec.build.output에 컨테이너 레지스트리를 지정하고.spec.build.plugins: 추가 커넥터를 지정하여KafkaConnect사용자 지정 리소스를 구성합니다.Copy to Clipboard Copied! Toggle word wrap Toggle overflow 리소스를 생성하거나 업데이트합니다.
oc apply -f <kafka_connect_configuration_file>
$ oc apply -f <kafka_connect_configuration_file>Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 새 컨테이너 이미지가 빌드되고 Kafka Connect 클러스터가 배포될 때까지 기다립니다.
-
추가한 커넥터 플러그인을 사용하려면 Kafka Connect REST API 또는
KafkaConnector사용자 정의 리소스를 사용합니다.
6.5.2. Kafka Connect 기본 이미지의 커넥터 플러그인을 사용하여 새 컨테이너 이미지 빌드 링크 복사링크가 클립보드에 복사되었습니다!
Kafka Connect 기본 이미지의 커넥터 플러그인을 사용하여 사용자 지정 Docker 이미지를 생성합니다. 사용자 지정 이미지를 /opt/kafka/plugins 디렉터리에 추가합니다.
Red Hat Ecosystem Catalog 에서 Kafka 컨테이너 이미지를 추가 커넥터 플러그인으로 자체 사용자 정의 이미지를 생성하기 위한 기본 이미지로 사용할 수 있습니다.
시작 시 Kafka Connect의 AMQ Streams 버전은 /opt/kafka/plugins 디렉터리에 포함된 타사 커넥터 플러그인을 로드합니다.
사전 요구 사항
프로세스
registry.redhat.io/amq-streams/kafka-37-rhel8:2.7.0을 기본 이미지로 사용하여 새Dockerfile을 생성합니다.FROM registry.redhat.io/amq-streams/kafka-37-rhel8:2.7.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
FROM registry.redhat.io/amq-streams/kafka-37-rhel8:2.7.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001Copy to Clipboard Copied! Toggle word wrap Toggle overflow 플러그인 파일 예
Copy to Clipboard Copied! Toggle word wrap Toggle overflow COPY 명령은 컨테이너 이미지에 복사할 플러그인 파일을 가리킵니다.
이 예제에서는 Debezium 커넥터(MongoDB, MySQL 및 PostgreSQL)용 플러그인을 추가하지만 모든 파일이 간결하게 나열되지는 않습니다. Kafka Connect에서 실행되는 Debezium은 다른 Kafka Connect 작업과 동일합니다.
- 컨테이너 이미지를 빌드합니다.
- 사용자 정의 이미지를 컨테이너 레지스트리로 내보냅니다.
새 컨테이너 이미지를 가리킵니다.
다음 방법 중 하나로 이미지를 가리킬 수 있습니다.
KafkaConnect사용자 정의 리소스의KafkaConnect.spec.image속성을 편집합니다.설정된 경우 이 속성은 Cluster Operator의
STRIMZI_KAFKA_CONNECT_IMAGES환경 변수를 재정의합니다.Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml파일에서STRIMZI_KAFKA_CONNECT_IMAGES환경 변수를 편집한 다음 Cluster Operator를 다시 설치합니다.
6.5.3. KafkaConnector 리소스 배포 링크 복사링크가 클립보드에 복사되었습니다!
KafkaConnector 리소스를 배포하여 커넥터를 관리합니다. KafkaConnector 사용자 정의 리소스는 Cluster Operator의 커넥터 관리에 대한 OpenShift 네이티브 접근 방식을 제공합니다. Kafka Connect REST API와 마찬가지로 커넥터를 관리하기 위해 HTTP 요청을 보낼 필요가 없습니다. 해당 KafkaConnector 리소스를 업데이트한 다음 업데이트를 적용하여 실행 중인 커넥터 인스턴스를 관리합니다. Cluster Operator는 실행 중인 커넥터 인스턴스의 구성을 업데이트합니다. 해당 KafkaConnector 를 삭제하여 커넥터를 제거합니다.
KafkaConnector 리소스는 연결된 Kafka Connect 클러스터와 동일한 네임스페이스에 배포해야 합니다.
이 절차에 표시된 구성에서 실패한 커넥터 및 작업을 자동으로 다시 시작하기 위해 autoRestart 기능이 활성화됨(enabled: true)입니다. KafkaConnector 리소스에 주석을 추가하여 커넥터 를 다시 시작하거나 커넥터 작업을 수동으로 다시 시작할 수도 있습니다.
커넥터 예
자체 커넥터를 사용하거나 AMQ Streams에서 제공하는 예제를 시도할 수 있습니다. Apache Kafka 3.1.0까지 파일 커넥터 플러그인 예제가 Apache Kafka에 포함되었습니다. Apache Kafka의 3.1.1 및 3.2.0 릴리스부터는 다른 커넥터로 플러그인 경로에 예제를 추가해야 합니다.
AMQ Streams는 예제 파일 커넥터 플러그인에 대한 KafkaConnector 구성 파일 (examples/connect/source-connector.yaml) 예를 제공합니다. 이 파일은 다음 커넥터 인스턴스를 KafkaConnector 리소스로 생성합니다.
-
Kafka 라이센스 파일(소스)에서 각 행을 읽고 데이터를 하나의 Kafka 항목에 메시지로 쓰는
FileStreamSourceConnector인스턴스입니다. -
Kafka 주제에서 메시지를 읽고 메시지를 임시 파일( sink)에 쓰는
FileStreamSinkConnector인스턴스입니다.
이 절차에서는 예제 파일을 사용하여 커넥터를 만듭니다.
예제 커넥터는 프로덕션 환경에서 사용하기 위한 것이 아닙니다.
사전 요구 사항
- Kafka Connect 배포
- Cluster Operator가 실행 중입니다
프로세스
다음 방법 중 하나로 Kafka Connect에
FileStreamSourceConnector및FileStreamSinkConnector플러그인을 추가합니다.Kafka Connect 구성에서
strimzi.io/use-connector-resources 주석을true로 설정합니다.Copy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnector리소스가 활성화된 상태에서 Cluster Operator는 이를 감시합니다.examples/connect/source-connector.yaml파일을 편집합니다.KafkaConnector 소스 커넥터 구성 예
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- 커넥터의 이름으로 사용되는
KafkaConnector리소스의 이름입니다. OpenShift 리소스에 유효한 모든 이름을 사용합니다. - 2
- 커넥터 인스턴스를 생성하는 Kafka Connect 클러스터의 이름입니다. 커넥터는 연결된 Kafka Connect 클러스터와 동일한 네임스페이스에 배포해야 합니다.
- 3
- 커넥터 클래스의 전체 이름입니다. Kafka Connect 클러스터에서 사용하는 이미지에 있어야 합니다.
- 4
- 커넥터가 생성할 수 있는 최대 Kafka Connect 작업 수입니다.
- 5
- 실패한 커넥터 및 작업을 자동으로 다시 시작할 수 있습니다. 기본적으로 재시작 수는 indefinite이지만
maxRestarts속성을 사용하여 자동 재시작 횟수에 대해 최대값을 설정할 수 있습니다. - 6
- 연결을 키 -값 쌍으로 연결합니다.
- 7
- 외부 데이터 파일의 위치입니다. 이 예제에서는
/opt/kafka/LICENSE파일에서 읽을 수 있도록FileStreamSourceConnector를 구성하고 있습니다. - 8
- 소스 데이터를 게시하는 Kafka 주제입니다.
OpenShift 클러스터에서 소스
KafkaConnector를 생성합니다.oc apply -f examples/connect/source-connector.yaml
oc apply -f examples/connect/source-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow examples/connect/sink-connector.yaml파일을 생성합니다.touch examples/connect/sink-connector.yaml
touch examples/connect/sink-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 다음 YAML을
sink-connector.yaml파일에 붙여넣습니다.Copy to Clipboard Copied! Toggle word wrap Toggle overflow OpenShift 클러스터에서 싱크
KafkaConnector를 생성합니다.oc apply -f examples/connect/sink-connector.yaml
oc apply -f examples/connect/sink-connector.yamlCopy to Clipboard Copied! Toggle word wrap Toggle overflow 커넥터 리소스가 생성되었는지 확인합니다.
oc get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name my-source-connector my-sink-connector
oc get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name my-source-connector my-sink-connectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow <my_connect_cluster>를 Kafka Connect 클러스터 이름으로 바꿉니다.
컨테이너에서
kafka-console-consumer.sh를 실행하여 소스 커넥터가 해당 항목에 기록된 메시지를 읽습니다.oc exec <my_kafka_cluster>-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginning
oc exec <my_kafka_cluster>-kafka-0 -i -t -- bin/kafka-console-consumer.sh --bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 --topic my-topic --from-beginningCopy to Clipboard Copied! Toggle word wrap Toggle overflow <my_kafka_cluster>를 Kafka 클러스터 이름으로 바꿉니다.
소스 및 싱크 커넥터 구성 옵션
커넥터 구성은 KafkaConnector 리소스의 spec.config 속성에 정의됩니다.
FileStreamSourceConnector 및 FileStreamSinkConnector 클래스는 Kafka Connect REST API와 동일한 구성 옵션을 지원합니다. 기타 커넥터는 다양한 구성 옵션을 지원합니다.
| 이름 | 유형 | 기본값 | 설명 |
|---|---|---|---|
|
| 문자열 | null | 메시지를 작성할 소스 파일입니다. 지정하지 않으면 표준 입력이 사용됩니다. |
|
| list | null | 데이터를 게시하는 Kafka 주제입니다. |
| 이름 | 유형 | 기본값 | 설명 |
|---|---|---|---|
|
| 문자열 | null | 메시지를 쓸 대상 파일입니다. 지정하지 않으면 표준 출력이 사용됩니다. |
|
| list | null | 데이터를 읽을 하나 이상의 Kafka 주제입니다. |
|
| 문자열 | null | 데이터를 읽을 하나 이상의 Kafka 주제와 일치하는 정규식입니다. |
6.5.4. Kafka Connect API 노출 링크 복사링크가 클립보드에 복사되었습니다!
Kafka Connect REST API를 KafkaConnector 리소스를 사용하여 커넥터를 관리하는 대신 사용합니다. Kafka Connect REST API는 < connect_cluster_name> -connect-api:8083 에서 실행되는 서비스로 사용할 수 있습니다. 여기서 < connect_cluster_name >은 Kafka Connect 클러스터의 이름입니다. 이 서비스는 Kafka Connect 인스턴스를 생성할 때 생성됩니다.
Kafka Connect REST API에서 지원하는 작업은 Apache Kafka Connect API 설명서에 설명되어 있습니다.
strimzi.io/use-connector-resources 주석은 KafkaConnectors를 활성화합니다. KafkaConnect 리소스 구성에 주석을 적용한 경우 Kafka Connect API를 사용하도록 제거해야 합니다. 그러지 않으면 Kafka Connect REST API를 사용하여 직접 변경한 수동 변경 사항은 Cluster Operator에서 되돌립니다.
커넥터 구성을 JSON 오브젝트로 추가할 수 있습니다.
커넥터 구성을 추가하기 위한 curl 요청 예
API는 OpenShift 클러스터 내에서만 액세스할 수 있습니다. OpenShift 클러스터 외부에서 실행되는 애플리케이션에 Kafka Connect API를 액세스하려면 다음 기능 중 하나를 생성하여 수동으로 노출할 수 있습니다.
-
LoadBalancer또는NodePort유형 서비스 -
Ingress리소스(Kubernetes만 해당) - OpenShift 경로(OpenShift만 해당)
연결이 안전하지 않으므로 외부 액세스를 권장합니다.
서비스를 생성하려면 < connect_cluster_name> -connect-api 서비스의 선택기 에서 레이블을 사용하여 서비스가 트래픽을 라우팅할 Pod를 구성합니다.
서비스의 선택기 구성
외부 클라이언트의 HTTP 요청을 허용하는 NetworkPolicy 도 생성해야 합니다.
Kafka Connect API에 대한 요청을 허용하는 NetworkPolicy의 예
- 1
- API에 연결할 수 있는 Pod의 레이블입니다.
클러스터 외부에 커넥터 구성을 추가하려면 curl 명령에 API를 노출하는 리소스의 URL을 사용합니다.
6.5.5. Kafka Connect API에 대한 액세스 제한 링크 복사링크가 클립보드에 복사되었습니다!
무단 조치 및 잠재적인 보안 문제를 방지하기 위해 Kafka Connect API에 대한 액세스를 신뢰할 수 있는 사용자에게만 제한하는 것이 중요합니다. Kafka Connect API는 커넥터 구성을 변경하기 위한 광범위한 기능을 제공하므로 보안 예방 조치를 취하는 것이 더 중요합니다. Kafka Connect API에 대한 액세스 권한이 있는 사용자는 관리자가 안전한 것으로 가정할 수 있는 중요한 정보를 얻을 수 있습니다.
Kafka Connect REST API는 OpenShift 클러스터에 대한 액세스 권한이 있고 호스트 이름/IP 주소 및 포트 번호를 포함하는 엔드포인트 URL을 알고 있는 모든 사용자가 액세스할 수 있습니다.
예를 들어 조직이 Kafka Connect 클러스터 및 커넥터를 사용하여 고객 데이터베이스에서 중앙 데이터베이스로 중요한 데이터를 스트리밍한다고 가정합니다. 관리자는 구성 공급자 플러그인을 사용하여 고객 데이터베이스 및 데이터베이스 연결 세부 정보 및 인증 자격 증명과 같은 중앙 데이터베이스를 연결하는 것과 관련된 중요한 정보를 저장합니다. 구성 공급자는 이러한 민감한 정보가 권한이 없는 사용자에게 노출되지 않도록 보호합니다. 그러나 Kafka Connect API에 액세스할 수 있는 사용자는 관리자의 동의 없이 고객 데이터베이스에 대한 액세스 권한을 계속 받을 수 있습니다. 페이크 데이터베이스를 설정하고 연결하도록 커넥터를 구성하여 이 작업을 수행할 수 있습니다. 그런 다음 고객 데이터베이스를 가리키도록 커넥터 구성을 수정하지만 데이터를 중앙 데이터베이스로 전송하는 대신 페이크 데이터베이스로 보냅니다. 페이크 데이터베이스에 연결하도록 커넥터를 구성하면 구성 공급자에 안전하게 저장되더라도 고객 데이터베이스에 연결하기 위한 로그인 세부 정보와 인증 정보가 가로채어집니다.
KafkaConnector 사용자 지정 리소스를 사용하는 경우 기본적으로 OpenShift RBAC 규칙에 따라 OpenShift 클러스터 관리자만 커넥터를 변경할 수 있습니다. 비 클러스터 관리자를 지정하여 AMQ Streams 리소스를 관리할 수도 있습니다. Kafka Connect 구성에서 KafkaConnector 리소스를 활성화하면 Cluster Operator에서 Kafka Connect REST API를 사용하여 직접 변경한 내용을 되돌립니다. KafkaConnector 리소스를 사용하지 않는 경우 기본 RBAC 규칙에서 Kafka Connect API에 대한 액세스를 제한하지 않습니다. OpenShift RBAC를 사용하여 Kafka Connect REST API에 대한 직접 액세스를 제한하려면 KafkaConnector 리소스를 활성화하고 사용해야 합니다.
보안을 강화하려면 Kafka Connect API에 대해 다음 속성을 구성하는 것이 좋습니다.
org.apache.kafka.disallowed.login.modules(Kafka 3.4 이상) 비보안 로그인 모듈을 사용하지 않도록
org.apache.kafka.disallowed.login.modulesJava 시스템 속성을 설정합니다. 예를 들어com.sun.security.auth.module.JndiLoginModule을 지정하면 KafkaJndiLoginModule을 사용할 수 없습니다.로그인 모듈 비활성화를 위한 설정 예
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 신뢰할 수 있는 로그인 모듈만 허용하고 사용 중인 버전에 대해 Kafka의 최신 조언을 따르십시오. 가장 좋은 방법은
org.apache.kafka.disallowed.login.modules시스템 속성을 사용하여 Kafka Connect 구성에서 비보안 로그인 모듈을 명시적으로 허용하지 않아야 합니다.connector.client.config.override.policy커넥터 구성이 Kafka Connect 구성과 사용하는 소비자 및 생산자를 덮어쓰지 않도록
connector.client.config.override.policy속성을None으로 설정합니다.커넥터 덮어쓰기 정책을 지정하는 구성 예
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
6.5.6. Kafka Connect API를 사용하여 KafkaConnector 사용자 정의 리소스 사용으로 전환 링크 복사링크가 클립보드에 복사되었습니다!
Kafka Connect API를 사용하여 KafkaConnector 사용자 정의 리소스를 사용하여 커넥터를 관리할 수 있습니다. 전환하려면 표시된 순서대로 다음을 수행합니다.
-
구성과 함께
KafkaConnector리소스를 배포하여 커넥터 인스턴스를 생성합니다. -
strimzi.io/use-connector-resources주석을true로 설정하여 Kafka Connect 구성에서KafkaConnector리소스를 활성화합니다.
KafkaConnector 리소스를 생성하기 전에 활성화하면 모든 커넥터를 삭제합니다.
KafkaConnector 리소스를 사용하여 Kafka Connect API를 사용하여 전환하려면 먼저 Kafka Connect 구성에서 KafkaConnector 리소스를 활성화하는 주석을 제거합니다. 그러지 않으면 Kafka Connect REST API를 사용하여 직접 변경한 수동 변경 사항은 Cluster Operator에서 되돌립니다.
전환을 수행할 때 KafkaConnect 리소스의 상태를 확인합니다. metadata.generation (현재 배포 버전)의 값은 status.observedGeneration (리소스의 최신 조정)과 일치해야 합니다. Kafka Connect 클러스터가 준비 되면 KafkaConnector 리소스를 삭제할 수 있습니다.