Kafka 클라이언트 애플리케이션 개발


Red Hat AMQ Streams 2.5

AMQ Streams를 사용하여 Kafka와 상호 작용할 수 있는 클라이언트 애플리케이션 개발

초록

Kafka 브로커를 통해 메시지를 보내고 받을 수 있는 클라이언트 애플리케이션을 개발합니다. 클라이언트와 브로커 간의 보안 액세스를 설정합니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 용어를 교체하기 위해 최선을 다하고 있습니다. 먼저 마스터(master), 슬레이브(slave), 블랙리스트(blacklist), 화이트리스트(whitelist) 등 네 가지 용어를 교체하고 있습니다. 이러한 변경 작업은 작업 범위가 크므로 향후 여러 릴리스에 걸쳐 점차 구현할 예정입니다. 자세한 내용은 CTO Chris Wright의 메시지를 참조하십시오.

1장. 클라이언트 개발 개요

메시지를 생성하거나 메시지를 사용하거나 둘 다 수행할 수 있는 AMQ Streams 설치에 대한 Kafka 클라이언트 애플리케이션을 개발합니다. RHEL의 AMQ Streams 또는 AMQ Streams와 함께 사용할 클라이언트 애플리케이션을 개발할 수 있습니다.

메시지는 선택적 키와 메시지 데이터를 포함하는 값과 헤더 및 관련 메타데이터를 포함합니다. 키는 메시지 제목 또는 메시지의 속성을 식별합니다. 메시지 그룹을 전송과 동일한 순서로 처리해야 하는 경우 동일한 키를 사용해야 합니다.

메시지는 일괄 처리로 제공됩니다. 메시지에는 메시지에 대한 타임스탬프 및 오프셋 위치와 같이 클라이언트의 필터링 및 라우팅에 유용한 세부 정보를 제공하는 헤더와 메타데이터가 포함되어 있습니다.

Kafka는 클라이언트 애플리케이션 개발을 위한 클라이언트 API를 제공합니다. Kafka 생산자 및 소비자 API는 클라이언트 애플리케이션에서 Kafka 클러스터와 상호 작용하는 기본 수단입니다. API는 메시지 흐름을 제어합니다. 생산자 API는 Kafka 항목에 메시지를 보내는 반면 소비자 API는 주제에서 메시지를 읽습니다.

AMQ Streams는 Java로 작성된 클라이언트를 지원합니다. 고객이 고객을 개발하는 방법은 특정 사용 사례에 따라 다릅니다. 데이터 조정은 우선 순위 또는 높은 처리량일 수 있습니다. 이러한 요구 사항은 클라이언트 및 브로커 구성을 통해 충족될 수 있습니다. 그러나 모든 클라이언트는 지정된 Kafka 클러스터의 모든 브로커에 연결할 수 있어야 합니다.

1.1. HTTP 클라이언트 지원

클라이언트에서 Kafka 생산자 및 소비자 API를 사용하는 대신 AMQ Streams Kafka Bridge를 설정하고 사용할 수 있습니다. Kafka 브리지는 HTTP 기반 클라이언트가 Kafka 클러스터와 상호 작용할 수 있는 RESTful 인터페이스를 제공합니다. Kafka 프로토콜을 해석해야 하는 클라이언트 애플리케이션 없이도 Strimzi에 대한 웹 API 연결의 이점을 제공합니다. Kafka는 TCP를 통해 바이너리 프로토콜을 사용합니다.

자세한 내용은 AMQ Streams Kafka Bridge 사용을 참조하십시오.

1.2. 생산자 및 소비자 튜닝

더 많은 구성 속성을 추가하여 Kafka 클라이언트의 성능을 최적화할 수 있습니다. 클라이언트 및 브로커 구성의 성능을 분석할 시간이 있을 때 이 작업을 수행해야 합니다.

자세한 내용은 Kafka 구성 튜닝을 참조하십시오.

1.3. 클라이언트 상호 작용 모니터링

분산 추적은 메시지의 엔드 투 엔드 추적을 용이하게 합니다. Kafka 소비자 및 생산자 클라이언트 애플리케이션에서 추적을 활성화할 수 있습니다.

자세한 내용은 다음 가이드의 분산 추적 설명서를 참조하십시오.

참고

클라이언트 애플리케이션이라는 용어를 사용할 때 Kafka 생산자와 소비자를 사용하여 Kafka 클러스터와 메시지를 보내고 받는 애플리케이션을 구체적으로 참조합니다. 고유한 고유한 사용 사례 및 기능이 있는 Kafka Connect 또는 Kafka Streams와 같은 다른 Kafka 구성 요소를 참조하지 않습니다.

2장. 클라이언트 개발 사전 요구 사항

AMQ Streams와 함께 사용할 클라이언트를 개발하려면 다음 사전 요구 사항이 필요합니다.

  • Red Hat 계정이 있어야 합니다.
  • AMQ Streams에서 Kafka 클러스터가 실행되고 있습니다.
  • Kafka 브로커는 보안 클라이언트 연결을 위해 리스너로 구성됩니다.
  • 클러스터에 대한 주제가 생성되었습니다.
  • 클라이언트를 개발하고 테스트할 IDE가 있습니다.
  • JDK 11 이상이 설치되어 있어야 합니다.

3장. Maven 프로젝트에 클라이언트 종속 항목 추가

Java 기반 Kafka 클라이언트를 개발하는 경우 Kafka Streams를 포함한 Kafka 클라이언트의 Red Hat 종속 항목을 Maven 프로젝트의 pom.xml 파일에 추가할 수 있습니다. Red Hat에서 빌드한 클라이언트 라이브러리만 AMQ Streams에서 지원됩니다.

다음 아티팩트를 종속 항목으로 추가할 수 있습니다.

kafka-clients

Kafka Producer,Consumer, AdminClient API를 포함합니다.

  • Producer API를 사용하면 애플리케이션에서 Kafka 브로커로 데이터를 보낼 수 있습니다.
  • Consumer API를 사용하면 애플리케이션에서 Kafka 브로커의 데이터를 사용할 수 있습니다.
  • AdminClient API는 주제, 브로커 및 기타 구성 요소를 포함하여 Kafka 클러스터 관리를 위한 기능을 제공합니다.
kafka-streams

KafkaStreams API를 포함합니다.

Kafka 스트림을 사용하면 애플리케이션에서 하나 이상의 입력 스트림에서 데이터를 수신할 수 있습니다. 이 API를 사용하여 매핑, 필터링 및 조인과 같은 데이터 스트림에서 일련의 실시간 작업을 실행할 수 있습니다. Kafka Streams를 사용하여 하나 이상의 출력 스트림에 결과를 작성할 수 있습니다. Red Hat Maven 리포지토리에서 사용할 수 있는 kafka-streams JAR 패키지의 일부입니다.

3.1. Maven 프로젝트에 Kafka 클라이언트 종속성 추가

Kafka 클라이언트의 Red Hat 종속성을 Maven 프로젝트에 추가합니다.

사전 요구 사항

  • 기존 pom.xml 이 있는 Maven 프로젝트 .

절차

  1. Maven 프로젝트의 pom.xml 파일의 < repositories > 섹션에 Red Hat Maven 리포지토리를 추가합니다.

    <repositories>
        <repository>
            <id>redhat-maven</id>
            <url>https://maven.repository.redhat.com/ga/</url>
        </repository>
    </repositories>
  2. kafka-clients 를 Maven 프로젝트 pom.xml 파일에 <dependency >로 추가합니다.

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.5.0.redhat-00009</version>
        </dependency>
    </dependencies>
  3. Maven 프로젝트를 빌드하여 Kafka 클라이언트 종속성을 프로젝트에 추가합니다.

3.2. Maven 프로젝트에 Kafka Streams 종속성 추가

Kafka Streams의 Red Hat 종속성을 Maven 프로젝트에 추가합니다.

사전 요구 사항

  • 기존 pom.xml 이 있는 Maven 프로젝트 .

절차

  1. Maven 프로젝트의 pom.xml 파일의 < repositories > 섹션에 Red Hat Maven 리포지토리를 추가합니다.

    <repositories>
        <repository>
            <id>redhat-maven</id>
            <url>https://maven.repository.redhat.com/ga/</url>
        </repository>
    </repositories>
  2. kafka-streams 를 Maven 프로젝트 pom.xml 파일에 <dependency >로 추가합니다.

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>3.5.0.redhat-00009</version>
        </dependency>
    </dependencies>
  3. Maven 프로젝트를 빌드하여 Kafka Streams 종속성을 프로젝트에 추가합니다.

3.3. Maven 프로젝트에 OAuth 2.0 종속성 추가

OAuth 2.0의 Red Hat 종속성을 Maven 프로젝트에 추가합니다.

사전 요구 사항

  • 기존 pom.xml 이 있는 Maven 프로젝트 .

절차

  1. Maven 프로젝트의 pom.xml 파일의 < repositories > 섹션에 Red Hat Maven 리포지토리를 추가합니다.

    <repositories>
        <repository>
            <id>redhat-maven</id>
            <url>https://maven.repository.redhat.com/ga/</url>
        </repository>
    </repositories>
  2. kafka-oauth-client 를 Maven 프로젝트의 pom.xml 파일에 < dependency >로 추가합니다.

    <dependency>
     <groupId>io.strimzi</groupId>
     <artifactId>kafka-oauth-client</artifactId>
     <version>0.13.0.redhat-00008</version>
    </dependency>
  3. 프로젝트에 OAuth 2.0 종속성을 추가하기 위해 Maven 프로젝트를 빌드합니다.

4장. Kafka 클러스터에 연결하기 위한 클라이언트 애플리케이션 구성

Kafka 클러스터에 연결하려면 클라이언트 애플리케이션을 브로커를 식별하고 연결을 활성화하는 최소 속성 세트로 구성해야 합니다. 또한 Kafka에서 사용하는 바이트 배열 형식으로 메시지를 변환하기 위해 serializer/deserializer 메커니즘을 추가해야 합니다. 소비자 클라이언트를 개발할 때 사용 가능한 모든 브로커를 검색하는 데 사용되는 Kafka 클러스터에 초기 연결을 추가하여 시작합니다. 연결을 설정한 경우 Kafka 주제에서 메시지 사용을 시작하거나 메시지를 생성할 수 있습니다.

