8.3. 커넥터 관리


Kafka Connect REST API는 커넥터를 직접 생성, 업데이트 및 삭제하기 위한 끝점을 제공합니다. API를 사용하여 커넥터의 상태를 확인하거나 로깅 수준을 변경할 수도 있습니다. API를 통해 커넥터를 생성할 때 API 호출의 일부로 커넥터에 대한 구성 세부 정보를 제공합니다.

커넥터를 플러그인으로 추가하고 관리할 수도 있습니다. 플러그인은 Kafka Connect API를 통해 커넥터를 구현하는 클래스가 포함된 JAR 파일로 패키징됩니다. classpath에서 플러그인을 지정하거나 Kafka Connect가 시작시 커넥터 플러그인을 실행할 수 있도록 플러그인 경로에 추가해야 합니다.

Kafka Connect REST API 또는 플러그인을 사용하여 커넥터를 관리하는 것 외에도 독립 실행형 모드에서 Kafka Connect를 실행할 때 속성 파일을 사용하여 커넥터 구성을 추가할 수도 있습니다. 이렇게 하려면 Kafka Connect 작업자 프로세스를 시작할 때 properties 파일의 위치를 지정하기만 하면 됩니다. 속성 파일에는 커넥터 클래스, 소스 및 대상 항목, 필요한 인증 또는 직렬화 설정을 포함한 커넥터의 구성 세부 정보가 포함되어야 합니다.

8.3.1. Kafka Connect API에 대한 액세스 제한

Kafka Connect REST API는 인증된 액세스 권한이 있고 호스트 이름/IP 주소 및 포트 번호를 포함하는 끝점 URL을 알고 있는 모든 사용자가 액세스할 수 있습니다. 인증되지 않은 작업 및 잠재적 보안 문제를 방지하기 위해 Kafka Connect API에 대한 액세스를 신뢰할 수 있는 사용자에게만 제한하는 것이 중요합니다.

보안을 개선하기 위해 Kafka Connect API에 대한 다음 속성을 구성하는 것이 좋습니다.

  • (Kafka 3.4 이상) org.apache.kafka.disallowed.login.modules 로 비보안 로그인 모듈을 특별히 제외
  • 커넥터.client.config.override.policyNONE 으로 설정하여 커넥터 구성이 Kafka Connect 구성 및 사용하는 소비자 및 생산자를 덮어쓰지 않도록 합니다.

8.3.2. 커넥터 구성

Kafka Connect REST API 또는 속성 파일을 사용하여 커넥터 인스턴스를 생성, 관리 및 모니터링합니다. 독립 실행형 또는 분산 모드에서 Kafka Connect를 사용하는 경우 REST API를 사용할 수 있습니다. 독립 실행형 모드에서 Kafka Connect를 사용할 때 속성 파일을 사용할 수 있습니다.

8.3.2.1. Kafka Connect REST API를 사용하여 커넥터 관리

Kafka Connect REST API를 사용하는 경우 요청 본문에 커넥터 구성 세부 정보를 지정하여 PUT 또는 POST HTTP 요청을 Kafka Connect REST API로 전송하여 커넥터를 동적으로 생성할 수 있습니다.

작은 정보

PUT 명령을 사용하면 커넥터를 시작하고 업데이트하는 것과 동일한 명령입니다.

REST 인터페이스는 기본적으로 포트 8083에서 수신 대기하고 다음 끝점을 지원합니다.

GET /connectors
기존 커넥터 목록을 반환합니다.
POST /connectors
커넥터를 만듭니다. 요청 본문은 커넥터 구성이 있는 JSON 오브젝트여야 합니다.
GET /connectors/<connector_name>
특정 커넥터에 대한 정보를 가져옵니다.
GET /connectors/<connector_name>/config
특정 커넥터의 구성을 가져옵니다.
PUT /connectors/<connector_name>/config
특정 커넥터의 구성을 업데이트합니다.
GET /connectors/<connector_name>/status
특정 커넥터의 상태를 가져옵니다.
GET /connectors/<connector_name>/tasks
특정 커넥터에 대한 작업 목록 가져오기
GET /connectors/<connector_name>/tasks/<task_id>/status
특정 커넥터에 대한 작업 상태를 가져옵니다.
PUT /connectors/<connector_name>/pause
커넥터 및 모든 작업을 일시 중지합니다. 커넥터는 모든 메시지 처리를 중지합니다.
PUT /connectors/<connector_name>/resume
일시 중지된 커넥터를 다시 시작합니다.
POST /connectors/<connector_name>/restart
실패한 경우 커넥터를 다시 시작합니다.
POST /connectors/<connector_name>/tasks/<task_id>/restart
특정 작업을 재시작합니다.
DELETE /connectors/<connector_name>
커넥터를 삭제합니다.
GET /connectors/<connector_name>/topics
특정 커넥터의 주제를 가져옵니다.
PUT /connectors/<connector_name>/topics/reset
특정 커넥터에 대한 활성 항목 집합을 비우십시오.
GET /connector-plugins
지원되는 모든 커넥터 플러그인 목록을 가져옵니다.
PUT /connector-plugins/<connector_type>/config/validate
커넥터 구성을 검증합니다.

8.3.2.2. 커넥터 구성 속성 지정

Kafka Connect 커넥터를 구성하려면 소스 또는 싱크 커넥터의 구성 세부 정보를 지정해야 합니다. 이 작업을 수행하는 방법에는 Kafka Connect REST API를 통해 JSON을 사용하여 구성을 제공하거나 속성 파일을 사용하여 구성 속성을 정의하는 두 가지 방법이 있습니다. 각 커넥터 유형에 사용할 수 있는 특정 구성 옵션은 다를 수 있지만 두 방법 모두 필요한 설정을 지정할 수 있는 유연한 방법을 제공합니다.

