9.6. Kafka Connect 구성
KafkaConnect 사용자 정의 리소스의 spec 속성을 업데이트하여 Kafka Connect 배포를 구성합니다.
Kafka Connect를 사용하여 Kafka 클러스터에 대한 외부 데이터 연결을 설정합니다. KafkaConnect 리소스의 속성을 사용하여 Kafka Connect 배포를 구성합니다.
Kafka Connect 클러스터 구성 옵션을 자세히 이해하려면 AMQ Streams 사용자 정의 리소스 API 참조를 참조하십시오.
KafkaConnector 구성
KafkaConnector 리소스를 사용하면 OpenShift 네이티브 방식으로 Kafka Connect의 커넥터 인스턴스를 생성하고 관리할 수 있습니다.
Kafka Connect 구성에서 strimzi.io/use-connector-resources 주석을 추가하여 Kafka Connect 클러스터에 대한 KafkaConnectors를 활성화합니다. AMQ Streams가 데이터 연결에 필요한 커넥터 플러그인으로 컨테이너 이미지를 자동으로 빌드하도록 빌드 구성을 추가할 수도 있습니다. Kafka Connect 커넥터에 대한 외부 구성은 externalConfiguration 속성을 통해 지정됩니다.
커넥터를 관리하려면 KafkaConnector 사용자 정의 리소스 또는 Kafka Connect REST API를 사용할 수 있습니다. KafkaConnector 리소스는 연결된 Kafka Connect 클러스터와 동일한 네임스페이스에 배포해야 합니다. 이러한 방법을 사용하여 커넥터를 생성, 재구성 또는 삭제하는 방법에 대한 자세한 내용은 커넥터 추가 를 참조하십시오.
커넥터 구성은 HTTP 요청의 일부로 Kafka Connect에 전달되어 Kafka 자체에 저장됩니다. ConfigMaps 및 Secrets는 구성 및 기밀 데이터를 저장하는 데 사용되는 표준 OpenShift 리소스입니다. ConfigMaps 및 Secrets를 사용하여 커넥터의 특정 요소를 구성할 수 있습니다. 그런 다음 필요한 경우 구성을 분리하고 더 안전하게 유지하는 HTTP REST 명령에서 구성 값을 참조할 수 있습니다. 이 방법은 특히 사용자 이름, 암호 또는 인증서와 같은 기밀 데이터에 적용됩니다.
대량의 메시지 처리
많은 양의 메시지를 처리하도록 구성을 조정할 수 있습니다. 자세한 내용은 많은 양의 메시지 처리를 참조하십시오.
KafkaConnect 사용자 정의 리소스 구성 예
- 1
KafkaConnect를 사용합니다.- 2
- Kafka Connect 클러스터의 KafkaConnectors를 활성화합니다.
- 3
- 작업을 실행하는 작업자의 복제본 노드 수입니다.
- 4
- mTLS, 토큰 기반 OAuth, SASL 기반 SCRAM-SHA-256/SCRAM-SHA-512 또는 PLAIN으로 지정된 Kafka Connect 클러스터에 대한 인증입니다. 기본적으로 Kafka Connect는 일반 텍스트 연결을 사용하여 Kafka 브로커에 연결합니다.
- 5
- Kafka 클러스터에 연결하기 위한 부트스트랩 서버입니다.
- 6
- TLS 인증서가 클러스터의 X.509 형식으로 저장되는 키 이름으로 TLS 암호화 인증서가 동일한 시크릿에 저장된 경우 여러 번 나열할 수 있습니다.
- 7
- 작업자의 Kafka Connect 구성(연결이 아님). 표준 Apache Kafka 구성은 AMQ Streams에서 직접 관리하지 않는 속성으로 제한될 수 있습니다.
- 8
- 커넥터 플러그인을 사용하여 컨테이너 이미지를 빌드하기 위한 구성 속성을 자동으로 빌드합니다.
- 9
- (필수) 새 이미지를 내보내는 컨테이너 레지스트리의 구성입니다.
- 10
- (필수) 새 컨테이너 이미지에 추가할 커넥터 플러그인 및 해당 아티팩트 목록입니다. 각 플러그인은 하나 이상의
아티팩트로 구성해야 합니다. - 11
- 다음과 같이 환경 변수를 사용하는 커넥터의 외부 구성 또는 볼륨. 구성 공급자 플러그인을 사용하여 외부 소스에서 구성 값을 로드할 수도 있습니다.
- 12
- 지원되는 리소스(현재
cpu및memory) 예약 요청 및 사용할 수 있는 최대 리소스를 지정합니다. - 13
- 지정된 Kafka Connect 로거 및 로그 수준이 직접(
인라인) 또는 ConfigMap을 통해 간접적으로(외부)됩니다. 사용자 정의 Log4j 구성은 ConfigMap의log4j.properties또는log4j2.properties키 아래에 배치해야 합니다. Kafka Connectlog4j.rootLogger로거의 경우 로그 수준을 INFO, ERROR, WARN, TRACE, DEBUG, FATAL 또는 OFF로 설정할 수 있습니다. - 14
- 컨테이너를 다시 시작할 시기(라이브)와 컨테이너가 트래픽을 허용할 시기(준비)를 확인할 상태 점검입니다.
- 15
- Prometheus 지표: 이 예제에서 Prometheus Cryostat 내보내기에 대한 구성이 포함된 ConfigMap을 참조하여 활성화됩니다.
metricsConfig.valueFrom.configMapKeyRef.key아래에 빈 파일이 포함된 ConfigMap에 대한 참조를 사용하여 추가 구성 없이 메트릭을 활성화할 수 있습니다. - 16
- Kafka Connect를 실행하는 VM(가상 머신)의 성능을 최적화하는 JVM 구성 옵션입니다.
- 17
- ADVANCED OPTION: 특수 상황에서만 권장되는 컨테이너 이미지 구성입니다.
- 18
- SPECIALIZED OPTION: 배포에 대한 Rack 인식 구성입니다. 이는 지역이 아닌 동일한 위치 내의 배포를 위한 특수 옵션입니다. 리더 복제본 대신 커넥터가 가장 가까운 복제본에서 사용할 수 있도록 하려면 이 옵션을 사용합니다. 경우에 따라 가장 가까운 복제본에서 소비하면 네트워크 사용률을 개선하거나 비용을 절감할 수 있습니다.
topologyKey는 랙 ID가 포함된 노드 레이블과 일치해야 합니다. 이 구성에 사용된 예제에서는 표준topology.kubernetes.io/zone레이블을 사용하는 영역을 지정합니다. 가장 가까운 복제본에서 사용하려면 Kafka 브로커 구성에서RackAwareReplicaSelector를 활성화합니다. - 19
- 템플릿 사용자 지정. 여기에서 Pod는 유사성 방지를 사용하여 예약되므로 이름이 동일한 노드에 Pod가 예약되지 않습니다.
- 20
- 환경 변수는 분산 추적에 대해 설정됩니다.
- 21
- OpenTelemetry를 사용하여 분산 추적을 활성화합니다.
9.6.1. 여러 인스턴스에 대한 Kafka Connect 구성 링크 복사링크가 클립보드에 복사되었습니다!
기본적으로 AMQ Streams는 Kafka Connect에서 사용하는 내부 주제의 그룹 ID와 이름을 구성합니다. Kafka Connect의 여러 인스턴스를 실행하는 경우 다음 구성 속성을 사용하여 이러한 기본 설정을 변경해야 합니다.
세 주제의 값은 동일한 group.id 가 있는 모든 인스턴스에 대해 동일해야 합니다.
이러한 기본 설정을 수정하지 않으면 동일한 Kafka 클러스터에 연결하는 각 인스턴스가 동일한 값으로 배포됩니다. 실제로 모든 인스턴스가 클러스터를 형성하고 동일한 내부 주제를 사용합니다.
동일한 내부 주제를 사용하려는 여러 인스턴스가 예기치 않은 오류가 발생하므로 각 인스턴스에 대한 이러한 속성 값을 변경해야 합니다.
9.6.2. Kafka Connect 사용자 권한 부여 구성 링크 복사링크가 클립보드에 복사되었습니다!
Kafka에서 권한을 사용하는 경우 Kafka Connect 사용자는 클러스터 그룹 및 Kafka Connect의 내부 항목에 대한 읽기/쓰기 액세스 권한이 필요합니다. 다음 절차에서는 간단한 권한 부여 및 ACL을 사용하여 액세스 권한을 부여하는 방법을 간략하게 설명합니다.
Kafka Connect 클러스터 그룹 ID 및 내부 주제의 속성은 기본적으로 AMQ Streams에 의해 구성됩니다. 또는 KafkaConnect 리소스의 사양에 명시적으로 정의할 수 있습니다. 이 기능은 그룹 ID 및 주제의 값이 여러 Kafka Connect 인스턴스를 실행할 때 달라야 하므로 여러 인스턴스에 대해 Kafka Connect를 구성할 때 유용합니다.
간단한 인증에서는 Kafka AclAuthorizer 및 StandardAuthorizer 플러그인에서 관리하는 ACL 규칙을 사용하여 적절한 액세스 수준을 보장합니다. 간단한 인증을 사용하도록 KafkaUser 리소스를 구성하는 방법에 대한 자세한 내용은 AclRule 스키마 참조를 참조하십시오.
사전 요구 사항
- OpenShift 클러스터
- 실행중인 Cluster Operator
프로세스
KafkaUser리소스에서권한 부여속성을 편집하여 사용자에게 액세스 권한을 제공합니다.액세스 권한은
리터럴이름 값을 사용하여 Kafka Connect 주제 및 클러스터 그룹에 대해 구성됩니다. 다음 표에서는 주제 및 클러스터 그룹 ID에 대해 구성된 기본 이름을 보여줍니다.Expand 표 9.2. 액세스 권한 구성의 이름 속성 이름 offset.storage.topicconnect-cluster-offsetsstatus.storage.topicconnect-cluster-statusconfig.storage.topicconnect-cluster-configsgroupconnect-cluster이 예제 구성에서 기본 이름은 액세스 권한을 지정하는 데 사용됩니다. Kafka Connect 인스턴스에 다른 이름을 사용하는 경우 ACL 구성에서 해당 이름을 사용합니다.
간단한 권한 부여를 위한 구성 예
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 리소스를 생성하거나 업데이트합니다.
oc apply -f KAFKA-USER-CONFIG-FILE
oc apply -f KAFKA-USER-CONFIG-FILECopy to Clipboard Copied! Toggle word wrap Toggle overflow
9.6.3. Kafka Connect 커넥터를 수동으로 중지하거나 일시 중지 링크 복사링크가 클립보드에 복사되었습니다!
KafkaConnector 리소스를 사용하여 커넥터를 구성하는 경우 상태 구성을 사용하여 커넥터를 중지하거나 일시 중지합니다. 커넥터 및 작업이 인스턴스화되는 일시 중지된 상태와 달리 커넥터를 중지하면 활성 프로세스가 없는 구성만 유지됩니다. 실행으로부터 커넥터를 중지하는 것은 단순히 일시 중지하는 것보다 장기간에 더 적합할 수 있습니다. 일시 중지된 커넥터를 다시 시작하는 속도가 빨라지지만 중지된 커넥터는 메모리와 리소스를 확보할 수 있는 이점이 있습니다.
상태 구성은 KafkaConnectorSpec 스키마의 (더 이상 사용되지 않음) 일시 중지 구성을 교체하여 커넥터를 일시 중지할 수 있습니다. 이전에 일시 중지 구성을 사용하여 커넥터를 일시 중지한 경우 충돌을 방지하기 위해 상태 구성만 사용하도록 전환하는 것이 좋습니다.
사전 요구 사항
- Cluster Operator가 실행 중입니다.
프로세스
일시 중지 또는 중지하려는 커넥터를 제어하는
KafkaConnector사용자 정의 리소스의 이름을 찾습니다.oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnector리소스를 편집하여 커넥터를 중지하거나 일시 중지합니다.Kafka Connect 커넥터를 중지하는 구성의 예
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 상태구성을중지 또는 일시중지하도록 변경합니다. 이 속성이실행 중이아닌 경우 커넥터의 기본 상태입니다.KafkaConnector구성에 변경 사항을 적용합니다.상태를실행중이거나 구성을 제거하여 커넥터를 다시 시작할 수 있습니다.
또는 Kafka Connect API를 노출 하고 중지 및 일시 중지 를 사용하여 커넥터가 실행되지 않도록 중지할 수 있습니다. 예를 들어 PUT /connectors/<connector_name>/stop. 그런 다음 resume 끝점을 사용하여 다시 시작할 수 있습니다.
9.6.4. Kafka Connect 커넥터 수동 재시작 링크 복사링크가 클립보드에 복사되었습니다!
KafkaConnector 리소스를 사용하여 커넥터를 관리하는 경우 strimzi.io/restart 주석을 사용하여 커넥터 재시작을 수동으로 트리거합니다.
사전 요구 사항
- Cluster Operator가 실행 중입니다.
프로세스
재시작할 Kafka 커넥터를 제어하는
KafkaConnector사용자 정의 리소스의 이름을 찾습니다.oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow OpenShift에서
KafkaConnector리소스에 주석을 달아 커넥터를 다시 시작합니다.oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart="true"Copy to Clipboard Copied! Toggle word wrap Toggle overflow 재시작주석은true로 설정됩니다.다음 조정이 발생할 때까지 기다립니다(기본적으로 2분마다).
조정 프로세스에서 주석을 감지한 한 Kafka 커넥터가 다시 시작됩니다. Kafka Connect에서 재시작 요청을 수락하면
KafkaConnector사용자 정의 리소스에서 주석이 제거됩니다.
9.6.5. Kafka Connect 커넥터 작업 수동 재시작 링크 복사링크가 클립보드에 복사되었습니다!
KafkaConnector 리소스를 사용하여 커넥터를 관리하는 경우 strimzi.io/restart-task 주석을 사용하여 커넥터 작업 재시작을 수동으로 트리거합니다.
사전 요구 사항
- Cluster Operator가 실행 중입니다.
프로세스
재시작할 Kafka 커넥터 작업을 제어하는
KafkaConnector사용자 정의 리소스의 이름을 찾습니다.oc get KafkaConnector
oc get KafkaConnectorCopy to Clipboard Copied! Toggle word wrap Toggle overflow KafkaConnector사용자 정의 리소스에서 재시작할 작업의 ID를 찾습니다.oc describe KafkaConnector <kafka_connector_name>
oc describe KafkaConnector <kafka_connector_name>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 작업 ID는 0부터 시작하여 음수가 아닌 정수입니다.
OpenShift에서
KafkaConnector리소스에 주석을 달아 커넥터 작업을 다시 시작하려면 ID를 사용합니다.oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"
oc annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task="0"Copy to Clipboard Copied! Toggle word wrap Toggle overflow 이 예에서는 작업
0이 다시 시작됩니다.다음 조정이 발생할 때까지 기다립니다(기본적으로 2분마다).
조정 프로세스에서 주석을 감지한 한 Kafka 커넥터 작업이 다시 시작됩니다. Kafka Connect에서 재시작 요청을 수락하면
KafkaConnector사용자 정의 리소스에서 주석이 제거됩니다.