로그 및 메트릭 컬렉션에서 클라이언트를 ID로 식별할 수 있도록 고유한 클라이언트 ID를 사용하는 것이 좋습니다.

속성 파일에서 속성을 구성할 수 있습니다. 속성 파일을 사용하면 코드를 다시 컴파일하지 않고도 구성을 수정할 수 있습니다.

예를 들어 다음 코드를 사용하여 Java 클라이언트에서 속성을 로드할 수 있습니다.

클라이언트에 구성 속성 로드

Properties properties = new Properties();
InsetPropertyStream insetPropertyStream = new FileInsetPropertyStream("config.properties");
properties.load(insetPropertyStream);
KafkaProducer<String, String> consumer = new KafkaProducer<>(properties);

구성 개체의 코드에 속성을 직접 추가할 수도 있습니다.You can also use add the properties directly to the code in a configuration object. 예를 들어 Java 클라이언트 애플리케이션에 setProperty() 메서드를 사용할 수 있습니다. 구성할 속성만 있는 경우 속성을 직접 추가하는 것이 유용한 옵션입니다.

4.1. 기본 프로듀서 클라이언트 구성

생산자 클라이언트를 개발할 때 다음을 구성합니다.

  • Kafka 클러스터에 대한 연결
  • Kafka 브로커의 메시지 키를 바이트로 변환하는 직렬화기
  • Kafka 브로커의 메시지 값을 바이트로 변환하는 직렬화기

압축된 메시지를 보내고 저장할 경우 압축 유형을 추가할 수도 있습니다.

기본 프로듀서 클라이언트 구성 속성

client.id = my-producer-id 1
bootstrap.servers = my-cluster-kafka-bootstrap:9092 2
key.serializer = org.apache.kafka.common.serialization.StringSerializer 3
value.serializer = org.apache.kafka.common.serialization.StringSerializer 4

1
클라이언트의 논리 이름입니다.
2
Kafka 클러스터에 초기 연결을 수행할 수 있도록 클라이언트의 부트스트랩 주소입니다.
3
Kafka 브로커로 전송되기 전에 메시지 키를 바이트로 변환하기 위한 직렬화기입니다.
4
Kafka 브로커로 전송되기 전에 메시지 값을 바이트로 변환하기 위한 직렬화기입니다.

코드에 직접 생산자 클라이언트 구성 추가

Properties props = new Properties();
props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-producer-id");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

KafkaProducer 는 전송하는 메시지에 대한 문자열 키 및 값 유형을 지정합니다. 사용된 serializers는 Kafka로 보내기 전에 지정된 유형의 키와 값을 바이트로 변환할 수 있어야 합니다.

4.2. 기본 소비자 클라이언트 구성

소비자 클라이언트를 개발할 때 다음을 구성합니다.

  • Kafka 클러스터에 대한 연결
  • Kafka 브로커에서 가져온 바이트를 클라이언트 애플리케이션에서 이해할 수 있는 메시지 키로 변환하는 역직렬자
  • Kafka 브로커에서 가져온 바이트를 클라이언트 애플리케이션에서 이해할 수 있는 메시지 값으로 변환하는 역직렬자

일반적으로 소비자 그룹 ID를 추가하여 소비자 그룹과 소비자 그룹을 연결합니다. 소비자 그룹은 하나 이상의 주제에서 병렬 소비자에 대규모 데이터 스트림 처리를 분산하기 위한 논리적 엔티티입니다. Consumer는 group.id 를 사용하여 그룹화되므로 메시지가 멤버 전체에 분산됩니다. 지정된 소비자 그룹에서 각 주제 파티션을 단일 소비자가 읽습니다. 단일 소비자는 여러 파티션을 처리할 수 있습니다. 최대 병렬 처리의 경우 각 파티션에 대해 하나의 소비자를 만듭니다. 파티션보다 많은 소비자가 있는 경우 일부 소비자는 유휴 상태로 유지되고 오류가 발생할 경우 이를 대신할 준비가 되어 있습니다.

기본 소비자 클라이언트 구성 속성

client.id = my-consumer-id 1
group.id = my-group-id 2
bootstrap.servers = my-cluster-kafka-bootstrap:9092 3
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer 4
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer 5

1
클라이언트의 논리 이름입니다.
2
소비자가 특정 소비자 그룹에 참여할 수 있는 그룹 ID입니다.
3
Kafka 클러스터에 초기 연결을 수행할 수 있도록 클라이언트의 부트스트랩 주소입니다.
4
Kafka 브로커에서 가져온 바이트를 메시지 키로 변환하는 Deserializer입니다.
5
Kafka 브로커에서 가져온 바이트를 메시지 값으로 변환하는 Deserializer입니다.

코드에 소비자 클라이언트 구성 직접 추가

Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer-id");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

KafkaConsumer 는 수신하는 메시지에 대한 문자열 키 및 값 유형을 지정합니다. 사용된 serializers는 Kafka에서 수신한 바이트를 지정된 유형으로 변환할 수 있어야 합니다.

참고

각 소비자 그룹에는 고유한 group.id 가 있어야 합니다. 동일한 group.id 를 사용하여 소비자를 다시 시작하면 중지되기 전에 해제된 위치에서 메시지 사용을 재개합니다.

5장. 보안 연결 구성

Kafka 클러스터와 클라이언트 애플리케이션 간의 연결을 보호하면 클러스터와 클라이언트 간의 통신의 기밀성, 무결성 및 신뢰성을 보장할 수 있습니다.

보안 연결을 위해 인증, 암호화 및 권한 부여와 관련된 구성을 도입할 수 있습니다.

인증
인증 메커니즘을 사용하여 클라이언트 애플리케이션의 ID를 확인합니다.
암호화
SSL/TLS 암호화를 사용하여 클라이언트와 브로커 간 전송 중 데이터 암호화를 활성화합니다.
권한 부여
클라이언트 애플리케이션의 인증된 ID를 기반으로 Kafka 브로커에서 허용된 클라이언트 액세스 및 작업을 제어합니다.

권한 부여는 인증 없이 사용할 수 없습니다. 인증이 활성화되지 않은 경우 클라이언트의 ID를 확인할 수 없으므로 권한 부여 규칙을 적용할 수 없습니다. 즉, 권한 부여 규칙이 정의되어 있어도 인증없이 적용되지 않습니다.

AMQ Streams에서 리스너는 Kafka 브로커와 클라이언트 간의 네트워크 연결을 구성하는 데 사용됩니다. 리스너 구성 옵션은 브로커가 들어오는 클라이언트 연결을 청취하는 방법과 보안 액세스 관리 방법을 결정합니다. 필요한 정확한 구성은 선택한 인증, 암호화 및 권한 부여 메커니즘에 따라 다릅니다.

보안 기능을 활성화하도록 Kafka 브로커 및 클라이언트 애플리케이션을 구성합니다. Kafka 클러스터에 대한 클라이언트 연결을 보호하는 일반적인 개요는 다음과 같습니다.

  1. Kafka 클러스터를 포함하여 AMQ Streams 구성 요소를 설치합니다.
  2. TLS의 경우 각 브로커 및 클라이언트 애플리케이션에 대한 TLS 인증서를 생성합니다.
  3. 보안 연결을 위해 브로커 구성에서 리스너를 구성합니다.
  4. 보안 연결을 위해 클라이언트 애플리케이션을 구성합니다.

Kafka 브로커와 안전하고 인증된 연결을 구축하기 위해 사용하는 메커니즘에 따라 클라이언트 애플리케이션을 구성합니다. Kafka 브로커가 사용하는 인증, 암호화 및 권한은 연결 클라이언트 애플리케이션에서 사용하는 인증, 암호화 및 권한과 일치해야 합니다. 클라이언트 애플리케이션 및 브로커는 보안 통신을 수행하기 위해 보안 프로토콜 및 구성에 동의해야 합니다. 예를 들어 Kafka 클라이언트와 Kafka 브로커는 동일한 TLS 버전 및 암호화 제품군을 사용해야 합니다.

참고

클라이언트와 브로커 간의 보안 구성이 일치하지 않으면 연결에 실패하거나 잠재적인 보안 취약점이 발생할 수 있습니다. 브로커 및 클라이언트 애플리케이션을 신중하게 구성하고 테스트하여 올바르게 보호하고 안전하게 통신할 수 있는지 확인하는 것이 중요합니다.

5.1. 보안 액세스를 위해 브로커 설정

보안 액세스를 위해 클라이언트 애플리케이션을 구성하려면 먼저 사용하려는 보안 메커니즘을 지원하도록 Kafka 클러스터의 브로커를 설정해야 합니다. 보안 연결을 활성화하려면 보안 메커니즘에 대한 적절한 구성으로 리스너를 생성합니다.

5.1.1. RHEL에서 실행되는 Kafka 클러스터에 대한 보안 연결 설정

RHEL에서 AMQ Streams를 사용하는 경우 Kafka 클러스터에 대한 클라이언트 연결을 보호하는 일반적인 개요는 다음과 같습니다.

  1. RHEL 서버에 Kafka 클러스터를 포함한 AMQ Streams 구성 요소를 설치합니다.
  2. TLS의 경우 Kafka 클러스터의 모든 브로커에 대한 TLS 인증서를 생성합니다.
  3. 브로커 구성 속성 파일에서 리스너를 구성합니다.

    • TLS 또는 SASL SCRAM-SHA-512와 같은 Kafka 클러스터 리스너에 대한 인증을 구성합니다.
    • Kafka 클러스터에서 활성화된 모든 리스너(예: 간단한 권한 부여)에 대한 권한을 구성합니다.
  4. TLS의 경우 각 클라이언트 애플리케이션에 대한 TLS 인증서를 생성합니다.
  5. config.properties 파일을 생성하여 클라이언트 애플리케이션에서 사용하는 연결 세부 사항 및 인증 자격 증명을 지정합니다.
  6. Kafka 클라이언트 애플리케이션을 시작하고 Kafka 클러스터에 연결합니다.

    • config.properties 파일에 정의된 속성을 사용하여 Kafka 브로커에 연결합니다.
  7. 클라이언트가 Kafka 클러스터에 성공적으로 연결하고 메시지를 안전하게 소비하고 생성할 수 있는지 확인합니다.

