7.2. 분산 모드에서 Kafka 연결
분산 모드에서 Kafka Connect는 하나 이상의 작업자 노드에서 실행되며 워크로드가 그 사이에 배포됩니다. HTTP REST 인터페이스를 사용하여 커넥터 플러그인 및 해당 구성을 관리합니다.
7.2.1. 분산 모드에서 Kafka 연결 구성
Kafka Connect를 분산 모드로 구성하려면 config/connect-distributed.properties
구성 파일을 편집합니다. 다음 옵션이 가장 중요합니다.
bootstrap.servers
-
Kafka에 대한 부트스트랩 연결로 사용되는 Kafka 브로커 주소 목록입니다. 예를 들어
kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092
. key.converter
-
메시지 키를 Kafka 형식으로 변환하거나 Kafka 형식으로 변환하는 데 사용되는 클래스입니다. 예를 들어
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
Kafka 형식으로 메시지 페이로드를 변환하는 데 사용되는 클래스입니다. 예를 들어
org.apache.kafka.connect.json.JsonConverter
. group.id
-
분산 Kafka Connect 클러스터의 이름입니다. 이는 고유해야 하며 다른 소비자 그룹 ID와 충돌해서는 안 됩니다. 기본값은
connect-cluster
입니다. config.storage.topic
-
커넥터 구성을 저장하는 데 사용되는 Kafka 주제입니다. 기본값은
connect-configs
입니다. offset.storage.topic
-
오프셋을 저장하는 데 사용되는 Kafka 주제입니다. 기본값은
connect-offset
입니다. status.storage.topic
-
작업자 노드 상태에 사용되는 Kafka 주제입니다. 기본값은
connect-status
입니다.
AMQ Streams에는 분산 모드에서 Kafka Connect에 대한 구성 파일이 포함되어 있습니다. AMQ Streams 설치 디렉터리의 config/connect-distributed.properties
를 참조하십시오.
Connector 플러그인은 부트스트랩 주소를 사용하여 Kafka 브로커에 대한 오픈 클라이언트 연결을 제공합니다. 이러한 연결을 구성하려면 생산자 또는 소비자 접두사가 붙은 표준 Kafka 생산자
및 소비자 구성 옵션을 사용합니다.
7.2.2. 분산 Kafka Connect에서 커넥터 구성
HTTP REST 인터페이스
분산 Kafka Connect용 커넥터는 HTTP REST 인터페이스를 사용하여 구성됩니다. REST 인터페이스는 기본적으로 포트 8083에서 수신 대기합니다. 다음 끝점을 지원합니다.
GET /connectors
- 기존 커넥터 목록을 반환합니다.
POST /connectors
- 커넥터를 생성합니다. 요청 본문은 커넥터 구성이 포함된 JSON 오브젝트여야 합니다.
GET /connectors/ <name>
- 특정 커넥터에 대한 정보를 가져옵니다.
GET /connectors/ <name> /config
- 특정 커넥터의 구성을 가져옵니다.
PUT /connectors/ <name> /config
- 특정 커넥터의 구성을 업데이트합니다.
GET /connectors/ <name> /status
- 특정 커넥터의 상태를 가져옵니다.
PUT /connectors/ <name> /pause
- 커넥터 및 모든 작업을 일시 중지합니다. 커넥터는 모든 메시지 처리를 중지합니다.
PUT /connectors/ <name> /resume
- 일시 중지된 커넥터를 다시 시작합니다.
POST /connectors/<name>/restart
- 실패한 경우 커넥터를 다시 시작합니다.
DELETE /connectors/ <name>
- 커넥터를 삭제합니다.
GET /connector-plugins
- 지원되는 모든 커넥터 플러그인 목록을 가져옵니다.
커넥터 구성
대부분의 구성 옵션은 커넥터별 커넥터이며 커넥터의 문서에 포함되어 있습니다. 다음 필드는 모든 커넥터에 공통입니다.
name
- 커넥터의 이름입니다. 지정된 Kafka Connect 인스턴스 내에서 고유해야 합니다.
connector.class
-
커넥터 플러그인의 클래스입니다. 예:
org.apache.kafka.connect.file.FileStreamSinkConnector
. tasks.max
- 이 커넥터에서 사용하는 최대 작업 수입니다. 작업은 커넥터가 작업을 병렬화하는 데 사용됩니다. 연결자는 지정된 것보다 적은 작업을 생성할 수 있습니다.
key.converter
-
Kafka 형식에서 메시지 키를 변환하는 데 사용되는 클래스입니다. 이렇게 하면 Kafka Connect 구성에 의해 설정된 기본값이 재정의됩니다. 예를 들어
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
Kafka 형식으로 메시지 페이로드를 변환하는 데 사용되는 클래스입니다. 이렇게 하면 Kafka Connect 구성에 의해 설정된 기본값이 재정의됩니다. 예를 들어
org.apache.kafka.connect.json.JsonConverter
.
또한 싱크 커넥터에 대해 다음 옵션 중 하나를 설정해야 합니다.
주제
- 입력으로 사용되는 쉼표로 구분된 주제 목록입니다.
topics.regex
- 입력으로 사용되는 주제의 Java 정규식입니다.
기타 모든 옵션은 특정 커넥터에 대한 설명서를 참조하십시오.
AMQ Streams에는 커넥터 구성 파일 예제가 포함되어 있습니다. 해당 파일은 AMQ Streams 설치 디렉터리의 config/connect-file-sink.properties
및 config/connect-file-source.properties
에서 찾을 수 있습니다.
7.2.3. 분산 Kafka 연결 실행
다음 절차에서는 분산 모드에서 Kafka Connect를 구성하고 실행하는 방법을 설명합니다.
사전 요구 사항
- 설치되어 실행 중인 AMQ Streams 클러스터입니다.
클러스터 실행
모든 Kafka Connect 작업자 노드에서
/opt/kafka/config/connect-distributed.properties
Kafka Connect 구성 파일을 편집합니다.-
Kafka 브로커를 가리키도록
bootstrap.server
옵션을 설정합니다. -
group.id
옵션을 설정합니다. -
config.storage.topic
옵션을 설정합니다. -
offset.storage.topic
옵션을 설정합니다. status.storage.topic
옵션을 설정합니다.예를 들면 다음과 같습니다.
bootstrap.servers=kafka0.my-domain.com:9092,kafka1.my-domain.com:9092,kafka2.my-domain.com:9092 group.id=my-group-id config.storage.topic=my-group-id-configs offset.storage.topic=my-group-id-offsets status.storage.topic=my-group-id-status
-
Kafka 브로커를 가리키도록
모든 Kafka Connect 노드에서
/opt/kafka/config/connect-distributed.properties
구성 파일을 사용하여 Kafka Connect 작업자를 시작합니다.su - kafka /opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
Kafka Connect가 실행 중인지 확인합니다.
jcmd | grep ConnectDistributed
7.2.4. 커넥터 생성
다음 절차에서는 Kafka Connect REST API를 사용하여 분산 모드에서 Kafka Connect와 함께 사용할 커넥터 플러그인을 생성하는 방법을 설명합니다.
사전 요구 사항
- 분산 모드에서 실행되는 Kafka Connect 설치입니다.
프로세스
커넥터 구성을 사용하여 JSON 페이로드를 준비합니다. 예를 들면 다음과 같습니다.
{ "name": "my-connector", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "my-topic-1,my-topic-2", "file": "/tmp/output-file.txt" } }
<
KafkaConnectAddress > :8083/connectors
에 POST 요청을 보내 커넥터를 만듭니다. 다음 예제에서는curl
을 사용합니다.curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
<
KafkaConnectAddress > :8083/connectors
에게 GET 요청을 전송하여 커넥터가 배포되었는지 확인합니다. 다음 예제에서는curl
을 사용합니다.curl http://connect0.my-domain.com:8083/connectors
7.2.5. 커넥터 삭제
다음 절차에서는 Kafka Connect REST API를 사용하여 분산 모드에서 Kafka Connect에서 커넥터 플러그인을 삭제하는 방법을 설명합니다.
사전 요구 사항
- 분산 모드에서 실행되는 Kafka Connect 설치입니다.
커넥터 삭제
GET
요청을 <KafkaConnectAddress > :8083/connectors/ <ConnectorName
> 에 전송하여 커넥터가 존재하는지 확인합니다. 다음 예제에서는curl
을 사용합니다.curl http://connect0.my-domain.com:8083/connectors
커넥터를 삭제하려면 <
KafkaConnectAddress > :8083/connectors
로DELETE
요청을 보냅니다. 다음 예제에서는curl
을 사용합니다.curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
<
KafkaConnectAddress > :8083/connectors
에게 GET 요청을 전송하여 커넥터가 삭제되었는지 확인합니다. 다음 예제에서는curl
을 사용합니다.curl http://connect0.my-domain.com:8083/connectors