6.4. Kafka 구성
Kafka는 속성 파일을 사용하여 정적 구성을 저장합니다. 구성 파일의 권장 위치는 /opt/kafka/config/server.properties
입니다. kafka
사용자가 구성 파일을 읽을 수 있어야 합니다.
AMQ Streams는 제품의 다양한 기본 및 고급 기능을 강조하는 구성 파일 예제를 제공합니다. AMQ Streams 설치 디렉터리의 config/server.properties
에서 확인할 수 있습니다.
이 장에서는 가장 중요한 구성 옵션에 대해 설명합니다.
6.4.1. ZooKeeper
Kafka 브로커를 사용하려면 ZooKeeper가 클러스터를 조정하고 클러스터를 조정하기 위해 ZooKeeper가 필요합니다 (예: 어떤 노드가 파티션의 리더인지 결정). ZooKeeper 클러스터에 대한 연결 세부 정보는 구성 파일에 저장됩니다. zookeeper.connect
필드에는 콤마로 구분된 호스트 이름 목록과 zookeeper 클러스터 멤버의 포트가 포함되어 있습니다.
예를 들면 다음과 같습니다.
zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181
Kafka는 이러한 주소를 사용하여 ZooKeeper 클러스터에 연결합니다. 이 구성을 사용하면 모든 Kafka znodes
가 ZooKeeper 데이터베이스의 루트에서 직접 생성됩니다. 따라서 이러한 ZooKeeper 클러스터는 단일 Kafka 클러스터에만 사용할 수 있습니다. 단일 ZooKeeper 클러스터를 사용하도록 여러 Kafka 클러스터를 구성하려면 Kafka 구성 파일에서 ZooKeeper 연결 문자열 끝에 기본(prefix) 경로를 지정합니다.
zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181/my-cluster-1
6.4.2. 리스너
리스너는 Kafka 브로커에 연결하는 데 사용됩니다. 각 Kafka 브로커는 여러 리스너를 사용하도록 구성할 수 있습니다. 각 리스너에는 다른 포트 또는 네트워크 인터페이스에서 청취할 수 있도록 다른 구성이 필요합니다.
리스너를 구성하려면 구성 파일에서 리스너
속성을 편집합니다(/opt/kafka/config/server.properties
). 리스너 속성에 리스너
를 쉼표로 구분된 목록으로 추가합니다. 다음과 같이 각 속성을 구성합니다.
<listenerName>://<hostname>:<port>
& lt;hostname
>이 비어 있으면 Kafka는 java.net.InetAddress.getCanonicalHostName()
클래스를 호스트 이름으로 사용합니다.
여러 리스너 구성의 예
listeners=internal-1://:9092,internal-2://:9093,replication://:9094
Kafka 클라이언트가 Kafka 클러스터에 연결하려는 경우 먼저 클러스터 노드 중 하나인 부트스트랩 서버에 연결합니다. 부트스트랩 서버는 클라이언트에 클러스터의 모든 브로커 목록을 제공하고 클라이언트는 각각에 개별적으로 연결됩니다. 브로커 목록은 구성된 리스너
를 기반으로 합니다.
공개된 리스너
필요한 경우 advertised.listeners
속성을 사용하여 리스너 속성에 지정된 것과 다른 리스너 주소 세트를 클라이언트에 제공할 수 있습니다. 프록시와 같은 추가 네트워크 인프라가 클라이언트와 브로커 사이에 있거나 IP 주소 대신 외부 DNS 이름을 사용하는 경우 유용합니다.
advertised.listeners
속성은 listeners
속성과 동일한 방식으로 포맷됩니다.
공개된 리스너 구성 예
listeners=internal-1://:9092,internal-2://:9093 advertised.listeners=internal-1://my-broker-1.my-domain.com:1234,internal-2://my-broker-1.my-domain.com:1235
공개된 리스너의 이름은 리스너
속성에 나열된 이름과 일치해야 합니다.
inter-broker 리스너
inter-broker 리스너 는 Kafka 브로커 간의 통신에 사용됩니다. 다음을 위해 inter-broker 통신이 필요합니다.
- 다른 브로커 간 워크로드 조정
- 다른 브로커에 저장된 파티션 간 메시지 복제
- 파티션 리더십 변경과 같은 컨트롤러의 관리 작업 처리
interbroker 리스너를 선택한 포트에 할당할 수 있습니다. 여러 리스너가 구성되면 inter.broker.listener.name
속성에서broker 간 리스너의 이름을 정의할 수 있습니다.
여기서 inter-broker 리스너의 이름은 REPLICATION
으로 지정됩니다.
listeners=REPLICATION://0.0.0.0:9091 inter.broker.listener.name=REPLICATION
컨트롤 플레인 리스너
기본적으로 컨트롤러와 다른 브로커 간의 통신은 broker 간 리스너 를 사용합니다. 컨트롤러는 파티션 리더십 변경과 같은 관리 작업을 조정해야 합니다.
컨트롤러 연결에 전용 컨트롤 플레인 리스너 를 활성화할 수 있습니다. 컨트롤 플레인 리스너를 선택한 포트에 할당할 수 있습니다.
컨트롤 플레인 리스너를 활성화하려면 리스너 이름으로 control.plane.listener.name
속성을 구성합니다.
listeners=CONTROLLER://0.0.0.0:9090,REPLICATION://0.0.0.0:9091 ... control.plane.listener.name=CONTROLLER
컨트롤러 통신은 브로커 간의 데이터 복제를 통해 지연되지 않으므로 컨트롤 플레인 리스너를 활성화하면 클러스터 성능이 향상될 수 있습니다. 데이터 복제는 브로커 간 리스너를 통해 계속됩니다.
control.plane.listener
가 구성되지 않은 경우 컨트롤러 연결에서는 broker 간 리스너 를 사용합니다.
6.4.3. 커밋 로그
Apache Kafka는 커밋 로그에서 생산자로부터 수신하는 모든 레코드를 저장합니다. 커밋 로그에는 Kafka가 제공해야 하는 레코드 형식의 실제 데이터가 포함됩니다. 이는 브로커가 수행하는 작업을 기록하는 애플리케이션 로그 파일이 아닙니다.
로그 디렉터리
하나 이상의 로그 디렉터리에 커밋 로그를 저장하도록 log.dirs
속성 파일을 사용하여 로그 디렉터리를 구성할 수 있습니다. 설치 중에 생성된 /var/lib/kafka
디렉터리로 설정해야 합니다.
log.dirs=/var/lib/kafka
성능상의 이유로 log.dirs를 여러 디렉터리에 구성하고 각각 다른 물리적 장치에 배치하여 디스크 I/O 성능을 개선할 수 있습니다. 예를 들면 다음과 같습니다.
log.dirs=/var/lib/kafka1,/var/lib/kafka2,/var/lib/kafka3
6.4.4. Broker ID
브로커 ID는 클러스터의 각 브로커의 고유 식별자입니다. 0보다 크거나 같은 정수를 브로커 ID로 할당할 수 있습니다. 브로커 ID는 재시작 또는 충돌 후 브로커를 식별하는 데 사용되며 따라서 ID가 안정적이고 시간이 지남에 따라 변경되지 않는 것이 중요합니다. 브로커 ID는 브로커 속성 파일에 구성됩니다.
broker.id=1
6.4.5. zookeeper 인증
기본적으로 ZooKeeper와 Kafka 간의 연결은 인증되지 않습니다. 그러나 Kafka 및 ZooKeeper는 SASL(Simple Authentication and Security Layer)을 사용하여 인증을 설정하는 데 사용할 수 있는 JDK(Java Authentication and Authorization Service)를 지원합니다. zookeeper는 로컬에 저장된 인증 정보가 있는 DIGEST-knative5 SASL 메커니즘을 사용한 인증을 지원합니다.
6.4.5.1. CloudEvent 구성
ZooKeeper 연결에 대한 SASL 인증은 10.0.0.1 구성 파일에서 구성해야 합니다. 기본적으로 Kafka는 ZooKeeper에 연결하기 위해 Client
라는 context를 사용합니다. 클라이언트
컨텍스트는 /opt/kafka/config/jass.conf
파일에 구성해야 합니다. 컨텍스트는 다음 예와 같이 PLAIN
SASL 인증을 활성화해야 합니다.
Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456"; };
6.4.5.2. ZooKeeper 인증 활성화
다음 절차에서는 ZooKeeper에 연결할 때 SASL DIGEST-ECDHE5 메커니즘을 사용하여 인증을 활성화하는 방법을 설명합니다.
사전 요구 사항
- ZooKeeper에서 클라이언트 간 인증이 사용 가능
SASL DIGEST- CHAP5 인증 활성화
모든 Kafka 브로커 노드에서
/opt/kafka/config/jaas.conf
10.0.0.1 구성 파일을 생성하거나 편집하고 다음 컨텍스트를 추가합니다.Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="<Username>" password="<Password>"; };
사용자 이름과 암호는 ZooKeeper에서 구성한 것과 동일해야 합니다.
다음 예제에서는
클라이언트
컨텍스트를 보여줍니다.Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456"; };
모든 Kafka 브로커 노드를 하나씩 재시작합니다. 10.0.0.1 구성을 Kafka 브로커에 전달하려면
KAFKA_OPTS
환경 변수를 사용합니다.su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
다중 노드 클러스터에서 브로커를 다시 시작하는 방법에 대한 자세한 내용은 4.3절. “Kafka 브로커를 정상적으로 다시 시작 수행” 을 참조하십시오.
추가 리소스
6.4.6. 권한 부여
Kafka 브로커의 권한 부여는 인증 기관 플러그인을 사용하여 구현됩니다.
이 섹션에서는 Kafka와 함께 제공되는 AclAuthorizer
플러그인 사용 방법에 대해 설명합니다.
또는 사용자 고유의 권한 부여 플러그인을 사용할 수 있습니다. 예를 들어 OAuth 2.0 토큰 기반 인증을 사용하는 경우 OAuth 2.0 권한 부여 를 사용할 수 있습니다.
6.4.6.1. 간단한 ACL 인증 프로그램
AclAuthorizer
를 포함한 Authorr 플러그인은 authorizer.class.name
속성을 통해 활성화됩니다.
authorizer.class.name=kafka.security.auth.AclAuthorizer
선택한 작성자에게 정규화된 이름이 필요합니다. AclAuthorizer
의 경우 정규화된 이름은 kafka.security.auth.AclAuthorizer
입니다.
6.4.6.1.1. ACL 규칙
AclAuthorizer
는 ACL 규칙을 사용하여 Kafka 브로커에 대한 액세스를 관리합니다.
ACL 규칙은 형식으로 정의됩니다.
principal P is allowed / denied operation O on Kafka resource R from host H
예를 들어 다음 사용자가 규칙을 설정할 수 있습니다.
Jane은 호스트 127.0.0.1에서 주제 댓글 을 볼 수 있습니다.
Host는 Jane이 연결 중인 머신의 IP 주소입니다.
대부분의 경우 사용자는 생산자 또는 소비자 애플리케이션입니다.
Consumer01 은 호스트 127.0.0.1에서 소비자 그룹 계정에 쓸 수 있습니다.
ACL 규칙이 없는 경우
지정된 리소스에 대해 ACL 규칙이 없으면 모든 작업이 거부됩니다. Kafka 설정 파일 /opt/kafka/config/server.properties
에서 allow.everyone.if.no.acl.found
속성을 true
로 설정하여 이 동작을 변경할 수 있습니다.
6.4.6.1.2. 주체
주체 는 사용자의 ID를 나타냅니다. ID 형식은 클라이언트가 Kafka에 연결하는 데 사용하는 인증 메커니즘에 따라 다릅니다.
-
user:ANONYMOUS
: 인증없이 연결됩니다. user:<username
> PLAIN 또는 SCRAM과 같은 간단한 인증 메커니즘을 사용하여 연결할 때.예:
User:admin
또는User:user1
.TLS 클라이언트 인증을 사용하여 연결된 경우 user
:<DistinguishedName
>예:
User:CN=user1,O=MyCompany,L=Prague,C=CZ
.-
사용자: Kerberos를 사용하여 연결할 때<Kerberos 사용자
이름>입니다.
DistinguishedName 은 클라이언트 인증서와 고유 이름입니다.
Kerberos 사용자 이름은 Kerberos로 연결할 때 기본적으로 사용되는 Kerberos 사용자의 기본 부분입니다. sasl.kerberos.principal.to.local.rules
속성을 사용하여 Kerberos 보안 주체에서 Kafka 주체를 빌드하는 방법을 구성할 수 있습니다.
6.4.6.1.3. 사용자 인증
권한을 사용하려면 클라이언트가 인증을 활성화하고 사용해야 합니다. 그렇지 않으면 모든 연결에 주체 User:ANONYMOUS
가 포함됩니다.
인증 방법에 대한 자세한 내용은 암호화 및 인증을 참조하십시오.
6.4.6.1.4. Super 사용자
Super 사용자는 ACL 규칙에 관계없이 모든 작업을 수행할 수 있습니다.
슈퍼 사용자는 속성 super.users
를 사용하여 Kafka 구성 파일에 정의됩니다.
예를 들면 다음과 같습니다.
super.users=User:admin,User:operator
6.4.6.1.5. 복제본 브로커 인증
인증이 활성화되면 모든 리스너 및 모든 연결에 적용됩니다. 여기에는 브로커 간 데이터 복제에 사용되는 inter-broker 연결이 포함됩니다. 따라서 인증을 활성화하면 브로커 간 연결에 인증을 사용하고 브로커에 충분한 권한이 있는 사용자에게 부여해야 합니다. 예를 들어 브로커 간 인증이 kafka-broker
사용자를 사용하는 경우 슈퍼 사용자 구성에는 사용자 이름 super.users=User:kafka-broker
가 포함되어야 합니다.
6.4.6.1.6. 지원되는 리소스
이러한 유형의 리소스에 Kafka ACL을 적용할 수 있습니다.
- 주제
- 소비자 그룹
- 클러스터
- TransactionId
- DelegationToken
6.4.6.1.7. 지원되는 작업
AclAuthorizer
는 리소스에 대한 작업을 인증합니다.
다음 표에 X
가 있는 필드는 각 리소스에 대해 지원되는 작업을 표시합니다.
주제 | 소비자 그룹 | Cluster | |
---|---|---|---|
read | X | X | |
write | X | ||
개발 | X | ||
delete | X | ||
변경 | X | ||
describe | X | X | X |
ClusterAction | X | ||
All | X | X | X |
6.4.6.1.8. ACL 관리 옵션
ACL 규칙은 Kafka 배포 패키지의 일부로 제공되는 bin/kafka-acls.sh
유틸리티를 사용하여 관리됩니다.
kafka-acls.sh
매개변수 옵션을 사용하여 ACL 규칙을 추가, 나열 및 제거하고 기타 기능을 수행합니다.
매개 변수에는 --add
와 같은 이중 암호 규칙이 필요합니다.
옵션 | 유형 | 설명 | Default |
---|---|---|---|
| 동작 | ACL 규칙을 추가합니다. | |
| 동작 | ACL 규칙 제거. | |
| 동작 | ACL 규칙을 나열합니다. | |
| 동작 | 인증자의 정규화된 클래스 이름입니다. |
|
| 설정 | 초기화를 위해 인증자에게 전달된 키/값 쌍입니다.
| |
| 리소스 | Kafka 클러스터에 연결할 호스트/포트 쌍입니다. |
이 옵션 또는 |
| 리소스 |
| |
| 리소스 | 클러스터를 ACL 리소스로 지정합니다. | |
| 리소스 | 주제 이름을 ACL 리소스로 지정합니다.
와일드카드로 사용되는 별표(
단일 명령으로 여러 | |
| 리소스 | 소비자 그룹 이름을 ACL 리소스로 지정합니다.
단일 명령으로 여러 개의 | |
| 리소스 | 트랜잭션 ID를 ACL 리소스로 지정합니다. 트랜잭션 전달은 생산자가 여러 파티션에 전송한 모든 메시지를 성공적으로 전달하거나 전달하지 않아야 함을 의미합니다.
와일드카드로 사용되는 별표( | |
| 리소스 | ACL 리소스로 위임 토큰을 지정합니다.
와일드카드로 사용되는 별표( | |
| 설정 |
리소스 이름에
리소스 패턴 필터 값 또는 특정 패턴 유형 필터로 또는 |
|
| principal | 허용 ACL 규칙에 주체가 추가되었습니다.
단일 명령으로 여러 | |
| principal | 거부 ACL 규칙에 주체가 추가되었습니다.
단일 명령으로 여러 개의 | |
| principal |
보안 주체의 ACL
단일 명령으로 여러 | |
| 호스트 |
호스트 이름 또는 CIDR 범위는 지원되지 않습니다. |
|
| 호스트 |
호스트 이름 또는 CIDR 범위는 지원되지 않습니다. |
|
| 작업 | 작업을 허용하거나 거부합니다.
단일 명령으로 여러Multiple | All |
| 바로 가기 | 메시지 프로듀서에 필요한 모든 작업을 허용하거나 거부할 수 있는 바로 가기(주제에 대한 WRITE 및 DESCRIBE on topic, CREATE on cluster). | |
| 바로 가기 | 메시지 소비자에 필요한 모든 작업을 허용하거나 거부하는 바로 가기(READ 및 DESCRIBE on 주제, 소비자 그룹의 준비). | |
| 바로 가기 |
생산자가 특정 트랜잭션 ID를 기반으로 메시지를 보낼 권한이 있는 경우 Idepmotence가 자동으로 활성화됩니다. | |
| 바로 가기 | 모든 쿼리를 수락하고 프롬프트를 표시하지 않는 바로 가기입니다. |
6.4.6.2. 권한 부여 활성화
다음 절차에서는 Kafka 브로커에서 권한 부여를 위해 AclAuthorizer
플러그인을 활성화하는 방법을 설명합니다.
절차
AclAuthorizer
를 사용하도록/opt/kafka/config/server.properties
Kafka 구성 파일을 편집합니다.authorizer.class.name=kafka.security.auth.AclAuthorizer
- Kafka 브로커를 시작합니다.
6.4.6.3. ACL 규칙 추가
AclAuthorizer
플러그인을 사용하여 ACL(액세스 제어 목록)을 기반으로 Kafka 브로커에 대한 액세스를 제어하는 경우 kafka-acls.sh
유틸리티를 사용하여 새 ACL 규칙을 추가할 수 있습니다.
사전 요구 사항
- 사용자가 Kafka 리소스에 액세스할 수 있는 적절한 권한을 생성하고 부여했습니다.
- AMQ Streams는 Kafka 브로커로 사용되는 모든 호스트에 설치됩니다.
- Kafka 브로커에서 권한 부여가 활성화됩니다.
절차
--add
옵션을 사용하여kafka-acls.sh
를 실행합니다.예:
user1
및user2
가MyConsumerGroup
소비자 그룹을 사용하여myTopic
에서 읽을 수 있도록 허용합니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2 opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2 opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
IP 주소 호스트
127.0.0.1
에서 읽기myTopic
에 대한user1
액세스를 거부합니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
MyConsumerGroup
에서myTopic
의 소비자로user1
을 추가합니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
추가 리소스
6.4.6.4. ACL 규칙 나열
AclAuthorizer
플러그인을 사용하여 ACL(액세스 제어 목록)을 기반으로 Kafka 브로커에 대한 액세스를 제어하는 경우 kafka-acls.sh
유틸리티를 사용하여 기존 ACL 규칙을 나열할 수 있습니다.
사전 요구 사항
절차
--list
옵션을 사용하여kafka-acls.sh
를 실행합니다.예를 들면 다음과 같습니다.
opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic myTopic Current ACLs for resource `Topic:myTopic`: User:user1 has Allow permission for operations: Read from hosts: * User:user2 has Allow permission for operations: Read from hosts: * User:user2 has Deny permission for operations: Read from hosts: 127.0.0.1 User:user1 has Allow permission for operations: Describe from hosts: * User:user2 has Allow permission for operations: Describe from hosts: * User:user2 has Deny permission for operations: Describe from hosts: 127.0.0.1
추가 리소스
6.4.6.5. ACL 규칙 제거
AclAuthorizer
플러그인을 사용하여 ACL(액세스 제어 목록)을 기반으로 Kafka 브로커에 대한 액세스를 제어하는 경우 kafka-acls.sh
유틸리티를 사용하여 기존 ACL 규칙을 제거할 수 있습니다.
사전 요구 사항
절차
--remove
옵션을 사용하여kafka-acls.sh
를 실행합니다.예:
Allow
user1
및user2
액세스를 허용하는 ACL을 제거하여MyConsumerGroup
소비자 그룹을 사용하여myTopic
에서 읽을 수 있습니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2 opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2 opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
MyConsumerGroup
에서myTopic
의 소비자로user1
을 추가하는 ACL을 제거합니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
IP 주소 호스트
127.0.0.1
에서 읽기myTopic
에 대한user1
액세스를 거부하는 ACL을 제거합니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
추가 리소스
6.4.7. zookeeper 권한 부여
Kafka와 ZooKeeper 간에 인증이 활성화되면 ZooKeeper 액세스 제어 목록(ACL) 규칙을 사용하여 ZooKeeper에 저장된 Kafka 메타데이터에 대한 액세스를 자동으로 제어할 수 있습니다.
6.4.7.1. ACL 설정
ZooKeeper ACL 규칙의 적용은 config/server.properties
Kafka 구성 파일의 zookeeper.set.acl
속성에 의해 제어됩니다.
속성은 기본적으로 비활성화되어 있으며 true
로 설정하여 활성화됩니다.
zookeeper.set.acl=true
ACL 규칙이 활성화된 경우 znode
를 ZooKeeper에서 만든 경우에만 수정하거나 삭제할 수 있습니다. 다른 모든 사용자에게는 읽기 전용 액세스 권한이 있습니다.
Kafka는 새로 생성된 ZooKeeper znodes
에 대해서만 ACL 규칙을 설정합니다. ACL이 클러스터를 처음 시작한 후에만 활성화된 경우 zookeeper-security-migration.sh
툴은 모든 기존 znode에 ACL을 설정할 수 있습니다
.
ZooKeeper의 데이터 기밀성
ZooKeeper에 저장된 데이터는 다음과 같습니다.
- 주제 이름 및 해당 구성
- SASL SCRAM 인증이 사용될 때 축소 및 해시된 사용자 인증 정보가 사용됩니다.
그러나 ZooKeeper는 Kafka를 사용하여 전송 및 수신된 레코드를 저장하지 않습니다. ZooKeeper에 저장된 데이터는 기밀이 아닌 것으로 간주됩니다.
데이터가 기밀로 분류되는 경우(예: 주제 이름에 고객 ID가 포함되어 있기 때문에) 보호 가능한 유일한 옵션은 네트워크 수준에서 ZooKeeper를 격리하고 Kafka 브로커에 대한 액세스만 허용하는 것입니다.
6.4.7.2. 새 Kafka 클러스터에 대한 ZooKeeper ACL 활성화
다음 절차에서는 새 Kafka 클러스터에 대한 Kafka 구성에서 ZooKeeper ACL을 활성화하는 방법을 설명합니다. Kafka 클러스터를 처음 시작하기 전에만 이 절차를 사용하십시오. 이미 실행 중인 클러스터에서 ZooKeeper ACL을 활성화하려면 6.4.7.3절. “기존 Kafka 클러스터에서 ZooKeeper ACL 활성화” 을 참조하십시오.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
- zookeeper 클러스터가 구성 및 실행 중입니다.
- ZooKeeper에서 클라이언트 간 인증이 활성화됩니다.
- Kafka 브로커에서 zookeeper 인증을 사용할 수 있습니다.
- Kafka 브로커는 아직 시작되지 않았습니다.
절차
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집하여 모든 클러스터 노드에서zookeeper.set.acl
필드를true
로 설정합니다.zookeeper.set.acl=true
- Kafka 브로커를 시작합니다.
6.4.7.3. 기존 Kafka 클러스터에서 ZooKeeper ACL 활성화
다음 절차에서는 실행 중인 Kafka 클러스터의 Kafka 구성에서 ZooKeeper ACL을 활성화하는 방법을 설명합니다. zookeeper-security-migration.sh
도구를 사용하여 기존의 모든 znode에 ZooKeeper ACL을 설정합니다
. zookeeper-security-migration.sh
는 AMQ Streams의 일부로 사용할 수 있으며 bin
디렉토리에서 찾을 수 있습니다.
사전 요구 사항
- Kafka 클러스터가 구성되고 실행 중입니다.
ZooKeeper ACL 활성화
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집하여 모든 클러스터 노드에서zookeeper.set.acl
필드를true
로 설정합니다.zookeeper.set.acl=true
모든 Kafka 브로커를 하나씩 다시 시작합니다.
다중 노드 클러스터에서 브로커를 다시 시작하는 방법에 대한 자세한 내용은 4.3절. “Kafka 브로커를 정상적으로 다시 시작 수행” 을 참조하십시오.
zookeeper-security-migration.sh
도구를 사용하여 기존의 모든 ZooKeeperznodes
에 ACL을 설정합니다.su - kafka cd /opt/kafka KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=<ZooKeeperURL> exit
예를 들면 다음과 같습니다.
su - kafka cd /opt/kafka KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=zoo1.my-domain.com:2181 exit
6.4.8. 암호화 및 인증
AMQ Streams는 리스너 구성의 일부로 구성된 암호화 및 인증을 지원합니다.
6.4.8.1. 리스너 구성
Kafka 브로커의 암호화 및 인증은 리스너별로 구성됩니다. Kafka 리스너 구성에 대한 자세한 내용은 6.4.2절. “리스너” 을 참조하십시오.
Kafka 브로커의 각 리스너는 자체 보안 프로토콜로 구성됩니다. 구성 속성 listener.security.protocol.map
은 어떤 리스너가 어떤 보안 프로토콜을 사용하는지 정의합니다. 각 리스너 이름을 보안 프로토콜에 매핑합니다. 지원되는 보안 프로토콜은 다음과 같습니다.
PLAINTEXT
- 암호화 또는 인증이 없는 리스너입니다.
SSL
- TLS 암호화를 사용하는 리스너 및 선택적으로 TLS 클라이언트 인증서를 사용한 인증입니다.
SASL_PLAINTEXT
- SASL 기반 인증이 있지만 암호화가 없는 리스너입니다.
SASL_SSL
- TLS 기반 암호화 및 SASL 기반 인증을 사용한 리스너
다음과 같은 리스너
구성이 제공됩니다.
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
listener.security.protocol.map
은 다음과 같을 수 있습니다.
listener.security.protocol.map=INT1:SASL_PLAINTEXT,INT2:SASL_SSL,REPLICATION:SSL
이렇게 하면 SASL 인증과 함께 암호화되지 않은 연결을 사용하고 리스너 INT2
가 SASL 인증과 함께 암호화된 연결을 사용하고 REPLICATION
인터페이스를 사용하여 TLS 암호화를 사용하도록 리스너 INT1
을 구성합니다. 동일한 보안 프로토콜을 여러 번 사용할 수 있습니다. 다음 예제도 유효한 구성입니다.
listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL
이러한 구성은 모든 인터페이스에 TLS 암호화 및 TLS 인증을 사용합니다. 다음 장에서는 TLS 및 SASL을 구성하는 방법을 자세히 설명합니다.
6.4.8.2. TLS 암호화
Kafka는 Kafka 클라이언트와의 통신을 암호화하기 위해 TLS를 지원합니다.
TLS 암호화 및 서버 인증을 사용하려면 개인 키와 공개 키가 포함된 키 저장소를 제공해야 합니다. 일반적으로 JDK(Java Keystore) 형식의 파일을 사용하여 수행됩니다. 이 파일의 경로는 ssl.keystore.location
속성에 설정됩니다. 키 저장소를 보호하는 암호를 설정하는 데 ssl.keystore.password
속성을 사용해야 합니다. 예를 들면 다음과 같습니다.
ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456
개인 키를 보호하는 데 추가 암호가 사용되는 경우도 있습니다. 이러한 암호는 ssl.key.password
속성을 사용하여 설정할 수 있습니다.
Kafka는 인증 기관에서 서명한 키와 자체 서명된 키를 사용할 수 있습니다. 인증 기관에서 서명한 키를 사용하는 것이 항상 선호되는 방법입니다. 클라이언트가 연결 중인 Kafka 브로커의 ID를 확인할 수 있도록 하려면 인증서에는 항상 CN(Common Name) 또는 SAN(Subject Alternative Names)에 공개된 호스트 이름이 포함되어야 합니다.
다른 리스너에 다른 SSL 구성을 사용할 수 있습니다. ssl.
로 시작하는 모든 옵션 앞에 listener.name.<NameOfTheListener>.
. 여기서 리스너의 이름은 항상 소문자여야 합니다. 이렇게 하면 해당 특정 리스너의 기본 SSL 구성이 재정의됩니다. 다음 예제에서는 다른 리스너에 다른 SSL 구성을 사용하는 방법을 보여줍니다.
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094 listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL # Default configuration - will be used for listeners INT1 and INT2 ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456 # Different configuration for listener REPLICATION listener.name.replication.ssl.keystore.location=/path/to/keystore/server-1.jks listener.name.replication.ssl.keystore.password=123456
추가 TLS 구성 옵션
Kafka는 위에서 설명한 기본 TLS 구성 옵션 외에도 TLS 구성을 미세 조정할 수 있는 다양한 옵션을 지원합니다. 예를 들어 TLS/SSL 프로토콜 또는 암호화 제품군을 활성화하거나 비활성화하려면 다음을 수행합니다.
ssl.cipher.suites
- 활성화된 암호화 제품군 목록입니다. 각 암호화 제품군은 TLS 연결에 사용되는 인증, 암호화, MAC 및 키 교환 알고리즘의 조합입니다. 기본적으로 사용 가능한 모든 암호화 제품군이 활성화됩니다.
ssl.enabled.protocols
-
활성화된 TLS/SSL 프로토콜 목록입니다. 기본값은
TLSv1.2,TLSv1.1,TLSv1
입니다.
6.4.8.3. TLS 암호화 활성화
다음 절차에서는 Kafka 브로커에서 암호화를 활성화하는 방법을 설명합니다.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
절차
- 클러스터의 모든 Kafka 브로커에 대한 TLS 인증서를 생성합니다. 인증서의 공개 및 부트스트랩 주소는 Common Name 또는 Subject Alternative Name에 있어야 합니다.
다음을 위해 모든 클러스터 노드에서
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집합니다.-
listener.security.protocol.map
필드를 변경하여 TLS 암호화를 사용하려는 리스너의SSL
프로토콜을 지정합니다. -
ssl.keystore.location
옵션을 브로커 인증서가 있는 JKS 키 저장소의 경로로 설정합니다. ssl.keystore.password
옵션을 키 저장소를 보호하는 데 사용한 암호로 설정합니다.예를 들면 다음과 같습니다.
listeners=UNENCRYPTED://:9092,ENCRYPTED://:9093,REPLICATION://:9094 listener.security.protocol.map=UNENCRYPTED:PLAINTEXT,ENCRYPTED:SSL,REPLICATION:PLAINTEXT ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456
-
- (re) Kafka 브로커를 시작합니다.
6.4.8.4. 인증
인증의 경우 다음을 사용할 수 있습니다.
- 암호화된 연결의 X.509 인증서를 기반으로 하는 TLS 클라이언트 인증
- 지원되는 Kafka SASL(Simple Authentication and Security Layer) 메커니즘
- OAuth 2.0 토큰 기반 인증
6.4.8.4.1. TLS 클라이언트 인증
TLS 클라이언트 인증은 이미 TLS 암호화를 사용하는 연결에서만 사용할 수 있습니다. TLS 클라이언트 인증을 사용하려면 공개 키가 있는 신뢰 저장소를 브로커에 제공할 수 있습니다. 이러한 키는 브로커에 연결하는 클라이언트를 인증하는 데 사용할 수 있습니다. 신뢰 저장소는 Java Keystore (JKS) 형식으로 제공해야 하며 인증 기관의 공개 키를 포함해야 합니다. 신뢰 저장소에 포함된 인증 기관 중 하나에 의해 서명된 공개 및 개인 키를 가진 모든 클라이언트가 인증됩니다. 신뢰 저장소의 위치는 ssl.truststore.location
필드를 사용하여 설정됩니다. 신뢰 저장소가 암호로 보호되는 경우 암호는 ssl.truststore.password
속성에 설정해야 합니다. 예를 들면 다음과 같습니다.
ssl.truststore.location=/path/to/keystore/server-1.jks ssl.truststore.password=123456
신뢰 저장소를 구성하고 나면 ssl.client.auth
속성을 사용하여 TLS 클라이언트 인증을 활성화해야 합니다. 이 속성은 세 가지 값 중 하나로 설정할 수 있습니다.
none
- TLS 클라이언트 인증이 꺼집니다. (기본값)
requested
- TLS 클라이언트 인증은 선택 사항입니다. 클라이언트는 TLS 클라이언트 인증서를 사용하여 인증하라는 요청을 받지만, 선택하지 않도록 선택할 수 있습니다.
필수 항목
- 클라이언트는 TLS 클라이언트 인증서를 사용하여 인증해야 합니다.
클라이언트가 TLS 클라이언트 인증을 사용하여 인증할 때 인증된 사용자 이름은 인증된 클라이언트 인증서와 고유 이름입니다. 예를 들어 CN=someuser라는 이름의 인증서가 있는 사용자는 다음과 같은 주요
. TLS 클라이언트 인증이 사용되지 않고 SASL이 비활성화되면 주체 이름은 CN=someuser
,OU=Unknown,O=Unknown,ST=Unknown,ST=Unknown,C=UnknownANONYMOUS
가 됩니다.
6.4.8.4.2. SASL 인증
SASL 인증은 Java Authentication and Authorization Service(JAAS)를 사용하여 구성됩니다. 또한 Kafka와 ZooKeeper 간의 연결 인증에 사용됩니다. CloudEvent는 자체 구성 파일을 사용합니다. 이 파일의 권장 위치는 /opt/kafka/config/jaas.conf
입니다. kafka
사용자가 파일을 읽을 수 있어야 합니다. Kafka를 실행할 때 Java 시스템 속성 java.security.auth.login.config
를 사용하여 이 파일의 위치가 지정됩니다. 이 속성은 브로커 노드를 시작할 때 Kafka로 전달되어야 합니다.
KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/my/jaas.config"; bin/kafka-server-start.sh
SASL 인증은 일반 암호화되지 않은 연결과 TLS 연결을 통해 지원됩니다. SASL은 각 리스너에 대해 개별적으로 활성화할 수 있습니다. 이를 활성화하려면 listener.security.protocol.map
의 보안 프로토콜이 SASL_PLAINTEXT
또는 SASL_SSL
이어야 합니다.
Kafka의 SASL 인증은 다음과 같은 여러 메커니즘을 지원합니다.
PLAIN
- 사용자 이름 및 암호를 기반으로 인증을 구현합니다. 사용자 이름과 암호는 Kafka 구성에 로컬로 저장됩니다.
SCRAM-SHA-256
및SCRAM-SHA-512
- SCRAM(Salted Challenge Response Authentication Mechanism)을 사용하여 인증을 구현합니다. SCRAM 인증 정보는 ZooKeeper에 중앙에서 저장됩니다. SCRAM은 ZooKeeper 클러스터 노드가 개인 네트워크에서 격리된 상태에서 사용될 수 있습니다.
GSSAPI
- Kerberos 서버에 대한 인증을 구현합니다.
PLAIN
메커니즘은 암호화되지 않은 형식으로 네트워크를 통해 사용자 이름과 암호를 전송합니다. 따라서 TLS 암호화와 함께만 사용해야 합니다.
SASL 메커니즘은 10.0.0.1 구성 파일을 통해 구성됩니다. Kafka는 KafkaServer
라는 10.0.0.1 컨텍스트를 사용합니다. ScanSetting에서 구성된 후에는 Kafka 구성에서 SASL 메커니즘을 활성화해야 합니다. 이는 sasl.enabled.mechanisms
속성을 사용하여 수행됩니다. 이 속성에는 쉼표로 구분된 활성화된 메커니즘 목록이 포함되어 있습니다.
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
broker 간 통신에 사용되는 리스너가 SASL을 사용하는 경우 속성 sasl.mechanism.inter.broker.protocol
을 사용하여 사용해야 하는 SASL 메커니즘을 지정해야 합니다. 예를 들면 다음과 같습니다.
sasl.mechanism.inter.broker.protocol=PLAIN
broker 간 통신에 사용할 사용자 이름과 암호는 사용자 이름 및 암호를 사용하여 KafkaServer
ECDHE 컨텍스트에서 지정해야 합니다.
SASL PLAIN
PLAIN 메커니즘을 사용하려면 연결할 수 있는 사용자 이름과 암호가 CloudEvent 컨텍스트에서 직접 지정됩니다. 다음 예제는 SASL PLAIN 인증에 대해 구성된 컨텍스트를 보여줍니다. 예제에서는 세 가지 사용자를 구성합니다.
-
admin
-
user1
-
user2
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; };
user 데이터베이스와의 configuration 파일은 모든 Kafka 브로커에서 동기화된 상태로 유지해야 합니다.
SASL PLAIN도broker 간 인증에 사용되는 경우 사용자 이름
및 암호
속성이 10.0.0.1 컨텍스트에 포함되어야 합니다.
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456" user_admin="123456" user_user1="123456" user_user2="123456"; };
SASL SCRAM
Kafka의 SCRAM 인증은 SCRAM-SHA-256
및 SCRAM-SHA-512
의 두 가지 메커니즘으로 구성됩니다. 이러한 메커니즘은 사용된 해시 알고리즘( SHA-256과 강력한 SHA-512)에서만 다릅니다. SCRAM 인증을 사용하려면 CloudEvent 구성 파일에 다음 구성을 포함해야 합니다.
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; };
Kafka 구성 파일에서 SASL 인증을 활성화하면 두 SCRAM 메커니즘을 나열할 수 있습니다. 그러나 해당 중 하나만 inter-broker 통신을 위해 선택할 수 있습니다. 예를 들면 다음과 같습니다.
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
SCRAM 메커니즘에 대한 사용자 자격 증명은 ZooKeeper에 저장됩니다. kafka-configs.sh
도구를 사용하여 관리할 수 있습니다. 예를 들어 다음 명령을 실행하여 암호 123456으로 사용자 user1을 추가합니다.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
사용자 인증 정보를 삭제하려면 다음을 사용합니다.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
SASL GSSAPI
Kerberos를 사용한 인증에 사용되는 SASL 메커니즘을 GSSAPI
라고 합니다. Kerberos SASL 인증을 구성하려면 다음 구성을 10.0.0.1 구성 파일에 추가해야 합니다.
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; };
Kerberos 사용자의 도메인 이름은 항상 대문자여야 합니다.
iPXE 구성 외에도 Kafka 구성의 sasl.kerberos.service.name
속성에 Kerberos 서비스 이름을 지정해야 합니다.
sasl.enabled.mechanisms=GSSAPI sasl.mechanism.inter.broker.protocol=GSSAPI sasl.kerberos.service.name=kafka
다중 SASL 메커니즘
Kafka는 여러 SASL 메커니즘을 동시에 사용할 수 있습니다. 서로 다른Configuration 구성을 모두 동일한 컨텍스트에 추가할 수 있습니다.
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; org.apache.kafka.common.security.scram.ScramLoginModule required; };
여러 메커니즘이 활성화되면 클라이언트는 사용하려는 메커니즘을 선택할 수 있습니다.
6.4.8.5. TLS 클라이언트 인증 활성화
다음 절차에서는 Kafka 브로커에서 TLS 클라이언트 인증을 활성화하는 방법을 설명합니다.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
- TLS 암호화가 활성화되어 있습니다.
절차
- 사용자 인증서에 서명하는 데 사용된 인증 기관의 공개 키가 포함된 JKS 신뢰 저장소를 준비합니다.
다음을 위해 모든 클러스터 노드에서
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집합니다.-
ssl.truststore.location
옵션을 사용자 인증서의 인증 기관이 있는 JKS 신뢰 저장소의 경로로 설정합니다. -
ssl.truststore.password
옵션을 신뢰 저장소를 보호하는 데 사용한 암호로 설정합니다. ssl.client.auth
옵션을필수로
설정합니다.예를 들면 다음과 같습니다.
ssl.truststore.location=/path/to/truststore.jks ssl.truststore.password=123456 ssl.client.auth=required
-
- (re) Kafka 브로커를 시작합니다.
6.4.8.6. SASL PLAIN 인증 활성화
다음 절차에서는 Kafka 브로커에서 SASL PLAIN 인증을 활성화하는 방법을 설명합니다.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
절차
/opt/kafka/config/jaas.conf
10.0.0.1 구성 파일을 편집하거나 생성합니다. 이 파일에는 모든 사용자 및 해당 암호가 포함되어야 합니다. 이 파일이 모든 Kafka 브로커에서 동일한지 확인합니다.예를 들면 다음과 같습니다.
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; };
다음을 위해 모든 클러스터 노드에서
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집합니다.-
listener.security.protocol.map
필드를 변경하여 SASL PLAIN 인증을 사용하려는 리스너의SASL_PLAINTEXT
또는SASL_SSL
프로토콜을 지정합니다. sasl.enabled.mechanisms
옵션을PLAIN
으로 설정합니다.예를 들면 다음과 같습니다.
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=PLAIN
-
KAFKA_OPTS 환경 변수를 사용하여 Kafka 브로커를 시작하여 10.0.0.1 구성을 Kafka 브로커에 전달합니다.
su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
6.4.8.7. SASL SCRAM 인증 활성화
다음 절차에서는 Kafka 브로커에서 SASL SCRAM 인증을 활성화하는 방법을 설명합니다.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
절차
/opt/kafka/config/jaas.conf
10.0.0.1 구성 파일을 편집하거나 생성합니다.KafkaServer
컨텍스트에 대한ScramLoginModule
을 활성화합니다. 이 파일이 모든 Kafka 브로커에서 동일한지 확인합니다.예를 들면 다음과 같습니다.
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; };
다음을 위해 모든 클러스터 노드에서
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집합니다.-
listener.security.protocol.map
필드를 변경하여 SASL SCRAM 인증을 사용하려는 리스너의SASL_PLAINTEXT
또는SASL_SSL
프로토콜을 지정합니다. sasl.enabled.mechanisms
옵션을SCRAM-SHA-256
또는SCRAM-SHA-512
로 설정합니다.예를 들면 다음과 같습니다.
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=SCRAM-SHA-512
-
KAFKA_OPTS 환경 변수를 사용하여 Kafka 브로커를 시작하여 10.0.0.1 구성을 Kafka 브로커에 전달합니다.
su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
6.4.8.8. SASL SCRAM 사용자 추가
다음 절차에서는 SASL SCRAM을 사용하여 인증을 위해 새 사용자를 추가하는 방법을 설명합니다.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
- SASL SCRAM 인증이 활성화되어 있습니다.
절차
kafka-configs.sh
도구를 사용하여 새 SASL SCRAM 사용자를 추가합니다.bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --add-config 'SCRAM-SHA-512=[password=<Password>]' --entity-type users --entity-name <Username>
예를 들면 다음과 같습니다.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
6.4.8.9. SASL SCRAM 사용자 삭제
다음 절차에서는 SASL SCRAM 인증을 사용할 때 사용자를 제거하는 방법을 설명합니다.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
- SASL SCRAM 인증이 활성화되어 있습니다.
절차
kafka-configs.sh
도구를 사용하여 SASL SCRAM 사용자를 삭제합니다./opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name <Username>
예를 들면 다음과 같습니다.
/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
6.4.9. OAuth 2.0 토큰 기반 인증 사용
AMQ Streams는 OAUTHB>-<ER 및 PLAIN 메커니즘을 사용한 OAuth 2.0 인증 사용을 지원합니다.
OAuth 2.0을 사용하면 리소스에 대한 제한된 액세스 권한을 부여하는 토큰을 발행하기 위한 중앙 권한 부여 서버를 사용하여 애플리케이션 간 표준화된 토큰 기반 인증 및 권한 부여가 가능합니다.
OAuth 2.0 인증을 구성한 다음 OAuth 2.0 권한 부여 를 설정할 수 있습니다.
OAuth 2.0을 사용하도록 Kafka 브로커 및 클라이언트를 구성해야 합니다. OAuth 2.0 인증은 단순
또는 OPA 기반 Kafka 권한 부여와 함께 사용할 수도 있습니다.
애플리케이션 클라이언트는 OAuth 2.0 인증을 사용하여 계정 자격 증명을 노출하지 않고 애플리케이션 서버( 리소스 서버라고 함)의 리소스에 액세스할 수 있습니다.
애플리케이션 클라이언트는 권한 부여 수준을 결정하는 데 사용할 수 있는 인증 수단으로 액세스 토큰을 전달합니다. 권한 부여 서버는 액세스 권한 부여 및 문의를 처리합니다.
AMQ Streams의 컨텍스트에서 다음을 수행합니다.
- Kafka 브로커는 OAuth 2.0 리소스 서버 역할을 합니다.
- Kafka 클라이언트는 OAuth 2.0 애플리케이션 클라이언트 역할을 합니다.
Kafka 클라이언트는 Kafka 브로커에 대해 인증합니다. 브로커 및 클라이언트는 필요한 경우 OAuth 2.0 권한 부여 서버와 통신하여 액세스 토큰을 얻거나 검증합니다.
AMQ Streams 배포를 위해 OAuth 2.0 통합은 다음을 제공합니다.
- Kafka 브로커에 대한 서버 측 OAuth 2.0 지원
- Kafka MirrorMaker, Kafka Connect 및 Kafka Bridge에 대한 클라이언트 측 OAuth 2.0 지원
RHEL의 AMQ Streams에는 두 개의 OAuth 2.0 라이브러리가 포함되어 있습니다.
kafka-oauth-client
-
io.strimzi.kafka.oauth.client.JaasClientOauthLogin>-<backHandler라는 사용자 지정 로그인 콜백
처리기 클래스를 제공합니다.OAUTHBISTRAER
인증 메커니즘을 처리하려면 Apache Kafka에서 제공하는OAuthBearerLoginModule
을 사용하여 로그인 콜백 처리기를 사용합니다. kafka-oauth-common
-
kafka-oauth-client
라이브러리에 필요한 일부 기능을 제공하는 도우미 라이브러리입니다.
제공된 클라이언트 라이브러리는 keycloak-core
,jackson-databind
, slf4j-api
와 같은 일부 추가 타사 라이브러리에 대한 종속성도 있습니다.
모든 종속성 라이브러리가 포함되도록 Maven 프로젝트를 사용하여 클라이언트를 패키징하는 것이 좋습니다. 종속성 라이브러리는 향후 버전에서 변경될 수 있습니다.
추가 리소스
6.4.9.1. OAuth 2.0 인증 메커니즘
AMQ Streams는 OAuth 2.0 인증을 위해 OAUTHBISTRAER 및 PLAIN 메커니즘을 지원합니다. 두 메커니즘을 모두 사용하면 Kafka 클라이언트가 Kafka 브로커와 인증 세션을 설정할 수 있습니다. 클라이언트, 권한 부여 서버 및 Kafka 브로커 간의 인증 흐름은 각 메커니즘마다 다릅니다.
가능하면 언제든지 OAUTHBECDHEER를 사용하도록 클라이언트를 구성하는 것이 좋습니다. OAUTHBLOGER는 클라이언트 자격 증명이 Kafka 브로커와 공유되지 않기 때문에 PLAIN보다 높은 수준의 보안을 제공합니다. OAUTHB topicER를 지원하지 않는 Kafka 클라이언트에만 PLAIN을 사용하는 것이 좋습니다.
클라이언트 연결에 OAuth 2.0 인증을 사용하도록 Kafka 브로커 리스너를 구성합니다. 필요한 경우 동일한 oauth
리스너에서 OAUTHBISTRAER 및 PLAIN 메커니즘을 사용할 수 있습니다. 각 메커니즘을 지원하는 속성을 oauth
리스너 구성에 명시적으로 지정해야 합니다.
AUTHBLOGER 개요
OAUTHBLOGER를 사용하려면 Kafka 브로커의 OAuth 인증 리스너 구성에서 sasl.enabled.mechanisms
를 OAUTHBECDHEER
로 설정합니다. 자세한 구성은 6.4.9.2절. “OAuth 2.0 Kafka 브로커 구성” 에서 참조하십시오.
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
많은 Kafka 클라이언트 도구는 프로토콜 수준에서 OAUTHB topicER에 대한 기본 지원을 제공하는 라이브러리를 사용합니다. 애플리케이션 개발을 지원하기 위해 AMQ Streams는 업스트림 Kafka 클라이언트 Java 라이브러리에 대한 OAuth 콜백 핸들러 를 제공합니다(다른 라이브러리에서는 제외). 따라서 자체 콜백 핸들러를 작성할 필요가 없습니다. 애플리케이션 클라이언트는 콜백 처리기를 사용하여 액세스 토큰을 제공할 수 있습니다. Go와 같은 다른 언어로 작성된 클라이언트는 사용자 정의 코드를 사용하여 권한 부여 서버에 연결하고 액세스 토큰을 받아야 합니다.
OAUTHBLOGER를 사용하면 클라이언트는 인증 정보 교환을 위해 Kafka 브로커와 세션을 시작합니다. 여기서 자격 증명은 콜백 처리기에서 제공하는 전달자 토큰의 형식을 취합니다. 콜백을 사용하면 다음 세 가지 방법 중 하나로 토큰 프로비저닝을 구성할 수 있습니다.
- 클라이언트 ID 및 시크릿( OAuth 2.0 클라이언트 인증 정보 메커니즘 사용)
- 구성 시 수동으로 가져온 장기 액세스 토큰
- 구성 시 수동으로 가져온 장기 새로 고침 토큰
프로토콜 수준에서 OAUTHBREER 메커니즘을 지원하는 Kafka 클라이언트에서만 인증을 사용할 수 있습니다.
PLAIN 개요
PLAIN
을 사용하려면 sasl.enabled.mechanisms
의 값에 PLAIN을 추가합니다.
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
PLAIN은 모든 Kafka 클라이언트 툴에서 사용하는 간단한 인증 메커니즘입니다. PLAIN을 OAuth 2.0 인증과 함께 사용할 수 있도록 AMQ Streams는 PLAIN 서버 측 콜백을 통해 OAuth 2.0 을 제공합니다.
PLAIN의 AMQ Streams 구현을 사용하면 클라이언트 인증 정보가 ZooKeeper에 저장되지 않습니다. 대신 클라이언트 자격 증명은 OAUTHB learnER 인증이 사용될 때와 유사하게 호환 권한 부여 서버 뒤에 중앙에서 처리됩니다.
OAuth 2.0 over PLAIN 콜백과 함께 사용하면 Kafka 클라이언트는 다음 방법 중 하나를 사용하여 Kafka 브로커로 인증합니다.
- 클라이언트 ID 및 시크릿( OAuth 2.0 클라이언트 인증 정보 메커니즘 사용)
- 구성 시 수동으로 가져온 장기 액세스 토큰
두 방법 모두 클라이언트는 PLAIN 사용자 이름
및 암호
속성을 제공하여 인증서를 Kafka 브로커에 전달해야 합니다. 클라이언트는 이러한 속성을 사용하여 클라이언트 ID와 시크릿 또는 사용자 이름 및 액세스 토큰을 전달합니다.
클라이언트 ID 및 시크릿은 액세스 토큰을 가져오는 데 사용됩니다.
액세스 토큰은 암호
속성 값으로 전달됩니다. $accessToken:
접두사를 사용하거나 사용하지 않고 액세스 토큰을 전달합니다.
-
리스너 구성에서 토큰 끝점(
oauth.token.endpoint.uri
)을 구성하는 경우 접두사가 필요합니다. -
리스너 구성에서 토큰 끝점(
oauth.token.endpoint.uri
)을 구성하지 않으면 접두사가 필요하지 않습니다. Kafka 브로커는 암호를 원시 액세스 토큰으로 해석합니다.
암호
가 액세스 토큰으로 설정된 경우 사용자
이름을 Kafka 브로커가 액세스 토큰에서 가져오는 동일한 보안 이름으로 설정해야 합니다. oauth.username.claim
, oauth.entryback.username.claim ,oauth. goesback.username.
prefix ,
추출 옵션을 지정할 수 있습니다. 사용자 이름 추출 프로세스는 권한 부여 서버(특히 클라이언트 ID를 계정 이름에 매핑하는 방법)에 따라 달라집니다.
oauth.userinfo.endpoint.uri
속성을 사용하여 리스너에서 사용자 이름
PLAIN을 통한 OAuth는 (더 이상 사용되지 않음) OAuth 2.0 암호 부여 메커니즘을 사용하여 사용자 이름과 암호(암호 부여) 전달을 지원하지 않습니다.
6.4.9.1.1. 속성 또는 변수를 사용하여 OAuth 2.0 구성
JDK(Java Authentication and Authorization Service) 속성 또는 환경 변수를 사용하여 OAuth 2.0 설정을 구성할 수 있습니다.
-
CloudEvent 속성은
server.properties
구성 파일에서 구성되고listener.name.LISTENER-NAME.oauthbearer.sas.jaas.config
속성의 키-값 쌍으로 전달됩니다. 환경 변수를 사용하는 경우 여전히
listener.name.LISTENER-NAME.oauthbearer.sasl.jaas.config
속성을server.properties
파일에 제공해야 하지만 다른ECDHE 속성을 생략할 수 있습니다.대문자 또는 대문자 환경 변수 이름 지정 규칙을 사용할 수 있습니다.
AMQ Streams OAuth 2.0 라이브러리는 다음으로 시작하는 속성을 사용합니다.
-
OAuth.
인증 구성 -
Strimzi
: OAuth 2.0 권한 부여 구성
추가 리소스
6.4.9.2. OAuth 2.0 Kafka 브로커 구성
OAuth 2.0 인증에 대한 Kafka 브로커 구성은 다음과 같습니다.
- 권한 부여 서버에서 OAuth 2.0 클라이언트 생성
- Kafka 클러스터에서 OAuth 2.0 인증 구성
권한 부여 서버와 관련하여 Kafka 브로커 및 Kafka 클라이언트는 모두 OAuth 2.0 클라이언트로 간주됩니다.
6.4.9.2.1. 권한 부여 서버의 OAuth 2.0 클라이언트 구성
세션 시작 중에 수신된 토큰을 확인하도록 Kafka 브로커를 구성하려면 다음 클라이언트 인증 정보를 사용하여 권한 부여 서버에 OAuth 2.0 클라이언트 정의를 생성하는 것이 좋습니다.
-
kafka-broker
의 클라이언트 ID (예:) - 인증 메커니즘으로서의 클라이언트 ID 및 시크릿
권한 부여 서버의 공용이 아닌 인트로스펙션 끝점을 사용하는 경우에만 클라이언트 ID와 시크릿을 사용해야 합니다. 빠른 로컬 JWT 토큰 검증과 마찬가지로 공용 권한 부여 서버 끝점을 사용할 때는 일반적으로 자격 증명이 필요하지 않습니다.
6.4.9.2.2. Kafka 클러스터의 OAuth 2.0 인증 구성
Kafka 클러스터에서 OAuth 2.0 인증을 사용하려면 Kafka server.properties
파일에서 Kafka 클러스터에 대한 OAuth 인증 리스너 구성을 활성화합니다. 최소 구성이 필요합니다. 또한 TLS가 중앙 간 통신에 사용되는 TLS 리스너를 구성할 수도 있습니다.
다음 방법 중 하나를 사용하여 권한 부여 서버에서 토큰 검증을 위해 브로커를 구성할 수 있습니다.
- 빠른 로컬 토큰 검증: 서명된 JWT 형식의 액세스 토큰과 결합된 JWKS 끝점
- 인트로스펙션 끝점
OAUTHBISTRAER 또는 PLAIN 인증 또는 둘 다 구성할 수 있습니다.
다음 예제에서는 전역 리스너 구성을 적용하는 최소 구성을 보여줍니다. 즉, 브로커 간 통신이 애플리케이션 클라이언트와 동일한 리스너를 거칩니다.
예제에서는 sasl.enabled.mechanis
를 지정하는 특정 리스너에 대한 OAuth 2.0 구성을 보여줍니다. LISTENER-NAME 은 리스너의 대소문자를 구분하지 않는 이름입니다. 여기서는 리스너 ms
대신 listener.name.LISTENER-NAME.sasl.enabled.mechanismsCLIENT
의 이름을 지정하므로 속성 이름은 listener.name.client.sasl.enabled.mechanisms
입니다.
이 예제에서는 OAUTHBDiscoveryER 인증을 사용합니다.
예: JWKS 끝점을 사용한 OAuth 2.0 인증에 대한 최소 리스너 구성
sasl.enabled.mechanisms=OAUTHBEARER 1 listeners=CLIENT://0.0.0.0:9092 2 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 3 listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER 4 sasl.mechanism.inter.broker.protocol=OAUTHBEARER 5 inter.broker.listener.name=CLIENT 6 listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 7 listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 8 oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 9 oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 10 oauth.username.claim="preferred_username" \ 11 oauth.client.id="kafka-broker" \ 12 oauth.client.secret="kafka-secret" \ 13 oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/token" ; 14 listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 15 listener.name.client.oauthbearer.connections.max.reauth.ms=3600000 16
- 1
- SASL을 통해 자격 증명을 교환하기 위한 OAUTHBLOGER 메커니즘을 활성화합니다.
- 2
- 클라이언트 애플리케이션이 연결할 수 있도록 리스너를 구성합니다. 시스템
호스트 이름은
재연결을 위해 클라이언트가 확인해야 하는 공개된 호스트 이름으로 사용됩니다. 이 예제에서 리스너의 이름은CLIENT
로 지정됩니다. - 3
- 리스너의 채널 프로토콜을 지정합니다.
SASL_SSL
은 TLS용입니다.SASL_PLAINTEXT
는 암호화되지 않은 연결(TLS 없음)에 사용되지만 TCP 연결 계층에서 차단 및 가로채기 위험이 있습니다. - 4
- CLIENT 리스너를 위한 OAUTHBLOGER 메커니즘을 지정합니다. 클라이언트 이름(
CLIENT
)은 일반적으로listeners
속성의 소문자,listener.name
.client 에 대해 소문자로 지정되고listener.name. client
.*
속성의 일부가 될 때 소문자로 지정됩니다. - 5
- 신호 간 통신을 위한 OAUTHBLOGER 메커니즘을 지정합니다.
- 6
- inter-broker 통신을 위한 리스너를 지정합니다. 구성을 활성화하려면 사양이 필요합니다.
- 7
- 클라이언트 리스너에서 OAuth 2.0 인증을 구성합니다.
- 8
- 클라이언트 및broker 간 통신에 대한 인증 설정을 구성합니다.
oauth.client.id
,oauth.client.secret
,auth.token.endpoint.uri
속성은broker 간 구성과 관련되어 있습니다. - 9
- 유효한 발급자 URI입니다. 이 발행자에서 발행한 액세스 토큰만 허용됩니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME.
- 10
- JWKS 끝점 URL입니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs.
- 11
- 토큰에 실제 사용자 이름을 포함하는 토큰 클레임(또는 키)입니다. 사용자 이름은 사용자를 식별하는 데 사용되는 보안 주체 입니다. 값은 인증 흐름과 사용된 권한 부여 서버에 따라 달라집니다.
- 12
- 모든 브로커에 대해 동일한 Kafka 브로커의 클라이언트 ID입니다.
kafka-broker
로 권한 부여 서버에 등록된 클라이언트 입니다. - 13
- Kafka 브로커의 시크릿(모든 브로커에 대해 동일합니다.
- 14
- 권한 부여 서버에 대한 OAuth 2.0 토큰 끝점 URL입니다. 프로덕션의 경우 항상
https://
URL을 사용합니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token. - 15
- 중앙 통신 간 통신을 위해 OAuth 2.0 인증을 활성화(및 필수 항목)합니다.
- 16
- (선택 사항) 토큰이 만료될 때 세션 만료를 강제 적용하고 Kafka 재인증 메커니즘 도 활성화합니다. 지정된 값이 액세스 토큰이 만료되기 위해 남은 시간보다 작으면 실제 토큰 만료 전에 클라이언트가 다시 인증해야 합니다. 기본적으로 액세스 토큰이 만료되면 세션이 만료되지 않으며 클라이언트는 재인증을 시도하지 않습니다.
다음 예제에서는 TLS가 중앙 간 통신에 사용되는 TLS 리스너에 대한 최소 구성을 보여줍니다.
예: OAuth 2.0 인증에 대한 TLS 리스너 구성
listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 1 listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT 2 listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER inter.broker.listener.name=REPLICATION listener.name.replication.ssl.keystore.password=KEYSTORE-PASSWORD 3 listener.name.replication.ssl.truststore.password=TRUSTSTORE-PASSWORD listener.name.replication.ssl.keystore.type=JKS listener.name.replication.ssl.truststore.type=JKS listener.name.replication.ssl.secure.random.implementation=SHA1PRNG 4 listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS 5 listener.name.replication.ssl.keystore.location=PATH-TO-KEYSTORE 6 listener.name.replication.ssl.truststore.location=PATH-TO-TRUSTSTORE 7 listener.name.replication.ssl.client.auth=required 8 listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ oauth.username.claim="preferred_username" ; 9
- 1
- 브로커 간 통신 및 클라이언트 애플리케이션에는 별도의 구성이 필요합니다.
- 2
- TLS를 사용하도록 REPLICATION 리스너와 CLIENT 리스너가 암호화되지 않은 채널에서 SASL을 사용하도록 구성합니다. 클라이언트는 프로덕션 환경에서 암호화된 채널(
SASL_SSL
)을 사용할 수 있습니다. - 3
ssl.
속성은 TLS 구성을 정의합니다.- 4
- 난수 생성기 구현입니다. 설정되지 않은 경우 Java 플랫폼 SDK 기본값이 사용됩니다.
- 5
- 호스트 이름 확인. 빈 문자열로 설정하면 호스트 이름 확인이 해제됩니다. 설정되지 않은 경우 기본값은
HTTPS
로, 서버 인증서에 대한 호스트 이름 확인을 적용합니다. - 6
- 리스너의 키 저장소 경로입니다.
- 7
- 리스너의 신뢰 저장소 경로입니다.
- 8
- 연결 간 연결에 사용되는 경우 REPLICATION 리스너의 클라이언트가 클라이언트 인증서로 인증하도록 지정합니다.
- 9
- OAuth 2.0에 대한 CLIENT 리스너를 구성합니다. 권한 부여 서버와의 연결은 보안 HTTPS 연결을 사용해야 합니다.
다음 예제에서는 SASL을 통해 자격 증명을 교환하기 위한 PLAIN 인증 메커니즘을 사용하는 OAuth 2.0 인증에 대한 최소 구성을 보여줍니다. 빠른 로컬 토큰 검증이 사용됩니다.
예: PLAIN 인증에 대한 최소 리스너 구성
listeners=CLIENT://0.0.0.0:9092 1 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 2 listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN 3 sasl.mechanism.inter.broker.protocol=OAUTHBEARER 4 inter.broker.listener.name=CLIENT 5 listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 6 listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 7 oauth.valid.issuer.uri="http://AUTH_SERVER/auth/realms/REALM" \ 8 oauth.jwks.endpoint.uri="https://AUTH_SERVER/auth/realms/REALM/protocol/openid-connect/certs" \ 9 oauth.username.claim="preferred_username" \ 10 oauth.client.id="kafka-broker" \ 11 oauth.client.secret="kafka-secret" \ 12 oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/token" ; 13 listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 14 listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler 15 listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 16 oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 17 oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 18 oauth.username.claim="preferred_username" \ 19 oauth.token.endpoint.uri="http://AUTH_SERVER/auth/realms/REALM/protocol/openid-connect/token" ; 20 connections.max.reauth.ms=3600000 21
- 1
- 클라이언트 애플리케이션이 연결할 수 있도록 리스너(이 예에서는
CLIENT
라고 함)를 구성합니다. 시스템호스트 이름은
재연결을 위해 클라이언트가 확인해야 하는 공개된 호스트 이름으로 사용됩니다. 이는 유일하게 구성된 리스너이므로broker 간 통신에도 사용됩니다. - 2
- 암호화되지 않은 채널에서 SASL을 사용하도록
CLIENT
리스너 예제를 구성합니다. 프로덕션 환경에서 클라이언트는 TCP 연결 계층에서 도방 및 가로채기를 방지하기 위해 암호화된 채널(SASL_SSL
)을 사용해야 합니다. - 3
- SASL을 통해 자격 증명을 교환할 수 있는 PLAIN 인증 메커니즘과 OAUTHBECDHEER를 활성화합니다. OAUTHBISTRAER는 브로커 간 통신에 필요하므로 지정됩니다. Kafka 클라이언트는 연결하는 데 사용할 메커니즘을 선택할 수 있습니다.
- 4
- 신호 간 통신을 위한 OAUTHBLOGER 인증 메커니즘을 지정합니다.
- 5
- broker 간 통신을 위해 리스너(이 예에서는
CLIENT
라고 함)를 지정합니다. 구성이 유효한 데 필요합니다. - 6
- OAUTHBREER 메커니즘에 대한 서버 콜백 처리기를 구성합니다.
- 7
- OAUTHBECDHEER 메커니즘을 사용하여 클라이언트 및 media 간 통신에 대한 인증 설정을 구성합니다.
oauth.client.id
,oauth.client.secret
,oauth.token.endpoint.uri
속성은broker 간 구성과 관련이 있습니다. - 8
- 유효한 발급자 URI입니다. 이 발행자의 액세스 토큰만 허용됩니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
- 9
- JWKS 끝점 URL입니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
- 10
- 토큰에 실제 사용자 이름을 포함하는 토큰 클레임(또는 키)입니다. 사용자 이름은 사용자를 식별하는 보안 주체 입니다. 값은 인증 흐름과 사용되는 권한 부여 서버에 따라 다릅니다.
- 11
- 모든 브로커에 대해 동일한 Kafka 브로커의 클라이언트 ID입니다.
kafka-broker
로 권한 부여 서버에 등록된 클라이언트 입니다. - 12
- Kafka 브로커의 시크릿(모든 브로커에 대해 동일합니다.
- 13
- 권한 부여 서버에 대한 OAuth 2.0 토큰 끝점 URL입니다. 프로덕션의 경우 항상
https://
URL을 사용합니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token - 14
- broker 간 통신을 위한 OAuth 2.0 인증을 활성화합니다.
- 15
- PLAIN 인증에 대한 서버 콜백 처리기를 구성합니다.
- 16
- PLAIN 인증을 사용하여 클라이언트 통신에 대한 인증 설정을 구성합니다.
OAuth.token.endpoint.uri
는 OAuth 2.0 클라이언트 인증 정보 메커니즘을 사용하여 PLAIN을 통해 OAuth 2.0 을 활성화하는 선택적 속성입니다. - 17
- 유효한 발급자 URI입니다. 이 발행자의 액세스 토큰만 허용됩니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
- 18
- JWKS 끝점 URL입니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
- 19
- 토큰에 실제 사용자 이름을 포함하는 토큰 클레임(또는 키)입니다. 사용자 이름은 사용자를 식별하는 보안 주체 입니다. 값은 인증 흐름과 사용된 권한 부여 서버에 따라 다릅니다.
- 20
- 권한 부여 서버에 대한 OAuth 2.0 토큰 끝점 URL입니다. PLAIN 메커니즘에 대한 추가 구성입니다. 지정된 경우 클라이언트는
$accessToken:
접두사를 사용하여 액세스 토큰을 암호로
전달하여 PLAIN을 통해 인증할 수 있습니다.프로덕션의 경우 항상
https://
URL을 사용합니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token. - 21
- (선택 사항) 토큰이 만료될 때 세션 만료를 강제 적용하고 Kafka 재인증 메커니즘 도 활성화합니다. 지정된 값이 액세스 토큰이 만료되기 위해 남은 시간보다 작으면 실제 토큰 만료 전에 클라이언트가 다시 인증해야 합니다. 기본적으로 액세스 토큰이 만료되면 세션이 만료되지 않으며 클라이언트는 재인증을 시도하지 않습니다.
6.4.9.2.3. 빠른 로컬 JWT 토큰 검증 구성
빠른 로컬 JWT 토큰 검증은 JWT 토큰 서명을 로컬에서 확인합니다.
로컬 검사에서 토큰을 확인합니다.
-
액세스 토큰에 대한
Bearer
의 (typ) 클레임 값을 포함하여 유형을 준수합니다. - 유효 ( 만료되지 않음)
-
유효한IssuerURI
와 일치하는 발급자가 있습니다.
권한 부여 서버에서 발행하지 않은 토큰이 거부되도록 리스너를 구성할 때 유효한 발급자 URI 를 지정합니다.
빠른 로컬 JWT 토큰 검증 중에 권한 부여 서버에 연결할 필요가 없습니다. OAuth 2.0 인증 서버에서 노출하는 JWKs 끝점 URI 를 지정하여 빠른 로컬 JWT 토큰 검증을 활성화합니다. 엔드포인트에는 Kafka 클라이언트에서 인증 정보로 전송되는 서명된 JWT 토큰의 유효성을 확인하는 데 사용되는 공개 키가 포함되어 있습니다.
권한 부여 서버와의 모든 통신은 HTTPS를 사용하여 수행해야 합니다.
TLS 리스너의 경우 인증서 신뢰 저장소를 구성하고 신뢰 저장소 파일을 가리킬 수 있습니다.
빠른 로컬 JWT 토큰 검증을 위한 속성의 예
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS" \ 1 oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/jwks" \ 2 oauth.jwks.refresh.seconds="300" \ 3 oauth.jwks.refresh.min.pause.seconds="1" \ 4 oauth.jwks.expiry.seconds="360" \ 5 oauth.username.claim="preferred_username" \ 6 oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ 7 oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ 8 oauth.ssl.truststore.type="PKCS12" ; 9
- 1
- 유효한 발급자 URI입니다. 이 발행자에서 발행한 액세스 토큰만 허용됩니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME.
- 2
- JWKS 끝점 URL입니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs.
- 3
- 끝점 간의 기간(기본값 300)
- 4
- 연속한 시도 사이에 최소 일시 정지(초)는 JWKS 공개 키를 새로 고침합니다. 알 수 없는 서명 키가 발생하면 JWKS 키 새로 고침은 마지막 새로 고침 시도 이후 지정된 일시 중지를 포함하여 정기적인 일정 외부에서 예약됩니다. 키를 새로 고치면 지수 백오프 규칙을 따르며
oauth.jwks.refresh.refresh.seconds
에 도달할 때까지 일시 중지를 계속 늘리면 실패한 새로 고침을 다시 시도합니다. 기본값은 1입니다. - 5
- JWK 인증서가 만료되기 전에 유효한 것으로 간주되는 기간입니다. 기본값은
360
초입니다. 더 긴 시간을 지정하는 경우 취소된 인증서에 대한 액세스를 허용할 위험을 고려하십시오. - 6
- 토큰에 실제 사용자 이름을 포함하는 토큰 클레임(또는 키)입니다. 사용자 이름은 사용자를 식별하는 데 사용되는 보안 주체 입니다. 값은 인증 흐름과 사용된 권한 부여 서버에 따라 달라집니다.
- 7
- TLS 구성에 사용된 신뢰 저장소의 위치입니다.
- 8
- 신뢰 저장소에 액세스하기 위한 암호입니다.
- 9
- PKCS #12 형식의 truststore 유형입니다.
6.4.9.2.4. OAuth 2.0 인트로스펙션 엔드 포인트 구성
OAuth 2.0 인트로스펙션 끝점을 사용한 토큰 검증은 수신된 액세스 토큰을 opaque로 처리합니다. Kafka 브로커는 유효성 검사에 필요한 토큰 정보로 응답하는 인트로스펙션 끝점에 액세스 토큰을 보냅니다. 중요한 것은 특정 액세스 토큰이 유효하고 토큰 만료 시기에 대한 최신 정보를 반환하는 것입니다.
OAuth 2.0 인트로스펙션 기반 검증을 구성하려면 빠른 로컬 JWT 토큰 검증을 위해 JWKs 끝점 URI 대신 인트로스펙션 끝점 URI를 지정합니다. 일반적으로 인트로스펙션 끝점이 보호되므로 권한 부여 서버에 따라 클라이언트 ID 및 클라이언트 시크릿 을 지정해야 합니다.
인트로스펙션 끝점의 속성 예
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.introspection.endpoint.uri="https://AUTH-SERVER-ADDRESS/introspection" \ 1 oauth.client.id="kafka-broker" \ 2 oauth.client.secret="kafka-broker-secret" \ 3 oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ 4 oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ 5 oauth.ssl.truststore.type="PKCS12" \ 6 oauth.username.claim="preferred_username" ; 7
- 1
- OAuth 2.0 인트로스펙션 엔드 포인트 URI입니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token/introspect.
- 2
- Kafka 브로커의 클라이언트 ID입니다.
- 3
- Kafka 브로커의 시크릿입니다.
- 4
- TLS 구성에 사용된 신뢰 저장소의 위치입니다.
- 5
- 신뢰 저장소에 액세스하기 위한 암호입니다.
- 6
- PKCS #12 형식의 truststore 유형입니다.
- 7
- 토큰에 실제 사용자 이름을 포함하는 토큰 클레임(또는 키)입니다. 사용자 이름은 사용자를 식별하는 데 사용되는 보안 주체 입니다.
oauth.username.claim
값은 사용된 권한 부여 서버에 따라 다릅니다.
6.4.9.3. Kafka 브로커에 대한 세션 재인증
Kafka 클라이언트와 Kafka 브로커 간의 OAuth 2.0 세션에 대한 Kafka 세션 재인증 을 사용하도록 OAuth 리스너를 구성할 수 있습니다. 이 메커니즘은 정의된 기간 후에 클라이언트와 브로커 간에 인증된 세션의 만료를 적용합니다. 세션이 만료되면 클라이언트는 기존 연결을 삭제하는 대신 기존 연결을 재사용하여 새 세션을 즉시 시작합니다.
세션 재인증은 기본적으로 비활성화되어 있습니다. server.properties
파일에서 활성화할 수 있습니다. SASL 메커니즘으로 OAUTHB>-<ER 또는 PLAIN이 활성화된 TLS 리스너의 connections.max.reauth.ms
속성을 설정합니다.
리스너당 세션 재인증을 지정할 수 있습니다. 예를 들면 다음과 같습니다.
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
세션 재인증은 클라이언트에서 사용하는 Kafka 클라이언트 라이브러리에서 지원해야 합니다.
세션 재인증은 빠른 로컬 JWT 또는 인트로스펙션 끝점 토큰 검증과 함께 사용할 수 있습니다.
클라이언트 재인증
브로커의 인증된 세션이 만료되면 클라이언트는 연결을 삭제하지 않고 유효한 새 액세스 토큰을 브로커에 전송하여 기존 세션으로 다시 인증해야 합니다.
토큰 검증에 성공하면 기존 연결을 사용하여 새 클라이언트 세션이 시작됩니다. 클라이언트가 다시 인증되지 않으면 브로커는 메시지 전송 또는 수신을 위해 추가 시도를 수행하면 연결을 종료합니다. 브로커에서 재인증 메커니즘이 활성화된 경우 Kafka 클라이언트 라이브러리 2.2 이상을 사용하는 Java 클라이언트는 자동으로 다시 인증됩니다.
세션 재인증은 사용하는 경우 새로 고침 토큰에도 적용됩니다. 세션이 만료되면 클라이언트는 새로 고침 토큰을 사용하여 액세스 토큰을 새로 고칩니다. 그런 다음 클라이언트는 새 액세스 토큰을 사용하여 기존 연결을 다시 인증합니다.
OAUTHBREER 및 PLAIN의 세션 만료
세션 재인증이 구성되면 세션 만료가 OAUTHB topicER 및 PLAIN 인증에 다르게 작동합니다.
OAUTHBREER 및 PLAIN의 경우 클라이언트 ID 및 시크릿 방법을 사용합니다.
-
브로커의 인증된 세션은 구성된
연결.max.reauth.ms
에서 만료됩니다. - 구성된 시간 전에 액세스 토큰이 만료되면 세션이 일찍 만료됩니다.
수 명이 긴 액세스 토큰 방법을 사용하는 PLAIN의 경우:
-
브로커의 인증된 세션은 구성된
연결.max.reauth.ms
에서 만료됩니다. - 구성된 시간 이전에 액세스 토큰이 만료되면 재인증이 실패합니다. 세션 재인증이 시도되지만 PLAIN에는 토큰을 새로 고치는 메커니즘이 없습니다.
connections.max.reauth.ms
가 구성되지 않은 경우 다시 인증되지 않고도 OAUTHBLOGER 및 PLAIN 클라이언트는 브로커에 연결되지 않은 상태로 유지될 수 있습니다. 인증된 세션은 액세스 토큰 만료로 끝나지 않습니다. 그러나 이는 예를 들어 keycloak
인증을 사용하거나 사용자 정의 인증 기관을 설치하여 권한을 구성할 때 고려할 수 있습니다.
6.4.9.4. OAuth 2.0 Kafka 클라이언트 구성
Kafka 클라이언트는 다음 중 하나로 구성됩니다.
- 권한 부여 서버에서 유효한 액세스 토큰을 가져오는 데 필요한 인증 정보(클라이언트 ID 및 시크릿)
- 권한 부여 서버에서 제공하는 툴을 사용하여 얻은 유효한 장기 액세스 토큰 또는 새로 고침 토큰
Kafka 브로커에 전송된 유일한 정보는 액세스 토큰입니다. 액세스 토큰을 얻기 위해 권한 부여 서버를 통해 인증하는 데 사용되는 인증 정보는 브로커로 전송되지 않습니다.
클라이언트에서 액세스 토큰을 가져오는 경우 권한 부여 서버와 더 이상 통신할 필요가 없습니다.
가장 간단한 메커니즘은 클라이언트 ID 및 시크릿을 사용한 인증입니다. 장기 액세스 토큰 또는 수명이 긴 새로 고침 토큰을 사용하면 권한 부여 서버 도구에 대한 추가 종속성이 있으므로 복잡성이 추가됩니다.
수명이 긴 액세스 토큰을 사용하는 경우 토큰의 최대 수명을 늘리도록 권한 부여 서버에서 클라이언트를 구성해야 할 수 있습니다.
Kafka 클라이언트가 액세스 토큰으로 직접 구성되지 않은 경우 클라이언트는 권한 부여 서버에 연결하여 Kafka 세션 시작 중에 액세스 토큰에 대한 인증 정보를 전송합니다. Kafka 클라이언트가 다음 중 하나를 교환합니다.
- 클라이언트 ID 및 시크릿
- 클라이언트 ID, 새로 고침 토큰 및 (선택 사항) 시크릿
- 사용자 이름 및 암호, 클라이언트 ID 및 (선택 사항) 시크릿
6.4.9.5. OAuth 2.0 클라이언트 인증 흐름
OAuth 2.0 인증 흐름은 기본 Kafka 클라이언트 및 Kafka 브로커 구성에 따라 다릅니다. 사용된 권한 부여 서버에서도 이 흐름을 지원해야 합니다.
Kafka 브로커 리스너 구성은 클라이언트가 액세스 토큰을 사용하여 인증하는 방법을 결정합니다. 클라이언트는 클라이언트 ID와 시크릿을 전달하여 액세스 토큰을 요청할 수 있습니다.
리스너가 PLAIN 인증을 사용하도록 구성된 경우 클라이언트는 클라이언트 ID와 시크릿 또는 사용자 이름 및 액세스 토큰으로 인증할 수 있습니다. 이러한 값은 PLAIN 메커니즘의 사용자 이름
및 암호
속성으로 전달됩니다.
리스너 구성은 다음 토큰 검증 옵션을 지원합니다.
- 권한 부여 서버에 연결하지 않고도 JWT 서명 확인 및 로컬 토큰 인트로스펙션을 기반으로 빠른 로컬 토큰 검증을 사용할 수 있습니다. 권한 부여 서버는 토큰의 서명을 확인하는 데 사용되는 공용 인증서가 포함된 JWKS 엔드포인트를 제공합니다.
- 권한 부여 서버에서 제공하는 토큰 인트로스펙션 끝점에 대한 호출을 사용할 수 있습니다. 새 Kafka 브로커 연결이 설정될 때마다 브로커는 클라이언트에서 인증된 액세스 토큰을 권한 부여 서버로 전달합니다. Kafka 브로커는 응답을 확인하여 토큰이 유효한지 여부를 확인합니다.
권한 부여 서버에서는 불투명 액세스 토큰만 사용할 수 있으므로 로컬 토큰 검증이 불가능합니다.
Kafka 클라이언트 인증 정보는 다음 유형의 인증에 대해 구성할 수도 있습니다.
- 이전에 생성된 장기 액세스 토큰을 사용한 직접 로컬 액세스
- 새 액세스 토큰이 발행될 수 있도록 권한 서버와 연결합니다(클라이언트 ID와 시크릿 또는 새로 고침 토큰 또는 사용자 이름 및 암호 사용)
6.4.9.5.1. SASL OAUTHB>-<ER 메커니즘을 사용하는 클라이언트 인증 흐름의 예
SASL OAUTHB topicER 메커니즘을 사용하여 Kafka 인증에 대해 다음 통신 흐름을 사용할 수 있습니다.
클라이언트 ID 및 보안을 사용하는 클라이언트, 브로커가 권한 부여 서버에 대한 검증을 위임함
- Kafka 클라이언트는 클라이언트 ID 및 시크릿과 선택적으로 새로 고침 토큰을 사용하여 권한 부여 서버에서 액세스 토큰을 요청합니다. 또는 클라이언트는 사용자 이름과 암호를 사용하여 인증할 수 있습니다.
- 권한 부여 서버는 새 액세스 토큰을 생성합니다.
- Kafka 클라이언트는 SASL OAUTHB>-<ER 메커니즘을 사용하여 액세스 토큰을 전달하는 Kafka 브로커로 인증합니다.
- Kafka 브로커는 자체 클라이언트 ID 및 시크릿을 사용하여 권한 부여 서버에서 토큰 인트로스펙션 끝점을 호출하여 액세스 토큰을 검증합니다.
- 토큰이 유효한 경우 Kafka 클라이언트 세션이 설정됩니다.
빠른 로컬 토큰 검증을 수행하는 브로커가 있는 클라이언트 ID 및 시크릿을 사용하는 클라이언트
- Kafka 클라이언트는 클라이언트 ID 및 시크릿을 사용하여 토큰 끝점에서 권한 부여 서버와 선택적으로 새로 고침 토큰을 사용하여 인증합니다. 또는 클라이언트는 사용자 이름과 암호를 사용하여 인증할 수 있습니다.
- 권한 부여 서버는 새 액세스 토큰을 생성합니다.
- Kafka 클라이언트는 SASL OAUTHB>-<ER 메커니즘을 사용하여 액세스 토큰을 전달하는 Kafka 브로커로 인증합니다.
- Kafka 브로커는 JWT 토큰 서명 확인 및 로컬 토큰 인트로스펙션을 사용하여 액세스 토큰을 로컬에서 검증합니다.
장기 액세스 토큰을 사용하는 클라이언트 및 인증 서버에 대한 검증을 위임하는 브로커
- Kafka 클라이언트는 SASL OAUTHB>-<ER 메커니즘을 사용하여 장기간의 액세스 토큰을 전달하는 Kafka 브로커로 인증합니다.
- Kafka 브로커는 자체 클라이언트 ID 및 시크릿을 사용하여 권한 부여 서버에서 토큰 인트로스펙션 끝점을 호출하여 액세스 토큰을 검증합니다.
- 토큰이 유효한 경우 Kafka 클라이언트 세션이 설정됩니다.
빠른 로컬 검증 수행 브로커를 통해 장기 액세스 토큰을 사용하는 클라이언트
- Kafka 클라이언트는 SASL OAUTHB>-<ER 메커니즘을 사용하여 장기간의 액세스 토큰을 전달하는 Kafka 브로커로 인증합니다.
- Kafka 브로커는 JWT 토큰 서명 확인 및 로컬 토큰 인트로스펙션을 사용하여 로컬에서 액세스 토큰을 검증합니다.
빠른 로컬 JWT 토큰 서명 검증은 토큰이 취소된 경우 권한 부여 서버에 검사가 없기 때문에 수명이 짧은 토큰에만 적합합니다. 토큰 만료는 토큰에 기록되지만 해지는 언제든지 발생할 수 있으므로 권한 부여 서버에 연결하지 않으면 이를 고려할 수 없습니다. 발행된 모든 토큰은 만료될 때까지 유효한 것으로 간주됩니다.
6.4.9.5.2. SASL PLAIN 메커니즘을 사용하는 클라이언트 인증 흐름의 예
OAuth PLAIN 메커니즘을 사용하여 Kafka 인증에 대해 다음 통신 흐름을 사용할 수 있습니다.
클라이언트 ID 및 시크릿을 사용하는 클라이언트 및 브로커에서 클라이언트의 액세스 토큰을 가져옵니다.
-
Kafka 클라이언트는
clientId
를 사용자 이름으로,시크릿
을 암호로 전달합니다. -
Kafka 브로커는 토큰 끝점을 사용하여
clientId
및시크릿
을 권한 부여 서버에 전달합니다. - 클라이언트 인증 정보가 유효하지 않은 경우 권한 부여 서버에서 새 액세스 토큰 또는 오류를 반환합니다.
Kafka 브로커는 다음 방법 중 하나로 토큰을 검증합니다.
- 토큰 인트로스펙션 엔드 포인트가 지정되면 Kafka 브로커는 권한 부여 서버에서 끝점을 호출하여 액세스 토큰을 검증합니다. 토큰 검증에 성공하면 세션이 설정됩니다.
- 로컬 토큰 인트로스펙션이 사용되는 경우 권한 부여 서버에 대한 요청이 수행되지 않습니다. Kafka 브로커는 JWT 토큰 서명 검사를 사용하여 로컬에서 액세스 토큰을 검증합니다.
클라이언트 ID 및 시크릿 없이 장기 액세스 토큰을 사용하는 클라이언트
- Kafka 클라이언트는 사용자 이름과 암호를 전달합니다. password는 클라이언트를 실행하기 전에 수동으로 가져와 구성한 액세스 토큰의 값을 제공합니다.
암호는 Kafka 브로커 리스너가 인증을 위해 토큰 끝점으로 구성되었는지에 따라
$accessToken:
문자열 접두사를 사용하여 전달됩니다.-
토큰 끝점이 구성된 경우 브로커에 클라이언트 시크릿 대신 액세스 토큰이 포함되어 있음을 알리기 위해 암호 앞에
$accessToken:
접두사가 추가해야 합니다. Kafka 브로커는 사용자 이름을 계정 사용자 이름으로 해석합니다. -
Kafka 브로커 리스너에 토큰 끝점이 구성되지 않은 경우(
no-client-credentials 모드
강화) 암호는 접두사 없이 액세스 토큰을 제공해야 합니다. Kafka 브로커는 사용자 이름을 계정 사용자 이름으로 해석합니다. 이 모드에서 클라이언트는 클라이언트 ID와 시크릿을 사용하지 않으며password
매개변수는 항상 원시 액세스 토큰으로 해석됩니다.
-
토큰 끝점이 구성된 경우 브로커에 클라이언트 시크릿 대신 액세스 토큰이 포함되어 있음을 알리기 위해 암호 앞에
Kafka 브로커는 다음 방법 중 하나로 토큰을 검증합니다.
- 토큰 인트로스펙션 엔드 포인트가 지정되면 Kafka 브로커는 권한 부여 서버에서 끝점을 호출하여 액세스 토큰을 검증합니다. 토큰 검증에 성공하면 세션이 설정됩니다.
- 로컬 토큰 인트로스펙션을 사용하는 경우 권한 부여 서버에 대한 요청이 없습니다. Kafka 브로커는 JWT 토큰 서명 검사를 사용하여 로컬에서 액세스 토큰을 검증합니다.
6.4.9.6. OAuth 2.0 인증 구성
OAuth 2.0은 Kafka 클라이언트와 AMQ Streams 구성 요소 간의 상호 작용에 사용됩니다.
AMQ Streams에 OAuth 2.0을 사용하려면 다음을 수행해야 합니다.
6.4.9.6.1. Red Hat Single Sign-On을 OAuth 2.0 인증 서버로 구성
다음 절차에서는 Red Hat Single Sign-On을 권한 부여 서버로 배포하고 AMQ Streams와 통합하도록 구성하는 방법을 설명합니다.
권한 부여 서버는 인증 및 권한 부여를 위한 중앙 지점과 사용자, 클라이언트 및 권한 관리를 제공합니다. Red Hat Single Sign-On에는 영역이 별도의 사용자, 클라이언트, 권한 및 기타 구성을 나타내는 영역에 대한 개념이 있습니다. 기본 마스터 영역을 사용하거나 새 마스터 영역을 만들 수 있습니다. 각 영역은 자체 OAuth 2.0 엔드포인트를 노출합니다. 즉, 애플리케이션 클라이언트 및 애플리케이션 서버는 모두 동일한 영역을 사용해야 합니다.
AMQ Streams에서 OAuth 2.0을 사용하려면 Red Hat Single Sign-On 배포를 사용하여 인증 영역을 생성하고 관리합니다.
Red Hat Single Sign-On을 이미 배포한 경우 배포 단계를 건너뛰고 현재 배포를 사용할 수 있습니다.
사전 준비 사항
Red Hat Single Sign-On 사용에 대해 잘 알고 있어야 합니다.
설치 및 관리 지침은 다음을 참조하십시오.
사전 요구 사항
- AMQ Streams 및 Kafka가 실행 중입니다.
Red Hat Single Sign-On 배포의 경우:
절차
Red Hat Single Sign-On 설치.
ZIP 파일에서 또는 RPM을 사용하여 설치할 수 있습니다.
Red Hat Single Sign-On 관리 콘솔에 로그인하여 AMQ Streams에 대한 OAuth 2.0 정책을 생성합니다.
Red Hat Single Sign-On을 배포할 때 로그인 세부 정보가 제공됩니다.
영역을 생성하고 활성화합니다.
기존 마스터 영역을 사용할 수 있습니다.
- 필요한 경우 영역의 세션 및 토큰 타임아웃을 조정합니다.
-
kafka-broker
라는 클라이언트를 생성합니다. 설정 탭에서 다음을 설정합니다.
-
기밀성에 대한 액세스 유형
-
이 클라이언트에 대한 웹 로그인을 비활성화하려면 Standard Flow Enabled to
OFF
-
이 클라이언트가 자체 이름으로 인증할 수 있도록 서비스 계정이
ON
에 활성화됨
-
기밀성에 대한 액세스 유형
- 계속하기 전에 저장 을 클릭합니다.
- 탭에서 AMQ Streams Kafka 클러스터 구성에서 사용할 시크릿을 기록합니다.
Kafka 브로커에 연결할 모든 애플리케이션 클라이언트에 대해 클라이언트 생성 단계를 반복합니다.
각 새 클라이언트에 대한 정의를 생성합니다.
구성의 클라이언트 ID로 이름을 사용합니다.
다음에 수행할 작업
권한 부여 서버를 배포 및 구성한 후 OAuth 2.0을 사용하도록 Kafka 브로커를 구성합니다.
6.4.9.6.2. Kafka 브로커에 대한 OAuth 2.0 지원 구성
다음 절차에서는 인증 서버를 사용하여 브로커 리스너가 OAuth 2.0 인증을 사용하도록 활성화되도록 Kafka 브로커를 구성하는 방법을 설명합니다.
TLS 리스너 구성을 통해 암호화된 인터페이스를 통해 OAuth 2.0을 사용하는 것이 좋습니다. 일반 리스너는 권장되지 않습니다.
선택한 권한 부여 서버를 지원하는 속성과 구현 중인 권한 부여 유형을 사용하여 Kafka 브로커를 구성합니다.
시작하기 전에
Kafka 브로커 리스너의 구성 및 인증에 대한 자세한 내용은 다음을 참조하십시오.
리스너 구성에 사용되는 속성에 대한 설명은 다음을 참조하십시오.
사전 요구 사항
- AMQ Streams 및 Kafka가 실행 중입니다.
- OAuth 2.0 인증 서버가 배포됨
절차
server.properties
파일에서 Kafka 브로커 리스너 구성을 구성합니다.예를 들어 OAUTHBLOGER 메커니즘을 사용합니다.
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
broker 연결 설정을
listener.name.client.oauthbearer.sasl.jaas.config
의 일부로 구성합니다.다음 예제에서는 연결 구성 옵션을 보여줍니다.
예 1: JWKS 끝점 구성을 사용한 로컬 토큰 검증
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME" \ oauth.jwks.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs" \ oauth.jwks.refresh.seconds="300" \ oauth.jwks.refresh.min.pause.seconds="1" \ oauth.jwks.expiry.seconds="360" \ oauth.username.claim="preferred_username" \ oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ oauth.ssl.truststore.type="PKCS12" ; listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
예 2: OAuth 2.0 인트로스펙션 끝점을 통해 인증 서버에 토큰 검증 제공
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.introspection.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/introspection" \ # ...
필요한 경우 권한 부여 서버에 대한 액세스를 구성합니다.
서비스 메시 와 같은 기술을 사용하여 컨테이너 외부의 보안 채널을 구성하지 않는 한, 프로덕션 환경에 이 단계가 일반적으로 필요합니다.
보안 권한 부여 서버에 연결하기 위한 사용자 정의 신뢰 저장소를 제공합니다. 권한 부여 서버에 액세스하려면 항상 SSL이 필요합니다.
속성을 설정하여 신뢰 저장소를 구성합니다.
예를 들면 다음과 같습니다.
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-broker-secret" \ oauth.ssl.truststore.location="PATH-TO-TRUSTSTORE-P12-FILE" \ oauth.ssl.truststore.password="TRUSTSTORE-PASSWORD" \ oauth.ssl.truststore.type="PKCS12" ;
인증서 호스트 이름이 액세스 URL 호스트 이름과 일치하지 않으면 인증서 호스트 이름 검증을 비활성화할 수 있습니다.
oauth.ssl.endpoint.identification.algorithm=""
이 검사를 통해 권한 부여 서버에 대한 클라이언트 연결이 인증됨을 확인합니다. 프로덕션 환경 외 환경에서 검증을 끄는 것이 좋습니다.
선택한 인증 흐름에 따라 추가 속성을 구성합니다.
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token" \ 1 oauth.custom.claim.check="@.custom == 'custom-value'" \ 2 oauth.scope="SCOPE" \ 3 oauth.check.audience="true" \ 4 oauth.audience="AUDIENCE" \ 5 oauth.valid.issuer.uri="https://https://AUTH-SERVER-ADDRESS/auth/REALM-NAME" \ 6 oauth.client.id="kafka-broker" \ 7 oauth.client.secret="kafka-broker-secret" \ 8 oauth.connect.timeout.seconds=60 \ 9 oauth.read.timeout.seconds=60 \ 10 oauth.http.retries=2 \ 11 oauth.http.retry.pause.millis=300 \ 12 oauth.groups.claim="$.groups" \ 13 oauth.groups.claim.delimiter="," ; 14
- 1
- 권한 부여 서버에 대한 OAuth 2.0 토큰 끝점 URL입니다. 프로덕션의 경우 항상
https://
URL을 사용합니다.KeycloakRBACAuthorizer
를 사용하거나 OAuth 2.0이 활성화된 리스너가 연결 간 통신에 사용되는 경우 필수입니다. - 2
- (선택 사항) 사용자 정의 클레임 확인. 검증 중에 추가 사용자 지정 규칙을 JWT 액세스 토큰에 적용하는 JsonPath 필터 쿼리입니다. 액세스 토큰에 필요한 데이터가 포함되어 있지 않으면 거부됩니다. 인트로스펙션 끝점 방법을 사용하는 경우 사용자 정의 검사가 인트로스펙션 끝점 응답 JSON에 적용됩니다.
- 3
- (선택 사항) 토큰 끝점에 전달되는
범위
매개변수입니다. 범위는broker 간 인증을 위한 액세스 토큰을 가져올 때 사용됩니다. 또한clientId
및시크릿
을 사용하여 PLAIN 클라이언트 인증을 통해 OAuth 2.0에 대한 클라이언트 이름에 사용됩니다. 권한 부여 서버에 따라 토큰과 토큰의 콘텐츠만 가져올 수 있습니다. 리스너의 토큰 검증 규칙에는 영향을 미치지 않습니다. - 4
- (선택 사항) #177 검사. 권한 부여 서버에서
aud
(audience) 클레임을 제공하고 대상 검사를 적용하려면ouath.check.audience
를true
로 설정합니다. 대상 검사에서 의도된 토큰 수신자를 식별합니다. 결과적으로 Kafka 브로커는aud
클레임에clientId
가 없는 토큰을 거부합니다. 기본값은false
입니다. - 5
- (선택 사항) 토큰 끝점에 전달된
대상
매개변수입니다. 구독자 간 인증을 위한 액세스 토큰을 가져올 때 대상 사용자가 사용됩니다.An audience is used when obtaining an access token for inter-broker authentication. 또한clientId
및시크릿
을 사용하여 PLAIN 클라이언트 인증을 통해 OAuth 2.0에 대한 클라이언트 이름에 사용됩니다. 권한 부여 서버에 따라 토큰과 토큰의 콘텐츠만 가져올 수 있습니다. 리스너의 토큰 검증 규칙에는 영향을 미치지 않습니다. - 6
- 유효한 발급자 URI입니다. 이 발행자에서 발행한 액세스 토큰만 허용됩니다. (알림이 필요합니다.)
- 7
- 모든 브로커에 대해 동일한 Kafka 브로커의 구성된 클라이언트 ID입니다.
kafka-broker
로 권한 부여 서버에 등록된 클라이언트 입니다. 인트로스펙션 엔드 포인트가 토큰 검증에 사용되거나KeycloakRBACAuthorizer
가 사용되는 경우에 필요합니다. - 8
- 모든 브로커에 대해 동일한 Kafka 브로커에 대해 구성된 시크릿입니다. 브로커가 권한 부여 서버에 인증해야 하는 경우 클라이언트 시크릿, 액세스 토큰 또는 새로 고침 토큰을 지정해야 합니다.
- 9
- (선택 사항) 권한 부여 서버에 연결할 때 연결 제한 시간(초)입니다. 기본값은 60입니다.
- 10
- (선택 사항) 권한 부여 서버에 연결할 때 읽기 제한 시간(초)입니다. 기본값은 60입니다.
- 11
- 권한 부여 서버에 실패한 HTTP 요청을 재시도하는 최대 횟수입니다. 기본값은 0이며, 즉 재시도를 수행하지 않습니다. 이 옵션을 효과적으로 사용하려면
oauth.connect.timeout.seconds
및oauth.read.timeout.seconds
옵션에 대한 시간 초과 시간을 줄이는 것이 좋습니다. 그러나 재시도하면 현재 작업자 스레드를 다른 요청에서 사용할 수 없게 할 수 있으며 요청이 너무 많으면 Kafka 브로커가 응답하지 않을 수 있습니다. - 12
- 권한 부여 서버에 실패한 HTTP 요청의 다른 재시도를 시도하기 전에 대기하는 시간입니다. 기본적으로 이 시간은 0으로 설정되어 일시 중지가 적용되지 않습니다. 실패한 요청으로 인해 많은 문제가 요청별 네트워크 결함 또는 프록시 문제로 신속하게 해결될 수 있기 때문입니다. 그러나 권한 부여 서버가 과부하 상태에 있거나 트래픽이 높은 경우 이 옵션을 100ms 이상으로 설정하여 서버의 부하를 줄이고 재시도 횟수 가능성을 높일 수 있습니다.
- 13
- JWT 토큰 또는 인트로스펙션 끝점 응답에서 그룹 정보를 추출하는 데 사용되는 JsonPath 쿼리입니다. 기본적으로 설정되지 않습니다. 사용자 정의 작성자가 사용자 그룹을 기반으로 권한 결정을 내리는 데 사용할 수 있습니다.
- 14
- 구분된 단일 문자열로 반환되는 경우 그룹 정보를 구문 분석하는 데 사용되는 구분 기호입니다. 기본값은 ','(comma)입니다.
OAuth 2.0 인증 및 사용 권한 부여 서버 유형에 따라 추가 구성 설정을 추가합니다.
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.check.issuer=false \ 1 oauth.fallback.username.claim="CLIENT-ID" \ 2 oauth.fallback.username.prefix="CLIENT-ACCOUNT" \ 3 oauth.valid.token.type="bearer" \ 4 oauth.userinfo.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/userinfo" ; 5
- 1
- 권한 부여 서버에서
iss
클레임을 제공하지 않으면 발급자 검사를 수행할 수 없습니다. 이 경우oauth.check.issuer
를false
로 설정하고oauth.valid.issuer.uri
를 지정하지 마십시오. 기본값은true
입니다. - 2
- 권한 부여 서버는 일반 사용자와 클라이언트를 모두 식별하는 단일 속성을 제공하지 않을 수 있습니다. 클라이언트가 자체 이름으로 인증되면 서버에서 클라이언트 ID 를 제공할 수 있습니다. 사용자 이름과 암호를 사용하여 인증할 때 새로 고침 토큰 또는 액세스 토큰을 얻기 위해 서버는 클라이언트 ID 외에도 username 속성을 제공할 수 있습니다. 기본 사용자 ID 속성을 사용할 수 없는 경우 사용할 사용자 이름 클레임(attribute)을 지정하려면 이 대체 옵션을 사용합니다.
- 3
oauth. fallbackback.username.claim
이 적용되는 경우 사용자 이름 클레임 값과 대체 사용자 이름 클레임의 값 간에 이름 충돌을 방지해야 할 수도 있습니다. 생산자라는 클라이언트뿐만 아니라생산자
라는 일반 사용자도 존재하는 상황을 고려하십시오.이 속성을 사용하여 클라이언트의 사용자 ID에 접두사를 추가할 수 있습니다.You can use this property to add a prefix to the user ID of the client.
- 4
- (사용 중인 권한 부여 서버에 따라
oauth.introspection.endpoint.uri
를 사용하는 경우에만 적용 가능) 인트로스펙션 끝점이 토큰 유형 특성을 반환하거나 반환하지 않거나 다른 값을 포함할 수 있습니다. 인트로스펙션 끝점의 응답에 포함해야 하는 유효한 토큰 유형 값을 지정할 수 있습니다. - 5
- (
oauth.introspection.endpoint.uri
사용 시에만 적용 가능) 권한 부여 서버가 인트로스펙션 끝점 응답에서 식별 가능한 정보를 제공하지 않도록 구성하거나 구현할 수 있습니다. 사용자 ID를 가져오려면userinfo
끝점의 URI를 폴백으로 구성할 수 있습니다.oauth.knativeback.username.claim
,oauth.entryback.username.claim
,oauth. fallbackback.username.prefix
설정이userinfo
끝점의 응답에 적용됩니다.
다음에 수행할 작업
6.4.9.6.3. OAuth 2.0을 사용하도록 Kafka Java 클라이언트 구성
Kafka 브로커와의 상호 작용에 OAuth 2.0을 사용하도록 Kafka 생산자 및 소비자 API를 구성합니다. 클라이언트 pom.xml
파일에 콜백 플러그인을 추가한 다음 OAuth 2.0에 대한 클라이언트를 구성합니다.
클라이언트 구성에 다음을 지정합니다.
SASL(Simple Authentication and Security Layer) 보안 프로토콜:
-
TLS 암호화 연결을 통한 인증에 필요한
SASL_SSL
암호화되지 않은 연결을 통한 인증을 위한
SASL_PLAINTEXT
프로덕션에는
SASL_SSL
과 로컬 개발에SASL_PLAINTEXT
를 사용하십시오.SASL_SSL
을 사용하는 경우 추가ssl.truststore
구성이 필요합니다. OAuth 2.0 권한 부여 서버에 대한 보안 연결(http://
)에는 truststore 구성이 필요합니다. OAuth 2.0 권한 부여 서버를 확인하려면 권한 부여 서버의 CA 인증서를 클라이언트 구성의 신뢰 저장소에 추가합니다. PEM 또는 PKCS #12 형식으로 신뢰 저장소를 구성할 수 있습니다.
-
TLS 암호화 연결을 통한 인증에 필요한
Kafka SASL 메커니즘:
-
전달자 토큰
을
사용하여 자격 증명 교환을 위한 태그 -
클라이언트 인증 정보(clientId + secret) 또는 액세스 토큰을 전달하는
PLAIN
-
전달자 토큰
SASL 메커니즘을 구현하는 module(Java Authentication and Authorization Service) 모듈:
-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
-
org.apache.kafka.common.security.plain.PlainLoginModule
은 PLAIN 메커니즘을 구현합니다.
-
다음 인증 방법을 지원하는 SASL 인증 속성:
- OAuth 2.0 클라이언트 자격 증명
- OAuth 2.0 암호 부여(더 이상 사용되지 않음)
- 액세스 토큰
- 토큰 새로 고침
SASL 인증 속성을 10.0.0.1 구성(sasl.jaas.config
)으로 추가합니다. 인증 속성을 구성하는 방법은 OAuth 2.0 권한 부여 서버에 액세스하는 데 사용 중인 인증 방법에 따라 다릅니다. 이 절차에서는 속성 파일에 속성이 지정된 다음 클라이언트 구성으로 로드됩니다.
인증 속성을 환경 변수로 또는 Java 시스템 속성으로 지정할 수도 있습니다. Java 시스템 속성의 경우 setProperty
를 사용하여 -D
옵션을 사용하여 명령줄에서 설정할 수 있습니다.
사전 요구 사항
- AMQ Streams 및 Kafka가 실행 중입니다.
- OAuth 2.0 인증 서버가 Kafka 브로커에 대한 OAuth 액세스를 위해 배포 및 구성
- Kafka 브로커는 OAuth 2.0에 대해 구성됩니다.
절차
OAuth 2.0 지원이 포함된 클라이언트 라이브러리를 Kafka 클라이언트의
pom.xml
파일에 추가합니다.<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.12.0.redhat-00006</version> </dependency>
속성 파일에 다음 구성을 지정하여 클라이언트 속성을 구성합니다.
- 보안 프로토콜
- SASL 메커니즘
사용 중인 방법에 따른 module 및 인증 속성
예를 들어
client.properties
파일에 다음을 추가할 수 있습니다.클라이언트 자격 증명 메커니즘 속성
security.protocol=SASL_SSL 1 sasl.mechanism=OAUTHBEARER 2 ssl.truststore.location=/tmp/truststore.p12 3 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ 4 oauth.client.id="<client_id>" \ 5 oauth.client.secret="<client_secret>" \ 6 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ 7 oauth.ssl.truststore.password="$STOREPASS" \ 8 oauth.ssl.truststore.type="PKCS12" \ 9 oauth.scope="<scope>" \ 10 oauth.audience="<audience>" ; 11
- 1
- TLS 암호화 연결을 위한
SASL_SSL
보안 프로토콜. 로컬 개발에만 암호화되지 않은 연결에서는SASL_PLAINTEXT
를 사용하십시오. - 2
- SASL 메커니즘을
OAUTHB learnER 또는
PLAIN
으로 지정합니다. - 3
- Kafka 클러스터에 대한 보안 액세스를 위한 신뢰 저장소 구성입니다.
- 4
- 권한 부여 서버 토큰 끝점의 URI입니다.
- 5
- 클라이언트 ID: 권한 부여 서버에서 클라이언트를 생성할 때 사용되는 이름입니다.
- 6
- 권한 부여 서버에서 클라이언트를 생성할 때 생성된 클라이언트 시크릿입니다.
- 7
- 위치에는 권한 부여 서버의 공개 키 인증서(
truststore.p12
)가 포함됩니다. - 8
- 신뢰 저장소에 액세스하기 위한 암호입니다.
- 9
- truststore 유형입니다.
- 10
- (선택 사항) 토큰 끝점에서 토큰을 요청하는
범위입니다
. 권한 부여 서버에 범위를 지정하기 위해 클라이언트가 필요할 수 있습니다. - 11
- (선택 사항) 토큰 끝점에서 토큰을 요청하는 대상입니다.
권한 부여 서버에 대상을 지정하기 위해 클라이언트가 필요할 수 있습니다.
password grant mechanism properties
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ 1 oauth.client.secret="<client_secret>" \ 2 oauth.password.grant.username="<username>" \ 3 oauth.password.grant.password="<password>" \ 4 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.scope="<scope>" \ oauth.audience="<audience>" ;
- 1
- 클라이언트 ID: 권한 부여 서버에서 클라이언트를 생성할 때 사용되는 이름입니다.
- 2
- (선택 사항) 권한 부여 서버에서 클라이언트를 생성할 때 생성된 클라이언트 시크릿입니다.
- 3
- 암호 부여 인증에 대한 사용자 이름입니다. OAuth 암호 부여 구성(사용자 이름 및 암호)은 OAuth 2.0 암호 부여 방법을 사용합니다. 암호 부여를 사용하려면 제한된 권한이 있는 권한 부여 서버에서 클라이언트에 대한 사용자 계정을 만듭니다. 계정이 서비스 계정처럼 작동해야 합니다. 인증에 사용자 계정이 필요하지만 새로 고침 토큰을 사용하는 것이 필요한 환경에서 사용합니다.
- 4
- 암호 부여 인증의 암호입니다.참고
SASL PLAIN은 OAuth 2.0 암호 부여 방법을 사용하여 사용자 이름과 암호(암호 부여) 전달을 지원하지 않습니다.
액세스 토큰 속성
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.access.token="<access_token>" ; 1 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \
- 1
- Kafka 클라이언트의 장기 액세스 토큰입니다.
토큰 속성 새로 고침
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ 1 oauth.client.secret="<client_secret>" \ 2 oauth.refresh.token="<refresh_token>" ; 3 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \
OAUTH 2.0 인증에 대한 클라이언트 속성을 Java 클라이언트 코드로 입력합니다.
클라이언트 속성 입력 표시 예
Properties props = new Properties(); try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) { props.load(reader); }
- Kafka 클라이언트가 Kafka 브로커에 액세스할 수 있는지 확인합니다.
6.4.10. OAuth 2.0 토큰 기반 권한 부여 사용
AMQ Streams는 Red Hat Single Sign-On Authorization Services 를 통해 OAuth 2.0 토큰 기반 권한 부여를 지원하므로 보안 정책 및 권한을 중앙에서 관리할 수 있습니다.
Red Hat Single Sign-On에 정의된 보안 정책 및 권한은 Kafka 브로커의 리소스에 대한 액세스 권한을 부여하는 데 사용됩니다. 사용자와 클라이언트는 Kafka 브로커에서 특정 작업을 수행하기 위해 액세스를 허용하는 정책과 대조됩니다.
Kafka는 기본적으로 모든 사용자에게 브로커에 대한 전체 액세스를 허용하고, ACL(액세스 제어 목록)을 기반으로 권한을 구성할 수 있도록 AclAuthorizer
플러그인도 제공합니다.
zookeeper는 사용자 이름을 기반으로 리소스에 대한 액세스를 부여하거나 거부하는 ACL 규칙을 저장합니다. 그러나 Red Hat Single Sign-On을 통한 OAuth 2.0 토큰 기반 권한 부여는 Kafka 브로커에 대한 액세스 제어를 구현하는 방법에 대해 훨씬 더 큰 유연성을 제공합니다. 또한 OAuth 2.0 권한 부여 및 ACL을 사용하도록 Kafka 브로커를 구성할 수 있습니다.
6.4.10.1. OAuth 2.0 인증 메커니즘
AMQ Streams의 OAuth 2.0 권한 부여에서는 Red Hat Single Sign-On 서버 권한 부여 서비스 REST 엔드포인트를 사용하여 특정 사용자에게 정의된 보안 정책을 적용하고 해당 사용자에게 다양한 리소스에 부여된 권한 목록을 제공하여 Red Hat Single Sign-On으로 토큰 기반 인증을 확장합니다. 정책은 역할 및 그룹을 사용하여 사용자와 권한을 일치시킵니다. OAuth 2.0 권한 부여는 Red Hat Single Sign-On 인증 서비스에서 사용자에게 수신된 부여 목록에 따라 권한을 로컬로 적용합니다.
6.4.10.1.1. Kafka 브로커 사용자 정의 인증
Red Hat Single Sign-On 작성자(KeycloakRBACAuthorizer
)는 AMQ Streams를 통해 제공됩니다. Red Hat Single Sign-On에서 제공하는 인증 서비스에 Red Hat Single Sign-On REST 엔드포인트를 사용하려면 Kafka 브로커에 사용자 정의 인증 기관을 구성합니다.
작성자는 필요에 따라 권한 부여 서버에서 부여된 권한 목록을 가져와서 Kafka 브로커에 로컬로 권한 부여를 적용하여 각 클라이언트 요청에 대해 신속하게 승인 결정을 내립니다.
6.4.10.2. OAuth 2.0 권한 부여 지원 구성
다음 절차에서는 Red Hat Single Sign-On 인증 서비스를 사용하여 OAuth 2.0 인증을 사용하도록 Kafka 브로커를 구성하는 방법을 설명합니다.
사전 준비 사항
특정 사용자에 대해 요청하거나 제한하려는 액세스를 고려하십시오. Red Hat Single Sign-On 그룹,역할,클라이언트 및 사용자를 결합하여 Red Hat Single Sign-On에서 액세스를 구성할 수 있습니다.
일반적으로 그룹은 조직의 부서 또는 지리적 위치에 따라 사용자와 일치시키는 데 사용됩니다. 및 역할은 해당 기능에 따라 사용자와 일치시키는 데 사용됩니다.
Red Hat Single Sign-On을 사용하면 사용자와 그룹을 LDAP에 저장할 수 있지만 클라이언트와 역할은 이러한 방식으로 저장할 수 없습니다. 권한 부여 정책 구성 방법을 선택하는 데 사용자 데이터에 대한 스토리지 및 액세스 요소가 될 수 있습니다.
Super 사용자에게 는 Kafka 브로커에 구현된 권한 부여와 관계없이 Kafka 브로커에 대한 제한되지 않은 액세스 권한이 항상 있습니다.
사전 요구 사항
절차
- Red Hat Single Sign-On 관리 콘솔에 액세스하거나 OAuth 2.0 인증을 설정할 때 생성한 Kafka 브로커 클라이언트에 대한 인증 서비스를 활성화하려면 Red Hat Single Sign-On Admin CLI를 사용합니다.
- 권한 부여 서비스를 사용하여 클라이언트에 대한 리소스, 권한 부여 범위, 정책 및 권한을 정의합니다.
- 역할 및 그룹을 할당하여 사용자 및 클라이언트에 권한을 바인딩합니다.
Red Hat Single Sign-On 권한을 사용하도록 Kafka 브로커를 구성합니다.
Kafka
server.properties
구성 파일에 다음을 추가하여 Kafka에서 작성자를 설치합니다.authorizer.class.name=io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder
Kafka 브로커의 구성을 추가하여 권한 부여 서버 및 권한 부여 서비스에 액세스합니다.
여기서는
server.properties
에 대한 추가 속성으로 추가된 구성 예제를 보여주지만 대문자 또는 대문자 이름 지정 규칙을 사용하여 환경 변수로 정의할 수도 있습니다.strimzi.authorization.token.endpoint.uri="https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token" 1 strimzi.authorization.client.id="kafka" 2
(선택 사항) 특정 Kafka 클러스터에 대한 구성을 추가합니다.
예를 들면 다음과 같습니다.
strimzi.authorization.kafka.cluster.name="kafka-cluster" 1
- 1
- 특정 Kafka 클러스터의 이름입니다. 이름은 권한을 대상으로 하는 데 사용되어 동일한 Red Hat Single Sign-On 영역 내에서 여러 클러스터를 관리할 수 있습니다. 기본값은
kafka-cluster
입니다.
(선택 사항) 간단한 권한 부여를 사용합니다.
예를 들면 다음과 같습니다.
strimzi.authorization.delegate.to.kafka.acl="false" 1
- 1
- Red Hat Single Sign-On 인증 서비스 정책에서 액세스를 거부하는 경우 Kafka
AclAuthorizer
에 권한을 위임합니다. 기본값은false
입니다.
(선택 사항) 권한 부여 서버에 TLS 연결 구성을 추가합니다.
예를 들면 다음과 같습니다.
strimzi.authorization.ssl.truststore.location=<path-to-truststore> 1 strimzi.authorization.ssl.truststore.password=<my-truststore-password> 2 strimzi.authorization.ssl.truststore.type=JKS 3 strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG 4 strimzi.authorization.ssl.endpoint.identification.algorithm=HTTPS 5
(선택 사항) 권한 부여 서버에서 권한 부여 새로 고침을 구성합니다. grants refresh 작업은 활성 토큰을 할당하고 각각에 대한 최신 부여를 요청하는 방식으로 작동합니다.
예를 들면 다음과 같습니다.
strimzi.authorization.grants.refresh.period.seconds="120" 1 strimzi.authorization.grants.refresh.pool.size="10" 2
(선택 사항) 권한 부여 서버와 통신할 때 네트워크 시간 초과를 구성합니다.
예를 들면 다음과 같습니다.
strimzi.authorization.connect.timeout.seconds="60" 1 strimzi.authorization.read.timeout.seconds="60" 2 strimzi.authorization.http.retries="2" 3
- 1
- Red Hat Single Sign-On 토큰 엔드포인트에 연결할 때 몇 초 내에 연결 제한 시간(초)입니다. 기본값은
60
입니다. - 2
- Red Hat Single Sign-On 토큰 엔드포인트에 연결할 때 몇 초 내에 읽기 제한 시간(초)입니다. 기본값은
60
입니다. - 3
- 권한 부여 서버에 실패한 HTTP 요청을 재시도하지 않고 재시도할 최대 횟수입니다. 기본값은
0
입니다. 즉, 재시도가 수행되지 않습니다. 이 옵션을 효과적으로 사용하려면strimzi.authorization.connect.timeout.seconds
및strimzi.authorization.read.timeout.seconds
옵션에 대한 시간 초과 시간을 줄이는 것이 좋습니다. 그러나 재시도하면 현재 작업자 스레드를 다른 요청에서 사용할 수 없게 할 수 있으며 요청이 너무 많으면 Kafka 브로커가 응답하지 않을 수 있습니다.
- Kafka 브로커에 특정 역할을 가진 클라이언트 또는 사용자로 액세스하여 구성된 권한을 확인하고, 필요한 액세스 권한이 있는지 확인하거나, 보유하지 않은 권한이 없는지 확인합니다.
6.4.11. OPA 정책 기반 권한 부여 사용
OPA(Open Policy Agent)는 오픈 소스 정책 엔진입니다. OPA를 AMQ Streams와 통합하여 Kafka 브로커에서 클라이언트 작업을 허용하는 정책 기반 권한 부여 메커니즘으로 작동할 수 있습니다.
클라이언트에서 요청이 생성되면, OPA는 Kafka 액세스에 대해 정의된 정책에 대한 요청을 평가한 다음 요청을 허용하거나 거부합니다.
Red Hat은 OPA 서버를 지원하지 않습니다.
추가 리소스
6.4.11.1. OPA 정책 정의
OPA를 AMQ Streams와 통합하기 전에 정책을 정의하여 세분화된 액세스 제어를 제공하는 방법을 고려하십시오.
Kafka 클러스터, 소비자 그룹 및 항목에 대한 액세스 제어를 정의할 수 있습니다. 예를 들어 생산자 클라이언트에서 특정 브로커 주제로 쓰기 액세스를 허용하는 권한 부여 정책을 정의할 수 있습니다.
이를 위해 정책은 다음을 지정할 수 있습니다.
- 생산자 클라이언트와 연결된 사용자 주체 및 호스트 주소
- 클라이언트에 허용된 작업
-
정책이 적용되는 리소스 유형 (
주제
) 및 리소스 이름
허용 및 거부 결정이 정책에 기록되며 제공된 요청 및 클라이언트 식별 데이터를 기반으로 응답이 제공됩니다.
이 예제에서는 생산자 클라이언트가 주제에 쓸 수 있는 정책을 충족해야 합니다.
6.4.11.2. OPA에 연결
Kafka가 OPA 정책 엔진에 액세스하여 액세스 제어 정책을 쿼리할 수 있도록 하려면 Kafka server.properties
파일에서 사용자 정의 OPA 인증 프로그램 플러그인(kafka-authorizer-opa-VERSION.jar
)을 구성합니다.
클라이언트에서 요청을 수행하면 OPA 정책 엔진은 지정된 URL 주소와 정의된 정책의 이름이어야 하는 REST 엔드포인트를 사용하여 플러그인에서 쿼리합니다.
플러그인은 정책에 대해 확인할 JSON 형식으로 클라이언트 요청(사용자 주체, 작업 및 리소스)의 세부 정보를 제공합니다. 세부 정보에는 클라이언트의 고유 ID가 포함됩니다. 예를 들어 TLS 인증이 사용되는 경우 클라이언트 인증서에서 고유 이름을 사용합니다.
OPA는 데이터를 사용하여 요청을 허용하거나 거부하기 위해 플러그인에 true 또는 false 응답을 제공합니다.
6.4.11.3. OPA 권한 부여 지원 구성
다음 절차에서는 OPA 인증을 사용하도록 Kafka 브로커를 구성하는 방법을 설명합니다.
사전 준비 사항
특정 사용자에 대해 요청하거나 제한하려는 액세스를 고려하십시오. 사용자와 Kafka 리소스를 조합하여 OPA 정책을 정의할 수 있습니다.
LDAP 데이터 소스에서 사용자 정보를 로드하도록 OPA를 설정할 수 있습니다.
Super 사용자에게 는 Kafka 브로커에 구현된 권한 부여와 관계없이 Kafka 브로커에 대한 제한되지 않은 액세스 권한이 항상 있습니다.
사전 요구 사항
- 연결에 OPA 서버를 사용할 수 있어야 합니다.
- Kafka용 OPA 인증 프로그램 플러그인
절차
Kafka 브로커에서 작업을 수행하기 위해 클라이언트 요청을 승인하는 데 필요한 OPA 정책을 작성합니다.
OPA 정책 정의를 참조하십시오.
이제 OPA를 사용하도록 Kafka 브로커를 구성합니다.
Kafka에 대한 OPA 인증 프로그램 플러그인을 설치합니다.
OPA 연결을 참조하십시오.
플러그인 파일이 Kafka 클래스 경로에 포함되어 있는지 확인합니다.
OPA 플러그인을 활성화하려면 Kafka
server.properties
구성 파일에 다음을 추가합니다.authorizer.class.name: com.bisnode.kafka.authorization.OpaAuthorizer
Kafka 브로커가 OPA 정책 엔진 및 정책에 액세스할 수 있도록
server.properties
에 구성을 추가합니다.예를 들면 다음과 같습니다.
opa.authorizer.url=https://OPA-ADDRESS/allow 1 opa.authorizer.allow.on.error=false 2 opa.authorizer.cache.initial.capacity=50000 3 opa.authorizer.cache.maximum.size=50000 4 opa.authorizer.cache.expire.after.seconds=600000 5 super.users=User:alice;User:bob 6
- 1
- (필수) 작성자 플러그인이 쿼리할 정책의 OAuth 2.0 토큰 끝점 URL입니다. 이 예제에서는 정책을
allow
라고 합니다. - 2
- 인증 플러그인이 OPA 정책 엔진에 연결하지 못하는 경우 클라이언트가 기본적으로 허용 또는 거부되는지 여부를 지정하는 플래그입니다.
- 3
- 로컬 캐시의 초기 용량(바이트)입니다. 플러그인이 모든 요청에 대해 OPA 정책 엔진을 쿼리할 필요가 없도록 캐시가 사용됩니다.
- 4
- 로컬 캐시의 최대 용량(바이트)입니다.
- 5
- OPA 정책 엔진에서 다시 로드하여 로컬 캐시가 새로 고쳐지는 시간(밀리초)입니다.
- 6
- Open Policy Agent 정책을 쿼리하지 않고 항상 허용되도록 사용자 보안 주체 목록을 슈퍼 사용자로 취급합니다.
인증 및 권한 부여 옵션에 대한 정보는 Open Policy Agent 웹 사이트를 참조하십시오.
- 및 올바른 권한이 없는 클라이언트를 사용하여 Kafka 브로커에 액세스하여 구성된 권한을 확인합니다.