브로커 설정에 대한 자세한 내용은 RHEL에서 AMQ Streams 사용을 참조하십시오.

5.1.2. RHEL에서 Kafka 클러스터의 보안 리스너 구성

구성 속성 파일을 사용하여 Kafka에서 리스너를 구성합니다. Kafka 브로커에 대한 보안 연결을 구성하려면 이 파일에서 TLS, SASL 및 기타 보안 관련 구성에 대한 관련 속성을 설정합니다.

다음은 PKCS#12 형식의 키 저장소 및 신뢰 저장소를 사용하여 Kafka 브로커의 server.properties 구성 파일에 지정된 TLS 리스너 구성의 예입니다.

server.properties의 리스너 구성 예

listeners = listener_1://0.0.0.0:9093, listener_2://0.0.0.0:9094
listener.security.protocol.map = listener_1:SSL, listener_2:PLAINTEXT
ssl.keystore.type = PKCS12
ssl.keystore.location = /path/to/keystore.p12
ssl.keystore.password = <password>
ssl.truststore.type = PKCS12
ssl.truststore.location = /path/to/truststore.p12
ssl.truststore.password = <password>
ssl.client.auth = required
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer.
super.users = User:superuser

listeners 속성은 각 리스너 이름을 지정하고 브로커가 수신 대기하는 IP 주소와 포트를 지정합니다. 프로토콜 맵은 listener_1 리스너가 TLS 암호화를 사용하는 클라이언트에 대해 SSL 프로토콜을 사용하도록 지시합니다. listener_2 는 TLS 암호화를 사용하지 않는 클라이언트에 대해 PLAINTEXT 연결을 제공합니다. 키 저장소에는 브로커의 개인 키와 인증서가 포함되어 있습니다. 신뢰 저장소에는 클라이언트 애플리케이션의 ID를 확인하는 데 사용되는 신뢰할 수 있는 인증서가 포함되어 있습니다. ssl.client.auth 속성은 클라이언트 인증을 적용합니다.

Kafka 클러스터는 간단한 권한 부여를 사용합니다. 작성자는 SimpleAclAuthorizer 로 설정됩니다. 단일 슈퍼 사용자는 모든 리스너에 대한 제약되지 않은 액세스에 대해 정의됩니다. AMQ Streams는 Kafka SimpleAclAuthorizer 및 사용자 정의 인증 관리자 플러그인을 지원합니다.

구성 속성 앞에 listener.name.<name_of_listener >를 지정하면 구성이 해당 리스너와 관련이 있습니다.

이는 샘플 구성일 뿐입니다. 일부 구성 옵션은 리스너 유형에 따라 다릅니다. OAuth 2.0 또는OPA(Open Policy Agent)를 사용하는 경우 특정 리스너에서 권한 부여 서버 또는 OPA 서버에 대한 액세스도 구성해야 합니다. 특정 요구 사항 및 환경에 따라 리스너를 생성할 수 있습니다.

리스너 구성에 대한 자세한 내용은 Apache Kafka 설명서 를 참조하십시오.

ACL을 사용하여 액세스 세부 조정

ACL(액세스 제어 목록)을 사용하여 Kafka 클러스터에 대한 액세스를 미세 조정할 수 있습니다. ACL(액세스 제어 목록)을 생성하고 관리하려면 kafka-acls.sh 명령줄 도구를 사용합니다. ACL은 클라이언트 애플리케이션에 액세스 규칙을 적용합니다.

다음 예에서 첫 번째 ACL은 my-topic 이라는 특정 항목에 대한 읽기 및 설명 권한을 부여합니다. resource.patternType리터럴 로 설정되며, 이는 리소스 이름이 정확히 일치해야 함을 의미합니다.

두 번째 ACL은 my-group 이라는 특정 소비자 그룹에 대해 읽기 권한을 부여합니다. resource.patternType접두사 로 설정되어 리소스 이름이 접두사와 일치해야 함을 의미합니다.

ACL 구성 예

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:my-user --operation Read --operation Describe --topic my-topic --resource-pattern-type literal \
--allow-principal User:my-user --operation Read --group my-group --resource-pattern-type prefixed

5.1.3. OpenShift에서 실행되는 Kafka 클러스터에 대한 보안 연결 설정

OpenShift에서 AMQ Streams를 사용하는 경우 Kafka 클러스터에 대한 클라이언트 연결을 보호하는 일반적인 개요는 다음과 같습니다.

  1. Cluster Operator를 사용하여 OpenShift 환경에 Kafka 클러스터를 배포합니다. Kafka 사용자 지정 리소스를 사용하여 클러스터를 구성 및 설치하고 리스너를 생성합니다.

    • TLS 또는 SASL SCRAM-SHA-512와 같은 리스너에 대한 인증을 구성합니다. Cluster Operator는 Kafka 브로커의 ID를 확인하기 위해 클러스터 CA 인증서가 포함된 보안을 생성합니다.
    • 간단한 권한 부여와 같이 활성화된 모든 리스너에 대한 권한 부여를 구성합니다.
  2. User Operator를 사용하여 클라이언트를 나타내는 Kafka 사용자를 생성합니다. KafkaUser 사용자 지정 리소스를 사용하여 사용자를 구성하고 생성합니다.

    • 리스너의 인증 메커니즘과 일치하는 Kafka 사용자(클라이언트)에 대한 인증을 구성합니다. User Operator는 클라이언트가 Kafka 클러스터와의 인증에 사용할 클라이언트 인증서 및 개인 키가 포함된 보안을 생성합니다.
    • 리스너의 권한 부여 메커니즘과 일치하는 Kafka 사용자(클라이언트)의 권한을 구성합니다. 권한 부여 규칙을 사용하면 Kafka 클러스터에서 특정 작업을 수행할 수 있습니다.
  3. config.properties 파일을 생성하여 클러스터에 연결하기 위해 클라이언트 애플리케이션에 필요한 연결 세부 정보 및 인증 자격 증명을 지정합니다.
  4. Kafka 클라이언트 애플리케이션을 시작하고 Kafka 클러스터에 연결합니다.

    • config.properties 파일에 정의된 속성을 사용하여 Kafka 브로커에 연결합니다.
  5. 클라이언트가 Kafka 클러스터에 성공적으로 연결하고 메시지를 안전하게 소비하고 생성할 수 있는지 확인합니다.

브로커 설정에 대한 자세한 내용은 OpenShift에서 AMQ Streams 구성을 참조하십시오.

5.1.4. OpenShift에서 Kafka 클러스터의 보안 리스너 구성

AMQ Streams를 사용하여 Kafka 사용자 정의 리소스를 배포할 때 Kafka 사양에 리스너 구성을 추가합니다. Kafka에서 연결 보안을 위해 리스너 구성을 사용합니다. Kafka 브로커에 대한 보안 연결을 구성하려면 리스너 수준에서 TLS, SASL 및 기타 보안 관련 구성의 관련 속성을 설정합니다.

외부 리스너는 OpenShift 클러스터 외부에서 Kafka 클러스터에 대한 클라이언트 액세스를 제공합니다. AMQ Streams는 구성 기반 Kafka 클러스터에 액세스할 수 있도록 리스너 서비스 및 부트스트랩 주소를 생성합니다. 예를 들어 다음 연결 메커니즘을 사용하는 외부 리스너를 생성할 수 있습니다.

  • 노드 포트
  • loadbalancers
  • OpenShift 경로

다음은 Kafka 리소스에 대한 노드 포트 리스너 구성의 예입니다.

Kafka 리소스의 리스너 구성 예

apiVersion: {KafkaApiVersion}
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    # ...
    listeners:
      - name: plaintext
        port: 9092
        type: internal
        tls: false
        configuration:
          useServiceDnsDomain: true
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: route
        tls: true
        authentication:
          type: tls
    authorization:
      type: simple
      superUsers:
        - CN=superuser
  # ...

listeners 속성은 일반 텍스트 ,tls 외부의 세 개의 리스너로 구성됩니다. 외부 리스너는 nodeport 유형이며 암호화 및 인증에 TLS를 사용합니다. Cluster Operator를 사용하여 Kafka 클러스터를 생성하면 CA 인증서가 자동으로 생성됩니다. 클라이언트 애플리케이션의 신뢰 저장소에 클러스터 CA를 추가하여 Kafka 브로커의 ID를 확인합니다. 또는 브로커 또는 리스너 수준에서 자체 인증서를 사용하도록 AMQ Streams를 구성할 수 있습니다. 클라이언트 애플리케이션에 다른 보안 구성이 필요한 경우 리스너 수준에서 인증서를 사용해야 할 수 있습니다. 리스너 수준에서 인증서를 사용하면 추가 제어 및 보안 계층도 추가됩니다.

작은 정보

구성 공급자 플러그인을 사용하여 구성 데이터를 생산자 및 소비자 클라이언트에 로드합니다. 구성 공급자 플러그인은 시크릿 또는 ConfigMap에서 구성 데이터를 로드합니다. 예를 들어 Strimzi 시크릿에서 인증서를 자동으로 가져오도록 공급자에게 지시할 수 있습니다. 자세한 내용은 OpenShift에서 실행하기 위한 AMQ Streams 설명서를 참조하십시오.

Kafka 클러스터는 간단한 권한 부여를 사용합니다. 권한 부여 속성 유형은 simple 로 설정됩니다. 단일 슈퍼 사용자는 모든 리스너에 대한 제약되지 않은 액세스에 대해 정의됩니다. AMQ Streams는 Kafka SimpleAclAuthorizer 및 사용자 정의 인증 관리자 플러그인을 지원합니다.

