8.3. 커넥터 관리
Kafka Connect REST API는 커넥터를 직접 생성, 업데이트 및 삭제하기 위한 엔드포인트를 제공합니다. API를 사용하여 커넥터의 상태를 확인하거나 로깅 수준을 변경할 수도 있습니다. API를 통해 커넥터를 생성할 때 API 호출의 일부로 커넥터에 대한 구성 세부 정보를 제공합니다.
커넥터를 플러그인으로 추가하고 관리할 수도 있습니다. 플러그인은 Kafka Connect API를 통해 커넥터를 구현하는 클래스를 포함하는 JAR 파일로 패키징됩니다. classpath에서 플러그인을 지정하거나 Kafka Connect의 플러그인 경로에 추가하여 시작 시 커넥터 플러그인을 실행하기만 하면 됩니다.
Kafka Connect REST API 또는 플러그인을 사용하여 커넥터를 관리하는 것 외에도 독립 실행형 모드에서 Kafka Connect를 실행할 때 속성 파일을 사용하여 커넥터 구성을 추가할 수도 있습니다. 이렇게 하려면 Kafka Connect 작업자 프로세스를 시작할 때 속성 파일의 위치를 지정하기만 하면 됩니다. 속성 파일에는 커넥터 클래스, 소스 및 대상 주제, 필요한 인증 또는 직렬화 설정을 포함하여 커넥터에 대한 구성 세부 정보가 포함되어야 합니다.
8.3.1. Kafka Connect API에 대한 액세스 제한
Kafka Connect REST API는 인증된 액세스 권한이 있고 호스트 이름/IP 주소 및 포트 번호를 포함하는 엔드포인트 URL을 알고 있는 모든 사용자가 액세스할 수 있습니다. 무단 조치 및 잠재적인 보안 문제를 방지하기 위해 Kafka Connect API에 대한 액세스를 신뢰할 수 있는 사용자에게만 제한하는 것이 중요합니다.
보안을 강화하려면 Kafka Connect API에 대해 다음 속성을 구성하는 것이 좋습니다.
-
(Kafka 3.4 이상)
org.apache.kafka.disallowed.login.modules
에서 비보안 로그인 모듈을 구체적으로 제외 -
Connector.client.config.override.policy
를NONE
으로 설정하여 커넥터 구성이 Kafka Connect 구성 및 사용하는 소비자 및 생산자를 덮어쓰지 않도록 합니다.
8.3.2. 커넥터 구성
Kafka Connect REST API 또는 속성 파일을 사용하여 커넥터 인스턴스를 생성, 관리 및 모니터링합니다. 독립 실행형 또는 분산 모드에서 Kafka Connect를 사용할 때 REST API를 사용할 수 있습니다. 독립 실행형 모드에서 Kafka Connect를 사용할 때 속성 파일을 사용할 수 있습니다.
8.3.2.1. Kafka Connect REST API를 사용하여 커넥터 관리
Kafka Connect REST API를 사용하는 경우 요청 본문에 커넥터 구성 세부 정보를 지정하여 PUT
또는 POST
HTTP 요청을 Kafka Connect REST API로 전송하여 커넥터를 동적으로 생성할 수 있습니다.
PUT
명령을 사용하면 커넥터를 시작하고 업데이트하는 것과 동일한 명령입니다.
REST 인터페이스는 기본적으로 포트 8083에서 수신 대기하고 다음 엔드포인트를 지원합니다.
GET /connectors
- 기존 커넥터 목록을 반환합니다.
POST /connectors
- 커넥터를 생성합니다. 요청 본문은 커넥터 구성이 포함된 JSON 오브젝트여야 합니다.
GET /connectors/<connector_name>
- 특정 커넥터에 대한 정보를 가져옵니다.
GET /connectors/<connector_name>/config
- 특정 커넥터의 구성을 가져옵니다.
PUT /connectors/<connector_name>/config
- 특정 커넥터의 구성을 업데이트합니다.
GET /connectors/<connector_name>/status
- 특정 커넥터의 상태를 가져옵니다.
GET /connectors/<connector_name>/tasks
- 특정 커넥터의 작업 목록 가져오기
GET /connectors/<connector_name>/tasks/<task_id>/status
- 특정 커넥터에 대한 작업 상태 가져오기
PUT /connectors/<connector_name>/pause
- 커넥터 및 모든 작업을 일시 중지합니다. 커넥터는 모든 메시지 처리를 중지합니다.
PUT /connectors/<connector_name>/stop
- 커넥터와 모든 작업을 중지합니다. 커넥터는 모든 메시지 처리를 중지합니다. 실행으로부터 커넥터를 중지하는 것은 단순히 일시 중지하는 것보다 장기간에 더 적합할 수 있습니다.
PUT /connectors/<connector_name>/resume
- 일시 중지된 커넥터를 다시 시작합니다.
POST /connectors/<connector_name>/restart
- 실패한 경우 커넥터를 다시 시작합니다.
POST /connectors/<connector_name>/tasks/<task_id>/restart
- 특정 작업을 다시 시작합니다.
DELETE /connectors/<connector_name>
- 커넥터를 삭제합니다.
GET /connectors/<connector_name>/topics
- 특정 커넥터에 대한 항목을 가져옵니다.
PUT /connectors/<connector_name>/topics/reset
- 특정 커넥터에 대한 활성 항목 세트를 비웁니다.
GET /connectors/<connector_name>/offsets
- 커넥터의 현재 오프셋을 가져옵니다.
DELETE /connectors/<connector_name>/offsets
- 중지됨 상태에 있어야 하는 커넥터의 오프셋을 재설정합니다.
PATCH /connectors/<connector_name>/offsets
-
중지된 상태에 있어야 하는 커넥터에 대한 오프셋(요청에서
offset
속성 사용)을 조정합니다. GET /connector-plugins
- 지원되는 모든 커넥터 플러그인 목록을 가져옵니다.
GET /connector-plugins/<connector_plugin_type>/config
- 커넥터 플러그인에 대한 구성을 가져옵니다.
PUT /connector-plugins/<connector_type>/config/validate
- 커넥터 구성을 검증합니다.
8.3.2.2. 커넥터 구성 속성 지정
Kafka Connect 커넥터를 구성하려면 소스 또는 싱크 커넥터에 대한 구성 세부 정보를 지정해야 합니다. 이 작업을 수행하는 방법은 두 가지가 있습니다. Kafka Connect REST API를 통해, JSON을 사용하여 구성을 제공하거나 속성 파일을 사용하여 구성 속성을 정의합니다. 각 커넥터 유형에 사용할 수 있는 특정 구성 옵션은 다를 수 있지만 두 방법 모두 필요한 설정을 지정하는 유연한 방법을 제공합니다.
다음 옵션은 모든 커넥터에 적용됩니다.
name
- 현재 Kafka Connect 인스턴스 내에서 고유해야 하는 커넥터의 이름입니다.
connector.class
-
커넥터 플러그인의 클래스입니다. 예를 들어
org.apache.kafka.connect.file.FileStreamSinkConnector
. tasks.max
- 지정된 커넥터가 사용할 수 있는 최대 작업 수입니다. 작업을 사용하면 커넥터가 병렬로 작업을 수행할 수 있습니다. 커넥터는 지정된 것보다 적은 작업을 생성할 수 있습니다.
key.converter
-
메시지 키를 Kafka 형식으로 변환하거나 Kafka 형식으로 변환하는 데 사용되는 클래스입니다. 이렇게 하면 Kafka Connect 구성에 의해 설정된 기본값이 재정의됩니다. 예를 들어
org.apache.kafka.connect.json.JsonConverter
. value.converter
-
Kafka 형식으로 메시지 페이로드를 변환하는 데 사용되는 클래스입니다. 이렇게 하면 Kafka Connect 구성에 의해 설정된 기본값이 재정의됩니다. 예를 들어
org.apache.kafka.connect.json.JsonConverter
.
싱크 커넥터에 대해 다음 옵션 중 하나 이상을 설정해야 합니다.
주제
- 입력으로 사용되는 쉼표로 구분된 주제 목록입니다.
topics.regex
- 입력으로 사용되는 주제의 Java 정규식입니다.
기타 모든 옵션은 Apache Kafka 문서의 커넥터 속성을 참조하십시오.
AMQ Streams에는 AMQ Streams 설치 디렉터리에 커넥터 구성 파일 config/connect-file-sink.properties
및 config/connect-file-source.properties
가 포함되어 있습니다.
8.3.3. Kafka Connect API를 사용하여 커넥터 생성
Kafka Connect REST API를 사용하여 Kafka Connect와 함께 사용할 커넥터를 생성합니다.
사전 요구 사항
- Kafka Connect 설치
프로세스
커넥터 구성을 사용하여 JSON 페이로드를 준비합니다. 예를 들면 다음과 같습니다.
{ "name": "my-connector", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "my-topic-1,my-topic-2", "file": "/tmp/output-file.txt" } }
<
KafkaConnectAddress > :8083/connectors
에 POST 요청을 보내 커넥터를 만듭니다. 다음 예제에서는curl
을 사용합니다.curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
<
KafkaConnectAddress > :8083/connectors
에게 GET 요청을 전송하여 커넥터가 배포되었는지 확인합니다. 다음 예제에서는curl
을 사용합니다.curl http://connect0.my-domain.com:8083/connectors
8.3.4. Kafka Connect API를 사용하여 커넥터 삭제
Kafka Connect REST API를 사용하여 Kafka Connect에서 커넥터를 삭제합니다.
사전 요구 사항
- Kafka Connect 설치
커넥터 삭제
GET
요청을 <KafkaConnectAddress > :8083/connectors/ <ConnectorName
> 에 전송하여 커넥터가 존재하는지 확인합니다. 다음 예제에서는curl
을 사용합니다.curl http://connect0.my-domain.com:8083/connectors
커넥터를 삭제하려면 <
KafkaConnectAddress > :8083/connectors
로DELETE
요청을 보냅니다. 다음 예제에서는curl
을 사용합니다.curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
<
KafkaConnectAddress > :8083/connectors
에게 GET 요청을 전송하여 커넥터가 삭제되었는지 확인합니다. 다음 예제에서는curl
을 사용합니다.curl http://connect0.my-domain.com:8083/connectors
8.3.5. 커넥터 플러그인 추가
Kafka는 커넥터 개발을 위한 시작점으로 사용할 수 있는 예제 커넥터를 제공합니다. 다음 예제 커넥터는 AMQ Streams에 포함되어 있습니다.
- FileStreamSink
- Kafka 주제에서 데이터를 읽고 데이터를 파일에 씁니다.
- FileStreamSource
- 파일에서 데이터를 읽고 데이터를 Kafka 주제로 보냅니다.
두 커넥터 모두 libs/connect-file-<kafka_version>.redhat-<build>.jar
플러그인에 포함되어 있습니다.
Kafka Connect의 커넥터 플러그인을 사용하려면 classpath에 추가하거나 Kafka Connect 속성 파일에서 플러그인 경로를 지정하고 플러그인을 위치에 복사할 수 있습니다.
classpath에 커넥터 예제 지정
CLASSPATH=/opt/kafka/libs/connect-file-<kafka_version>.redhat-<build>.jar opt/kafka/bin/connect-distributed.sh
플러그인 경로 설정
plugin.path=/opt/kafka/connector-plugins,/opt/connectors
plugin.path
구성 옵션에는 쉼표로 구분된 경로 목록이 포함될 수 있습니다.
필요한 경우 더 많은 커넥터 플러그인을 추가할 수 있습니다. Kafka Connect는 시작 시 커넥터 플러그인을 검색하고 실행합니다.
분산 모드에서 Kafka Connect를 실행하는 경우 모든 작업자 노드에서 플러그인을 사용할 수 있어야 합니다.