7.7. Kafka Connect 구성
AMQ Streams의 KafkaConnect 리소스를 사용하여 새 Kafka Connect 클러스터를 빠르고 쉽게 생성합니다.
KafkaConnect 리소스를 사용하여 Kafka Connect 를 배포할 때 Kafka 클러스터에 연결하기 위해 부트스트랩 서버 주소 ( spec.bootstrapServers)를 지정합니다. 서버가 다운된 경우 두 개 이상의 주소를 지정할 수 있습니다. 또한 인증 자격 증명 및 TLS 암호화 인증서를 지정하여 보안 연결을 만듭니다.
Kafka 클러스터는 AMQ Streams에서 관리하거나 OpenShift 클러스터에 배포할 필요가 없습니다.
KafkaConnect 리소스를 사용하여 다음을 지정할 수도 있습니다.
- 플러그인을 연결할 플러그인이 포함된 컨테이너 이미지를 빌드하는 플러그인 구성
- Kafka Connect 클러스터에 속하는 작업자 Pod 구성
-
KafkaConnector리소스를 사용하여 플러그인을 관리할 수 있는 주석
Cluster Operator는 KafkaConnector 리소스를 사용하여 생성된 KafkaConnect 리소스 및 커넥터를 사용하여 배포된 Kafka Connect 클러스터를 관리합니다.
플러그인 구성
플러그인은 커넥터 인스턴스를 생성하기 위한 구현을 제공합니다. 플러그인이 인스턴스화되면 특정 유형의 외부 데이터 시스템에 연결하기 위한 구성이 제공됩니다. 플러그인은 지정된 종류의 데이터 소스에 연결하기 위한 커넥터 및 작업 구현을 정의하는 하나 이상의 JAR 파일 세트를 제공합니다. 많은 외부 시스템에 대한 플러그인은 Kafka Connect에서 사용할 수 있습니다. 고유한 플러그인을 만들 수도 있습니다.
이 구성은 Kafka Connect 내외로 피드할 소스 입력 데이터 및 대상 출력 데이터를 설명합니다. 소스 커넥터의 경우 외부 소스 데이터는 메시지를 저장할 특정 주제를 참조해야 합니다. 이 플러그인은 데이터를 변환하는 데 필요한 라이브러리 및 파일을 포함할 수도 있습니다.
Kafka Connect 배포에는 플러그인이 하나 이상 있을 수 있지만 각 플러그인마다 하나의 버전만 있을 수 있습니다.
선택한 플러그인이 포함된 사용자 지정 Kafka Connect 이미지를 생성할 수 있습니다. 다음 두 가지 방법으로 이미지를 생성할 수 있습니다.
컨테이너 이미지를 자동으로 생성하려면 KafkaConnect 리소스의 build 속성을 사용하여 Kafka Connect 클러스터에 추가할 플러그인을 지정합니다. AMQ Streams는 플러그인 아티팩트를 자동으로 다운로드하고 새 컨테이너 이미지에 추가합니다.
플러그인 구성 예
- 1
- 플러그인을 사용하여 컨테이너 이미지를 빌드하기 위한 구성 속성을 자동으로 빌드 합니다.
- 2
- 새 이미지가 푸시되는 컨테이너 레지스트리 구성
출력속성은 이미지의 유형과 이름을 설명하고 선택적으로 컨테이너 레지스트리에 액세스하는 데 필요한 인증 정보가 포함된 시크릿 이름을 설명합니다. - 3
- 새 컨테이너 이미지에 추가할 플러그인 및 해당 아티팩트 목록입니다.
plugins속성은 아티팩트 유형과 아티팩트가 다운로드되는 URL을 설명합니다. 각 플러그인은 하나 이상의 아티팩트로 구성해야 합니다. 또한 SHA-512 체크섬을 지정하여 아티팩트를 압축 해제하기 전에 확인할 수 있습니다.
Dockerfile을 사용하여 이미지를 빌드하는 경우 AMQ Streams의 최신 컨테이너 이미지를 기본 이미지로 사용하여 플러그인 구성 파일을 추가할 수 있습니다.
플러그인 구성 수동 추가 표시 예
FROM registry.redhat.io/amq-streams/kafka-37-rhel8:2.7.0 USER root:root COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001
FROM registry.redhat.io/amq-streams/kafka-37-rhel8:2.7.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
작업자에 대한 Kafka Connect 클러스터 구성
KafkaConnect 리소스의 config 속성에 작업자의 구성을 지정합니다.
분산 Kafka Connect 클러스터에는 그룹 ID와 내부 구성 주제 세트가 있습니다.
-
group.id -
offset.storage.topic -
config.storage.topic -
status.storage.topic
Kafka Connect 클러스터는 기본적으로 이러한 속성에 대해 동일한 값으로 구성됩니다. Kafka Connect 클러스터는 오류를 생성하기 때문에 그룹 ID 또는 주제 이름을 공유할 수 없습니다. 서로 다른 Kafka Connect 클러스터를 여러 개 사용하는 경우 생성된 각 Kafka Connect 클러스터의 작업자에 대해 이러한 설정을 고유해야 합니다.
각 Kafka Connect 클러스터에서 사용하는 커넥터의 이름도 고유해야 합니다.
다음 예제 작업자 구성에서는 JSON 변환기가 지정됩니다. Kafka Connect에서 사용하는 내부 Kafka 항목에 대한 복제 요인이 설정됩니다. 프로덕션 환경에는 최소 3개 이상 있어야 합니다. 주제를 만든 후 복제 요소를 변경하면 적용되지 않습니다.
작업자 구성 예
- 1
- Kafka 내의 Kafka Connect 클러스터 ID입니다. 각 Kafka Connect 클러스터에 대해 고유해야 합니다.
- 2
- 커넥터 오프셋을 저장하는 Kafka 주제입니다. 각 Kafka Connect 클러스터에 대해 고유해야 합니다.
- 3
- 커넥터 및 작업 상태 구성을 저장하는 Kafka 주제입니다. 각 Kafka Connect 클러스터에 대해 고유해야 합니다.
- 4
- 커넥터 및 작업 상태 업데이트를 저장하는 Kafka 주제입니다. 각 Kafka Connect 클러스터에 대해 고유해야 합니다.
- 5
- message 키를 Kafka의 저장을 위해 JSON 형식으로 변환하기 위한 Cryostat입니다.
- 6
- message 값을 Kafka의 저장을 위해 JSON 형식으로 변환하기 위한 Cryostat입니다.
- 7
- 메시지 키를 구조화된 JSON 형식으로 변환하는 데 사용되는 스키마입니다.
- 8
- 메시지 값을 구조화된 JSON 형식으로 변환하는 데 사용되는 스키마입니다.
- 9
- 커넥터 오프셋을 저장하는 Kafka 항목의 복제 인수입니다.
- 10
- 커넥터 및 작업 상태 구성을 저장하는 Kafka 항목의 복제 인수입니다.
- 11
- 커넥터 및 작업 상태 업데이트를 저장하는 Kafka 항목의 복제 인수입니다.
커넥터의 KafkaConnector 관리
배포의 작업자 Pod에 사용되는 컨테이너 이미지에 플러그인을 추가한 후 AMQ Streams의 KafkaConnector 사용자 정의 리소스 또는 Kafka Connect API를 사용하여 커넥터 인스턴스를 관리할 수 있습니다. 이러한 옵션을 사용하여 새 커넥터 인스턴스를 생성할 수도 있습니다.
KafkaConnector 리소스는 Cluster Operator의 커넥터 관리에 대한 OpenShift 네이티브 접근 방식을 제공합니다. KafkaConnector 리소스로 커넥터를 관리하려면 KafkaConnect 사용자 정의 리소스에 주석을 지정해야 합니다.
KafkaConnectors를 활성화하는 주석
use-connector-resources 를 true 로 설정하면 KafkaConnectors가 커넥터를 생성, 삭제 및 재구성할 수 있습니다.
KafkaConnect 구성에서 use-connector-resources 가 활성화된 경우 KafkaConnector 리소스를 사용하여 커넥터를 정의하고 관리해야 합니다. KafkaConnector 리소스는 외부 시스템에 연결하도록 구성됩니다. 외부 데이터 시스템과 상호 작용하는 Kafka Connect 클러스터 및 Kafka 클러스터와 동일한 OpenShift 클러스터에 배포됩니다.
Kafka 구성 요소는 동일한 OpenShift 클러스터에 포함되어 있습니다.
구성은 커넥터 인스턴스가 인증을 포함하여 외부 데이터 시스템에 연결하는 방법을 지정합니다. 또한 볼 데이터를 설명해야 합니다. 소스 커넥터의 경우 구성에 데이터베이스 이름을 지정할 수 있습니다. 대상 주제 이름을 지정하여 Kafka에 데이터가 있어야 하는 위치를 지정할 수도 있습니다.
tasksMax 를 사용하여 최대 작업 수를 지정합니다. 예를 들어 tasksMax: 2 가 있는 소스 커넥터는 소스 데이터 가져오기를 두 작업으로 분할할 수 있습니다.
KafkaConnector 소스 커넥터 구성 예
- 1
- 커넥터의 이름으로 사용되는
KafkaConnector리소스의 이름입니다. OpenShift 리소스에 유효한 모든 이름을 사용합니다. - 2
- 커넥터 인스턴스를 생성하는 Kafka Connect 클러스터의 이름입니다. 커넥터는 연결된 Kafka Connect 클러스터와 동일한 네임스페이스에 배포해야 합니다.
- 3
- 커넥터 클래스의 전체 이름입니다. Kafka Connect 클러스터에서 사용하는 이미지에 있어야 합니다.
- 4
- 커넥터가 생성할 수 있는 최대 Kafka Connect 작업 수입니다.
- 5
- 실패한 커넥터 및 작업을 자동으로 다시 시작할 수 있습니다. 기본적으로 재시작 수는 indefinite이지만
maxRestarts속성을 사용하여 자동 재시작 횟수에 대해 최대값을 설정할 수 있습니다. - 6
- 연결을 키 -값 쌍으로 연결합니다.
- 7
- 외부 데이터 파일의 위치입니다. 이 예제에서는
/opt/kafka/LICENSE파일에서 읽을 수 있도록FileStreamSourceConnector를 구성하고 있습니다. - 8
- 소스 데이터를 게시하는 Kafka 주제입니다.
OpenShift Secrets 또는 ConfigMaps와 같은 외부 소스에서 커넥터의 기밀 구성 값을 로드 할 수 있습니다.
Kafka 연결 API
Kafka Connect REST API를 KafkaConnector 리소스를 사용하여 커넥터를 관리하는 대신 사용합니다. Kafka Connect REST API는 < connect_cluster_name> -connect-api:8083 에서 실행되는 서비스로 사용할 수 있습니다. 여기서 < connect_cluster_name >은 Kafka Connect 클러스터의 이름입니다.
커넥터 구성을 JSON 오브젝트로 추가합니다.
커넥터 구성을 추가하기 위한 curl 요청 예
KafkaConnectors가 활성화된 경우 Cluster Operator에 의해 Kafka Connect REST API를 사용하여 직접 변경한 수동 변경 사항을 되돌립니다.
REST API에서 지원하는 작업은 Apache Kafka Connect API 설명서에 설명되어 있습니다.
OpenShift 외부에 Kafka Connect API 서비스를 노출할 수 있습니다. 이를 통해 수신 또는 경로와 같이 액세스를 제공하는 연결 메커니즘을 사용하는 서비스를 생성합니다. 연결이 안전하지 않으므로 advisedly를 사용합니다.