이는 샘플 구성일 뿐입니다. 일부 구성 옵션은 리스너 유형에 따라 다릅니다. OAuth 2.0 또는OPA(Open Policy Agent)를 사용하는 경우 특정 리스너에서 권한 부여 서버 또는 OPA 서버에 대한 액세스도 구성해야 합니다. 특정 요구 사항 및 환경에 따라 리스너를 생성할 수 있습니다.

리스너 구성에 대한 자세한 내용은 GenericKafkaListener 스키마 참조를 참조하십시오.

참고

OpenShift에서 Kafka 클러스터에 대한 클라이언트 액세스에 대해 경로 유형 리스너를 사용하는 경우 TLS 패스스루 기능이 활성화됩니다. OpenShift 경로는 HTTP 프로토콜과 함께 작동하도록 설계되었지만 Apache Kafka에서 사용하는 Kafka 프로토콜을 포함하여 다른 프로토콜의 네트워크 트래픽을 프록시하는 데 사용할 수도 있습니다. 클라이언트는 경로에 대한 연결을 설정하고 경로는 TLS 서버 이름 표시(SNI) 확장을 사용하여 대상 호스트 이름을 가져오는 OpenShift 클러스터에서 실행 중인 브로커로 트래픽을 전달합니다. SNI 확장을 사용하면 경로가 각 연결에 대한 대상 브로커를 올바르게 식별할 수 있습니다.

ACL을 사용하여 액세스 세부 조정

ACL(액세스 제어 목록)을 사용하여 Kafka 클러스터에 대한 액세스를 미세 조정할 수 있습니다. ACL(액세스 제어 목록)을 추가하려면 KafkaUser 사용자 정의 리소스를 구성합니다. KafkaUser 를 생성하면 AMQ Streams에서 자동으로 ACL 생성 및 업데이트를 관리합니다. ACL은 클라이언트 애플리케이션에 액세스 규칙을 적용합니다.

다음 예에서 첫 번째 ACL은 my-topic 이라는 특정 항목에 대한 읽기 및 설명 권한을 부여합니다. resource.patternType리터럴 로 설정되며, 이는 리소스 이름이 정확히 일치해야 함을 의미합니다.

두 번째 ACL은 my-group 이라는 특정 소비자 그룹에 대해 읽기 권한을 부여합니다. resource.patternType접두사 로 설정되어 리소스 이름이 접두사와 일치해야 함을 의미합니다.

KafkaUser 리소스의 ACL 구성 예

apiVersion: {KafkaUserApiVersion}
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  # ...
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operations:
          - Read
          - Describe
      - resource:
          type: group
          name: my-group
          patternType: prefix
        operations:
          - Read

참고

Kafka 사용자를 구성할 때 tls-external 을 인증 옵션으로 지정하는 경우 User Operator에서 생성한 클라이언트 인증서 대신 자체 클라이언트 인증서를 사용할 수 있습니다.

5.2. 보안 액세스를 위해 클라이언트 설정

Kafka 브로커에 보안 연결을 지원하도록 리스너를 설정한 후 다음 단계는 Kafka 클러스터와 통신하도록 이러한 리스너를 사용하도록 클라이언트 애플리케이션을 구성하는 것입니다. 이를 위해서는 리스너에 구성된 보안 메커니즘에 따라 클러스터 인증을 위해 각 클라이언트에 적절한 보안 설정을 제공해야 합니다.

5.2.1. 보안 프로토콜 구성

Kafka 브로커 리스너에 구성된 프로토콜과 일치하도록 클라이언트 애플리케이션에서 사용하는 보안 프로토콜을 구성합니다. 예를 들어 TLS 인증에 SSL (Secure Sockets Layer)을 사용하거나 TLS 암호화를 사용하여 SASL (Simple Authentication and Security Layer over SSL) 인증에 대해 SSL(Secure Sockets Layer)을 사용합니다. Kafka 클러스터에 액세스하는 데 필요한 인증 메커니즘을 지원하는 클라이언트 구성에 신뢰 저장소 및 키 저장소를 추가합니다.

truststore
신뢰 저장소에는 Kafka 브로커의 진위 여부를 확인하는 데 사용되는 신뢰할 수 있는 CA(인증 기관)의 공용 인증서가 포함되어 있습니다. 클라이언트에서 보안 Kafka 브로커에 연결하면 브로커의 ID를 확인해야 할 수 있습니다.
키 저장소
키 저장소에는 클라이언트의 개인 키와 공개 인증서가 포함됩니다. 클라이언트가 브로커에 자신을 인증하려는 경우 자체 인증서를 제공합니다.

TLS 인증을 사용하는 경우 Kafka 클라이언트 구성에 Kafka 클러스터에 연결하기 위한 신뢰 저장소 및 키 저장소가 필요합니다. SASL SCRAM-SHA-512를 사용하는 경우 디지털 인증서가 아닌 사용자 이름과 암호 자격 증명을 교환하여 인증을 수행하므로 키 저장소가 필요하지 않습니다. SCRAM-SHA-512는 더 간단한 메커니즘이지만 인증서 기반 인증을 사용하는 것만큼 안전하지는 않습니다.

참고

자체 인증서 인프라가 있고 타사 CA의 인증서를 사용하는 경우 클라이언트의 기본 신뢰 저장소에 이미 공용 CA 인증서가 포함되어 클라이언트의 신뢰 저장소에 추가할 필요가 없습니다. 기본 신뢰 저장소에 이미 포함된 공용 CA 인증서 중 하나에서 서명한 경우 클라이언트는 서버의 인증서를 자동으로 신뢰합니다.

config.properties 파일을 생성하여 클라이언트 애플리케이션에서 사용하는 인증 자격 증명을 지정할 수 있습니다.

다음 예에서 security.protocol 은 클라이언트와 브로커 간의 TLS 인증 및 암호화를 활성화하기 위해 SSL 로 설정됩니다.

ssl.truststore.locationssl.truststore.password 속성은 신뢰 저장소의 위치와 암호를 지정합니다. ssl.keystore.locationssl.keystore.password 속성은 키 저장소의 위치와 암호를 지정합니다.

PKCS #12(Public-Key Cryptography Standards #12) 파일 형식이 사용됩니다. base64로 인코딩된 PEM(Privacy Enhanced mail) 형식을 사용할 수도 있습니다.

TLS 인증을 위한 클라이언트 구성 속성 예

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SSL
ssl.truststore.location = /path/to/ca.p12
ssl.truststore.password = truststore-password
ssl.keystore.location = /path/to/user.p12
ssl.keystore.password = keystore-password
client.id = my-client

다음 예에서 security.protocolSASL_SSL 로 설정되어 클라이언트와 브로커 간의 TLS 암호화로 SASL 인증을 활성화합니다. 인증만 필요하며 암호화가 필요한 경우 SASL 프로토콜을 사용할 수 있습니다. 인증을 위해 지정된 SASL 메커니즘은 SCRAM-SHA-512 입니다. 다양한 인증 메커니즘을 사용할 수 있습니다. SASL.jaas.config 속성은 인증 자격 증명을 지정합니다.

SCRAM-SHA-512 인증에 대한 클라이언트 구성 속성의 예

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = SCRAM-SHA-512
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \
  username = "user" \
  password = "secret";
ssl.truststore.location = path/to/truststore.p12
ssl.truststore.password = truststore_password
ssl.truststore.type = PKCS12
client.id = my-client

참고

PEM 형식을 지원하지 않는 애플리케이션의 경우 OpenSSL과 같은 도구를 사용하여 PEM 파일을 PKCS #12 형식으로 변환할 수 있습니다.

5.2.2. 허용된 TLS 버전 및 암호화 제품군 구성

SSL 구성 및 암호화 제품군을 통합하여 클라이언트 애플리케이션과 Kafka 클러스터 간의 TLS 기반 통신을 추가로 보호할 수 있습니다. Kafka 브로커 구성에서 지원되는 TLS 버전 및 암호화 제품군을 지정합니다. 사용하는 TLS 버전 및 암호화 제품군을 제한하려면 클라이언트에 구성을 추가할 수도 있습니다. 클라이언트의 구성은 브로커에서 활성화된 프로토콜 및 암호화 제품군만 사용해야 합니다.

다음 예에서 SSL은 Kafka 브로커와 클라이언트 애플리케이션 간의 통신에 security.protocol 을 사용하여 활성화됩니다. 암호화 제품군을 쉼표로 구분된 목록으로 지정합니다. ssl.cipher.suites 속성은 클라이언트가 사용할 수 있는 쉼표로 구분된 암호화 제품군 목록입니다.

Kafka 브로커의 SSL 구성 속성 예

security.protocol: "SSL"
ssl.enabled.protocols: "TLSv1.3", "TLSv1.2"
ssl.protocol: "TLSv1.3"
ssl.cipher.suites: "TLS_AES_256_GCM_SHA384"

ssl.enabled.protocols 속성은 클러스터와 해당 클라이언트 간의 보안 통신에 사용할 수 있는 사용 가능한 TLS 버전을 지정합니다. 이 경우 TLSv1.3TLSv1.2 가 모두 활성화됩니다. ssl.protocol 속성은 모든 연결에 대한 기본 TLS 버전을 설정하며 활성화된 프로토콜에서 선택해야 합니다. 기본적으로 클라이언트는 TLSv1.3 을 사용하여 통신합니다. 클라이언트가 TLSv1.2만 지원하는 경우 브로커에 계속 연결되고 지원되는 버전을 사용하여 통신할 수 있습니다. 마찬가지로 구성이 클라이언트에 있고 브로커가 TLSv1.2만 지원하는 경우 클라이언트는 지원되는 버전을 사용합니다.

Apache Kafka에서 지원하는 암호화 제품군은 사용 중인 Kafka 버전과 기본 환경에 따라 다릅니다. 가장 높은 수준의 보안을 제공하는 지원되는 최신 암호화 제품군을 확인합니다.

5.2.3. ACL(액세스 제어 목록) 사용

