5.2. 보안 액세스를 위해 클라이언트 설정
Kafka 브로커에 보안 연결을 지원하도록 리스너를 설정한 후 다음 단계는 Kafka 클러스터와 통신하도록 이러한 리스너를 사용하도록 클라이언트 애플리케이션을 구성하는 것입니다. 이를 위해서는 리스너에 구성된 보안 메커니즘에 따라 클러스터 인증을 위해 각 클라이언트에 적절한 보안 설정을 제공해야 합니다.
5.2.1. 보안 프로토콜 구성
Kafka 브로커 리스너에 구성된 프로토콜과 일치하도록 클라이언트 애플리케이션에서 사용하는 보안 프로토콜을 구성합니다. 예를 들어 TLS 인증에 SSL
(Secure Sockets Layer)을 사용하거나 TLS 암호화를 사용하여 SASL
(Simple Authentication and Security Layer over SSL) 인증에 대해 SSL(Secure Sockets Layer)을 사용합니다. Kafka 클러스터에 액세스하는 데 필요한 인증 메커니즘을 지원하는 클라이언트 구성에 신뢰 저장소 및 키 저장소를 추가합니다.
- truststore
- 신뢰 저장소에는 Kafka 브로커의 진위 여부를 확인하는 데 사용되는 신뢰할 수 있는 CA(인증 기관)의 공용 인증서가 포함되어 있습니다. 클라이언트에서 보안 Kafka 브로커에 연결하면 브로커의 ID를 확인해야 할 수 있습니다.
- 키 저장소
- 키 저장소에는 클라이언트의 개인 키와 공개 인증서가 포함됩니다. 클라이언트가 브로커에 자신을 인증하려는 경우 자체 인증서를 제공합니다.
TLS 인증을 사용하는 경우 Kafka 클라이언트 구성에 Kafka 클러스터에 연결하기 위한 신뢰 저장소 및 키 저장소가 필요합니다. SASL SCRAM-SHA-512를 사용하는 경우 디지털 인증서가 아닌 사용자 이름과 암호 자격 증명을 교환하여 인증을 수행하므로 키 저장소가 필요하지 않습니다. SCRAM-SHA-512는 더 간단한 메커니즘이지만 인증서 기반 인증을 사용하는 것만큼 안전하지는 않습니다.
자체 인증서 인프라가 있고 타사 CA의 인증서를 사용하는 경우 클라이언트의 기본 신뢰 저장소에 이미 공용 CA 인증서가 포함되어 클라이언트의 신뢰 저장소에 추가할 필요가 없습니다. 기본 신뢰 저장소에 이미 포함된 공용 CA 인증서 중 하나에서 서명한 경우 클라이언트는 서버의 인증서를 자동으로 신뢰합니다.
config.properties
파일을 생성하여 클라이언트 애플리케이션에서 사용하는 인증 자격 증명을 지정할 수 있습니다.
다음 예에서 security.protocol
은 클라이언트와 브로커 간의 TLS 인증 및 암호화를 활성화하기 위해 SSL
로 설정됩니다.
ssl.truststore.location
및 ssl.truststore.password
속성은 신뢰 저장소의 위치와 암호를 지정합니다. ssl.keystore.location
및 ssl.keystore.password
속성은 키 저장소의 위치와 암호를 지정합니다.
PKCS #12(Public-Key Cryptography Standards #12) 파일 형식이 사용됩니다. base64로 인코딩된 PEM(Privacy Enhanced mail) 형식을 사용할 수도 있습니다.
TLS 인증을 위한 클라이언트 구성 속성 예
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SSL ssl.truststore.location = /path/to/ca.p12 ssl.truststore.password = truststore-password ssl.keystore.location = /path/to/user.p12 ssl.keystore.password = keystore-password client.id = my-client
다음 예에서 security.protocol
은 SASL_SSL
로 설정되어 클라이언트와 브로커 간의 TLS 암호화로 SASL 인증을 활성화합니다. 인증만 필요하며 암호화가 필요한 경우 SASL
프로토콜을 사용할 수 있습니다. 인증을 위해 지정된 SASL 메커니즘은 SCRAM-SHA-512
입니다. 다양한 인증 메커니즘을 사용할 수 있습니다. SASL.jaas.config
속성은 인증 자격 증명을 지정합니다.
SCRAM-SHA-512 인증에 대한 클라이언트 구성 속성의 예
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = SCRAM-SHA-512 sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \ username = "user" \ password = "secret"; ssl.truststore.location = path/to/truststore.p12 ssl.truststore.password = truststore_password ssl.truststore.type = PKCS12 client.id = my-client
PEM 형식을 지원하지 않는 애플리케이션의 경우 OpenSSL과 같은 도구를 사용하여 PEM 파일을 PKCS #12 형식으로 변환할 수 있습니다.
5.2.2. 허용된 TLS 버전 및 암호화 제품군 구성
SSL 구성 및 암호화 제품군을 통합하여 클라이언트 애플리케이션과 Kafka 클러스터 간의 TLS 기반 통신을 추가로 보호할 수 있습니다. Kafka 브로커 구성에서 지원되는 TLS 버전 및 암호화 제품군을 지정합니다. 사용하는 TLS 버전 및 암호화 제품군을 제한하려면 클라이언트에 구성을 추가할 수도 있습니다. 클라이언트의 구성은 브로커에서 활성화된 프로토콜 및 암호화 제품군만 사용해야 합니다.
다음 예에서 SSL은 Kafka 브로커와 클라이언트 애플리케이션 간의 통신에 security.protocol
을 사용하여 활성화됩니다. 암호화 제품군을 쉼표로 구분된 목록으로 지정합니다. ssl.cipher.suites 속성은
클라이언트가 사용할 수 있는 쉼표로 구분된 암호화 제품군 목록입니다.
Kafka 브로커의 SSL 구성 속성 예
security.protocol: "SSL" ssl.enabled.protocols: "TLSv1.3", "TLSv1.2" ssl.protocol: "TLSv1.3" ssl.cipher.suites: "TLS_AES_256_GCM_SHA384"
ssl.enabled.protocols
속성은 클러스터와 해당 클라이언트 간의 보안 통신에 사용할 수 있는 사용 가능한 TLS 버전을 지정합니다. 이 경우 TLSv1.3
및 TLSv1.2
가 모두 활성화됩니다. ssl.protocol
속성은 모든 연결에 대한 기본 TLS 버전을 설정하며 활성화된 프로토콜에서 선택해야 합니다. 기본적으로 클라이언트는 TLSv1.3
을 사용하여 통신합니다. 클라이언트가 TLSv1.2만 지원하는 경우 브로커에 계속 연결되고 지원되는 버전을 사용하여 통신할 수 있습니다. 마찬가지로 구성이 클라이언트에 있고 브로커가 TLSv1.2만 지원하는 경우 클라이언트는 지원되는 버전을 사용합니다.
Apache Kafka에서 지원하는 암호화 제품군은 사용 중인 Kafka 버전과 기본 환경에 따라 다릅니다. 가장 높은 수준의 보안을 제공하는 지원되는 최신 암호화 제품군을 확인합니다.
5.2.3. ACL(액세스 제어 목록) 사용
클라이언트 애플리케이션의 ACLS에 대해 명시적으로 구성할 필요는 없습니다. ACL은 Kafka 브로커에 의해 서버 측에서 적용됩니다. 클라이언트에서 데이터를 생성하거나 사용하기 위해 서버에 요청을 보내면 서버는 ACL을 확인하여 클라이언트(사용자)가 요청된 작업을 수행할 수 있는지 확인합니다. 클라이언트가 권한이 부여된 경우 요청이 처리되고, 그렇지 않으면 요청이 거부되고 오류가 반환됩니다. 그러나 Kafka 클러스터와의 보안 연결을 활성화하려면 클라이언트를 계속 인증하고 적절한 보안 프로토콜을 사용해야 합니다.
Kafka 브로커에서 ACL(액세스 제어 목록)을 사용하는 경우 제어하려는 주제 및 작업에 대한 클라이언트 액세스를 제한하도록 ACL이 올바르게 설정되어 있는지 확인합니다. OPA(Open Policy Agent) 정책을 사용하여 액세스를 관리하는 경우 권한 부여 규칙이 정책에 구성되어 있으므로 Kafka 브로커에 대한 ACL을 지정할 필요가 없습니다. OAuth 2.0은 몇 가지 유연성을 제공합니다. OAuth 2.0 공급자를 사용하여 ACL을 관리하거나 OAuth 2.0 및 Kafka의 간단한
권한을 사용하여 ACL을 관리할 수 있습니다.
ACL은 대부분의 요청에 적용되며 작업 생성 및 사용으로 제한되지 않습니다. 예를 들어 ACLS를 새 주제 생성과 같은 주제 또는 쓰기 작업 설명과 같은 읽기 작업에 적용할 수 있습니다.
5.2.4. 토큰 기반 액세스에 OAuth 2.0 사용
OAuth 2.0 공급자를 통해 권한 부여 제어를 적용하려면 AMQ Streams에서 OAuth 2.0 공개 표준을 사용합니다. OAuth 2.0은 애플리케이션이 다른 시스템에 저장된 사용자 데이터에 안전하게 액세스할 수 있는 방법을 제공합니다. 권한 부여 서버는 Kafka 클러스터에 대한 액세스 권한을 부여하는 클라이언트 애플리케이션에 액세스 토큰을 발행할 수 있습니다.
다음 단계에서는 토큰 검증에 OAuth 2.0을 설정하고 사용하는 일반적인 접근 방식을 설명합니다.
- 클라이언트 ID 및 시크릿과 같은 브로커 및 클라이언트 자격 증명을 사용하여 권한 부여 서버를 구성합니다.
- 권한 부여 서버에서 OAuth 2.0 자격 증명을 가져옵니다.
- OAuth 2.0 자격 증명을 사용하여 Kafka 브로커에서 리스너를 구성하고 권한 부여 서버와 상호 작용합니다.
- Oauth 2.0 종속성을 클라이언트 라이브러리에 추가합니다.
- OAuth 2.0 자격 증명으로 Kafka 클라이언트를 구성하고 권한 부여 서버와 상호 작용합니다.
- 런타임에 OAuth 2.0 공급자로 클라이언트를 인증하는 액세스 토큰을 가져옵니다.
Kafka 브로커에 OAuth 2.0에 대한 리스너가 구성된 경우 OAuth 2.0을 사용하도록 클라이언트 애플리케이션을 설정할 수 있습니다. Kafka 클러스터에 액세스하기 위한 표준 Kafka 클라이언트 구성 외에도 OAuth 2.0 인증에 대한 특정 구성을 포함해야 합니다. 또한 사용하는 권한 부여 서버가 Kafka 클러스터 및 클라이언트 애플리케이션에서 액세스할 수 있는지 확인해야 합니다.
SASL(Simple Authentication and Security Layer) 보안 프로토콜 및 메커니즘을 지정합니다. 프로덕션 환경에서는 다음 설정이 권장됩니다.
-
TLS 암호화 연결에 대한
SASL_SSL
프로토콜입니다. -
전달자 토큰을 사용하여 자격 증명 교환을 위한
OAUTHBLOGER
메커니즘
Java 인증 및 권한 부여 서비스(Java Authentication and Authorization Service) 모듈은 SASL 메커니즘을 구현합니다. 메커니즘 구성은 사용 중인 인증 방법에 따라 다릅니다. 예를 들어 인증 정보 교환을 사용하면 OAuth 2.0 액세스 토큰 끝점, 액세스 토큰, 클라이언트 ID 및 클라이언트 시크릿을 추가할 수 있습니다. 클라이언트는 권한 부여 서버의 토큰 끝점(URL)에 연결하여 토큰이 여전히 유효한지 확인합니다. 인증된 액세스를 위해 권한 부여 서버의 공개 키 인증서가 포함된 신뢰 저장소도 필요합니다.
OAauth 2.0의 클라이언트 구성 속성 예
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = OAUTHBEARER # ... sasl.jaas.config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri = "https://localhost:9443/oauth2/token" \ oauth.access.token = <access_token> \ oauth.client.id = "<client_id>" \ oauth.client.secret = "<client_secret>" \ oauth.ssl.truststore.location = "/<truststore_location>/oauth-truststore.p12" \ oauth.ssl.truststore.password = "<truststore_password>" \ oauth.ssl.truststore.type = "PKCS12" \
OAuth 2.0을 사용하도록 브로커를 설정하는 방법에 대한 자세한 내용은 다음 가이드를 참조하십시오.
5.2.5. OPA(Open Policy Agent) 액세스 정책 사용
AMQ Streams와 함께OPA(Open Policy Agent) 정책 에이전트를 사용하여 액세스 정책에 대해 Kafka 클러스터에 연결하는 요청을 평가합니다. OPA(Open Policy Agent)는 권한 부여 정책을 관리하는 정책 엔진입니다. 정책은 클라이언트 애플리케이션을 변경하지 않고도 액세스 제어를 중앙 집중화하고 동적으로 업데이트할 수 있습니다. 예를 들어 특정 사용자(클라이언트)만 특정 항목에 대한 메시지를 생성하고 사용할 수 있는 정책을 만들 수 있습니다.
AMQ Streams는 Kafka 권한 부여에 대한 Open Policy Agent 플러그인을 인증자로 사용합니다.
다음 단계에서는 OPA를 설정하고 사용하는 일반적인 접근 방식을 설명합니다.
- OPA 서버의 인스턴스를 설정합니다.
- Kafka 클러스터에 대한 액세스를 제어하는 권한 부여 규칙을 제공하는 정책을 정의합니다.
- Kafka 브로커가 OPA 인증을 수락하고 OPA 서버와 상호 작용할 수 있도록 구성을 생성합니다.
- Kafka 클러스터에 대한 권한 있는 액세스에 대한 인증 정보를 제공하도록 Kafka 클라이언트를 구성합니다.
Kafka 브로커에 OPA용으로 구성된 리스너가 있는 경우 OPA를 사용하도록 클라이언트 애플리케이션을 설정할 수 있습니다. 리스너 구성에서 OPA 서버에 연결할 URL을 지정하고 클라이언트 애플리케이션을 인증합니다. Kafka 클러스터에 액세스하기 위한 표준 Kafka 클라이언트 구성 외에도 Kafka 브로커로 인증할 인증 정보를 추가해야 합니다. 브로커는 인증 정책을 평가하기 위해 OPA 서버에 요청을 전송하여 클라이언트에게 요청된 작업을 수행하는 데 필요한 권한이 있는지 확인합니다. 정책 엔진에서 권한 부여 정책을 적용하기 때문에 신뢰 저장소 또는 키 저장소가 통신을 보호할 필요가 없습니다.
OPA 권한 부여의 클라이언트 구성 속성 예
bootstrap.servers = my-cluster-kafka-bootstrap:9093 security.protocol = SASL_SSL sasl.mechanism = SCRAM-SHA-512 sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \ username = "user" \ password = "secret"; # ...
Red Hat은 OPA 서버를 지원하지 않습니다.
OPA를 사용하도록 브로커를 설정하는 방법에 대한 자세한 내용은 다음 가이드를 참조하십시오.
5.2.6. 메시지 스트리밍 시 트랜잭션 사용
브로커 및 생산자 클라이언트 애플리케이션에 트랜잭션 속성을 구성하면 메시지가 단일 트랜잭션에서 처리되도록 할 수 있습니다. 트랜잭션은 메시지 스트리밍에 신뢰성과 일관성을 추가합니다.
트랜잭션은 항상 브로커에서 활성화됩니다. 다음 속성을 사용하여 기본 구성을 변경할 수 있습니다.
트랜잭션에 대한 Kafka 브로커 구성 속성의 예
transaction.state.log.replication.factor = 3 transaction.state.log.min.isr = 2 transaction.abort.timed.out.transaction.cleanup.interval.ms = 3600000
이는 내부 __ECDHE_state
항목에 대한 복제본 3개를 생성하는 프로덕션 환경의 일반적인 구성입니다. \__ECDHE_state
주제는 진행 중인 트랜잭션에 대한 정보를 저장합니다. 트랜잭션 로그에 최소 2개의 동기화 복제본이 필요합니다. 정리 간격은 시간 초과 트랜잭션을 점검하고 해당 트랜잭션 로그를 정리하는 시간입니다.
클라이언트 구성에 트랜잭션 속성을 추가하려면 생산자 및 소비자에 대해 다음 속성을 설정해야 합니다.
트랜잭션에 대한 생산자 클라이언트 구성 속성의 예
transactional.id = unique-transactional-id enable.idempotence = true max.in.flight.requests.per.connection = 5 acks = all transaction.timeout.ms = 30000 delivery.timeout = 25000
트랜잭션 ID를 사용하면 Kafka 브로커가 트랜잭션을 추적할 수 있습니다. 이는 생산자의 고유 식별자이며 특정 파티션 세트와 함께 사용해야 합니다. 여러 파티션 집합에 대해 트랜잭션을 수행해야 하는 경우 각 세트에 대해 다른 트랜잭션 ID를 사용해야 합니다. 생산자 인스턴스가 중복 메시지를 생성하지 않도록 멱등이 활성화됩니다. idempotence를 사용하면 생산자 ID와 시퀀스 번호를 사용하여 메시지가 추적됩니다. 브로커가 메시지를 수신하면 생산자 ID와 시퀀스 번호를 확인합니다. 동일한 생산자 ID와 시퀀스 번호가 있는 메시지가 이미 수신되면 브로커는 중복 메시지를 삭제합니다.
최대 진행 중인 요청 수는 트랜잭션이 전송되는 순서대로 처리되도록 5로 설정됩니다. 파티션은 메시지 순서를 손상시키지 않고 최대 5개의 진행 중인 요청을 포함할 수 있습니다.
acks
를 모든
것으로 설정하면 생산자는 트랜잭션을 완료로 간주하기 전에 작성 중인 주제 파티션의 모든 동기화 내 복제본에서 승인이 완료될 때까지 기다립니다. 이렇게 하면 Kafka 클러스터에 메시지가 기록(커밋)되고 브로커 오류가 발생한 경우에도 메시지가 손실되지 않습니다.
트랜잭션 시간 초과는 클라이언트가 시간 초과 전에 트랜잭션을 완료해야 하는 최대 시간을 지정합니다. 전달 시간 초과는 생산자가 시간 초과 전에 브로커가 메시지 전달을 승인할 때까지 대기하는 최대 시간을 지정합니다. 메시지가 트랜잭션 기간 내에 전달되도록 하려면 전달 시간 초과를 트랜잭션 시간 초과보다 적게 설정합니다. 네트워크 대기 시간 및 메시지 처리량을 고려하고 재시도 횟수를 지정할 때 임시 오류를 허용합니다.
트랜잭션에 대한 소비자 클라이언트 구성 속성 예
group.id = my-group-id isolation.level = read_committed enable.auto.commit = false
read_committed
격리 수준은 소비자가 성공적으로 완료된 트랜잭션에 대한 메시지만 읽을 수 있도록 지정합니다. 소비자는 진행 중이거나 실패한 트랜잭션의 일부인 메시지를 처리하지 않습니다. 이를 통해 소비자는 완전히 완료된 트랜잭션의 일부인 메시지만 읽습니다.
트랜잭션을 사용하여 메시지를 스트리밍할 때 enable.auto.commit
을 false
로 설정하는 것이 중요합니다. true
로 설정하면 소비자는 트랜잭션을 고려하지 않고 오프셋을 정기적으로 커밋합니다. 즉, 소비자가 트랜잭션이 완전히 완료되기 전에 메시지를 커밋할 수 있습니다. enable.auto.commit
을 false
로 설정하면 소비자가 완전히 기록되고 트랜잭션의 일부로 해당 항목에 커밋된 메시지만 읽고 커밋합니다.