다음 옵션이 모든 커넥터에 적용됩니다.

name
현재 Kafka Connect 인스턴스 내에서 고유해야 하는 커넥터의 이름입니다.
connector.class
커넥터 플러그인의 클래스입니다. 예를 들면 org.apache.kafka.connect.file.FileStreamSinkConnector 입니다.
tasks.max
지정된 커넥터에서 사용할 수 있는 최대 작업 수입니다. 작업을 사용하면 커넥터가 병렬 작업을 수행할 수 있습니다. 커넥터는 지정된 작업보다 적은 수의 작업을 생성할 수 있습니다.
key.converter
Kafka 형식으로 메시지 키를 변환하는 데 사용되는 클래스입니다. 이렇게 하면 Kafka Connect 구성에서 설정한 기본값을 덮어씁니다. 예를 들면 org.apache.kafka.connect.json.JsonConverter 입니다.
value.converter
Kafka 형식으로 메시지 페이로드를 변환하는 데 사용되는 클래스입니다. 이렇게 하면 Kafka Connect 구성에서 설정한 기본값을 덮어씁니다. 예를 들면 org.apache.kafka.connect.json.JsonConverter 입니다.

싱크 커넥터에 대해 다음 옵션 중 하나를 설정해야 합니다.

주제
입력으로 사용되는 쉼표로 구분된 항목 목록입니다.
topics.regex
입력으로 사용되는 항목에 대한 Java 정규식입니다.

다른 모든 옵션은 Apache Kafka 설명서의 커넥터 속성을 참조하십시오.

참고

AMQ Streams에는 AMQ Streams 설치 디렉터리에 커넥터 구성 파일 config/connect-file-sink.propertiesconfig/connect-file-source.properties 가 포함되어 있습니다.

8.3.3. Kafka Connect API를 사용하여 커넥터 생성

Kafka Connect REST API를 사용하여 Kafka Connect에서 사용할 커넥터를 생성합니다.

사전 요구 사항

  • Kafka Connect 설치

절차

  1. 커넥터 구성을 사용하여 JSON 페이로드를 준비합니다. 예를 들면 다음과 같습니다.

    {
      "name": "my-connector",
      "config": {
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-topic-1,my-topic-2",
        "file": "/tmp/output-file.txt"
      }
    }
  2. POST 요청을 < KafkaConnectAddress>:8083/connectors 로 보내 커넥터를 생성합니다. 다음 예제에서는 curl 을 사용합니다.

    curl -X POST -H "Content-Type: application/json" --data @sink-connector.json http://connect0.my-domain.com:8083/connectors
  3. < KafkaConnectAddress > :8083/connectors로 GET 요청을 전송하여 커넥터가 배포되었는지 확인합니다. 다음 예제에서는 curl 을 사용합니다.

    curl http://connect0.my-domain.com:8083/connectors

8.3.4. Kafka Connect API를 사용하여 커넥터 삭제

Kafka Connect REST API를 사용하여 Kafka Connect에서 커넥터를 삭제합니다.

사전 요구 사항

  • Kafka Connect 설치

커넥터 삭제

  1. < KafkaConnectAddress > :8083/connectors/ <ConnectorName>으로 GET 요청을 전송하여 커넥터가 존재하는지 확인합니다. 다음 예제에서는 curl 을 사용합니다.

    curl http://connect0.my-domain.com:8083/connectors
  2. 커넥터를 삭제하려면 DELETE 요청을 < KafkaConnectAddress> :808 3/connectors 로 보냅니다. 다음 예제에서는 curl 을 사용합니다.

    curl -X DELETE http://connect0.my-domain.com:8083/connectors/my-connector
  3. < KafkaConnectAddress > :8083/connectors 로 GET 요청을 전송하여 커넥터가 삭제되었는지 확인합니다. 다음 예제에서는 curl 을 사용합니다.

    curl http://connect0.my-domain.com:8083/connectors

8.3.5. 커넥터 플러그인 추가

Kafka는 커넥터 개발을 위한 시작점으로 사용할 예제 커넥터를 제공합니다. 다음 예제 커넥터는 AMQ Streams에 포함되어 있습니다.

FileStreamSink
Kafka 주제에서 데이터를 읽고 데이터를 파일에 씁니다.
FileStreamSource
파일에서 데이터를 읽고 Kafka 주제로 데이터를 보냅니다.

두 커넥터 모두 libs/connect-file-<kafka_version>.redhat-<build>.jar 플러그인에 포함되어 있습니다.

Kafka Connect에서 커넥터 플러그인을 사용하려면 classpath에 추가하거나 Kafka Connect 속성 파일에서 플러그인 경로를 지정하고 플러그인을 해당 위치에 복사할 수 있습니다.

classpath에서 커넥터 예제 지정

CLASSPATH=/opt/kafka/libs/connect-file-<kafka_version>.redhat-<build>.jar opt/kafka/bin/connect-distributed.sh

플러그인 경로 설정

plugin.path=/opt/kafka/connector-plugins,/opt/connectors

plugin.path 구성 옵션에는 쉼표로 구분된 경로 목록을 포함할 수 있습니다.

필요한 경우 커넥터 플러그인을 추가할 수 있습니다. Kafka Connect는 시작 시 커넥터 플러그인을 검색하고 실행합니다.

참고

분산 모드에서 Kafka Connect를 실행하는 경우 모든 작업자 노드에서 플러그인을 사용할 수 있어야 합니다.

Red Hat logoGithubRedditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

© 2024 Red Hat, Inc.