클라이언트 애플리케이션의 ACLS에 대해 명시적으로 구성할 필요는 없습니다. ACL은 Kafka 브로커에 의해 서버 측에서 적용됩니다. 클라이언트에서 데이터를 생성하거나 사용하기 위해 서버에 요청을 보내면 서버는 ACL을 확인하여 클라이언트(사용자)가 요청된 작업을 수행할 수 있는지 확인합니다. 클라이언트가 권한이 부여된 경우 요청이 처리되고, 그렇지 않으면 요청이 거부되고 오류가 반환됩니다. 그러나 Kafka 클러스터와의 보안 연결을 활성화하려면 클라이언트를 계속 인증하고 적절한 보안 프로토콜을 사용해야 합니다.

Kafka 브로커에서 ACL(액세스 제어 목록)을 사용하는 경우 제어하려는 주제 및 작업에 대한 클라이언트 액세스를 제한하도록 ACL이 올바르게 설정되어 있는지 확인합니다. OPA(Open Policy Agent) 정책을 사용하여 액세스를 관리하는 경우 권한 부여 규칙이 정책에 구성되어 있으므로 Kafka 브로커에 대한 ACL을 지정할 필요가 없습니다. OAuth 2.0은 몇 가지 유연성을 제공합니다. OAuth 2.0 공급자를 사용하여 ACL을 관리하거나 OAuth 2.0 및 Kafka의 간단한 권한을 사용하여 ACL을 관리할 수 있습니다.

참고

ACL은 대부분의 요청에 적용되며 작업 생성 및 사용으로 제한되지 않습니다. 예를 들어 ACLS를 새 주제 생성과 같은 주제 또는 쓰기 작업 설명과 같은 읽기 작업에 적용할 수 있습니다.

5.2.4. 토큰 기반 액세스에 OAuth 2.0 사용

OAuth 2.0 공급자를 통해 권한 부여 제어를 적용하려면 AMQ Streams에서 OAuth 2.0 공개 표준을 사용합니다. OAuth 2.0은 애플리케이션이 다른 시스템에 저장된 사용자 데이터에 안전하게 액세스할 수 있는 방법을 제공합니다. 권한 부여 서버는 Kafka 클러스터에 대한 액세스 권한을 부여하는 클라이언트 애플리케이션에 액세스 토큰을 발행할 수 있습니다.

다음 단계에서는 토큰 검증에 OAuth 2.0을 설정하고 사용하는 일반적인 접근 방식을 설명합니다.

  1. 클라이언트 ID 및 시크릿과 같은 브로커 및 클라이언트 자격 증명을 사용하여 권한 부여 서버를 구성합니다.
  2. 권한 부여 서버에서 OAuth 2.0 자격 증명을 가져옵니다.
  3. OAuth 2.0 자격 증명을 사용하여 Kafka 브로커에서 리스너를 구성하고 권한 부여 서버와 상호 작용합니다.
  4. Oauth 2.0 종속성을 클라이언트 라이브러리에 추가합니다.
  5. OAuth 2.0 자격 증명으로 Kafka 클라이언트를 구성하고 권한 부여 서버와 상호 작용합니다.
  6. 런타임에 OAuth 2.0 공급자로 클라이언트를 인증하는 액세스 토큰을 가져옵니다.

Kafka 브로커에 OAuth 2.0에 대한 리스너가 구성된 경우 OAuth 2.0을 사용하도록 클라이언트 애플리케이션을 설정할 수 있습니다. Kafka 클러스터에 액세스하기 위한 표준 Kafka 클라이언트 구성 외에도 OAuth 2.0 인증에 대한 특정 구성을 포함해야 합니다. 또한 사용하는 권한 부여 서버가 Kafka 클러스터 및 클라이언트 애플리케이션에서 액세스할 수 있는지 확인해야 합니다.

SASL(Simple Authentication and Security Layer) 보안 프로토콜 및 메커니즘을 지정합니다. 프로덕션 환경에서는 다음 설정이 권장됩니다.

  • TLS 암호화 연결에 대한 SASL_SSL 프로토콜입니다.
  • 전달자 토큰을 사용하여 자격 증명 교환을 위한 OAUTHBLOGER 메커니즘

Java 인증 및 권한 부여 서비스(Java Authentication and Authorization Service) 모듈은 SASL 메커니즘을 구현합니다. 메커니즘 구성은 사용 중인 인증 방법에 따라 다릅니다. 예를 들어 인증 정보 교환을 사용하면 OAuth 2.0 액세스 토큰 끝점, 액세스 토큰, 클라이언트 ID 및 클라이언트 시크릿을 추가할 수 있습니다. 클라이언트는 권한 부여 서버의 토큰 끝점(URL)에 연결하여 토큰이 여전히 유효한지 확인합니다. 인증된 액세스를 위해 권한 부여 서버의 공개 키 인증서가 포함된 신뢰 저장소도 필요합니다.

OAauth 2.0의 클라이언트 구성 속성 예

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = OAUTHBEARER
# ...
sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    oauth.token.endpoint.uri = "https://localhost:9443/oauth2/token" \
    oauth.access.token = <access_token> \
    oauth.client.id = "<client_id>" \
    oauth.client.secret = "<client_secret>" \
    oauth.ssl.truststore.location = "/<truststore_location>/oauth-truststore.p12" \
    oauth.ssl.truststore.password = "<truststore_password>" \
    oauth.ssl.truststore.type = "PKCS12" \

OAuth 2.0을 사용하도록 브로커를 설정하는 방법에 대한 자세한 내용은 다음 가이드를 참조하십시오.

5.2.5. OPA(Open Policy Agent) 액세스 정책 사용

AMQ Streams와 함께OPA(Open Policy Agent) 정책 에이전트를 사용하여 액세스 정책에 대해 Kafka 클러스터에 연결하는 요청을 평가합니다. OPA(Open Policy Agent)는 권한 부여 정책을 관리하는 정책 엔진입니다. 정책은 클라이언트 애플리케이션을 변경하지 않고도 액세스 제어를 중앙 집중화하고 동적으로 업데이트할 수 있습니다. 예를 들어 특정 사용자(클라이언트)만 특정 항목에 대한 메시지를 생성하고 사용할 수 있는 정책을 만들 수 있습니다.

AMQ Streams는 Kafka 권한 부여에 대한 Open Policy Agent 플러그인을 인증자로 사용합니다.

다음 단계에서는 OPA를 설정하고 사용하는 일반적인 접근 방식을 설명합니다.

  1. OPA 서버의 인스턴스를 설정합니다.
  2. Kafka 클러스터에 대한 액세스를 제어하는 권한 부여 규칙을 제공하는 정책을 정의합니다.
  3. Kafka 브로커가 OPA 인증을 수락하고 OPA 서버와 상호 작용할 수 있도록 구성을 생성합니다.
  4. Kafka 클러스터에 대한 권한 있는 액세스에 대한 인증 정보를 제공하도록 Kafka 클라이언트를 구성합니다.

Kafka 브로커에 OPA용으로 구성된 리스너가 있는 경우 OPA를 사용하도록 클라이언트 애플리케이션을 설정할 수 있습니다. 리스너 구성에서 OPA 서버에 연결할 URL을 지정하고 클라이언트 애플리케이션을 인증합니다. Kafka 클러스터에 액세스하기 위한 표준 Kafka 클라이언트 구성 외에도 Kafka 브로커로 인증할 인증 정보를 추가해야 합니다. 브로커는 인증 정책을 평가하기 위해 OPA 서버에 요청을 전송하여 클라이언트에게 요청된 작업을 수행하는 데 필요한 권한이 있는지 확인합니다. 정책 엔진에서 권한 부여 정책을 적용하기 때문에 신뢰 저장소 또는 키 저장소가 통신을 보호할 필요가 없습니다.

OPA 권한 부여의 클라이언트 구성 속성 예

bootstrap.servers = my-cluster-kafka-bootstrap:9093
security.protocol = SASL_SSL
sasl.mechanism = SCRAM-SHA-512
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \
  username = "user" \
  password = "secret";
# ...

참고

Red Hat은 OPA 서버를 지원하지 않습니다.

OPA를 사용하도록 브로커를 설정하는 방법에 대한 자세한 내용은 다음 가이드를 참조하십시오.

5.2.6. 메시지 스트리밍 시 트랜잭션 사용

브로커 및 생산자 클라이언트 애플리케이션에 트랜잭션 속성을 구성하면 메시지가 단일 트랜잭션에서 처리되도록 할 수 있습니다. 트랜잭션은 메시지 스트리밍에 신뢰성과 일관성을 추가합니다.

트랜잭션은 항상 브로커에서 활성화됩니다. 다음 속성을 사용하여 기본 구성을 변경할 수 있습니다.

트랜잭션에 대한 Kafka 브로커 구성 속성의 예

transaction.state.log.replication.factor = 3
transaction.state.log.min.isr = 2
transaction.abort.timed.out.transaction.cleanup.interval.ms = 3600000

이는 내부 __ECDHE_state 항목에 대한 복제본 3개를 생성하는 프로덕션 환경의 일반적인 구성입니다. \__ECDHE_state 주제는 진행 중인 트랜잭션에 대한 정보를 저장합니다. 트랜잭션 로그에 최소 2개의 동기화 복제본이 필요합니다. 정리 간격은 시간 초과 트랜잭션을 점검하고 해당 트랜잭션 로그를 정리하는 시간입니다.

클라이언트 구성에 트랜잭션 속성을 추가하려면 생산자 및 소비자에 대해 다음 속성을 설정해야 합니다.

트랜잭션에 대한 생산자 클라이언트 구성 속성의 예

transactional.id = unique-transactional-id
enable.idempotence = true
max.in.flight.requests.per.connection = 5
acks = all
transaction.timeout.ms = 30000
delivery.timeout = 25000

트랜잭션 ID를 사용하면 Kafka 브로커가 트랜잭션을 추적할 수 있습니다. 이는 생산자의 고유 식별자이며 특정 파티션 세트와 함께 사용해야 합니다. 여러 파티션 집합에 대해 트랜잭션을 수행해야 하는 경우 각 세트에 대해 다른 트랜잭션 ID를 사용해야 합니다. 생산자 인스턴스가 중복 메시지를 생성하지 않도록 멱등이 활성화됩니다. idempotence를 사용하면 생산자 ID와 시퀀스 번호를 사용하여 메시지가 추적됩니다. 브로커가 메시지를 수신하면 생산자 ID와 시퀀스 번호를 확인합니다. 동일한 생산자 ID와 시퀀스 번호가 있는 메시지가 이미 수신되면 브로커는 중복 메시지를 삭제합니다.

최대 진행 중인 요청 수는 트랜잭션이 전송되는 순서대로 처리되도록 5로 설정됩니다. 파티션은 메시지 순서를 손상시키지 않고 최대 5개의 진행 중인 요청을 포함할 수 있습니다.

acks모든 것으로 설정하면 생산자는 트랜잭션을 완료로 간주하기 전에 작성 중인 주제 파티션의 모든 동기화 내 복제본에서 승인이 완료될 때까지 기다립니다. 이렇게 하면 Kafka 클러스터에 메시지가 기록(커밋)되고 브로커 오류가 발생한 경우에도 메시지가 손실되지 않습니다.

트랜잭션 시간 초과는 클라이언트가 시간 초과 전에 트랜잭션을 완료해야 하는 최대 시간을 지정합니다. 전달 시간 초과는 생산자가 시간 초과 전에 브로커가 메시지 전달을 승인할 때까지 대기하는 최대 시간을 지정합니다. 메시지가 트랜잭션 기간 내에 전달되도록 하려면 전달 시간 초과를 트랜잭션 시간 초과보다 적게 설정합니다. 네트워크 대기 시간 및 메시지 처리량을 고려하고 재시도 횟수를 지정할 때 임시 오류를 허용합니다.

트랜잭션에 대한 소비자 클라이언트 구성 속성 예

group.id = my-group-id
isolation.level = read_committed
enable.auto.commit = false

read_committed 격리 수준은 소비자가 성공적으로 완료된 트랜잭션에 대한 메시지만 읽을 수 있도록 지정합니다. 소비자는 진행 중이거나 실패한 트랜잭션의 일부인 메시지를 처리하지 않습니다. 이를 통해 소비자는 완전히 완료된 트랜잭션의 일부인 메시지만 읽습니다.

트랜잭션을 사용하여 메시지를 스트리밍할 때 enable.auto.commitfalse 로 설정하는 것이 중요합니다. true 로 설정하면 소비자는 트랜잭션을 고려하지 않고 오프셋을 정기적으로 커밋합니다. 즉, 소비자가 트랜잭션이 완전히 완료되기 전에 메시지를 커밋할 수 있습니다. enable.auto.commitfalse 로 설정하면 소비자가 완전히 기록되고 트랜잭션의 일부로 해당 항목에 커밋된 메시지만 읽고 커밋합니다.

6장. Kafka 클라이언트 개발

선호하는 프로그래밍 언어로 Kafka 클라이언트를 생성하고 AMQ Streams에 연결합니다.

Kafka 클러스터와 상호 작용하려면 클라이언트 애플리케이션이 메시지를 생성하고 사용할 수 있어야 합니다. 기본 Kafka 클라이언트 애플리케이션을 개발하고 구성하려면 최소한 다음을 수행해야 합니다.

  • Kafka 클러스터에 연결할 구성 설정
  • 생산자와 소비자를 사용하여 메시지를 보내고 받을 수 있습니다.

Kafka 클러스터에 연결하고 생산자와 소비자를 사용하는 데 필요한 기본 구성을 설정하는 것이 Kafka 클라이언트 개발의 첫 번째 단계입니다. 그런 다음 클라이언트 애플리케이션의 입력, 보안, 성능, 오류 처리 및 기능을 개선할 수 있습니다.

사전 요구 사항

다음에 대한 속성 값이 포함된 클라이언트 속성 파일을 생성할 수 있습니다.

절차

  1. 프로그래밍 언어용 Kafka 클라이언트 라이브러리(예: Java, Python, .NET 등)를 선택합니다.
  2. 패키지 관리자를 통해 또는 해당 소스에서 라이브러리를 다운로드하여 수동으로 라이브러리를 설치합니다.
  3. 코드에서 Kafka 클라이언트에 필요한 클래스 및 종속 항목을 가져옵니다.
  4. 생성할 클라이언트 유형에 따라 Kafka 소비자 또는 생산자 오브젝트를 생성합니다.

    둘 다 수행하는 클라이언트를 가질 수 있습니다.

  5. 필요한 경우 브로커 주소, 포트 및 인증 정보를 포함하여 Kafka 클러스터에 연결할 구성 속성을 제공합니다.
  6. Kafka 소비자 또는 생산자 오브젝트를 사용하여 주제를 구독하거나, 메시지를 생성하거나, Kafka 클러스터에서 메시지를 검색할 수 있습니다.
  7. AMQ Streams와의 연결 또는 통신 중에 발생할 수 있는 모든 오류를 처리합니다.

6.1. Kafka 생산자 클라이언트의 예

이 Java 기반 Kafka 생산자 클라이언트는 Kafka 항목에 메시지를 생성하는 자체 포함 애플리케이션의 예입니다. 클라이언트는 Kafka Producer API를 사용하여 일부 오류 처리와 함께 비동기적으로 메시지를 보냅니다.

클라이언트는 메시지 처리를 위해 콜백 인터페이스를 구현합니다.

Kafka 생산자 클라이언트를 실행하려면 Producer 클래스에서 기본 메서드를 실행합니다. 클라이언트는 randomBytes 메서드를 사용하여 임의의 바이트 배열을 메시지 페이로드로 생성합니다. 클라이언트는 NUM_MESSAGES 메시지(예 구성의 100)가 전송될 때까지 Kafka 항목에 메시지를 생성합니다. 생산자는 스레드로부터 안전하므로 여러 스레드가 단일 생산자 인스턴스를 사용할 수 있습니다.

이 예제 클라이언트는 특정 사용 사례에 대해 보다 복잡한 Kafka 생산자를 구축하기 위한 기본 기반을 제공합니다. 로깅 프레임워크와의 통합과 같은 추가 기능을 통합할 수 있습니다.

참고

각 클라이언트에 SLF4J 바인딩을 추가하여 클라이언트 API 로그를 확인할 수 있습니다.

사전 요구 사항

  • 지정된 BOOTSTRAP_SERVERS에서 실행되는 Kafka 브로커
  • message가 생성되는 Kafka 주제로 to pl_NAME 이라는 Kafka 주제입니다.

설정

Producer 클래스에 지정된 다음 상수를 통해 생산자 클라이언트를 구성할 수 있습니다.

BOOTSTRAP_SERVERS
Kafka 브로커에 연결할 주소와 포트입니다(예: localhost:9092).
TOPIC_NAME
메시지를 생성할 Kafka 주제의 이름입니다.
NUM_MESSAGES
중지하기 전에 생성할 메시지 수입니다.
MESSAGE_SIZE_BYTES
각 메시지의 크기(바이트)입니다.
PROCESSING_DELAY_MS
메시지를 전송하는 사이의 지연 시간(밀리초)입니다. 이는 메시지 처리 시간을 시뮬레이션할 수 있으며 테스트에 유용합니다.

생산자 클라이언트의 예

import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongSerializer;

public class Producer implements Callback {
    private static final Random RND = new Random(0);
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC_NAME = "my-topic";
    private static final long NUM_MESSAGES = 100;
    private static final int MESSAGE_SIZE_BYTES = 100;
    private static final long PROCESSING_DELAY_MS = 0L;

    protected AtomicLong messageCount = new AtomicLong(0);

    public static void main(String[] args) {
        new Producer().run();
    }

    public void run() {
        System.out.println("Running producer");
        try (var producer = createKafkaProducer()) {  1
            byte[] value = randomBytes(MESSAGE_SIZE_BYTES); 2
            while (messageCount.get() < NUM_MESSAGES) { 3
                sleep(PROCESSING_DELAY_MS); 4
                producer.send(new ProducerRecord<>(TOPIC_NAME, messageCount.get(), value), this); 5
                messageCount.incrementAndGet();
            }
        }
    }

    private KafkaProducer<Long, byte[]> createKafkaProducer() {
        Properties props = new Properties(); 6
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 7
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); 8
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); 9
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        return new KafkaProducer<>(props);
    }

    private void sleep(long ms) { 10
        try {
            TimeUnit.MILLISECONDS.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private byte[] randomBytes(int size) { 11
        if (size <= 0) {
            throw new IllegalArgumentException("Record size must be greater than zero");
        }
        byte[] payload = new byte[size];
        for (int i = 0; i < payload.length; ++i) {
            payload[i] = (byte) (RND.nextInt(26) + 65);
        }
        return payload;
    }

    private boolean retriable(Exception e) { 12
        if (e == null) {
            return false;
        } else if (e instanceof IllegalArgumentException
            || e instanceof UnsupportedOperationException
            || !(e instanceof RetriableException)) {
            return false;
        } else {
            return true;
        }
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) { 13
        if (e != null) {
            System.err.println(e.getMessage());
            if (!retriable(e)) {
                e.printStackTrace();
                System.exit(1);
            }
        } else {
            System.out.printf("Record sent to %s-%d with offset %d%n",
                metadata.topic(), metadata.partition(), metadata.offset());
        }
    }
}

1
클라이언트는 createKafkaProducer 메서드를 사용하여 Kafka 생산자를 생성합니다. 생산자는 메시지를 Kafka 주제로 비동기적으로 보냅니다.
2
바이트 배열은 Kafka 항목에 전송된 각 메시지의 페이로드로 사용됩니다.
3
전송되는 최대 메시지 수는 NUM_MESSAGES 상수 값에 따라 결정됩니다.
4
메시지 속도는 전송된 각 메시지 사이의 지연으로 제어됩니다.
5
생산자는 주제 이름, 메시지 수 값 및 메시지 값을 전달합니다.
6
클라이언트는 제공된 구성을 사용하여 KafkaProducer 인스턴스를 생성합니다. 속성 파일을 사용하거나 구성을 직접 추가할 수 있습니다. 기본 구성에 대한 자세한 내용은 4장. Kafka 클러스터에 연결하기 위한 클라이언트 애플리케이션 구성 을 참조하십시오.
7
Kafka 브로커에 대한 연결입니다.
8
임의로 생성된 UUID를 사용하는 생산자의 고유한 클라이언트 ID입니다. 클라이언트 ID는 필요하지 않지만 요청 소스를 추적하는 것이 유용합니다.
9
키와 값을 바이트 배열로 처리하는 데 필요한 적절한 serializer 클래스입니다.
10
지정된 밀리초 수에 대한 메시지 전송 프로세스에 지연을 도입합니다. 일시 중지된 동안 메시지를 보내는 스레드가 중단되면 InterruptedException 오류가 발생합니다.
11
Kafka 항목에 전송된 각 메시지의 페이로드 역할을 하는 특정 크기의 임의 바이트 배열을 생성하는 방법입니다. 이 방법은 임의의 정수를 생성하고 65 를 추가하여 ascii 코드에서 대문자를 나타냅니다(65는 A, 66은 B 이므로). ascii 코드는 페이로드 배열에 단일 바이트로 저장됩니다. 페이로드 크기가 0보다 크면 IllegalArgumentException 이 발생합니다.
12
예외적으로 메시지를 다시 보낼지 여부를 확인하는 방법입니다. null 및 지정된 예외는 재시도되지 않으며 RetriableException 인터페이스를 구현하지 않는 예외도 아닙니다. 다른 오류를 포함하도록 이 방법을 사용자 지정할 수 있습니다.
13
Kafka 브로커가 메시지를 승인할 때 호출됩니다. 성공 시 메시지의 주제, 파티션 및 오프셋 위치에 대한 세부 정보가 포함된 메시지가 출력됩니다. 메시지를 전송할 때 오류가 발생한 경우 오류 메시지가 출력됩니다. 이 방법은 예외를 확인하고 치명적 또는 치명적 오류인지 여부에 따라 적절한 조치를 취합니다. 오류가 치명적이 아닌 경우 메시지 전송 프로세스가 계속됩니다. 오류가 치명적이면 스택 추적이 출력되고 생산자가 종료됩니다.

오류 처리

생산자 클라이언트가 catch하는 치명적인 예외는 다음과 같습니다.

InterruptedException
일시 중지된 동안 현재 스레드가 중단될 때 발생하는 오류입니다. 중단은 일반적으로 생산자를 중지하거나 종료할 때 발생합니다. 예외는 생산자를 종료하는 RuntimeException 으로 다시 탐색됩니다.
IllegalArgumentException
생산자가 유효하지 않거나 부적절한 인수를 수신할 때 발생하는 오류입니다. 예를 들어 항목이 누락된 경우 예외가 발생합니다.
UnsupportedOperationException
작업이 지원되지 않거나 메서드가 구현되지 않은 경우 발생하는 오류입니다. 예를 들어 지원되지 않는 프로듀서 구성을 사용하려고 하거나 KafkaProducer 클래스에서 지원하지 않는 메서드를 호출하면 예외가 발생합니다.

생산자 클라이언트가 catch한 치명적이 아닌 예외는 다음과 같습니다.

RetriableException
Kafka 클라이언트 라이브러리에서 제공하는 RetriableException 인터페이스를 구현하는 예외에 대해 발생한 오류입니다.

치명적이 아닌 오류가 있는 경우 생산자는 계속 메시지를 보냅니다.

6.2. Kafka 소비자 클라이언트의 예

이 Java 기반 Kafka 소비자 클라이언트는 Kafka 주제의 메시지를 사용하는 자체 포함 애플리케이션의 예입니다. 클라이언트는 Kafka Consumer API를 사용하여 지정된 주제에서 일부 오류 처리와 함께 비동기적으로 메시지를 가져오고 처리합니다. 메시지를 성공적으로 처리한 후 오프셋을 커밋하여 At-least-once 의미 체계를 따릅니다.

클라이언트는 파티션 처리를 위해 ConsumerRebalanceListener 인터페이스와 오프셋 커밋을 위한 OffsetCommitCallback 인터페이스를 구현합니다.

Kafka 소비자 클라이언트를 실행하려면 Consumer 클래스에서 기본 메서드를 실행합니다. 클라이언트는 NUM_MESSAGES 메시지(예 구성의 100)가 사용될 때까지 Kafka 주제의 메시지를 사용합니다. 소비자는 여러 스레드에서 동시에 안전하게 액세스하도록 설계되지 않았습니다.

이 예제 클라이언트는 특정 사용 사례에 맞게 더 복잡한 Kafka 소비자를 구축하기 위한 기본 기반을 제공합니다. 로깅 프레임워크와의 통합과 같은 추가 기능을 통합할 수 있습니다.

참고

각 클라이언트에 SLF4J 바인딩을 추가하여 클라이언트 API 로그를 확인할 수 있습니다.

사전 요구 사항

  • 지정된 BOOTSTRAP_SERVERS에서 실행되는 Kafka 브로커
  • message가 사용되는 Kafka 주제로 , message is used from which messages are used.

설정

Consumer 클래스에 지정된 다음 상수를 통해 소비자 클라이언트를 구성할 수 있습니다.

BOOTSTRAP_SERVERS
Kafka 브로커에 연결할 주소와 포트입니다(예: localhost:9092).
GROUP_ID
소비자 그룹 식별자입니다.
POLL_TIMEOUT_MS
각 폴링 중에 새 메시지를 대기하는 최대 시간입니다.
TOPIC_NAME
메시지를 사용할 Kafka 주제의 이름입니다.
NUM_MESSAGES
중지하기 전에 사용할 메시지 수입니다.
PROCESSING_DELAY_MS
메시지를 전송하는 사이의 지연 시간(밀리초)입니다. 이는 메시지 처리 시간을 시뮬레이션할 수 있으며 테스트에 유용합니다.

소비자 클라이언트의 예

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;

import static java.time.Duration.ofMillis;
import static java.util.Collections.singleton;

public class Consumer implements ConsumerRebalanceListener, OffsetCommitCallback {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "my-group";
    private static final long POLL_TIMEOUT_MS = 1_000L;
    private static final String TOPIC_NAME = "my-topic";
    private static final long NUM_MESSAGES = 100;
    private static final long PROCESSING_DELAY_MS = 0L;

    private KafkaConsumer<Long, byte[]> kafkaConsumer;
    protected AtomicLong messageCount = new AtomicLong(0);
    private Map<TopicPartition, OffsetAndMetadata> pendingOffsets = new HashMap<>();

    public static void main(String[] args) {
        new Consumer().run();
    }

    public void run() {
        System.out.println("Running consumer");
        try (var consumer = createKafkaConsumer()) { 1
            kafkaConsumer = consumer;
            consumer.subscribe(singleton(TOPIC_NAME), this); 2
            System.out.printf("Subscribed to %s%n", TOPIC_NAME);
            while (messageCount.get() < NUM_MESSAGES) { 3
                try {
                    ConsumerRecords<Long, byte[]> records = consumer.poll(ofMillis(POLL_TIMEOUT_MS)); 4
                    if (!records.isEmpty()) { 5
                        for (ConsumerRecord<Long, byte[]> record : records) {
                            System.out.printf("Record fetched from %s-%d with offset %d%n",
                                record.topic(), record.partition(), record.offset());
                            sleep(PROCESSING_DELAY_MS); 6

                            pendingOffsets.put(new TopicPartition(record.topic(), record.partition()), 7
                                new OffsetAndMetadata(record.offset() + 1, null));
                            if (messageCount.incrementAndGet() == NUM_MESSAGES) {
                                break;
                            }
                        }
                        consumer.commitAsync(pendingOffsets, this); 8
                        pendingOffsets.clear();
                    }
                } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { 9
                    System.out.println("Invalid or no offset found, using latest");
                    consumer.seekToEnd(e.partitions());
                    consumer.commitSync();
                } catch (Exception e) {
                    System.err.println(e.getMessage());
                    if (!retriable(e)) {
                        e.printStackTrace();
                        System.exit(1);
                    }
                }
            }
        }
    }

    private KafkaConsumer<Long, byte[]> createKafkaConsumer() {
        Properties props = new Properties(); 10
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); 11
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); 12
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); 13
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); 14
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 15
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 16
        return new KafkaConsumer<>(props);
    }

    private void sleep(long ms) { 17
        try {
            TimeUnit.MILLISECONDS.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean retriable(Exception e) { 18
        if (e == null) {
            return false;
        } else if (e instanceof IllegalArgumentException
            || e instanceof UnsupportedOperationException
            || !(e instanceof RebalanceInProgressException)
            || !(e instanceof RetriableException)) {
            return false;
        } else {
            return true;
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 19
        System.out.printf("Assigned partitions: %s%n", partitions);
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 20
        System.out.printf("Revoked partitions: %s%n", partitions);
        kafkaConsumer.commitSync(pendingOffsets);
        pendingOffsets.clear();
    }

    @Override
    public void onPartitionsLost(Collection<TopicPartition> partitions) { 21
        System.out.printf("Lost partitions: {}", partitions);
    }

    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { 22
        if (e != null) {
            System.err.println("Failed to commit offsets");
            if (!retriable(e)) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    }
}

1
클라이언트는 createKafkaConsumer 방법을 사용하여 Kafka 소비자를 생성합니다.
2
소비자는 특정 주제를 구독합니다. 주제를 구독하면 확인 메시지가 출력됩니다.
3
소비되는 최대 메시지 수는 NUM_MESSAGES 상수 값에 따라 결정됩니다.
4
가져오기를 위한 다음 폴링은 rebalance를 방지하려면 session.timeout.ms 내에서 호출해야 합니다.
5
Kafka에서 가져온 배치 메시지가 포함된 레코드 오브젝트가 비어 있지 않은지 확인하는 조건입니다. 레코드 오브젝트가 비어 있으면 처리할 새 메시지가 없으며 프로세스를 건너뜁니다.
6
지정된 밀리초 수에 대한 메시지 가져오기 프로세스에 지연을 도입하는 메서드입니다.
7
소비자는 pendingOffsets 맵을 사용하여 커밋해야 하는 소비된 메시지의 오프셋을 저장합니다.
8
일련의 메시지를 처리한 후 소비자는 commitAsync 메서드를 사용하여 오프셋을 비동기식으로 커밋하고 at-least-once 시맨틱을 구현합니다.
9
메시지를 사용할 때 치명적이지 않은 오류를 처리합니다. 치명적이 아닌 오류의 경우 소비자는 파티션의 끝을 찾고 사용 가능한 최신 오프셋에서 사용을 시작합니다. 예외를 다시 시도할 수 없는 경우 스택 추적이 출력되고 소비자가 종료됩니다.
10
클라이언트는 제공된 구성을 사용하여 KafkaConsumer 인스턴스를 생성합니다. 속성 파일을 사용하거나 구성을 직접 추가할 수 있습니다. 기본 구성에 대한 자세한 내용은 4장. Kafka 클러스터에 연결하기 위한 클라이언트 애플리케이션 구성 을 참조하십시오.
11
Kafka 브로커에 대한 연결입니다.
12
임의로 생성된 UUID를 사용하는 생산자의 고유한 클라이언트 ID입니다. 클라이언트 ID는 필요하지 않지만 요청 소스를 추적하는 것이 유용합니다.
13
파티션에 대한 할당의 소비자 조정을 위한 그룹 ID입니다.
14
키와 값을 바이트 배열로 처리하기 위한 적절한 역직자 클래스입니다.
15
자동 오프셋 커밋을 비활성화하는 구성입니다.
16
파티션에 대한 커밋된 오프셋이 없는 경우 가장 빠른 오프셋에서 메시지 사용을 시작하도록 소비자에 대한 구성입니다.
17
지정된 밀리초 수에 대한 메시지 사용 프로세스에 지연을 도입하는 메서드입니다. 일시 중지된 동안 메시지를 보내는 스레드가 중단되면 InterruptedException 오류가 발생합니다.
18
예외에 따라 메시지를 다시 커밋할지 여부를 확인하는 방법입니다. null 및 지정된 예외는 재시도되지 않으며 RebalanceInProgressException 또는 RetriableException 인터페이스를 구현하지 않는 예외도 아닙니다. 다른 오류를 포함하도록 이 방법을 사용자 지정할 수 있습니다.
19
사용자에게 할당된 파티션 목록을 나타내는 메시지를 콘솔에 출력하는 방법입니다.
20
소비자 그룹 리밸런스 중에 소비자가 파티션의 소유권을 잃고자 할 때 호출되는 방법입니다. 이 메서드는 소비자에서 취소되는 파티션 목록을 출력합니다. 보류 중인 오프셋이 커밋됩니다.
21
소비자 그룹 리밸런스 중에 소비자가 파티션의 소유권이 손실되었지만 보류 중인 오프셋을 커밋하지 못한 경우 호출되는 메서드입니다. 이 메서드는 소비자가 손실한 파티션 목록을 출력합니다.
22
소비자가 Kafka에 오프셋을 커밋할 때 호출되는 메서드입니다. 오프셋을 커밋할 때 오류가 발생한 경우 오류 메시지가 출력됩니다. 이 방법은 예외를 확인하고 치명적 또는 치명적 오류인지 여부에 따라 적절한 조치를 취합니다. 오류가 치명적이 아닌 경우 오프셋 커밋 프로세스가 계속됩니다. 오류가 치명적이면 스택 추적이 출력되고 소비자가 종료됩니다.

오류 처리

소비자 클라이언트가 catch하는 치명적인 예외는 다음과 같습니다.

InterruptedException
일시 중지된 동안 현재 스레드가 중단될 때 발생하는 오류입니다. 중단은 일반적으로 소비자를 중지하거나 종료할 때 발생합니다. 예외는 소비자를 종료하는 RuntimeException 으로 다시 검색됩니다.
IllegalArgumentException
소비자가 유효하지 않거나 부적절한 인수를 수신할 때 발생하는 오류입니다. 예를 들어 항목이 누락된 경우 예외가 발생합니다.
UnsupportedOperationException
작업이 지원되지 않거나 메서드가 구현되지 않은 경우 발생하는 오류입니다. 예를 들어 지원되지 않는 소비자 구성을 사용하려는 경우 예외가 발생하거나 KafkaConsumer 클래스에서 지원하지 않는 메서드를 호출합니다.

소비자 클라이언트가 위반하지 않는 예외는 다음과 같습니다.

OffsetOutOfRangeException
일반적으로 오프셋이 해당 파티션에 유효한 오프셋 범위를 벗어나는 경우 소비자가 파티션의 잘못된 오프셋을 검색하려고 할 때 발생하는 오류입니다.
NoOffsetForPartitionException
파티션에 대한 커밋된 오프셋이 없고 자동 재설정 정책이 활성화되지 않았거나 요청된 오프셋이 유효하지 않을 때 발생하는 오류입니다.
RebalanceInProgressException
파티션이 할당될 때 소비자 그룹 리밸런스 중에 발생한 오류입니다. 소비자가 리밸런싱을 수행하는 경우 오프셋 커밋을 완료할 수 없습니다.
RetriableException
Kafka 클라이언트 라이브러리에서 제공하는 RetriableException 인터페이스를 구현하는 예외에 대해 발생한 오류입니다.

치명적이 아닌 오류가 있는 경우 소비자는 계속 메시지를 처리합니다.

6.3. 소비자와의 협업 재조정 사용

Kafka 소비자는 재조정 프로토콜에 따라 결정된 파티션 할당 전략을 사용합니다. 기본적으로 Kafka는 RangeAssignor 프로토콜을 사용하므로 리밸런스 중에 사용자가 파티션 할당을 재조정하여 잠재적인 서비스 중단을 초래합니다.

효율성을 개선하고 다운타임을 줄이기 위해 협력적 재조정 접근 방식인 CooperativeStickyAssignor 프로토콜로 전환할 수 있습니다. 기본 프로토콜과 달리 협업 재조정을 통해 소비자는 함께 작업할 수 있으며 리밸런스 중에 파티션 할당을 유지하고 소비자 그룹 내에서 균형을 유지하는 데 필요한 경우에만 파티션을 릴리스할 수 있습니다.

절차

  1. 소비자 구성에서 partition.assignment.strategy 속성을 사용하여 프로토콜로 CooperativeStickyAssignor 를 사용하도록 전환합니다. 예를 들어 현재 구성이 partition.assignment.strategy=RangeAssignor인 CooperativeStickyAssignor 인 경우 partition.assignment.strategy=CooperativeStickyAssignor 로 업데이트합니다.

    소비자 구성 파일을 직접 수정하는 대신 소비자 애플리케이션 코드에서 props.put 을 사용하여 파티션 할당 전략을 설정할 수도 있습니다.

    # ...
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
    # ...
  2. 그룹의 각 소비자를 한 번에 하나씩 다시 시작하여 다시 시작한 후 그룹에 다시 참여할 수 있습니다.
주의

CooperativeStickyAssignor 프로토콜로 전환한 후 소비자 재조정 중에 RebalanceInProgressException 이 발생하여 동일한 소비자 그룹에 있는 여러 Kafka 클라이언트의 예기치 않은 중지 페이지가 발생할 수 있습니다. 또한 이 문제로 인해 Kafka 소비자가 재조정하는 동안 파티션 할당을 변경하지 않은 경우에도 커밋되지 않은 메시지가 복제될 수 있습니다. 자동 오프셋 커밋(enable.auto.commit=true)을 사용하는 경우 변경할 필요가 없습니다. 오프셋을 수동으로 커밋하고(enable.auto.commit=false)하고 수동 커밋 중에 RebalanceInProgressException 이 발생하는 경우 다음 루프에서 consumer 구현을 변경하여 소비자 재조정 프로세스를 완료합니다. 자세한 내용은 고객 포털의 CooperativeStickyAssignor 문서를 참조하십시오.

부록 A. 서브스크립션 사용

AMQ Streams는 소프트웨어 서브스크립션을 통해 제공됩니다. 서브스크립션을 관리하려면 Red Hat 고객 포털에서 계정에 액세스하십시오.

귀하의 계정에 액세스

  1. access.redhat.com 으로 이동합니다.
  2. 아직 계정이 없는 경우 계정을 생성합니다.
  3. 계정에 로그인합니다.

서브스크립션 활성화

  1. access.redhat.com 으로 이동합니다.
  2. 내 서브스크립션으로 이동합니다.
  3. 서브스크립션을 활성화하여 16자리 활성화 번호를 입력합니다.

Zip 및 Tar 파일 다운로드

zip 또는 tar 파일에 액세스하려면 고객 포털을 사용하여 다운로드할 관련 파일을 찾습니다. RPM 패키지를 사용하는 경우에는 이 단계가 필요하지 않습니다.

  1. 브라우저를 열고 access.redhat.com/downloads 에서 Red Hat 고객 포털 제품 다운로드 페이지에 로그인합니다.
  2. INTEGRAT ION 및 AUTOMATION 카테고리에서 Apache Kafka의 AMQ Streams 를 찾습니다.
  3. 원하는 AMQ Streams 제품을 선택합니다. Software Download 페이지가 열립니다.
  4. 구성 요소에 대한 다운로드 링크를 클릭합니다.

DNF로 패키지 설치

패키지 및 모든 패키지 종속 항목을 설치하려면 다음을 사용합니다.

dnf install <package_name>

로컬 디렉터리에서 이전에 다운로드한 패키지를 설치하려면 다음을 사용합니다.

dnf install <path_to_download_package>

2023-11-22에 최종 업데이트된 문서

법적 공지

Copyright © 2023 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
Red Hat logoGithubRedditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

© 2024 Red Hat, Inc.