6.4. Kafka 구성
Kafka는 속성 파일을 사용하여 정적 구성을 저장합니다. 구성 파일에 권장되는 위치는 /opt/kafka/config/server.properties
입니다. kafka
사용자가 구성 파일을 읽을 수 있어야 합니다.
AMQ Streams는 제품의 다양한 기본 및 고급 기능을 강조하는 구성 파일 예제를 제공합니다. 이 파일은 AMQ Streams 설치 디렉터리의 config/server.properties
에서 확인할 수 있습니다.
이 장에서는 가장 중요한 구성 옵션에 대해 설명합니다.
6.4.1. ZooKeeper
Kafka 브로커는 구성의 일부를 저장하고 클러스터를 조정하고 (예: 어떤 노드가 어떤 파티션의 리더인지 결정하는 데) Zoo Cryostat가 필요합니다. Zoo Cryostat 클러스터의 연결 세부 정보는 구성 파일에 저장됩니다. zookeeper.connect
필드에는 콤마로 구분된 호스트 이름 목록과 zookeeper 클러스터 멤버의 포트가 포함되어 있습니다.
예를 들면 다음과 같습니다.
zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181
Kafka는 이러한 주소를 사용하여 Zoo Cryostat 클러스터에 연결합니다. 이 구성을 사용하면 모든 Kafka znodes
가 Zoo Cryostat 데이터베이스의 루트에서 직접 생성됩니다. 따라서 이러한 Zoo Cryostat 클러스터는 단일 Kafka 클러스터에만 사용할 수 있습니다. 단일 Zoo Cryostat 클러스터를 사용하도록 여러 Kafka 클러스터를 구성하려면 Kafka 구성 파일의 Zoo Cryostat 연결 문자열 끝에 기본(접두사) 경로를 지정합니다.
zookeeper.connect=zoo1.my-domain.com:2181,zoo2.my-domain.com:2181,zoo3.my-domain.com:2181/my-cluster-1
6.4.2. 리스너
리스너는 Kafka 브로커에 연결하는 데 사용됩니다. 각 Kafka 브로커는 여러 리스너를 사용하도록 구성할 수 있습니다. 각 리스너에는 다른 포트 또는 네트워크 인터페이스에서 수신 대기할 수 있도록 다른 구성이 필요합니다.
리스너를 구성하려면 구성 파일(/opt/kafka/config/server.properties
)에서 listeners
속성을 편집합니다. listeners
속성에 쉼표로 구분된 목록으로 리스너를 추가합니다. 다음과 같이 각 속성을 구성합니다.
<listenerName>://<hostname>:<port>
< hostname
>이 비어 있으면 Kafka에서 java.net.InetAddress.getCanonicalHostName()
클래스를 호스트 이름으로 사용합니다.
여러 리스너 구성 예
listeners=internal-1://:9092,internal-2://:9093,replication://:9094
Kafka 클라이언트가 Kafka 클러스터에 연결하려고 하면 먼저 클러스터 노드 중 하나인 부트스트랩 서버에 연결합니다. 부트스트랩 서버는 클러스터에 있는 모든 브로커 목록을 클라이언트에 제공하고 클라이언트는 각각 개별적으로 연결합니다. 브로커 목록은 구성된 리스너
를 기반으로 합니다.
공개된 리스너
선택적으로 advertise .listeners
속성을 사용하여 클라이언트에 listeners
속성에 지정된 것과 다른 리스너 주소 집합을 제공할 수 있습니다. 프록시와 같은 추가 네트워크 인프라가 클라이언트와 브로커 간이거나 IP 주소 대신 외부 DNS 이름을 사용하는 경우 유용합니다.
advertise .listeners
속성은 listeners
속성과 동일한 방식으로 포맷됩니다.
광고된 리스너에 대한 구성 예
listeners=internal-1://:9092,internal-2://:9093 advertised.listeners=internal-1://my-broker-1.my-domain.com:1234,internal-2://my-broker-1.my-domain.com:1235
공개된 리스너의 이름은 listeners
속성에 나열된 리스너와 일치해야 합니다.
broker 리스너
broker 리스너 는 Kafka 브로커 간 통신에 사용됩니다. 다음의 경우broker 간 통신이 필요합니다.
- 서로 다른 브로커 간 워크로드 조정
- 다른 브로커에 저장된 파티션 간 메시지 복제
- 파티션 리더십 변경과 같은 컨트롤러의 관리 작업 처리
broker 리스너는 선택한 포트에 할당할 수 있습니다. 여러 리스너가 구성된 경우 inter.broker.listener .name 속성에서broker 리스너의 이름을
정의할 수 있습니다.
여기에서는broker 리스너의 이름이 REPLICATION
으로 지정됩니다.
listeners=REPLICATION://0.0.0.0:9091 inter.broker.listener.name=REPLICATION
컨트롤 플레인 리스너
기본적으로 컨트롤러와 다른 브로커 간의 통신은 브루커 리스너를 사용합니다. 컨트롤러는 파티션 리더십 변경과 같은 관리 작업을 조정합니다.
컨트롤러 연결에 대한 전용 컨트롤 플레인 리스너 를 활성화할 수 있습니다. 선택한 포트에 컨트롤 플레인 리스너를 할당할 수 있습니다.
컨트롤 플레인 리스너를 활성화하려면 리스너 이름으로 control.plane.listener.name
속성을 구성합니다.
listeners=CONTROLLER://0.0.0.0:9090,REPLICATION://0.0.0.0:9091 ... control.plane.listener.name=CONTROLLER
컨트롤러 통신은 브로커 간 데이터 복제에 의해 지연되지 않으므로 컨트롤 플레인 리스너를 활성화하면 클러스터 성능이 향상될 수 있습니다. 데이터 복제는 inter-broker 리스너를 통해 계속됩니다.
control.plane.listener
가 구성되지 않은 경우 컨트롤러 연결은 inter-broker 리스너 를 사용합니다.
6.4.3. 커밋 로그
Apache Kafka는 커밋 로그에서 생산자에서 수신하는 모든 레코드를 저장합니다. 커밋 로그에는 Kafka가 제공해야 하는 레코드 형식의 실제 데이터가 포함됩니다. 이는 브로커가 수행하는 작업을 기록하는 애플리케이션 로그 파일이 아닙니다.
로그 디렉터리
log.dirs
속성 파일을 사용하여 로그 디렉터리를 구성하여 하나 이상의 로그 디렉터리에 커밋 로그를 저장할 수 있습니다. 설치 중에 생성된 /var/lib/kafka
디렉터리로 설정해야 합니다.
log.dirs=/var/lib/kafka
성능상의 이유로 log.dirs를 여러 디렉터리로 구성하고 각각 물리적 장치를 다른 물리적 장치에 배치하여 디스크 I/O 성능을 향상시킬 수 있습니다. 예를 들면 다음과 같습니다.
log.dirs=/var/lib/kafka1,/var/lib/kafka2,/var/lib/kafka3
6.4.4. 브로커 ID
broker ID는 클러스터의 각 브로커의 고유 식별자입니다. 브로커 ID로 0보다 크거나 같은 정수를 할당할 수 있습니다. 브로커 ID는 재시작 또는 충돌 후 브로커를 식별하는 데 사용되며, 따라서 ID가 안정적이며 시간이 지남에 따라 변경되지 않는 것이 중요합니다. 브로커 ID는 브로커 속성 파일에 구성되어 있습니다.
broker.id=1
6.4.5. Zookeeper 인증
기본적으로 Zoo Cryostat와 Kafka 간 연결은 인증되지 않습니다. 그러나 Kafka 및 Zoo Cryostat는 SAS(Simple Authentication and Security Layer)를 사용하여 인증을 설정하는 데 사용할 수 있는 JAAS(Java Authentication and Authorization Service)를 지원합니다. Zookeeper는 로컬에 저장된 인증 정보가 있는 vmwareGEST-MD5 SASL 메커니즘을 사용한 인증을 지원합니다.
6.4.5.1. JAAS 구성
Zoo Cryostat 연결에 대한 SASL 인증은 JAAS 구성 파일에서 구성해야 합니다. 기본적으로 Kafka는 Zoo Cryostat에 연결하기 위해 Client
라는 JAAS 컨텍스트를 사용합니다. 클라이언트
컨텍스트는 /opt/kafka/config/jass.conf
파일에서 구성해야 합니다. 컨텍스트는 다음 예와 같이 PLAIN
SASL 인증을 활성화해야 합니다.
Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456"; };
6.4.5.2. Zoo Cryostat 인증 활성화
다음 절차에서는 Zoo Cryostat에 연결할 때 SASL hieradataGEST-MD5 메커니즘을 사용하여 인증을 활성화하는 방법을 설명합니다.
사전 요구 사항
- Zoo Cryostat에서 클라이언트-서버 간 인증이 활성화됨
SASL>-<GEST-MD5 인증 활성화
모든 Kafka 브로커 노드에서
/opt/kafka/config/jaas.conf
JAAS 구성 파일을 생성하거나 편집하고 다음 컨텍스트를 추가합니다.Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="<Username>" password="<Password>"; };
사용자 이름과 암호는 Zoo Cryostat에 구성된 것과 동일해야 합니다.
다음 예제에서는
클라이언트
컨텍스트를 보여줍니다.Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456"; };
모든 Kafka 브로커 노드를 하나씩 다시 시작합니다. JAAS 구성을 Kafka 브로커에 전달하려면
KAFKA_OPTS
환경 변수를 사용합니다.su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
멀티 노드 클러스터에서 브로커를 다시 시작하는 방법에 대한 자세한 내용은 4.3절. “Kafka 브로커의 정상 롤링 재시작 수행” 을 참조하십시오.
추가 리소스
6.4.6. 권한 부여
Kafka 브로커의 권한 부여는 승인자 플러그인을 사용하여 구현됩니다.
이 섹션에서는 Kafka와 함께 제공된 AclAuthorizer
플러그인을 사용하는 방법을 설명합니다.
또는 고유한 권한 부여 플러그인을 사용할 수 있습니다. 예를 들어 OAuth 2.0 토큰 기반 인증을 사용하는 경우 OAuth 2.0 인증을 사용할 수 있습니다.
6.4.6.1. 간단한 ACL 승인자
AclAuthorizer
를 포함한 승인자 플러그인은 authorizer.class.name
속성을 통해 활성화됩니다.
authorizer.class.name=kafka.security.auth.AclAuthorizer
선택한 승인자에게 정규화된 이름이 필요합니다. AclAuthorizer
의 경우 정규화된 이름은 kafka.security.auth.AclAuthorizer
입니다.
6.4.6.1.1. ACL 규칙
AclAuthorizer
는 ACL 규칙을 사용하여 Kafka 브로커에 대한 액세스를 관리합니다.
ACL 규칙은 형식으로 정의됩니다.
호스트 H에서 Kafka 리소스 R 에서 주체 P 가 허용 / 거부된 작업 O
예를 들어, 사용자를 위해 규칙을 설정할 수 있습니다.
존은 host 127.0.0.1의 주제 주석 을 볼 수 있습니다.
host는 존스가 연결 중인 시스템의 IP 주소입니다.
대부분의 경우 사용자는 생산자 또는 소비자 애플리케이션입니다.
Consumer01 은 호스트 127.0.0.1에서 소비자 그룹 계정에 쓸 수 있습니다.
ACL 규칙이 없는 경우
지정된 리소스에 ACL 규칙이 없으면 모든 작업이 거부됩니다. Kafka 구성 파일 /opt/kafka/config/server.properties
에서 allow.everyone.if.no.acl.found
속성을 true
로 설정하여 이 동작을 변경할 수 있습니다.
6.4.6.1.2. 보안 주체
보안 주체 는 사용자의 ID를 나타냅니다. ID 형식은 클라이언트가 Kafka에 연결하는 데 사용하는 인증 메커니즘에 따라 달라집니다.
-
인증 없이 연결할 때
사용자:ANONYMOUS
. user:<username
>은 PLAIN 또는 SCRAM과 같은 간단한 인증 메커니즘을 사용하여 연결된 경우입니다.예:
User:admin
또는User:user1
.TLS 클라이언트 인증을 사용하여 연결된 경우 user
:<DistinguishedName
>입니다.예를 들면
User:CN=user1,O=MyCompany,L=Prague,C=CZ
입니다.-
User:<Kerberos username&
gt; when connected using Kerberos.
DistinguishedName 은 클라이언트 인증서와 고유 이름입니다.
Kerberos 사용자 이름은 Kerberos 주체의 기본 부분이며 Kerberos를 사용하여 연결할 때 기본적으로 사용됩니다. sasl.kerberos.principal.to.local.rules
속성을 사용하여 Kerberos 주체에서 Kafka 주체를 빌드하는 방법을 구성할 수 있습니다.
6.4.6.1.3. 사용자 인증
승인을 사용하려면 클라이언트가 인증을 활성화하고 사용해야 합니다. 그렇지 않으면 모든 연결에 주체 User:ANONYMOUS
가 있습니다.
인증 방법에 대한 자세한 내용은 암호화 및 인증을 참조하십시오.
6.4.6.1.4. 슈퍼유저
슈퍼유저는 ACL 규칙에 관계없이 모든 작업을 수행할 수 있습니다.
슈퍼 사용자는 super.users
속성을 사용하여 Kafka 구성 파일에 정의되어 있습니다.
예를 들면 다음과 같습니다.
super.users=User:admin,User:operator
6.4.6.1.5. 복제본 브로커 인증
권한 부여가 활성화되면 모든 리스너 및 모든 연결에 적용됩니다. 여기에는 브로커 간 데이터 복제에 사용되는 inter-broker 연결이 포함됩니다. 따라서 권한 부여를 활성화하는 경우 브랜드 간 연결에 인증을 사용하고 브로커에서 사용하는 사용자에게 충분한 권한을 부여해야 합니다. 예를 들어 브로커 간 인증이 kafka-broker
사용자를 사용하는 경우 슈퍼 사용자 구성에 사용자 이름 super.users=User:kafka-broker
가 포함되어야 합니다.
6.4.6.1.6. 지원되는 리소스
이러한 유형의 리소스에 Kafka ACL을 적용할 수 있습니다.
- 주제
- 소비자 그룹
- 클러스터
- TransactionId
- DelegationToken
6.4.6.1.7. 지원되는 작업
AclAuthorizer
는 리소스에 대한 작업을 인증합니다.
다음 표에 X
가 있는 필드는 각 리소스에 대해 지원되는 작업을 나타냅니다.
주제 | 소비자 그룹 | Cluster | |
---|---|---|---|
읽기 | X | X | |
쓰기 | X | ||
개발 | X | ||
delete | X | ||
변경 | X | ||
describe | X | X | X |
ClusterAction | X | ||
All | X | X | X |
6.4.6.1.8. ACL 관리 옵션
ACL 규칙은 Kafka 배포 패키지의 일부로 제공되는 bin/kafka-acls.sh
유틸리티를 사용하여 관리됩니다.
kafka-acls.sh
매개변수 옵션을 사용하여 ACL 규칙을 추가, 나열 및 제거하고 기타 기능을 수행합니다.
매개 변수에는 --add 와 같은 이중-하이프 규칙(예: --add
)이 필요합니다.
옵션 | 유형 | 설명 | Default |
---|---|---|---|
| 동작 | ACL 규칙을 추가합니다. | |
| 동작 | ACL 규칙을 제거합니다. | |
| 동작 | ACL 규칙을 나열합니다. | |
| 동작 | 승인자의 정규화된 클래스 이름입니다. |
|
| 설정 | 초기화를 위해 승인자에게 전달된 키/값 쌍입니다.
| |
| 리소스 | Kafka 클러스터에 연결할 호스트/포트 쌍입니다. |
이 옵션 또는 |
| 리소스 |
| |
| 리소스 | 클러스터를 ACL 리소스로 지정합니다. | |
| 리소스 | 주제 이름을 ACL 리소스로 지정합니다.
와일드카드로 사용되는 별표(
단일 명령은 여러 | |
| 리소스 | 소비자 그룹 이름을 ACL 리소스로 지정합니다.
단일 명령은 여러 | |
| 리소스 | 트랜잭션 ID를 ACL 리소스로 지정합니다. 트랜잭션 전달은 생산자가 여러 파티션으로 전송하거나 해당 파티션을 성공적으로 전달하지 않아야 함을 의미합니다.
와일드카드로 사용되는 별표( | |
| 리소스 | 위임 토큰을 ACL 리소스로 지정합니다.
와일드카드로 사용되는 별표( | |
| 설정 |
리소스 이름에 대한 리소스 패턴 유형으로
리소스 패턴 필터 값 또는 특정 패턴 유형 필터로 일부 또는 |
|
| 주체 | 주체가 허용 ACL 규칙에 추가되었습니다.
단일 명령은 | |
| 주체 | 거부 ACL 규칙에 보안 주체가 추가되었습니다.
단일 명령은 | |
| 주체 |
principal에 대한 ACL
단일 명령은 | |
| 호스트 |
호스트 이름 또는 CIDR 범위는 지원되지 않습니다. |
|
| 호스트 |
호스트 이름 또는 CIDR 범위는 지원되지 않습니다. |
|
| 작업 | 작업을 허용하거나 거부합니다.
단일 명령은 단일 명령에서 multipleMultiple | All |
| 바로 가기 | 메시지 프로듀서(WRITE 및 DESCRIBE, 클러스터의 CREATE)에 필요한 모든 작업을 허용하거나 거부하는 바로 가기입니다. | |
| 바로 가기 | 메시지 소비자(READ 및 DESCRIBE on topic, READ on consumer group)에 필요한 모든 작업을 허용하거나 거부하는 바로 가기입니다. | |
| 바로 가기 |
Idepmotence는 생산자가 특정 트랜잭션 ID를 기반으로 메시지를 보낼 수 있는 권한이 부여된 경우 자동으로 활성화됩니다. | |
| 바로 가기 | 모든 쿼리를 수락하고 묻지 않는 바로 가기입니다. |
6.4.6.2. 권한 부여 활성화
다음 절차에서는 Kafka 브로커에서 승인을 위해 AclAuthorizer
플러그인을 활성화하는 방법을 설명합니다.
프로세스
AclAuthorizer
를 사용하도록/opt/kafka/config/server.properties
Kafka 구성 파일을 편집합니다.authorizer.class.name=kafka.security.auth.AclAuthorizer
- Kafka 브로커를 시작합니다.
6.4.6.3. ACL 규칙 추가
AclAuthorizer
플러그인을 사용하여 ACL(Access Control Lists)을 기반으로 Kafka 브로커에 대한 액세스를 제어하는 경우 kafka-acls.sh
유틸리티를 사용하여 새 ACL 규칙을 추가할 수 있습니다.
사전 요구 사항
- 사용자가 생성되어 Kafka 리소스에 액세스할 수 있는 적절한 권한이 부여되었습니다.
- AMQ Streams는 Kafka 브로커로 사용되는 모든 호스트에 설치됩니다.
- Kafka 브로커에서 권한 부여가 활성화됩니다.
프로세스
--add
옵션을 사용하여kafka-acls.sh
를 실행합니다.예:
MyConsumerGroup
소비자 그룹을 사용하여user1
및user2
액세스가myTopic
에서 읽을 수 있도록 허용합니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2 opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2 opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
user1
액세스를 거부하여 IP 주소127.0.0.1
에서myTopic
을 읽습니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
MyConsumerGroup
을 사용하여myTopic
의 소비자로user1
을 추가합니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
추가 리소스
6.4.6.4. ACL 규칙 나열
AclAuthorizer
플러그인을 사용하여 ACL(Access Control Lists)을 기반으로 Kafka 브로커에 대한 액세스를 제어하는 경우 kafka-acls.sh
유틸리티를 사용하여 기존 ACL 규칙을 나열할 수 있습니다.
사전 요구 사항
프로세스
--list
옵션을 사용하여kafka-acls.sh
를 실행합니다.예를 들면 다음과 같습니다.
opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic myTopic Current ACLs for resource `Topic:myTopic`: User:user1 has Allow permission for operations: Read from hosts: * User:user2 has Allow permission for operations: Read from hosts: * User:user2 has Deny permission for operations: Read from hosts: 127.0.0.1 User:user1 has Allow permission for operations: Describe from hosts: * User:user2 has Allow permission for operations: Describe from hosts: * User:user2 has Deny permission for operations: Describe from hosts: 127.0.0.1
추가 리소스
6.4.6.5. ACL 규칙 제거
AclAuthorizer
플러그인을 사용하여 ACL(Access Control Lists)을 기반으로 Kafka 브로커에 대한 액세스를 제어하는 경우 kafka-acls.sh
유틸리티를 사용하여 기존 ACL 규칙을 제거할 수 있습니다.
사전 요구 사항
프로세스
--remove
옵션을 사용하여kafka-acls.sh
를 실행합니다.예:
MyConsumerGroup
소비자 그룹을 사용하여myTopic
에서 Allowuser1
및user2
액세스를 읽을 수 있도록 ACL을 제거합니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Read --topic myTopic --allow-principal User:user1 --allow-principal User:user2 opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Describe --topic myTopic --allow-principal User:user1 --allow-principal User:user2 opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Read --operation Describe --group MyConsumerGroup --allow-principal User:user1 --allow-principal User:user2
MyConsumerGroup
을 사용하여myTopic
의 소비자로user1
을 추가하는 ACL을 제거합니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --consumer --topic myTopic --group MyConsumerGroup --allow-principal User:user1
IP 주소 호스트
127.0.0.1
에서myTopic
읽기를 위해user1
액세스를 거부하는 ACL을 제거합니다.opt/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove --operation Describe --operation Read --topic myTopic --group MyConsumerGroup --deny-principal User:user1 --deny-host 127.0.0.1
추가 리소스
6.4.7. Zookeeper 권한 부여
Kafka와 Zoo Cryostat 간에 인증이 활성화되면 Zoo Cryostat Access Control List(ACL) 규칙을 사용하여 Zoo Cryostat에 저장된 Kafka의 메타데이터에 대한 액세스를 자동으로 제어할 수 있습니다.
6.4.7.1. ACL 구성
Zoo Cryostat ACL 규칙 적용은 config/server
속성에 의해 제어됩니다.
.properties
Kafka 구성 파일의 Zookeeper.set.acl
속성은 기본적으로 비활성화되어 있으며 true
로 설정하여 활성화됩니다.
zookeeper.set.acl=true
ACL 규칙이 활성화되면 znode
가 Zoo Cryostat에서 생성되는 경우 이를 생성한 Kafka 사용자만 수정하거나 삭제할 수 있습니다. 다른 모든 사용자는 읽기 전용 액세스 권한을 갖습니다.
Kafka는 새로 생성된 Zoo Cryostat znodes
에만 ACL 규칙을 설정합니다. ACL이 클러스터를 처음 시작한 후에만 활성화된 경우 zookeeper-security-migration.sh
툴에서 모든 기존 znodes
에 ACL을 설정할 수 있습니다.
Zoo Cryostat의 데이터 기밀성
Zoo Cryostat에 저장된 데이터는 다음과 같습니다.
- 주제 이름 및 구성
- SASL SCRAM 인증을 사용할 때 Salted 및 hashed 사용자 자격 증명입니다.
그러나 Zoo Cryostat는 Kafka를 사용하여 전송 및 수신한 레코드를 저장하지 않습니다. Zoo Cryostat에 저장된 데이터는 기밀로 간주됩니다.
데이터가 기밀로 간주되는 경우(예: 주제 이름에 고객 ID가 포함되어 있기 때문에) 보호에 사용할 수 있는 유일한 옵션은 네트워크 수준에서 Zoo Cryostat를 분리하고 Kafka 브로커에만 액세스를 허용하는 것입니다.
6.4.7.2. 새 Kafka 클러스터에 대해 Zoo Cryostat ACL 활성화
다음 절차에서는 새 Kafka 클러스터의 Kafka 구성에서 Zoo Cryostat ACL을 활성화하는 방법을 설명합니다. Kafka 클러스터를 처음 시작하기 전에 이 절차를 사용하십시오. 이미 실행 중인 클러스터에서 Zoo Cryostat ACL을 활성화하려면 6.4.7.3절. “기존 Kafka 클러스터에서 Zoo Cryostat ACL 활성화” 을 참조하십시오.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
- Zookeeper 클러스터가 구성되어 실행 중입니다.
- 클라이언트-서버 간 인증은 Zoo Cryostat에서 활성화됩니다.
- Zookeeper 인증은 Kafka 브로커에서 사용할 수 있습니다.
- Kafka 브로커는 아직 시작되지 않았습니다.
프로세스
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집하여 모든 클러스터 노드에서zookeeper.set.acl
필드를true
로 설정합니다.zookeeper.set.acl=true
- Kafka 브로커를 시작합니다.
6.4.7.3. 기존 Kafka 클러스터에서 Zoo Cryostat ACL 활성화
다음 절차에서는 실행 중인 Kafka 클러스터의 Kafka 구성에서 Zoo Cryostat ACL을 활성화하는 방법을 설명합니다. Zookeeper -security-migration.sh
도구를 사용하여 기존 znodes
에 Zoo Cryostat ACL을 설정합니다. Zookeeper -security-migration.sh
는 AMQ Streams의 일부로 사용할 수 있으며 bin
디렉토리에서 찾을 수 있습니다.
사전 요구 사항
- Kafka 클러스터가 구성되어 실행 중입니다.
Zoo Cryostat ACL 활성화
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집하여 모든 클러스터 노드에서zookeeper.set.acl
필드를true
로 설정합니다.zookeeper.set.acl=true
모든 Kafka 브로커를 하나씩 다시 시작합니다.
멀티 노드 클러스터에서 브로커를 다시 시작하는 방법에 대한 자세한 내용은 4.3절. “Kafka 브로커의 정상 롤링 재시작 수행” 을 참조하십시오.
Zookeeper
-security-migration.sh 도구를 사용하여 기존의 모든 Zoo Cryostat
znodes
에서 ACL을 설정합니다.su - kafka cd /opt/kafka KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=<ZooKeeperURL> exit
예를 들면 다음과 같습니다.
su - kafka cd /opt/kafka KAFKA_OPTS="-Djava.security.auth.login.config=./config/jaas.conf"; ./bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=zoo1.my-domain.com:2181 exit
6.4.8. 암호화 및 인증
AMQ Streams는 리스너 구성의 일부로 구성된 암호화 및 인증을 지원합니다.
6.4.8.1. 리스너 구성
Kafka 브로커의 암호화 및 인증은 리스너별로 구성됩니다. Kafka 리스너 구성에 대한 자세한 내용은 6.4.2절. “리스너” 을 참조하십시오.
Kafka 브로커의 각 리스너는 자체 보안 프로토콜로 구성됩니다. 구성 속성 listener.security.protocol.map
은 보안 프로토콜을 사용하는 리스너를 정의합니다. 각 리스너 이름을 보안 프로토콜에 매핑합니다. 지원되는 보안 프로토콜은 다음과 같습니다.
일반 텍스트
- 암호화 또는 인증이 없는 리스너입니다.
SSL
- TLS 암호화를 사용하는 리스너 및 필요한 경우 TLS 클라이언트 인증서를 사용한 인증입니다.
SASL_PLAINTEXT
- 암호화가 없지만 SASL 기반 인증을 사용하는 리스너입니다.
SASL_SSL
- TLS 기반 암호화 및 SASL 기반 인증을 사용하는 리스너입니다.
다음 리스너 구성이
제공됩니다.
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094
listener.security.protocol.map
은 다음과 같을 수 있습니다.
listener.security.protocol.map=INT1:SASL_PLAINTEXT,INT2:SASL_SSL,REPLICATION:SSL
이렇게 하면 SASL 인증과 함께 암호화되지 않은 연결을 사용하고, 리스너 INT2
가 SASL 인증과 함께 암호화된 연결을 사용하고 REPLICATION
인터페이스를 사용하여 TLS 암호화(TLS 클라이언트 인증과 함께)를 사용하도록 리스너 INT1
을 구성합니다. 동일한 보안 프로토콜을 여러 번 사용할 수 있습니다. 다음 예도 유효한 구성입니다.
listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL
이러한 구성은 모든 인터페이스에 TLS 암호화 및 TLS 인증을 사용합니다. 다음 장에서는 TLS 및 SASL 구성 방법을 자세히 설명합니다.
6.4.8.2. TLS 암호화
Kafka는 Kafka 클라이언트와의 통신을 암호화하기 위해 TLS를 지원합니다.
TLS 암호화 및 서버 인증을 사용하려면 개인 키와 공개 키가 포함된 키 저장소를 제공해야 합니다. 일반적으로 JKS(Java Keystore) 형식으로 파일을 사용하여 수행합니다. 이 파일의 경로는 ssl.keystore.location
속성에 설정됩니다. 키 저장소를 보호하는 암호를 설정하는 데 ssl.keystore.password
속성을 사용해야 합니다. 예를 들면 다음과 같습니다.
ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456
경우에 따라 개인 키를 보호하는 데 추가 암호가 사용됩니다. 이러한 암호는 ssl.key.password
속성을 사용하여 설정할 수 있습니다.
Kafka는 인증 기관에서 서명한 키와 자체 서명 키를 사용할 수 있습니다. 인증 기관에서 서명한 키를 사용하는 것이 항상 선호되는 방법이어야 합니다. 클라이언트가 연결 중인 Kafka 브로커의 ID를 확인할 수 있도록 하려면 인증서에서 항상 공개된 호스트 이름을 CN(일반 이름) 또는 SAN(주체 대체 이름)으로 포함해야 합니다.
다양한 리스너에 다양한 SSL 구성을 사용할 수 있습니다. ssl으로 시작하는 모든 옵션 앞에
여기서 리스너 이름은 항상 소문자여야 합니다. 이렇게 하면 해당 특정 리스너에 대한 기본 SSL 구성이 재정의됩니다. 다음 예제에서는 다양한 리스너에 다양한 SSL 구성을 사용하는 방법을 보여줍니다.
listener.
name.<NameOfTheListener> 접두사를 지정할 수 있습니다.
listeners=INT1://:9092,INT2://:9093,REPLICATION://:9094 listener.security.protocol.map=INT1:SSL,INT2:SSL,REPLICATION:SSL # Default configuration - will be used for listeners INT1 and INT2 ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456 # Different configuration for listener REPLICATION listener.name.replication.ssl.keystore.location=/path/to/keystore/server-1.jks listener.name.replication.ssl.keystore.password=123456
추가 TLS 구성 옵션
위에서 설명한 기본 TLS 구성 옵션 외에도 Kafka는 TLS 구성을 미세 조정하기 위한 다양한 옵션을 지원합니다. 예를 들어 TLS / SSL 프로토콜 또는 암호화 제품군을 활성화하거나 비활성화하려면 다음을 수행합니다.
ssl.cipher.suites
- 활성화된 암호화 제품군 목록입니다. 각 암호화 제품군은 TLS 연결에 사용되는 인증, 암호화, MAC 및 키 교환 알고리즘의 조합입니다. 기본적으로 사용 가능한 모든 암호화 제품군이 활성화됩니다.
ssl.enabled.protocols
-
활성화된 TLS/SSL 프로토콜 목록입니다. 기본값은
TLSv1.2,TLSv1.1,TLSv1
입니다.
6.4.8.3. TLS 암호화 활성화
다음 절차에서는 Kafka 브로커에서 암호화를 활성화하는 방법을 설명합니다.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
프로세스
- 클러스터의 모든 Kafka 브로커에 대한 TLS 인증서를 생성합니다. 인증서에는 일반 이름 또는 주체 대체 이름에 광고 및 부트스트랩 주소가 있어야 합니다.
다음을 위해 모든 클러스터 노드에서
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집합니다.-
listener.security.protocol.map
필드를 변경하여 TLS 암호화를 사용하려는 리스너의SSL
프로토콜을 지정합니다. -
브로커 인증서와 함께 JKS 키 저장소의 경로로
ssl.keystore.location
옵션을 설정합니다. ssl.keystore.password
옵션을 키 저장소를 보호하는 데 사용한 암호로 설정합니다.예를 들면 다음과 같습니다.
listeners=UNENCRYPTED://:9092,ENCRYPTED://:9093,REPLICATION://:9094 listener.security.protocol.map=UNENCRYPTED:PLAINTEXT,ENCRYPTED:SSL,REPLICATION:PLAINTEXT ssl.keystore.location=/path/to/keystore/server-1.jks ssl.keystore.password=123456
-
- Kafka 브로커 시작
6.4.8.4. 인증
인증의 경우 다음을 사용할 수 있습니다.
- 암호화된 연결에 대한 X.509 인증서를 기반으로 하는 TLS 클라이언트 인증
- 지원되는 Kafka SASL(Simple Authentication and Security Layer) 메커니즘
- OAuth 2.0 토큰 기반 인증
6.4.8.4.1. TLS 클라이언트 인증
TLS 클라이언트 인증은 이미 TLS 암호화를 사용하는 연결에서만 사용할 수 있습니다. TLS 클라이언트 인증을 사용하려면 공개 키가 있는 신뢰 저장소를 브로커에 제공할 수 있습니다. 이러한 키는 브로커에 연결하는 클라이언트를 인증하는 데 사용할 수 있습니다. 신뢰 저장소는 JKS(Java Keystore) 형식으로 제공해야 하며 인증 기관의 공개 키가 포함되어야 합니다. 신뢰 저장소에 포함된 인증 기관 중 하나에 의해 서명된 공개 및 개인 키가 있는 모든 클라이언트가 인증됩니다. 신뢰 저장소의 위치는 ssl.truststore.location
필드를 사용하여 설정됩니다. 신뢰 저장소가 암호로 보호되는 경우 암호는 ssl.truststore.password
속성에 설정해야 합니다. 예를 들면 다음과 같습니다.
ssl.truststore.location=/path/to/keystore/server-1.jks ssl.truststore.password=123456
truststore가 구성되면 ssl.client.auth
속성을 사용하여 TLS 클라이언트 인증을 활성화해야 합니다. 이 속성은 다음 세 가지 값 중 하나로 설정할 수 있습니다.
none
- TLS 클라이언트 인증이 꺼집니다. (기본값)
requested
- TLS 클라이언트 인증은 선택 사항입니다. 클라이언트는 TLS 클라이언트 인증서를 사용하여 인증하라는 메시지가 표시되지만 선택할 수는 없습니다.
필수 항목
- 클라이언트는 TLS 클라이언트 인증서를 사용하여 인증하는 데 필요합니다.
클라이언트가 TLS 클라이언트 인증을 사용하여 인증하면 인증된 주체 이름이 인증된 클라이언트 인증서와 고유합니다. 예를 들어 고유 이름이 CN=someuser
인 인증서가 있는 사용자는 다음 주체 CN=someuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
로 인증됩니다. TLS 클라이언트 인증을 사용하지 않고 SASL이 비활성화되면 주 이름은 ANONYMOUS
입니다.
6.4.8.4.2. SASL 인증
SASL 인증은 JAAS(Java Authentication and Authorization Service)를 사용하여 구성됩니다. JAAS는 Kafka와 Zoo Cryostat 간의 연결 인증에도 사용됩니다. JAAS는 자체 구성 파일을 사용합니다. 이 파일의 권장 위치는 /opt/kafka/config/jaas.conf
입니다. 이 파일은 kafka
사용자가 읽을 수 있어야 합니다. Kafka를 실행하는 경우 Java 시스템 속성 java.security.auth.login.config
를 사용하여 이 파일의 위치가 지정됩니다. 브로커 노드를 시작할 때 이 속성을 Kafka로 전달해야 합니다.
KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/my/jaas.config"; bin/kafka-server-start.sh
SASL 인증은 일반 암호화되지 않은 연결과 TLS 연결을 통해 모두 지원됩니다. 각 리스너에 대해 SASL을 개별적으로 활성화할 수 있습니다. 이를 활성화하려면 listener.security.protocol.map
의 보안 프로토콜이 SASL_PLAINTEXT
또는 SASL_SSL
이어야 합니다.
Kafka의 SASL 인증은 몇 가지 다른 메커니즘을 지원합니다.
PLAIN
- 사용자 이름과 암호를 기반으로 인증을 구현합니다. 사용자 이름과 암호는 Kafka 구성에 로컬로 저장됩니다.
SCRAM-SHA-256
및SCRAM-SHA-512
- SCRAM(Salted Challenge Response Authentication Mechanism)을 사용하여 인증을 구현합니다. SCRAM 자격 증명은 중앙 Zoo Cryostat에 저장됩니다. SCRAM은 Zoo Cryostat 클러스터 노드가 프라이빗 네트워크에서 격리된 상태로 실행되는 상황에서 사용할 수 있습니다.
GSSAPI
- Kerberos 서버에 대한 인증을 구현합니다.
PLAIN
메커니즘은 암호화되지 않은 형식으로 네트워크를 통해 사용자 이름과 암호를 전송합니다. 따라서 TLS 암호화와 함께만 사용해야 합니다.
SASL 메커니즘은 JAAS 구성 파일을 통해 구성됩니다. Kafka는 KafkaServer
라는 JAAS 컨텍스트를 사용합니다. JAAS에서 구성한 후 Kafka 구성에서 SASL 메커니즘을 활성화해야 합니다. 이 작업은 sasl.enabled.mechanisms
속성을 사용하여 수행됩니다. 이 속성에는 활성화된 메커니즘의 쉼표로 구분된 목록이 포함되어 있습니다.
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
broker 간 통신에 사용되는 리스너가 SASL을 사용하는 경우 속성 sasl.mechanism.inter.broker.protocol
을 사용하여 사용해야 하는 SASL 메커니즘을 지정해야 합니다. 예를 들면 다음과 같습니다.
sasl.mechanism.inter.broker.protocol=PLAIN
broker 간 통신에 사용할 사용자 이름과 암호는 사용자 이름 및 암호 를 사용하여
. KafkaServer
JAAS 컨텍스트에 지정해야 합니다
SASL PLAIN
PLAIN 메커니즘을 사용하려면 연결할 수 있는 사용자 이름과 암호가 JAAS 컨텍스트에서 직접 지정됩니다. 다음 예제에서는 SASL PLAIN 인증에 대해 구성된 컨텍스트를 보여줍니다. 이 예제에서는 세 개의 다른 사용자를 구성합니다.
-
admin
-
user1
-
user2
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; };
사용자 데이터베이스가 있는 JAAS 구성 파일은 모든 Kafka 브로커에서 동기화되어 있어야 합니다.
broker 간 인증에 SASL PLAIN도 사용하는 경우 사용자 이름과
암호
속성을 JAAS 컨텍스트에 포함해야 합니다.
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456" user_admin="123456" user_user1="123456" user_user2="123456"; };
SASL SCRAM
Kafka의 SCRAM 인증은 SCRAM-SHA-256
및 SCRAM-SHA-512
의 두 가지 메커니즘으로 구성됩니다. 이러한 메커니즘은 SHA-256과 SHA-512 강화의 해시 알고리즘에서만 다릅니다. SCRAM 인증을 활성화하려면 JAAS 구성 파일에 다음 구성을 포함해야 합니다.
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; };
Kafka 구성 파일에서 SASL 인증을 활성화할 때 두 SCRAM 메커니즘을 모두 나열할 수 있습니다. 그러나 그 중 하나만 브루커 간 통신을 위해 선택할 수 있습니다. 예를 들면 다음과 같습니다.
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
SCRAM 메커니즘에 대한 사용자 자격 증명은 Zoo Cryostat에 저장됩니다. kafka-configs.sh
도구를 사용하여 관리할 수 있습니다. 예를 들어 다음 명령을 실행하여 암호 123456이 있는 user1 사용자를 추가합니다.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
사용자 인증 정보를 삭제하려면 다음을 사용합니다.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
SASL GSSAPI
Kerberos를 사용한 인증에 사용되는 SASL 메커니즘을 GSSAPI
라고 합니다. Kerberos SASL 인증을 구성하려면 JAAS 구성 파일에 다음 구성을 추가해야 합니다.
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; };
Kerberos 주체의 도메인 이름은 항상 대문자여야 합니다.
JAAS 구성 외에도 Kafka 구성의 sasl.kerberos.service.name
속성에 Kerberos 서비스 이름을 지정해야 합니다.
sasl.enabled.mechanisms=GSSAPI sasl.mechanism.inter.broker.protocol=GSSAPI sasl.kerberos.service.name=kafka
여러 SASL 메커니즘
Kafka는 여러 SASL 메커니즘을 동시에 사용할 수 있습니다. 서로 다른 JAAS 구성은 모두 동일한 컨텍스트에 추가할 수 있습니다.
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; org.apache.kafka.common.security.scram.ScramLoginModule required; };
여러 메커니즘이 활성화되면 클라이언트는 사용하려는 메커니즘을 선택할 수 있습니다.
6.4.8.5. TLS 클라이언트 인증 활성화
다음 절차에서는 Kafka 브로커에서 TLS 클라이언트 인증을 활성화하는 방법을 설명합니다.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
- TLS 암호화가 활성화되어 있습니다.
프로세스
- 사용자 인증서에 서명하는 데 사용되는 인증 기관의 공개 키가 포함된 JKS 신뢰 저장소를 준비합니다.
다음을 위해 모든 클러스터 노드에서
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집합니다.-
사용자 인증서의 인증 기관이 있는 JKS 신뢰 저장소 경로로
ssl.truststore.location
옵션을 설정합니다. -
ssl.truststore.password
옵션을 신뢰 저장소를 보호하는 데 사용한 암호로 설정합니다. ssl.client.auth
옵션을required
로 설정합니다.예를 들면 다음과 같습니다.
ssl.truststore.location=/path/to/truststore.jks ssl.truststore.password=123456 ssl.client.auth=required
-
사용자 인증서의 인증 기관이 있는 JKS 신뢰 저장소 경로로
- Kafka 브로커 시작
6.4.8.6. SASL PLAIN 인증 활성화
다음 절차에서는 Kafka 브로커에서 SASL PLAIN 인증을 활성화하는 방법을 설명합니다.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
프로세스
/opt/kafka/config/jaas.conf
JAAS 구성 파일을 편집하거나 만듭니다. 이 파일에는 모든 사용자와 암호가 포함되어야 합니다. 이 파일이 모든 Kafka 브로커에서 동일한지 확인합니다.예를 들면 다음과 같습니다.
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required user_admin="123456" user_user1="123456" user_user2="123456"; };
다음을 위해 모든 클러스터 노드에서
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집합니다.-
SASL PLAIN 인증을 사용하려는 리스너의
SASL_PLAINTEXT
또는SASL_SSL
프로토콜을 지정하도록listener.security.protocol.map
필드를 변경합니다. sasl.enabled.mechanisms
옵션을PLAIN
으로 설정합니다.예를 들면 다음과 같습니다.
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=PLAIN
-
SASL PLAIN 인증을 사용하려는 리스너의
(re) KAFKA_OPTS 환경 변수를 사용하여 Kafka 브로커를 시작하여 JAAS 구성을 Kafka 브로커에 전달합니다.
su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
6.4.8.7. SASL SCRAM 인증 활성화
다음 절차에서는 Kafka 브로커에서 SASL SCRAM 인증을 활성화하는 방법을 설명합니다.
사전 요구 사항
- AMQ Streams는 Kafka 브로커로 사용할 모든 호스트에 설치됩니다.
프로세스
/opt/kafka/config/jaas.conf
JAAS 구성 파일을 편집하거나 만듭니다.KafkaServer
컨텍스트에 대해ScramLoginModule
을 활성화합니다. 이 파일이 모든 Kafka 브로커에서 동일한지 확인합니다.예를 들면 다음과 같습니다.
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required; };
다음을 위해 모든 클러스터 노드에서
/opt/kafka/config/server.properties
Kafka 구성 파일을 편집합니다.-
SASL SCRAM 인증을 사용하려는 리스너의
SASL_PLAINTEXT
또는SASL_SSL
프로토콜을 지정하도록listener.security.protocol.map
필드를 변경합니다. sasl.enabled.mechanisms
옵션을SCRAM-SHA-256
또는SCRAM-SHA-512
로 설정합니다.예를 들면 다음과 같습니다.
listeners=INSECURE://:9092,AUTHENTICATED://:9093,REPLICATION://:9094 listener.security.protocol.map=INSECURE:PLAINTEXT,AUTHENTICATED:SASL_PLAINTEXT,REPLICATION:PLAINTEXT sasl.enabled.mechanisms=SCRAM-SHA-512
-
SASL SCRAM 인증을 사용하려는 리스너의
(re) KAFKA_OPTS 환경 변수를 사용하여 Kafka 브로커를 시작하여 JAAS 구성을 Kafka 브로커에 전달합니다.
su - kafka export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/jaas.conf"; /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
6.4.8.8. SASL SCRAM 사용자 추가
이 절차에서는 SASL SCRAM을 사용하여 인증을 위해 새 사용자를 추가하는 방법을 설명합니다.
프로세스
kafka-configs.sh
도구를 사용하여 새 SASL SCRAM 사용자를 추가합니다.bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --add-config 'SCRAM-SHA-512=[password=<Password>]' --entity-type users --entity-name <Username>
예를 들면 다음과 같습니다.
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name user1
6.4.8.9. SASL SCRAM 사용자 삭제
다음 절차에서는 SASL SCRAM 인증을 사용할 때 사용자를 제거하는 방법을 설명합니다.
프로세스
kafka-configs.sh
도구를 사용하여 SASL SCRAM 사용자를 삭제합니다./opt/kafka/bin/kafka-configs.sh --bootstrap-server <broker_address> --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name <Username>
예를 들면 다음과 같습니다.
/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name user1
6.4.9. OAuth 2.0 토큰 기반 인증 사용
AMQ Streams는 OAUTHBEARER 및 PLAIN 메커니즘을 사용한 OAuth 2.0 인증 사용을 지원합니다.
OAuth 2.0은 중앙 권한 부여 서버를 사용하여 리소스에 대한 제한된 액세스 권한을 부여하는 토큰을 발행하여 애플리케이션 간에 표준화된 토큰 기반 인증과 권한을 부여할 수 있습니다.
OAuth 2.0 인증을 구성한 다음 OAuth 2.0 인증을 구성할 수 있습니다.
Kafka 브로커 및 클라이언트는 모두 OAuth 2.0을 사용하도록 구성해야 합니다. OAuth 2.0 인증은 단순
또는 OPA 기반 Kafka 권한 부여와 함께 사용할 수도 있습니다.
애플리케이션 클라이언트는 OAuth 2.0 인증을 사용하여 계정 자격 증명을 노출하지 않고도 애플리케이션 서버( 리소스 서버라고 함)의 리소스에 액세스할 수 있습니다.
애플리케이션 클라이언트는 인증 수단으로 액세스 토큰을 전달합니다. 이 방법으로 부여할 액세스 수준을 결정하는 데 사용할 애플리케이션 서버가 사용할 수 있습니다. 권한 부여 서버는 액세스 권한 부여 및 액세스 관련 문의를 처리합니다.
AMQ Streams의 컨텍스트에서 다음을 수행합니다.
- Kafka 브로커는 OAuth 2.0 리소스 서버 역할을 합니다.
- Kafka 클라이언트가 OAuth 2.0 애플리케이션 클라이언트 역할을 합니다.
Kafka 클라이언트는 Kafka 브로커에 인증합니다. 브로커 및 클라이언트는 필요에 따라 OAuth 2.0 권한 부여 서버와 통신하여 액세스 토큰을 얻거나 검증합니다.
AMQ Streams 배포를 위해 OAuth 2.0 통합은 다음을 제공합니다.
- Kafka 브로커에 대한 서버 측 OAuth 2.0 지원
- Kafka MirrorMaker, Kafka Connect 및 Kafka Bridge에 대한 클라이언트 측 OAuth 2.0 지원
RHEL의 AMQ Streams에는 두 개의 OAuth 2.0 라이브러리가 포함되어 있습니다.
kafka-oauth-client
-
io.strimzi.kafka.client.client.JaasClientOauthLoginCallbackHandler
라는 사용자 정의 로그인 콜백 처리기 클래스를 제공합니다.OAUTHBEARER
인증 메커니즘을 처리하려면 Apache Kafka에서 제공하는OAuthBearerLoginModule
과 함께 로그인 콜백 핸들러를 사용합니다. kafka-oauth-common
-
kafka-oauth-client
라이브러리에 필요한 일부 기능을 제공하는 도우미 라이브러리입니다.
제공된 클라이언트 라이브러리는 keycloak-core
,jackson-databind
, slf4j-api
와 같은 일부 추가 타사 라이브러리에 대한 종속성도 있습니다.
모든 종속성 라이브러리가 포함되도록 Maven 프로젝트를 사용하여 클라이언트를 패키징하는 것이 좋습니다. 종속성 라이브러리는 향후 버전에서 변경될 수 있습니다.
추가 리소스
6.4.9.1. OAuth 2.0 인증 메커니즘
AMQ Streams는 OAuth 2.0 인증을 위한 OAUTHBEARER 및 PLAIN 메커니즘을 지원합니다. 두 메커니즘을 통해 Kafka 클라이언트가 Kafka 브로커를 사용하여 인증된 세션을 설정할 수 있습니다. 클라이언트, 권한 부여 서버 및 Kafka 브로커 간의 인증 흐름은 각 메커니즘마다 다릅니다.
가능한 경우 OAUTHBEARER를 사용하도록 클라이언트를 구성하는 것이 좋습니다. OAUTHBEARER는 클라이언트 인증 정보가 Kafka 브로커와 공유 되지 않기 때문에 PLAIN보다 높은 수준의 보안을 제공합니다. OAUTHBEARER를 지원하지 않는 Kafka 클라이언트에서만 PLAIN을 사용하는 것이 좋습니다.
클라이언트 연결에 OAuth 2.0 인증을 사용하도록 Kafka 브로커 리스너를 구성합니다. 필요한 경우 동일한 oauth
리스너에서 OAUTHBEARER 및 PLAIN 메커니즘을 사용할 수 있습니다. 각 메커니즘을 지원하는 속성은 oauth
리스너 구성에 명시적으로 지정해야 합니다.
OAUTHBEARER 개요
OAUTHBEARER
를 사용하려면 Kafka 브로커의 OAuth 인증 리스너 구성에서 sasl.enabled.mechanisms
를 OAUTHBEARER로 설정합니다. 자세한 구성은 6.4.9.2절. “OAuth 2.0 Kafka 브로커 구성” 에서 참조하십시오.
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER
많은 Kafka 클라이언트 툴은 프로토콜 수준에서 OAUTHBEARER에 대한 기본 지원을 제공하는 라이브러리를 사용합니다. 애플리케이션 개발을 지원하기 위해 AMQ Streams는 업스트림 Kafka 클라이언트 Java 라이브러리에 대한 OAuth 콜백 처리기 를 제공합니다(다른 라이브러리는 아님). 따라서 자체 콜백 처리기를 작성할 필요가 없습니다. 애플리케이션 클라이언트는 콜백 처리기를 사용하여 액세스 토큰을 제공할 수 있습니다. Go와 같은 다른 언어로 작성된 클라이언트는 사용자 지정 코드를 사용하여 권한 부여 서버에 연결하고 액세스 토큰을 받아야 합니다.
클라이언트는 OAUTHBEARER를 사용하여 인증 정보 교환을 위해 Kafka 브로커로 세션을 시작합니다. 여기서 인증 정보는 콜백 처리기에서 제공하는 전달자 토큰의 형태를 취합니다. 콜백을 사용하면 다음 세 가지 방법 중 하나로 토큰 프로비저닝을 구성할 수 있습니다.
- 클라이언트 ID 및 시크릿( OAuth 2.0 클라이언트 인증 정보 메커니즘 사용)
- 구성 시 수동으로 얻은 장기 액세스 토큰
- 구성 시 수동으로 가져온 장기 새로 고침 토큰
OAUTHBEARER 인증은 프로토콜 수준에서 OAUTHBEARER 메커니즘을 지원하는 Kafka 클라이언트에서만 사용할 수 있습니다.
PLAIN 개요
PLAIN을 사용하려면 sasl.enabled.mechanisms
값에 PLAIN
을 추가합니다.
listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
PLAIN은 모든 Kafka 클라이언트 툴에서 사용하는 간단한 인증 메커니즘입니다. OAuth 2.0 인증에 사용할 PLAIN을 활성화하기 위해 AMQ Streams는 PLAIN 서버 측 콜백을 통해 OAuth 2.0 을 제공합니다.
PLAIN의 AMQ Streams 구현을 사용하면 클라이언트 인증 정보가 Zoo Cryostat에 저장되지 않습니다. 대신 OAUTHBEARER 인증이 사용되는 경우와 유사하게 클라이언트 자격 증명은 호환 권한 부여 서버 뒤에서 중앙에서 처리됩니다.
PLAIN 콜백을 통해 OAuth 2.0과 함께 사용하면 Kafka 클라이언트가 다음 방법 중 하나를 사용하여 Kafka 브로커로 인증합니다.
- 클라이언트 ID 및 시크릿( OAuth 2.0 클라이언트 인증 정보 메커니즘 사용)
- 구성 시 수동으로 얻은 장기 액세스 토큰
두 방법 모두 클라이언트는 Kafka 브로커에 인증 정보를 전달하기 위해 PLAIN 사용자 이름과
암호
속성을 제공해야 합니다. 클라이언트는 이러한 속성을 사용하여 클라이언트 ID와 시크릿 또는 사용자 이름 및 액세스 토큰을 전달합니다.
클라이언트 ID와 시크릿은 액세스 토큰을 가져오는 데 사용됩니다.
액세스 토큰은 암호
속성 값으로 전달됩니다. $accessToken:
접두사를 사용하거나 사용하지 않고 액세스 토큰을 전달합니다.
-
리스너 구성에서 토큰 끝점(
oauth.token.endpoint.uri
)을 구성하는 경우 접두사가 필요합니다. -
리스너 구성에서 토큰 끝점(
oauth.token.endpoint.uri
)을 구성하지 않으면 접두사가 필요하지 않습니다. Kafka 브로커는 암호를 원시 액세스 토큰으로 해석합니다.
암호
가 액세스 토큰으로 설정된 경우 사용자
이름은 Kafka 브로커가 액세스 토큰에서 얻는 것과 동일한 주체 이름으로 설정해야 합니다. oauth.username.claim
,oauth.fallback.username.claim
,oauth.fallback.username.prefix
, oauth.userinfo.endpoint.uri
속성을 사용하여 리스너에서 사용자 이름 추출 옵션을 지정할 수 있습니다. 사용자 이름 추출 프로세스는 권한 부여 서버(특히 클라이언트 ID를 계정 이름에 매핑하는 방법)에 따라 다릅니다.
PLAIN을 통한 OAuth는 (더 이상 사용되지 않는) OAuth 2.0 암호 부여 메커니즘을 사용하여 사용자 이름과 암호(암호 부여) 전달을 지원하지 않습니다.
6.4.9.1.1. 속성 또는 변수를 사용하여 OAuth 2.0 구성
JAAS(Java Authentication and Authorization Service) 속성 또는 환경 변수를 사용하여 OAuth 2.0 설정을 구성할 수 있습니다.
-
JAAS 속성은
server.properties
구성 파일에 구성되어listener.name의 키-값 쌍으로 전달됩니다.LISTENER-NAME.oauthbearer.sasl.jaas.config
속성. 환경 변수를 사용하는 경우에도
listener.name.LISTENER-NAME.oauthbearer.sasl.jaas.config
속성을server.properties
파일에 제공해야 하지만 다른 JAAS 속성을 생략할 수 있습니다.대문자 또는 대문자 환경 변수 이름 지정 규칙을 사용할 수 있습니다.
AMQ Streams OAuth 2.0 라이브러리는 다음으로 시작하는 속성을 사용합니다.
-
OAuth.
인증 구성 -
Strimzi.
OAuth 2.0 인증 구성
추가 리소스
6.4.9.2. OAuth 2.0 Kafka 브로커 구성
OAuth 2.0 인증에 대한 Kafka 브로커 구성은 다음과 같습니다.
- 권한 부여 서버에서 OAuth 2.0 클라이언트 생성
- Kafka 클러스터에서 OAuth 2.0 인증 구성
권한 부여 서버와 관련하여 Kafka 브로커 및 Kafka 클라이언트는 모두 OAuth 2.0 클라이언트로 간주됩니다.
6.4.9.2.1. 권한 부여 서버의 OAuth 2.0 클라이언트 구성
세션 시작 중에 수신된 토큰을 확인하도록 Kafka 브로커를 구성하려면 다음 클라이언트 인증 정보가 활성화된 권한 부여 서버로 구성된 OAuth 2.0 클라이언트 정의를 생성하는 것이 좋습니다.
-
kafka-broker
의 클라이언트 ID(예:) - 인증 메커니즘으로 클라이언트 ID 및 시크릿
권한 부여 서버의 비공개 인트로스펙션 엔드포인트를 사용하는 경우에만 클라이언트 ID와 시크릿을 사용해야 합니다. 인증 정보는 빠른 로컬 JWT 토큰 검증과 마찬가지로 공용 권한 부여 서버 끝점을 사용할 때 필요하지 않습니다.
6.4.9.2.2. Kafka 클러스터의 OAuth 2.0 인증 구성
Kafka 클러스터에서 OAuth 2.0 인증을 사용하려면 Kafka server.properties
파일에서 Kafka 클러스터에 대한 OAuth 인증 리스너 구성을 활성화합니다. 최소 구성이 필요합니다. TLS가 브루커 간 통신에 사용되는 TLS 리스너를 구성할 수도 있습니다.
다음 방법 중 하나를 사용하여 권한 부여 서버에서 토큰 검증을 위해 브로커를 구성할 수 있습니다.
- 빠른 로컬 토큰 검증: 서명된 JWT 형식의 액세스 토큰과 함께 JWKS 끝점
- 인트로스펙션 끝점
OAUTHBEARER 또는 PLAIN 인증을 구성하거나 둘 다 구성할 수 있습니다.
다음 예제에서는 글로벌 리스너 구성을 적용하는 최소 구성을 보여줍니다. 즉, 애플리케이션 클라이언트와 동일한 리스너 간 통신이 진행됩니다.
이 예제에서는 또한 listener.name을 지정하는 특정 리스너에 대한 OAuth 2.0 구성을 보여줍니다. sasl.enabled.mechanisms 대신LISTENER-NAME
. sasl.enabled.mechanisms
. LISTENER-NAME 은 리스너의 대소문자를 구분하지 않는 이름입니다. 여기서는 리스너 CLIENT
의 이름을 지정하므로 속성 이름은 listener.name.client.sasl.enabled.mechanisms
입니다.
이 예에서는 OAUTHBEARER 인증을 사용합니다.
예: JWKS 엔드포인트를 사용한 OAuth 2.0 인증에 대한 최소 리스너 구성
sasl.enabled.mechanisms=OAUTHBEARER 1 listeners=CLIENT://0.0.0.0:9092 2 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 3 listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER 4 sasl.mechanism.inter.broker.protocol=OAUTHBEARER 5 inter.broker.listener.name=CLIENT 6 listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 7 listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 8 oauth.valid.issuer.uri="https://<oauth_server_address>" \ 9 oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \ 10 oauth.username.claim="preferred_username" \ 11 oauth.client.id="kafka-broker" \ 12 oauth.client.secret="kafka-secret" \ 13 oauth.token.endpoint.uri="https://<oauth_server_address>/token" ; 14 listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 15 listener.name.client.oauthbearer.connections.max.reauth.ms=3600000 16
- 1
- SASL을 통해 인증 정보를 교환하기 위한 OAUTHBEARER 메커니즘을 활성화합니다.
- 2
- 클라이언트 애플리케이션이 연결할 리스너를 구성합니다. 시스템
호스트
이름은 공개된 호스트 이름으로 사용되며, 다시 연결하기 위해 클라이언트가 확인해야 합니다. 이 예제에서는 리스너의 이름이CLIENT
로 지정됩니다. - 3
- 리스너의 채널 프로토콜을 지정합니다.
SASL_SSL
은 TLS용입니다.SASL_PLAINTEXT
는 암호화되지 않은 연결(TLS 없음)에 사용되지만 TCP 연결 계층에서 도청 및 인터셉션의 위험이 있습니다. - 4
- CLIENT 리스너의 OAUTHBEARER 메커니즘을 지정합니다. 클라이언트 이름(
CLIENT
)은 일반적으로listeners
속성에 대문자로 지정되며,listener.name
.client 속성의 경우 소문자(listener.name.client
) 및listener.name.client.*
속성의 일부인 경우 소문자로 지정됩니다. - 5
- broker 간 통신을 위한 OAUTHBEARER 메커니즘을 지정합니다.
- 6
- broker 간 통신을 위한 리스너를 지정합니다. 구성이 유효하려면 사양이 필요합니다.
- 7
- 클라이언트 리스너에 OAuth 2.0 인증을 구성합니다.
- 8
- 클라이언트 및 브랜드 간 통신에 대한 인증 설정을 구성합니다.
oauth.client.id
,oauth.client.secret
및auth.token.endpoint.uri
속성은broker 간 구성과 관련이 있습니다. - 9
- 유효한 발행자 URI입니다. 이 발행자가 발행한 액세스 토큰만 허용됩니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME.
- 10
- JWKS 엔드포인트 URL입니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs.
- 11
- 토큰에 실제 사용자 이름이 포함된 토큰 클레임(또는 키)입니다. 사용자 이름은 사용자를 식별하는 데 사용되는 주체 입니다. 값은 인증 흐름 및 사용된 권한 부여 서버에 따라 달라집니다. 필요한 경우
"['user.info'].['user.id']"
와 같은 JsonPath 표현식을 사용하여 토큰 내의 중첩된 JSON 속성에서 사용자 이름을 검색할 수 있습니다. - 12
- 모든 브로커에 대해 동일한 Kafka 브로커의 클라이언트 ID입니다. 이 클라이언트는 권한 부여 서버에
kafka-broker
로 등록된 클라이언트 입니다. - 13
- 모든 브로커에 대해 동일한 Kafka 브로커의 시크릿입니다.
- 14
- 권한 부여 서버의 OAuth 2.0 토큰 끝점 URL입니다. 프로덕션의 경우 항상
https://
urls를 사용합니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token. - 15
- broker 간 통신을 위해 OAuth 2.0 인증에만 필요합니다.
- 16
- (선택 사항) 토큰이 만료될 때 세션 만료를 강제 적용하고 Kafka 재인증 메커니즘 도 활성화합니다. 지정된 값이 액세스 토큰이 만료될 때까지 남은 시간보다 작으면 클라이언트는 실제 토큰 만료 전에 다시 인증해야 합니다. 기본적으로 액세스 토큰이 만료되고 클라이언트가 재인증을 시도하지 않으면 세션이 만료되지 않습니다.
다음 예제에서는 TLS 리스너에 대한 최소 구성을 보여줍니다. 여기서 TLS는 브루커 간 통신에 사용됩니다.
예: OAuth 2.0 인증을 위한 TLS 리스너 구성
listeners=REPLICATION://kafka:9091,CLIENT://kafka:9092 1 listener.security.protocol.map=REPLICATION:SSL,CLIENT:SASL_PLAINTEXT 2 listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER inter.broker.listener.name=REPLICATION listener.name.replication.ssl.keystore.password=<keystore_password> 3 listener.name.replication.ssl.truststore.password=<truststore_password> listener.name.replication.ssl.keystore.type=JKS listener.name.replication.ssl.truststore.type=JKS listener.name.replication.ssl.secure.random.implementation=SHA1PRNG 4 listener.name.replication.ssl.endpoint.identification.algorithm=HTTPS 5 listener.name.replication.ssl.keystore.location=<path_to_keystore> 6 listener.name.replication.ssl.truststore.location=<path_to_truststore> 7 listener.name.replication.ssl.client.auth=required 8 listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 9 oauth.valid.issuer.uri="https://<oauth_server_address>" \ oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \ oauth.username.claim="preferred_username" ;
- 1
- broker 간 통신 및 클라이언트 애플리케이션에는 별도의 구성이 필요합니다.
- 2
- TLS를 사용하도록 REPLICATION 리스너와 CLIENT 리스너가 암호화되지 않은 채널을 통해 SASL을 사용하도록 구성합니다. 클라이언트는 프로덕션 환경에서 암호화된 채널(
SASL_SSL
)을 사용할 수 있습니다. - 3
ssl.
속성은 TLS 구성을 정의합니다.- 4
- 임의 번호 생성기 구현. 설정하지 않으면 Java 플랫폼 SDK 기본값이 사용됩니다.
- 5
- 호스트 이름 확인 빈 문자열로 설정하면 호스트 이름 확인이 해제됩니다. 설정되지 않은 경우 기본값은
HTTPS
로, 서버 인증서에 대한 호스트 이름 확인을 적용합니다. - 6
- 리스너의 키 저장소 경로입니다.
- 7
- 리스너의 신뢰 저장소 경로입니다.
- 8
- TLS 연결을 설정할 때 REPLICATION 리스너의 클라이언트가 클라이언트 인증서로 인증되도록 지정합니다(브러 간 연결에 사용됨).
- 9
- OAuth 2.0의 CLIENT 리스너를 구성합니다. 권한 부여 서버와의 연결은 보안 HTTPS 연결을 사용해야 합니다.
다음 예제에서는 SASL을 통해 인증 정보를 교환하기 위한 PLAIN 인증 메커니즘을 사용한 OAuth 2.0 인증에 대한 최소 구성을 보여줍니다. 빠른 로컬 토큰 검증이 사용됩니다.
예: PLAIN 인증을 위한 최소 리스너 구성
listeners=CLIENT://0.0.0.0:9092 1 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT 2 listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN 3 sasl.mechanism.inter.broker.protocol=OAUTHBEARER 4 inter.broker.listener.name=CLIENT 5 listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler 6 listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ 7 oauth.valid.issuer.uri="http://<auth_server>/auth/realms/<realm>" \ 8 oauth.jwks.endpoint.uri="https://<auth_server>/auth/realms/<realm>/protocol/openid-connect/certs" \ 9 oauth.username.claim="preferred_username" \ 10 oauth.client.id="kafka-broker" \ 11 oauth.client.secret="kafka-secret" \ 12 oauth.token.endpoint.uri="https://<oauth_server_address>/token" ; 13 listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler 14 listener.name.client.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler 15 listener.name.client.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ 16 oauth.valid.issuer.uri="https://<oauth_server_address>" \ 17 oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \ 18 oauth.username.claim="preferred_username" \ 19 oauth.token.endpoint.uri="http://<auth_server>/auth/realms/<realm>/protocol/openid-connect/token" ; 20 connections.max.reauth.ms=3600000 21
- 1
- 클라이언트 애플리케이션이 연결할 리스너(이 예에서는
CLIENT
)를 구성합니다. 시스템호스트
이름은 공개된 호스트 이름으로 사용되며, 다시 연결하기 위해 클라이언트가 확인해야 합니다. 이 리스너는 유일하게 구성된 리스너이므로 상호 통신에도 사용됩니다. - 2
- 암호화되지 않은 채널을 통해 SASL을 사용하도록 예제
CLIENT
리스너를 구성합니다. 프로덕션 환경에서 클라이언트는 TCP 연결 계층에서 도청 및 인터셉션을 방지하기 위해 암호화된 채널(SASL_SSL
)을 사용해야 합니다. - 3
- SASL 및 OAUTHBEARER 를 통해 인증 정보를 교환하는 데 PLAIN 인증 메커니즘을 활성화합니다. OAUTHBEARER는 또한 상호 통신에 필요하기 때문에 지정됩니다. Kafka 클라이언트는 연결할 메커니즘을 선택할 수 있습니다.
- 4
- broker 간 통신을 위한 OAUTHBEARER 인증 메커니즘을 지정합니다.
- 5
- broker 간 통신을 위한 리스너(이 예에서는
CLIENT
)를 지정합니다. 구성이 유효한 데 필요합니다. - 6
- OAUTHBEARER 메커니즘에 대한 서버 콜백 핸들러를 구성합니다.
- 7
- OAUTHBEARER 메커니즘을 사용하여 클라이언트 및 브랜드 간 통신에 대한 인증 설정을 구성합니다.
oauth.client.id
,oauth.client.secret
및oauth.token.endpoint.uri
속성은broker 간 구성과 관련이 있습니다. - 8
- 유효한 발행자 URI입니다. 이 발행자의 액세스 토큰만 허용됩니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
- 9
- JWKS 엔드포인트 URL입니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
- 10
- 토큰에 실제 사용자 이름이 포함된 토큰 클레임(또는 키)입니다. 사용자 이름은 사용자를 식별하는 데 사용되는 주체 입니다. 값은 인증 흐름 및 사용된 권한 부여 서버에 따라 달라집니다. 필요한 경우
"['user.info'].['user.id']"
와 같은 JsonPath 표현식을 사용하여 토큰 내의 중첩된 JSON 속성에서 사용자 이름을 검색할 수 있습니다. - 11
- 모든 브로커에 대해 동일한 Kafka 브로커의 클라이언트 ID입니다. 이 클라이언트는 권한 부여 서버에
kafka-broker
로 등록된 클라이언트 입니다. - 12
- Kafka 브로커의 시크릿(모든 브로커에 대해 동일)
- 13
- 권한 부여 서버의 OAuth 2.0 토큰 끝점 URL입니다. 프로덕션의 경우 항상
https://
urls를 사용합니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token - 14
- 브랜드 간 통신을 위해 OAuth 2.0 인증을 활성화합니다.
- 15
- PLAIN 인증에 대한 서버 콜백 처리기를 구성합니다.
- 16
- PLAIN 인증을 사용하여 클라이언트 통신에 대한 인증 설정을 구성합니다.
OAuth.token.endpoint.uri
는 OAuth 2.0 클라이언트 인증 정보 메커니즘을 사용하여 PLAIN을 통해 OAuth 2.0 을 활성화하는 선택적 속성입니다. - 17
- 유효한 발행자 URI입니다. 이 발행자의 액세스 토큰만 허용됩니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME
- 18
- JWKS 엔드포인트 URL입니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs
- 19
- 토큰에 실제 사용자 이름이 포함된 토큰 클레임(또는 키)입니다. 사용자 이름은 사용자를 식별하는 데 사용되는 주체 입니다. 값은 인증 흐름 및 사용된 권한 부여 서버에 따라 달라집니다. 필요한 경우
"['user.info'].['user.id']"
와 같은 JsonPath 표현식을 사용하여 토큰 내의 중첩된 JSON 속성에서 사용자 이름을 검색할 수 있습니다. - 20
- 권한 부여 서버의 OAuth 2.0 토큰 끝점 URL입니다. PLAIN 메커니즘에 대한 추가 구성입니다. 지정된 경우 클라이언트는
$accessToken:
접두사를 사용하여암호로
액세스 토큰을 전달하여 PLAIN을 통해 인증할 수 있습니다.프로덕션의 경우 항상
https://
urls를 사용합니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token. - 21
- (선택 사항) 토큰이 만료될 때 세션 만료를 강제 적용하고 Kafka 재인증 메커니즘 도 활성화합니다. 지정된 값이 액세스 토큰이 만료될 때까지 남은 시간보다 작으면 클라이언트는 실제 토큰 만료 전에 다시 인증해야 합니다. 기본적으로 액세스 토큰이 만료되고 클라이언트가 재인증을 시도하지 않으면 세션이 만료되지 않습니다.
6.4.9.2.3. 빠른 로컬 JWT 토큰 검증 구성
빠른 로컬 JWT 토큰 검증은 JWT 토큰 서명을 로컬로 확인합니다.
로컬 검사에서는 토큰이 있는지 확인합니다.
-
액세스 토큰에 대한
Bearer
의 (typ) 클레임 값을 포함하여 유형 준수 - 유효함 (종료되지 않음)
-
유효한IssuerURI
와 일치하는 발행자가 있음
권한 부여 서버에서 발행하지 않은 토큰이 거부되도록 리스너를 구성할 때 유효한 발행자 URI 를 지정합니다.
빠른 로컬 JWT 토큰 검증 중에 권한 부여 서버에 연결할 필요가 없습니다. OAuth 2.0 권한 부여 서버에서 노출하는 JWKs 엔드포인트 URI 를 지정하여 빠른 로컬 JWT 토큰 검증을 활성화합니다. 엔드포인트에는 Kafka 클라이언트가 인증 정보로 전송되는 서명된 JWT 토큰의 유효성을 검사하는 데 사용되는 공개 키가 포함되어 있습니다.
권한 부여 서버와의 모든 통신은 HTTPS를 사용하여 수행해야 합니다.
TLS 리스너의 경우 인증서 신뢰 저장소를 구성하고 truststore 파일을 가리킬 수 있습니다.
빠른 로컬 JWT 토큰 검증을 위한 속성의 예
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://<oauth_server_address>" \ 1 oauth.jwks.endpoint.uri="https://<oauth_server_address>/jwks" \ 2 oauth.jwks.refresh.seconds="300" \ 3 oauth.jwks.refresh.min.pause.seconds="1" \ 4 oauth.jwks.expiry.seconds="360" \ 5 oauth.username.claim="preferred_username" \ 6 oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 7 oauth.ssl.truststore.password="<truststore_password>" \ 8 oauth.ssl.truststore.type="PKCS12" ; 9
- 1
- 유효한 발행자 URI입니다. 이 발행자가 발행한 액세스 토큰만 허용됩니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME.
- 2
- JWKS 엔드포인트 URL입니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/certs.
- 3
- 엔드포인트 새로 고침 간격(기본값 300).
- 4
- JWKS 공개 키를 새로 고치는 시도 사이의 최소 일시 중지 시간(초)입니다. 알 수 없는 서명 키가 발생하면 JWKS 키 새로 고침이 마지막 새로 고침 시도 이후 지정된 일시 중지를 사용하여 정기적인 스케줄 외부에 예약됩니다. 키를 새로 고치는 것은 exponential backoff 규칙을 따르고,
oauth.jwks.refresh.seconds
에 도달할 때까지 일시 중지를 늘리면 실패한 새로 고침을 다시 시도합니다. 기본값은 1입니다. - 5
- JWKs 인증서가 만료되기 전에 유효한 것으로 간주됩니다. 기본값은
360
초입니다. 더 긴 시간을 지정하면 취소된 인증서에 대한 액세스를 허용할 위험을 고려하십시오. - 6
- 토큰에 실제 사용자 이름이 포함된 토큰 클레임(또는 키)입니다. 사용자 이름은 사용자를 식별하는 데 사용되는 주체 입니다. 값은 인증 흐름 및 사용된 권한 부여 서버에 따라 달라집니다. 필요한 경우
"['user.info'].['user.id']"
와 같은 JsonPath 표현식을 사용하여 토큰 내의 중첩된 JSON 속성에서 사용자 이름을 검색할 수 있습니다. - 7
- TLS 구성에 사용되는 신뢰 저장소의 위치입니다.
- 8
- 신뢰 저장소에 액세스하기 위한 암호입니다.
- 9
- PKCS #12 형식의 truststore 유형입니다.
6.4.9.2.4. OAuth 2.0 인트로스펙션 엔드 포인트 구성
OAuth 2.0 인트로스펙션 엔드포인트를 사용하는 토큰 검증은 수신된 액세스 토큰을 불투명으로 처리합니다. Kafka 브로커는 검증에 필요한 토큰 정보로 응답하는 인트로스펙션 엔드포인트에 액세스 토큰 토큰을 보냅니다. 중요한 것은 특정 액세스 토큰이 유효한 경우 최신 정보와 토큰이 만료되는 시기에 대한 정보를 반환합니다.
OAuth 2.0 인트로스펙션 기반 검증을 구성하려면 빠른 로컬 JWT 토큰 검증에 지정된 JWKs 엔드포인트 URI 대신 인트로스펙션 엔드포인트 URI를 지정합니다. 인증 서버에 따라 인트로스펙션 엔드포인트가 일반적으로 보호되므로 일반적으로 클라이언트 ID 및 클라이언트 시크릿 을 지정해야 합니다.
인트로스펙션 끝점의 속성 예
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.introspection.endpoint.uri="https://<oauth_server_address>/introspection" \ 1 oauth.client.id="kafka-broker" \ 2 oauth.client.secret="kafka-broker-secret" \ 3 oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ 4 oauth.ssl.truststore.password="<truststore_password>" \ 5 oauth.ssl.truststore.type="PKCS12" \ 6 oauth.username.claim="preferred_username" ; 7
- 1
- OAuth 2.0 인트로스펙션 엔드포인트 URI입니다. 예: https://AUTH-SERVER-ADDRESS/auth/realms/REALM-NAME/protocol/openid-connect/token/introspect.
- 2
- Kafka 브로커의 클라이언트 ID입니다.
- 3
- Kafka 브로커의 시크릿입니다.
- 4
- TLS 구성에 사용되는 신뢰 저장소의 위치입니다.
- 5
- 신뢰 저장소에 액세스하기 위한 암호입니다.
- 6
- PKCS #12 형식의 truststore 유형입니다.
- 7
- 토큰에 실제 사용자 이름이 포함된 토큰 클레임(또는 키)입니다. 사용자 이름은 사용자를 식별하는 데 사용되는 주체 입니다. 값은 인증 흐름 및 사용된 권한 부여 서버에 따라 달라집니다. 필요한 경우
"['user.info'].['user.id']"
와 같은 JsonPath 표현식을 사용하여 토큰 내의 중첩된 JSON 속성에서 사용자 이름을 검색할 수 있습니다.
6.4.9.3. Kafka 브로커에 대한 세션 재인증
Kafka 클라이언트와 Kafka 브로커 간의 OAuth 2.0 세션에 Kafka 세션 재인증 을 사용하도록 OAuth 리스너를 구성할 수 있습니다. 이 메커니즘은 정의된 기간 후에 클라이언트와 브로커 간에 인증된 세션 만료를 적용합니다. 세션이 만료되면 클라이언트는 기존 연결을 삭제하는 대신 다시 사용하여 새 세션을 즉시 시작합니다.
세션 재인증은 기본적으로 비활성화되어 있습니다. server.properties
파일에서 이를 활성화할 수 있습니다. OAUTHBEARER 또는 PLAIN이 SASL 메커니즘으로 활성화된 TLS 리스너에 대해 connections.max.reauth.ms
속성을 설정합니다.
리스너당 세션 재인증을 지정할 수 있습니다. 예를 들면 다음과 같습니다.
listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
세션 재인증은 클라이언트에서 사용하는 Kafka 클라이언트 라이브러리에서 지원해야 합니다.
세션 재인증은 빠른 로컬 JWT 또는 인트로스펙션 엔드포인트 토큰 검증과 함께 사용할 수 있습니다.
클라이언트 재인증
브로커의 인증된 세션이 만료되면 연결을 삭제하지 않고 새 유효한 액세스 토큰을 브로커에 전송하여 클라이언트가 기존 세션으로 다시 인증해야 합니다.
토큰 유효성 검사가 성공하면 기존 연결을 사용하여 새 클라이언트 세션이 시작됩니다. 클라이언트가 다시 인증되지 않으면 메시지를 보내거나 수신하려고 하면 브로커가 연결을 종료합니다. Kafka 클라이언트 라이브러리 2.2 이상을 사용하는 Java 클라이언트는 브로커에서 재인증 메커니즘이 활성화된 경우 자동으로 다시 인증됩니다.
세션 재인증은 사용되는 경우 토큰 새로 고침에도 적용됩니다. 세션이 만료되면 클라이언트는 새로 고침 토큰을 사용하여 액세스 토큰을 새로 고칩니다. 그런 다음 클라이언트는 새 액세스 토큰을 사용하여 기존 연결을 다시 인증합니다.
OAUTHBEARER 및 PLAIN의 세션 만료
세션 재인증이 구성되면 OAUTHBEARER 및 PLAIN 인증에 대해 세션 만료가 다르게 작동합니다.
OAUTHBEARER 및 PLAIN의 경우 클라이언트 ID 및 시크릿 방법을 사용합니다.
-
브로커의 인증된 세션은 구성된
connections.max.reauth.ms
에서 만료됩니다. - 구성된 시간 전에 액세스 토큰이 만료되면 세션이 이전에 만료됩니다.
장기 액세스 토큰 방법을 사용하는 PLAIN의 경우:
-
브로커의 인증된 세션은 구성된
connections.max.reauth.ms
에서 만료됩니다. - 구성된 시간 전에 액세스 토큰이 만료되면 재인증이 실패합니다. 세션 재인증이 시도되었지만 PLAIN에는 토큰을 새로 고치는 메커니즘이 없습니다.
connections.max.reauth.ms
가 구성되지 않은 경우 OAUTHBEARER 및 PLAIN 클라이언트는 다시 인증할 필요 없이 브로커에 무기한 연결을 유지할 수 있습니다. 인증된 세션은 액세스 토큰 만료로 끝나지 않습니다. 그러나 예를 들어 keycloak
권한 부여를 사용하거나 사용자 정의 승인기를 설치하여 권한을 구성할 때 이를 고려할 수 있습니다.
6.4.9.4. OAuth 2.0 Kafka 클라이언트 구성
Kafka 클라이언트는 다음 중 하나로 구성됩니다.
- 권한 부여 서버에서 유효한 액세스 토큰을 가져오는 데 필요한 인증 정보(클라이언트 ID 및 시크릿)
- 권한 부여 서버에서 제공하는 툴을 사용하여 얻은 유효한 장기 액세스 토큰 또는 새로 고침 토큰
Kafka 브로커로 전송된 유일한 정보는 액세스 토큰입니다. 액세스 토큰을 얻기 위해 권한 부여 서버로 인증하는 데 사용되는 인증 정보는 브로커로 전송되지 않습니다.
클라이언트에서 액세스 토큰을 가져올 때 권한 부여 서버와의 추가 통신이 필요하지 않습니다.
가장 간단한 메커니즘은 클라이언트 ID 및 시크릿을 사용한 인증입니다. 장기 액세스 토큰 또는 수명이 긴 새로 고침 토큰을 사용하면 권한 부여 서버 도구에 대한 추가 종속성이 있기 때문에 복잡성이 향상됩니다.
장기 액세스 토큰을 사용하는 경우 토큰의 최대 수명을 늘리려면 권한 부여 서버에서 클라이언트를 구성해야 할 수 있습니다.
Kafka 클라이언트가 액세스 토큰으로 직접 구성되지 않은 경우 클라이언트는 권한 부여 서버에 연결하여 Kafka 세션 시작 중에 액세스 토큰에 대한 인증 정보를 교환합니다. Kafka 클라이언트 교환:
- 클라이언트 ID 및 시크릿
- 클라이언트 ID, 새로 고침 토큰, 시크릿(선택 사항)
- 사용자 이름과 암호, 클라이언트 ID 및 시크릿 사용(선택 사항)
6.4.9.5. OAuth 2.0 클라이언트 인증 흐름
OAuth 2.0 인증 흐름은 기본 Kafka 클라이언트 및 Kafka 브로커 구성에 따라 다릅니다. 이 흐름은 사용된 권한 부여 서버에서도 지원해야 합니다.
Kafka 브로커 리스너 구성은 클라이언트가 액세스 토큰을 사용하여 인증하는 방법을 결정합니다. 클라이언트는 클라이언트 ID와 시크릿을 전달하여 액세스 토큰을 요청할 수 있습니다.
리스너가 PLAIN 인증을 사용하도록 구성된 경우 클라이언트는 클라이언트 ID 및 시크릿 또는 사용자 이름 및 액세스 토큰으로 인증할 수 있습니다. 이러한 값은 PLAIN 메커니즘의 사용자 이름
및 암호
속성으로 전달됩니다.
리스너 구성은 다음 토큰 검증 옵션을 지원합니다.
- 권한 부여 서버에 연결하지 않고도 JWT 서명 확인 및 로컬 토큰 인트로스펙션에 따라 빠른 로컬 토큰 검증을 사용할 수 있습니다. 권한 부여 서버는 토큰의 서명의 유효성을 검사하는 데 사용되는 공용 인증서가 있는 JWKS 엔드포인트를 제공합니다.
- 권한 부여 서버에서 제공하는 토큰 인트로스펙션 엔드포인트를 호출할 수 있습니다. 새 Kafka 브로커 연결이 설정될 때마다 브로커는 클라이언트에서 수신한 액세스 토큰을 권한 부여 서버로 전달합니다. Kafka 브로커는 토큰이 유효한지 여부를 확인하기 위해 응답을 확인합니다.
권한 부여 서버는 불투명 액세스 토큰만 사용할 수 있으므로 로컬 토큰 유효성 검사는 사용할 수 없습니다.
다음과 같은 인증 유형에 대해 Kafka 클라이언트 인증 정보를 구성할 수도 있습니다.
- 이전에 생성된 장기 액세스 토큰을 사용하여 로컬 액세스 직접
- 새 액세스 토큰을 발행하기 위해 권한 부여 서버와 연락합니다(클라이언트 ID와 시크릿 또는 새로 고침 토큰 또는 사용자 이름과 암호 사용)
6.4.9.5.1. SASL OAUTHBEARER 메커니즘을 사용한 클라이언트 인증 흐름의 예
SASL OAUTHBEARER 메커니즘을 사용하여 Kafka 인증에 다음 통신 흐름을 사용할 수 있습니다.
브로커에서 인증 서버에 검증을 위임하는 클라이언트 ID와 시크릿을 사용하는 클라이언트
- Kafka 클라이언트는 클라이언트 ID 및 시크릿을 사용하여 권한 부여 서버에서 액세스 토큰을 요청하고 선택적으로 새로 고침 토큰을 요청합니다. 또는 클라이언트는 사용자 이름과 암호를 사용하여 인증할 수 있습니다.
- 권한 부여 서버에서 새 액세스 토큰을 생성합니다.
- Kafka 클라이언트는 SASL OAUTHBEARER 메커니즘을 사용하여 액세스 토큰을 전달하여 Kafka 브로커로 인증합니다.
- Kafka 브로커는 자체 클라이언트 ID와 시크릿을 사용하여 권한 부여 서버에서 토큰 인트로스펙션 엔드포인트를 호출하여 액세스 토큰을 검증합니다.
- 토큰이 유효한 경우 Kafka 클라이언트 세션이 설정됩니다.
브로커가 빠른 로컬 토큰 검증을 수행하는 클라이언트 ID 및 시크릿을 사용하는 클라이언트
- Kafka 클라이언트는 클라이언트 ID 및 시크릿을 사용하여 토큰 끝점의 권한 부여 서버와 필요한 경우 새로 고침 토큰을 사용하여 인증합니다. 또는 클라이언트는 사용자 이름과 암호를 사용하여 인증할 수 있습니다.
- 권한 부여 서버에서 새 액세스 토큰을 생성합니다.
- Kafka 클라이언트는 SASL OAUTHBEARER 메커니즘을 사용하여 액세스 토큰을 전달하여 Kafka 브로커로 인증합니다.
- Kafka 브로커는 JWT 토큰 서명 검사 및 로컬 토큰 인트로스펙션을 사용하여 액세스 토큰을 로컬로 검증합니다.
브로커로 권한 부여 서버에 검증을 위임하는 장기 액세스 토큰을 사용하는 클라이언트
- Kafka 클라이언트는 SASL OAUTHBEARER 메커니즘을 사용하여 Kafka 브로커로 인증하여 장기 액세스 토큰을 전달합니다.
- Kafka 브로커는 자체 클라이언트 ID와 시크릿을 사용하여 권한 부여 서버에서 토큰 인트로스펙션 엔드포인트를 호출하여 액세스 토큰을 검증합니다.
- 토큰이 유효한 경우 Kafka 클라이언트 세션이 설정됩니다.
브로커가 빠른 로컬 유효성 검사를 수행하는 장기 액세스 토큰을 사용하는 클라이언트
- Kafka 클라이언트는 SASL OAUTHBEARER 메커니즘을 사용하여 Kafka 브로커로 인증하여 장기 액세스 토큰을 전달합니다.
- Kafka 브로커는 JWT 토큰 서명 검사 및 로컬 토큰 인트로스펙션을 사용하여 액세스 토큰을 로컬로 검증합니다.
빠른 로컬 JWT 토큰 서명 검증은 토큰이 취소된 경우 권한 부여 서버와 확인되지 않으므로 수명이 짧은 토큰에만 적합합니다. 토큰 만료는 토큰에 기록되지만 취소는 언제든지 발생할 수 있으므로 권한 부여 서버에 연결하지 않아도 됩니다. 발행된 토큰은 만료될 때까지 유효한 것으로 간주됩니다.
6.4.9.5.2. SASL PLAIN 메커니즘을 사용한 클라이언트 인증 흐름의 예
OAuth PLAIN 메커니즘을 사용하여 Kafka 인증에 다음 통신 흐름을 사용할 수 있습니다.
브로커가 클라이언트의 액세스 토큰을 가져오는 클라이언트 ID와 시크릿을 사용하는 클라이언트
-
Kafka 클라이언트는 사용자 이름으로
clientId
를 전달하고시크릿
을 암호로 전달합니다. -
Kafka 브로커는 토큰 끝점을 사용하여
clientId
및secret
을 권한 부여 서버에 전달합니다. - 인증 서버는 새로운 액세스 토큰을 반환하거나 클라이언트 자격 증명이 유효하지 않은 경우 오류를 반환합니다.
Kafka 브로커는 다음 방법 중 하나로 토큰을 검증합니다.
- 토큰 인트로스펙션 엔드포인트가 지정된 경우 Kafka 브로커는 권한 부여 서버에서 엔드포인트를 호출하여 액세스 토큰을 검증합니다. 토큰 유효성 검사가 성공하면 세션이 설정됩니다.
- 로컬 토큰 인트로스펙션이 사용되는 경우 권한 부여 서버에 대한 요청이 이루어지지 않습니다. Kafka 브로커는 JWT 토큰 서명 검사를 사용하여 액세스 토큰을 로컬로 검증합니다.
클라이언트 ID 및 시크릿 없이 장기 액세스 토큰을 사용하는 클라이언트
- Kafka 클라이언트는 사용자 이름과 암호를 전달합니다. 암호는 수동으로 가져와 클라이언트를 실행하기 전에 구성한 액세스 토큰의 값을 제공합니다.
이 암호는 Kafka 브로커 리스너가 인증을 위해 토큰 끝점으로 구성되었는지 여부에 따라
$accessToken:
문자열 접두사와 함께 전달됩니다.-
토큰 끝점이 구성된 경우 브로커에 password 매개변수에 클라이언트 시크릿이 아닌 액세스 토큰이 포함되어 있음을 알 수 있도록 암호 앞에
$accessToken:
가 붙여야 합니다. Kafka 브로커는 사용자 이름을 계정 사용자 이름으로 해석합니다. -
토큰 끝점이 Kafka 브로커 리스너에 구성되지 않은 경우(
no-client-credentials 모드
사용) 암호는 접두사 없이 액세스 토큰을 제공해야 합니다. Kafka 브로커는 사용자 이름을 계정 사용자 이름으로 해석합니다. 이 모드에서 클라이언트는 클라이언트 ID와 시크릿을 사용하지 않으며password
매개변수는 항상 원시 액세스 토큰으로 해석됩니다.
-
토큰 끝점이 구성된 경우 브로커에 password 매개변수에 클라이언트 시크릿이 아닌 액세스 토큰이 포함되어 있음을 알 수 있도록 암호 앞에
Kafka 브로커는 다음 방법 중 하나로 토큰을 검증합니다.
- 토큰 인트로스펙션 엔드포인트가 지정된 경우 Kafka 브로커는 권한 부여 서버에서 엔드포인트를 호출하여 액세스 토큰을 검증합니다. 토큰 유효성 검사가 성공하면 세션이 설정됩니다.
- 로컬 토큰 인트로스펙션이 사용되는 경우 권한 부여 서버에 대한 요청이 없습니다. Kafka 브로커는 JWT 토큰 서명 검사를 사용하여 액세스 토큰을 로컬로 검증합니다.
6.4.9.6. OAuth 2.0 인증 구성
OAuth 2.0은 Kafka 클라이언트와 AMQ Streams 구성 요소 간의 상호 작용에 사용됩니다.
AMQ Streams에 OAuth 2.0을 사용하려면 다음을 수행해야 합니다.
6.4.9.6.1. Red Hat Single Sign-On을 OAuth 2.0 인증 서버로 구성
다음 절차에서는 Red Hat Single Sign-On을 인증 서버로 배포하고 AMQ Streams와의 통합을 위해 구성하는 방법을 설명합니다.
권한 부여 서버는 인증 및 권한 부여와 사용자, 클라이언트 및 권한 관리를 위한 중앙 지점을 제공합니다. Red Hat Single Sign-On에는 영역이 별도의 사용자, 클라이언트, 권한 및 기타 구성을 나타내는 영역 개념이 있습니다. 기본 마스터 영역을 사용하거나 새 마스터 영역을 생성할 수 있습니다. 각 영역은 자체 OAuth 2.0 엔드포인트를 노출합니다. 즉, 애플리케이션 클라이언트와 애플리케이션 서버가 모두 동일한 영역을 사용해야 합니다.
AMQ Streams와 함께 OAuth 2.0을 사용하려면 Red Hat Single Sign-On을 배포하여 인증 영역을 생성하고 관리합니다.
Red Hat Single Sign-On이 이미 배포된 경우 배포 단계를 건너뛰고 현재 배포를 사용할 수 있습니다.
사전 준비 사항
Red Hat Single Sign-On 사용에 대해 잘 알고 있어야 합니다.
설치 및 관리 지침은 다음을 참조하십시오.
사전 요구 사항
- AMQ Streams 및 Kafka가 실행 중입니다.
Red Hat Single Sign-On 배포의 경우:
프로세스
Red Hat Single Sign-On 설치.
ZIP 파일에서 설치하거나 RPM을 사용하여 설치할 수 있습니다.
Red Hat Single Sign-On 관리 콘솔에 로그인하여 AMQ Streams에 대한 OAuth 2.0 정책을 생성합니다.
Red Hat Single Sign-On을 배포할 때 로그인 세부 정보가 제공됩니다.
영역을 생성하고 활성화합니다.
기존 마스터 영역을 사용할 수 있습니다.
- 필요한 경우 영역의 세션 및 토큰 타임아웃을 조정합니다.
-
kafka-broker
라는 클라이언트를 생성합니다. 설정 탭에서 다음을 설정합니다.
-
액세스 유형 (
Confidential
) -
이 클라이언트에 대한 웹 로그인을 비활성화하려면 표준 흐름을
OFF
로 활성화 -
이 클라이언트가 고유한 이름으로 인증할 수 있도록 활성화됨
-
액세스 유형 (
- 계속하기 전에 저장을 클릭합니다.
- 탭에서 AMQ Streams Kafka 클러스터 구성에서 사용할 시크릿을 기록합니다.
Kafka 브로커에 연결할 모든 애플리케이션 클라이언트에 대해 클라이언트 생성 단계를 반복합니다.
각 새 클라이언트에 대한 정의를 생성합니다.
구성에서 이름을 클라이언트 ID로 사용합니다.
다음에 수행할 작업
권한 부여 서버를 배포하고 구성한 후 OAuth 2.0을 사용하도록 Kafka 브로커를 구성합니다.
6.4.9.6.2. Kafka 브로커에 대한 OAuth 2.0 지원 구성
다음 절차에서는 브로커 리스너가 권한 부여 서버를 사용하여 OAuth 2.0 인증을 사용하도록 Kafka 브로커를 구성하는 방법을 설명합니다.
TLS 리스너 구성을 통해 암호화된 인터페이스를 통해 OAuth 2.0을 사용하는 것이 좋습니다. 일반 리스너는 권장되지 않습니다.
선택한 권한 부여 서버를 지원하는 속성과 구현 중인 권한 부여 유형을 사용하여 Kafka 브로커를 구성합니다.
시작하기 전
Kafka 브로커 리스너의 구성 및 인증에 대한 자세한 내용은 다음을 참조하십시오.
리스너 구성에 사용되는 속성에 대한 설명은 다음을 참조하십시오.
사전 요구 사항
- AMQ Streams 및 Kafka가 실행 중입니다.
- OAuth 2.0 인증 서버가 배포됨
프로세스
server.properties
파일에서 Kafka 브로커 리스너 구성을 구성합니다.예를 들어 OAUTHBEARER 메커니즘을 사용합니다.
sasl.enabled.mechanisms=OAUTHBEARER listeners=CLIENT://0.0.0.0:9092 listener.security.protocol.map=CLIENT:SASL_PLAINTEXT listener.name.client.sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=OAUTHBEARER inter.broker.listener.name=CLIENT listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ; listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
broker 연결 설정을
listener.name.client.oauthbearer.sasl.jaas.config
의 일부로 구성합니다.이 예제에서는 연결 구성 옵션을 보여줍니다.
예 1: JWKS 엔드포인트 구성을 사용한 로컬 토큰 검증
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.valid.issuer.uri="https://<oauth_server_address>/auth/realms/<realm_name>" \ oauth.jwks.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/certs" \ oauth.jwks.refresh.seconds="300" \ oauth.jwks.refresh.min.pause.seconds="1" \ oauth.jwks.expiry.seconds="360" \ oauth.username.claim="preferred_username" \ oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ oauth.ssl.truststore.password="<truststore_password>" \ oauth.ssl.truststore.type="PKCS12" ; listener.name.client.oauthbearer.connections.max.reauth.ms=3600000
예 2: OAuth 2.0 인트로스펙션 끝점을 통해 인증 서버에 토큰 검증 위임
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.introspection.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/introspection" \ # ...
필요한 경우 권한 부여 서버에 대한 액세스를 구성합니다.
서비스 메시 와 같은 기술을 사용하여 컨테이너 외부의 보안 채널을 구성하지 않는 한 일반적으로 프로덕션 환경에 이 단계가 필요합니다.
보안 권한 부여 서버에 연결하기 위한 사용자 정의 신뢰 저장소를 제공합니다. 권한 부여 서버에 액세스하려면 항상 SSL이 필요합니다.
속성을 설정하여 신뢰 저장소를 구성합니다.
예를 들면 다음과 같습니다.
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.client.id="kafka-broker" \ oauth.client.secret="kafka-broker-secret" \ oauth.ssl.truststore.location="<path_to_truststore_p12_file>" \ oauth.ssl.truststore.password="<truststore_password>" \ oauth.ssl.truststore.type="PKCS12" ;
인증서 호스트 이름이 액세스 URL 호스트 이름과 일치하지 않으면 인증서 호스트 이름 검증을 끌 수 있습니다.
oauth.ssl.endpoint.identification.algorithm=""
검사를 통해 권한 부여 서버에 대한 클라이언트 연결이 정품인지 확인합니다. 비프로덕션 환경에서 검증을 해제할 수 있습니다.
선택한 인증 흐름에 따라 추가 속성을 구성합니다.
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.token.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/token" \ 1 oauth.custom.claim.check="@.custom == 'custom-value'" \ 2 oauth.scope="<scope>" \ 3 oauth.check.audience="true" \ 4 oauth.audience="<audience>" \ 5 oauth.valid.issuer.uri="https://https://<oauth_server_address>/auth/<realm_name>" \ 6 oauth.client.id="kafka-broker" \ 7 oauth.client.secret="kafka-broker-secret" \ 8 oauth.connect.timeout.seconds=60 \ 9 oauth.read.timeout.seconds=60 \ 10 oauth.http.retries=2 \ 11 oauth.http.retry.pause.millis=300 \ 12 oauth.groups.claim="$.groups" \ 13 oauth.groups.claim.delimiter="," \ 14 oauth.include.accept.header="false" ; 15
- 1
- 권한 부여 서버의 OAuth 2.0 토큰 끝점 URL입니다. 프로덕션의 경우 항상
https://
urls를 사용합니다.KeycloakAuthorizer
를 사용하거나 OAuth 2.0이 활성화된 리스너가 뇌 간 통신에 사용되는 경우 필요합니다. - 2
- (선택 사항) 사용자 정의 클레임 검사. 검증 중에 JWT 액세스 토큰에 추가 사용자 지정 규칙을 적용하는 JsonPath 필터 쿼리입니다. 액세스 토큰에 필요한 데이터가 포함되어 있지 않으면 거부됩니다. 인트로스펙션 엔드포인트 방법을 사용하는 경우 사용자 정의 검사가 인트로스펙션 엔드포인트 응답 JSON에 적용됩니다.
- 3
- (선택 사항) 토큰 끝점에 전달되는
scope
매개변수입니다. 범위는 broker 간 인증을 위한 액세스 토큰을 가져올 때 사용됩니다.clientId
및secret
을 사용하는 PLAIN 클라이언트 인증의 OAuth 2.0용 클라이언트 이름에도 사용됩니다. 이는 권한 부여 서버에 따라 토큰과 토큰의 콘텐츠에만 영향을 미칩니다. 리스너의 토큰 검증 규칙에는 영향을 미치지 않습니다. - 4
- (선택 사항) Audience 확인 권한 부여 서버가
aud
(audience) 클레임을 제공하고 대상 검사를 적용하려는 경우ouath.check.audience
를true
로 설정합니다. 대상 검사에서는 의도된 토큰 수신자를 식별합니다. 결과적으로 Kafka 브로커는aud
클레임에clientId
가 없는 토큰을 거부합니다. 기본값은false
입니다. - 5
- (선택 사항) 토큰 끝점에 전달되는
대상
매개변수입니다. 대상 은broker 간 인증을 위한 액세스 토큰을 가져올 때 사용됩니다.clientId
및secret
을 사용하는 PLAIN 클라이언트 인증의 OAuth 2.0용 클라이언트 이름에도 사용됩니다. 이는 권한 부여 서버에 따라 토큰과 토큰의 콘텐츠에만 영향을 미칩니다. 리스너의 토큰 검증 규칙에는 영향을 미치지 않습니다. - 6
- 유효한 발행자 URI입니다. 이 발행자가 발행한 액세스 토큰만 허용됩니다. (필수 사항)
- 7
- 모든 브로커에 대해 동일한 Kafka 브로커의 구성된 클라이언트 ID입니다. 이 클라이언트는 권한 부여 서버에
kafka-broker
로 등록된 클라이언트 입니다. 인트로스펙션 엔드포인트가 토큰 검증에 사용되거나KeycloakAuthorizer
가 사용되는 경우 필요합니다. - 8
- 모든 브로커에 대해 동일한 Kafka 브로커에 대해 구성된 시크릿입니다. 브로커가 권한 부여 서버에 인증해야 하는 경우 클라이언트 시크릿에 액세스 토큰 또는 새로 고침 토큰을 지정해야 합니다.
- 9
- (선택 사항) 권한 부여 서버에 연결할 때 연결 시간 초과(초)입니다. 기본값은 60입니다.
- 10
- (선택 사항) 권한 부여 서버에 연결할 때 읽기 제한 시간(초)입니다. 기본값은 60입니다.
- 11
- 권한 부여 서버에 실패한 HTTP 요청을 재시도하는 최대 횟수입니다. 기본값은 0입니다. 즉, 재시도가 수행되지 않습니다. 이 옵션을 효과적으로 사용하려면
oauth.connect.timeout.seconds
및oauth.read.timeout.seconds
옵션의 시간 초과 시간을 줄이는 것이 좋습니다. 그러나 재시도하면 현재 작업자 스레드를 다른 요청에 사용할 수 없게 될 수 있으며 요청이 너무 많으면 Kafka 브로커가 응답하지 않을 수 있습니다. - 12
- 권한 부여 서버에 실패한 HTTP 요청을 다시 시도하기 전에 대기하는 시간입니다. 기본적으로 이 시간은 0으로 설정되어 있으므로 일시 중지가 적용되지 않습니다. 이는 실패한 요청을 유발하는 많은 문제가 요청당 네트워크 결함 또는 프록시 문제로 인해 신속하게 해결할 수 있기 때문입니다. 그러나 권한 부여 서버가 과부하가 발생하거나 트래픽이 많은 경우 이 옵션을 100ms 이상으로 설정하여 서버의 부하를 줄이고 성공적인 재시도 가능성을 높일 수 있습니다.
- 13
- JWT 토큰 또는 인트로스펙션 엔드포인트 응답에서 그룹 정보를 추출하는 데 사용되는 JsonPath 쿼리입니다. 기본적으로 설정되지 않습니다. 사용자 지정 작성자가 사용자 그룹을 기반으로 권한 부여 결정을 내리는 데 사용할 수 있습니다.
- 14
- 그룹으로 구분된 단일 문자열로 반환되는 경우 그룹 정보를 구문 분석하는 데 사용되는 구분 기호입니다. 기본값은 ',' (comma)입니다.
- 15
- (선택 사항)
oauth.include.accept.header
를false
로 설정하여 요청에서Accept
헤더를 제거합니다. 헤더를 포함하면 권한 부여 서버와 통신할 때 문제가 발생하는 경우 이 설정을 사용할 수 있습니다.
OAuth 2.0 인증을 적용하는 방법과 사용 중인 인증 서버 유형에 따라 추가 구성 설정을 추가합니다.
listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ # ... oauth.check.issuer=false \ 1 oauth.fallback.username.claim="<client_id>" \ 2 oauth.fallback.username.prefix="<client_account>" \ 3 oauth.valid.token.type="bearer" \ 4 oauth.userinfo.endpoint.uri="https://<oauth_server_address>/auth/realms/<realm_name>/protocol/openid-connect/userinfo" ; 5
- 1
- 권한 부여 서버에서
iss
클레임을 제공하지 않으면 발급자 검사를 수행할 수 없습니다. 이 경우oauth.check.issuer
를false
로 설정하고oauth.valid.issuer.uri
를 지정하지 마십시오. 기본값은true
입니다. - 2
- 권한 부여 서버는 일반 사용자와 클라이언트를 모두 식별하는 단일 속성을 제공하지 않을 수 있습니다. 클라이언트가 자체 이름으로 인증하면 서버에서 클라이언트 ID 를 제공할 수 있습니다. 사용자가 사용자 이름과 암호를 사용하여 인증하는 경우 새로 고침 토큰 또는 액세스 토큰을 가져오기 위해 서버에서 클라이언트 ID 외에 username 속성을 제공할 수 있습니다. 기본 사용자 ID 속성을 사용할 수 없는 경우 사용할 사용자 이름 클레임(attribute)을 지정하려면 이 대체 옵션을 사용합니다. 필요한 경우
"['client.info'].['client.id']"
와 같은 JsonPath 표현식을 사용하여 토큰 내의 중첩된 JSON 속성에서 대체 사용자 이름을 검색할 수 있습니다. - 3
oauth.fallback.username.claim
이 적용되는 경우 사용자 이름 클레임 값과 대체 사용자 이름 클레임의 이름 충돌을 방지할 수도 있습니다.producer
라는 클라이언트가 존재하지만producer
라는 일반 사용자도 존재하는 상황을 고려하십시오. 이 속성을 사용하여 클라이언트의 사용자 ID에 접두사를 추가할 수 있습니다.- 4
- (
oauth.introspection.endpoint.uri
) 사용 중인 권한 부여 서버에 따라 인트로스펙션 엔드 포인트가 토큰 유형 속성을 반환하거나 반환하지 않거나 다른 값을 포함할 수 있습니다. 인트로스펙션 끝점의 응답에 포함되어야 하는 유효한 토큰 유형 값을 지정할 수 있습니다. - 5
- (
oauth.introspection.endpoint.uri
) 사용 시에만 적용할 수 있습니다. 인증 서버는 인트로스펙션 엔드포인트 응답에서 식별 가능한 정보를 제공하지 않기 위해 인증 서버를 구성하거나 구현할 수 있습니다. 사용자 ID를 얻으려면userinfo
끝점의 URI를 폴백으로 구성할 수 있습니다.oauth.fallback.username.claim
,oauth.fallback.username.claim
,oauth.fallback.username.prefix
설정은userinfo
엔드포인트 응답에 적용됩니다.
다음에 수행할 작업
6.4.9.6.3. OAuth 2.0을 사용하도록 Kafka Java 클라이언트 구성
Kafka 브로커와의 상호 작용에 OAuth 2.0을 사용하도록 Kafka 생산자 및 소비자 API를 구성합니다. 클라이언트 pom.xml
파일에 콜백 플러그인을 추가한 다음 OAuth 2.0에 대해 클라이언트를 구성합니다.
클라이언트 구성에 다음을 지정합니다.
SASL(Simple Authentication and Security Layer) 보안 프로토콜:
-
TLS 암호화 연결을 통한 인증을 위한
SASL_SSL
암호화되지 않은 연결을 통한 인증을 위한
SASL_PLAINTEXT
프로덕션에는
SASL_SSL
을 사용하고 로컬 개발에는SASL_PLAINTEXT
를 사용하십시오.SASL_SSL
을 사용하는 경우 추가ssl.truststore
구성이 필요합니다. OAuth 2.0 권한 부여 서버에 대한 보안 연결(https://
)을 위해서는 truststore 구성이 필요합니다. OAuth 2.0 권한 부여 서버를 확인하려면 인증 서버의 CA 인증서를 클라이언트 구성의 신뢰 저장소에 추가합니다. PEM 또는 PKCS #12 형식으로 신뢰 저장소를 구성할 수 있습니다.
-
TLS 암호화 연결을 통한 인증을 위한
Kafka SASL 메커니즘:
-
전달자 토큰을 사용하여 인증 정보 교환을 위한
OAUTHBEARER
-
클라이언트 인증 정보(clientId + 시크릿) 또는 액세스 토큰을 전달하는
PLAIN
-
전달자 토큰을 사용하여 인증 정보 교환을 위한
SASL 메커니즘을 구현하는 JAAS(Java Authentication and Authorization Service) 모듈:
-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
은 OAuthbearer 메커니즘을 구현합니다. -
org.apache.kafka.common.security.plain.PlainLoginModule
은 일반 메커니즘을 구현합니다.
OAuthbearer 메커니즘을 사용할 수 있으려면 사용자 정의
io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
클래스를 콜백 처리기로 추가해야 합니다.JaasClientOauthLoginCallbackHandler
는 클라이언트 로그인 중에 토큰을 액세스하기 위해 권한 부여 서버로의 OAuth 콜백을 처리합니다. 이를 통해 자동 토큰 갱신을 통해 사용자 개입 없이 지속적인 인증을 보장할 수 있습니다. 또한 OAuth 2.0 암호 부여 방법을 사용하여 클라이언트의 로그인 인증 정보를 처리합니다.-
SASL 인증 속성은 다음 인증 방법을 지원합니다.
- OAuth 2.0 클라이언트 인증 정보
- OAuth 2.0 암호 부여(더 이상 사용되지 않음)
- 액세스 토큰
- 토큰 새로 고침
SASL 인증 속성을 JAAS 구성(
sasl.jaas.config
및sasl.login.callback.handler.class
)으로 추가합니다. 인증 속성을 구성하는 방법은 OAuth 2.0 권한 부여 서버에 액세스하기 위해 사용 중인 인증 방법에 따라 다릅니다. 이 절차에서 속성은 속성 파일에 지정 된 다음 클라이언트 구성에 로드됩니다.In this procedure, the properties are specified in a properties file, then loaded into the client configuration.
인증 속성을 환경 변수로 지정하거나 Java 시스템 속성으로 지정할 수도 있습니다. Java 시스템 속성의 경우 setProperty
를 사용하여 설정하고 -D
옵션을 사용하여 명령줄에서 전달할 수 있습니다.
사전 요구 사항
- AMQ Streams 및 Kafka가 실행 중입니다.
- OAuth 2.0 권한 부여 서버가 Kafka 브로커에 대한 OAuth 액세스용으로 배포 및 구성됨
- Kafka 브로커는 OAuth 2.0으로 구성되어 있습니다.
프로세스
OAuth 2.0 지원이 포함된 클라이언트 라이브러리를 Kafka 클라이언트의
pom.xml
파일에 추가합니다.<dependency> <groupId>io.strimzi</groupId> <artifactId>kafka-oauth-client</artifactId> <version>0.14.0.redhat-00006</version> </dependency>
속성 파일에 다음 구성을 지정하여 클라이언트 속성을 구성합니다.
- 보안 프로토콜
- SASL 메커니즘
사용 방법에 따른 JAAS 모듈 및 인증 속성
예를 들어
client.properties
파일에 다음을 추가할 수 있습니다.클라이언트 인증 메커니즘 속성
security.protocol=SASL_SSL 1 sasl.mechanism=OAUTHBEARER 2 ssl.truststore.location=/tmp/truststore.p12 3 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ 4 oauth.client.id="<client_id>" \ 5 oauth.client.secret="<client_secret>" \ 6 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ 7 oauth.ssl.truststore.password="$STOREPASS" \ 8 oauth.ssl.truststore.type="PKCS12" \ 9 oauth.scope="<scope>" \ 10 oauth.audience="<audience>" ; 11 sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- TLS 암호화 연결을 위한
SASL_SSL
보안 프로토콜. 로컬 개발을 위해 암호화되지 않은 연결보다SASL_PLAINTEXT
를 사용하십시오. - 2
OAUTHBEARER
또는PLAIN
으로 지정된 SASL 메커니즘 .- 3
- Kafka 클러스터에 대한 보안 액세스를 위한 신뢰 저장소 구성입니다.
- 4
- 권한 부여 서버 토큰 끝점의 URI입니다.
- 5
- 권한 부여 서버에서 클라이언트를 생성할 때 사용되는 이름인 클라이언트 ID입니다.
- 6
- 권한 부여 서버에서 클라이언트를 생성할 때 생성된 클라이언트 시크릿입니다.
- 7
- 위치에는 권한 부여 서버의 공개 키 인증서(
truststore.p12
)가 포함되어 있습니다. - 8
- truststore에 액세스하기 위한 암호입니다.
- 9
- truststore 유형입니다.
- 10
- (선택 사항) 토큰 끝점에서 토큰을 요청하는
범위
입니다. 권한 부여 서버에는 클라이언트가 범위를 지정해야 할 수 있습니다. - 11
- (선택 사항) 토큰 끝점에서 토큰을 요청하는 대상입니다.
권한 부여 서버에는 클라이언트가 대상을 지정해야 할 수 있습니다.
암호는 메커니즘 속성 부여
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ 1 oauth.client.secret="<client_secret>" \ 2 oauth.password.grant.username="<username>" \ 3 oauth.password.grant.password="<password>" \ 4 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" \ oauth.scope="<scope>" \ oauth.audience="<audience>" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- 권한 부여 서버에서 클라이언트를 생성할 때 사용되는 이름인 클라이언트 ID입니다.
- 2
- (선택 사항) 권한 부여 서버에서 클라이언트를 생성할 때 생성된 클라이언트 시크릿입니다.
- 3
- 암호 부여 인증의 사용자 이름입니다. OAuth 암호 부여 구성(사용자 이름 및 암호)은 OAuth 2.0 암호 부여 방법을 사용합니다. 암호 부여를 사용하려면 제한된 권한이 있는 권한 부여 서버에서 클라이언트에 대한 사용자 계정을 만듭니다. 계정은 서비스 계정처럼 작동해야 합니다. 인증에 사용자 계정이 필요하지만 새로 고침 토큰을 사용하는 것이 좋습니다.
- 4
- 암호 부여 인증입니다.참고
SASL PLAIN은 OAuth 2.0 암호 부여 방법을 사용하여 사용자 이름과 암호(암호 부여)를 전달하는 것을 지원하지 않습니다.
토큰 속성에 액세스
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.access.token="<access_token>" \ 1 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
- 1
- Kafka 클라이언트용 장기 액세스 토큰입니다.
토큰 속성 새로 고침
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER ssl.truststore.location=/tmp/truststore.p12 ssl.truststore.password=$STOREPASS ssl.truststore.type=PKCS12 sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="<token_endpoint_url>" \ oauth.client.id="<client_id>" \ 1 oauth.client.secret="<client_secret>" \ 2 oauth.refresh.token="<refresh_token>" \ 3 oauth.ssl.truststore.location="/tmp/oauth-truststore.p12" \ oauth.ssl.truststore.password="$STOREPASS" \ oauth.ssl.truststore.type="PKCS12" ; sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
OAUTH 2.0 인증의 클라이언트 속성을 Java 클라이언트 코드에 입력합니다.
클라이언트 속성의 입력 표시 예
Properties props = new Properties(); try (FileReader reader = new FileReader("client.properties", StandardCharsets.UTF_8)) { props.load(reader); }
- Kafka 클라이언트가 Kafka 브로커에 액세스할 수 있는지 확인합니다.
6.4.10. OAuth 2.0 토큰 기반 권한 부여 사용
AMQ Streams는 Red Hat Single Sign-On 인증 서비스를 통해 OAuth 2.0 토큰 기반 권한 사용을 지원하므로 보안 정책과 권한을 중앙에서 관리할 수 있습니다.
Red Hat Single Sign-On에 정의된 보안 정책 및 권한은 Kafka 브로커의 리소스에 대한 액세스 권한을 부여하는 데 사용됩니다. 사용자와 클라이언트는 Kafka 브로커에서 특정 작업을 수행하기 위해 액세스를 허용하는 정책과 일치합니다.
Kafka는 기본적으로 모든 사용자에게 브로커에 대한 전체 액세스를 허용하며 AclAuthorizer
및 StandardAuthorizer
플러그인도 제공하여 ACL(Access Control Lists)을 기반으로 권한 부여를 구성합니다. 이러한 플러그인에서 관리하는 ACL 규칙은 사용자 이름을 기반으로 리소스에 대한 액세스 권한을 부여하거나 거부하는 데 사용되며 이러한 규칙은 Kafka 클러스터 자체에 저장됩니다. 그러나 Red Hat Single Sign-On을 사용한 OAuth 2.0 토큰 기반 권한 부여는 Kafka 브로커에 대한 액세스 제어를 구현하는 방법에 대한 유연성을 훨씬 높여 줍니다. 또한 OAuth 2.0 권한 부여 및 ACL을 사용하도록 Kafka 브로커를 구성할 수 있습니다.
6.4.10.1. OAuth 2.0 인증 메커니즘
AMQ Streams의 OAuth 2.0 인증에서는 Red Hat Single Sign-On 서버 권한 부여 REST 끝점을 사용하여 특정 사용자에게 정의된 보안 정책을 적용하고 해당 사용자의 다양한 리소스에 부여된 권한 목록을 제공하여 Red Hat Single Sign-On으로 토큰 기반 인증을 확장합니다. 정책은 역할과 그룹을 사용하여 사용자와 권한을 일치시킵니다. OAuth 2.0 권한 부여는 Red Hat Single Sign-On 인증 서비스에서 사용자에 대한 수신된 권한 목록을 기반으로 권한을 로컬로 적용합니다.
6.4.10.1.1. Kafka 브로커 사용자 정의 승인
Red Hat Single Sign-On 승인자 (KeycloakAuthorizer
)는 AMQ Streams와 함께 제공됩니다. Red Hat Single Sign-On에서 제공하는 권한 부여 서비스에 Red Hat Single Sign-On REST 엔드포인트를 사용하려면 Kafka 브로커에서 사용자 지정 승인자를 구성합니다.
인증자는 필요에 따라 권한 부여 서버에서 부여된 권한 목록을 가져와서 Kafka 브로커에 로컬로 권한 부여를 적용하여 각 클라이언트 요청에 대해 신속하게 권한 부여 결정을 내립니다.
6.4.10.2. OAuth 2.0 권한 부여 지원 구성
다음 절차에서는 Red Hat Single Sign-On 인증 서비스를 사용하여 OAuth 2.0 인증을 사용하도록 Kafka 브로커를 구성하는 방법을 설명합니다.
사전 준비 사항
필요한 액세스 또는 특정 사용자에 대해 제한하려는 경우를 고려하십시오. Red Hat Single Sign-On 그룹,역할,클라이언트 및 사용자를 결합하여 Red Hat Single Sign-On에서 액세스를 구성할 수 있습니다.
일반적으로 그룹은 조직 부서 또는 지리적 위치를 기반으로 사용자와 일치시키는 데 사용됩니다. 및 역할은 해당 기능을 기반으로 사용자와 일치시키는 데 사용됩니다.
Red Hat Single Sign-On을 사용하면 사용자와 그룹을 LDAP에 저장할 수 있지만 클라이언트와 역할은 이러한 방식으로 저장할 수 없습니다. 사용자 데이터에 대한 스토리지 및 액세스는 권한 부여 정책 구성 방법의 요소가 될 수 있습니다.
Super 사용자는 Kafka 브로커에서 구현된 권한 부여에 관계없이 항상 Kafka 브로커에 대한 무제한 액세스 권한을 갖습니다.
사전 요구 사항
프로세스
- Red Hat Single Sign-On 관리 콘솔에 액세스하거나 Red Hat Single Sign-On 관리 CLI를 사용하여 OAuth 2.0 인증을 설정할 때 생성한 Kafka 브로커 클라이언트에 대한 인증 서비스를 활성화합니다.
- 인증 서비스를 사용하여 클라이언트에 대한 리소스, 권한 부여 범위, 정책 및 권한을 정의합니다.
- 역할 및 그룹을 할당하여 사용자 및 클라이언트에 권한을 바인딩합니다.
Red Hat Single Sign-On 권한을 사용하도록 Kafka 브로커를 구성합니다.
Kafka
server.properties
구성 파일에 다음을 추가하여 Kafka에 인증자를 설치합니다.authorizer.class.name=io.strimzi.kafka.oauth.server.authorizer.KeycloakAuthorizer principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder
Kafka 브로커에 대한 구성을 추가하여 권한 부여 서버 및 권한 부여 서비스에 액세스합니다.
여기서는
server.properties
에 추가 속성을 추가하지만 대문자 또는 대문자 이름 지정 규칙을 사용하여 환경 변수로 정의할 수도 있습니다.strimzi.authorization.token.endpoint.uri="https://<auth_server_address>/auth/realms/REALM-NAME/protocol/openid-connect/token" 1 strimzi.authorization.client.id="kafka" 2
(선택 사항) 특정 Kafka 클러스터에 대한 구성을 추가합니다.
예를 들면 다음과 같습니다.
strimzi.authorization.kafka.cluster.name="kafka-cluster" 1
- 1
- 특정 Kafka 클러스터의 이름입니다. 이름은 권한을 대상으로 지정하는 데 사용되며 동일한 Red Hat Single Sign-On 영역에서 여러 클러스터를 관리할 수 있습니다. 기본값은
kafka-cluster
입니다.
(선택 사항) 간단한 인증으로 위임합니다.
strimzi.authorization.delegate.to.kafka.acl="true" 1
- 1
- Red Hat Single Sign-On 인증 서비스 정책에서 액세스가 거부되는 경우 Kafka
AclAuthorizer
에 권한을 위임합니다. 기본값은false
입니다.
(선택 사항) 권한 부여 서버에 TLS 연결에 대한 구성을 추가합니다.
예를 들면 다음과 같습니다.
strimzi.authorization.ssl.truststore.location=<path_to_truststore> 1 strimzi.authorization.ssl.truststore.password=<my_truststore_password> 2 strimzi.authorization.ssl.truststore.type=JKS 3 strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG 4 strimzi.authorization.ssl.endpoint.identification.algorithm=HTTPS 5
(선택 사항) 권한 부여 서버에서 권한 부여 새로 고침을 구성합니다. 부여 새로 고침 작업은 활성 토큰을 열거하고 각각에 대한 최신 권한을 요청하여 작동합니다.
예를 들면 다음과 같습니다.
strimzi.authorization.grants.refresh.period.seconds="120" 1 strimzi.authorization.grants.refresh.pool.size="10" 2 strimzi.authorization.grants.max.idle.time.seconds="300" 3 strimzi.authorization.grants.gc.period.seconds="300" 4 strimzi.authorization.reuse.grants="false" 5
- 1
- 권한 부여 서버의 권한 부여 목록이 새로 고침되는 빈도(기본적으로 분당 시간)를 지정합니다. 디버깅 목적으로 새로 고침을 비활성화하려면
"0"
으로 설정합니다. - 2
- grants 새로 고침 작업에 사용되는 스레드 풀의 크기(병합 정도)를 지정합니다. 기본값은
"5"
입니다. - 3
- 캐시의 유휴 부여를 제거할 수 있는 시간(초)입니다. 기본값은 300입니다.
- 4
- 캐시에서 오래된 권한을 정리하는 작업의 연속 실행 사이의 시간(초)입니다. 기본값은 300입니다.
- 5
- 최신 부여를 새 세션에 대해 가져올지 여부를 제어합니다. 비활성화되면 Red Hat Single Sign-On에서 부여가 검색되고 사용자에게 캐시됩니다. 기본값은
true
입니다.
(선택 사항) 권한 부여 서버와 통신할 때 네트워크 시간 초과를 구성합니다.
예를 들면 다음과 같습니다.
strimzi.authorization.connect.timeout.seconds="60" 1 strimzi.authorization.read.timeout.seconds="60" 2 strimzi.authorization.http.retries="2" 3
- 1
- Red Hat Single Sign-On 토큰 끝점에 연결할 때 연결 시간(초)입니다. 기본값은
60
입니다. - 2
- Red Hat Single Sign-On 토큰 끝점에 연결할 때 읽기 제한 시간(초)입니다. 기본값은
60
입니다. - 3
- 권한 부여 서버에 대한 실패한 HTTP 요청을 재시도하지 않고 재시도할 최대 횟수입니다. 기본값은
0
입니다. 즉, 재시도가 수행되지 않습니다. 이 옵션을 효과적으로 사용하려면strimzi.authorization.connect.timeout.seconds
및strimzi.authorization.read.timeout.seconds
옵션의 시간 초과 시간을 줄이는 것이 좋습니다. 그러나 재시도하면 현재 작업자 스레드를 다른 요청에 사용할 수 없게 될 수 있으며 요청이 너무 많으면 Kafka 브로커가 응답하지 않을 수 있습니다.
(선택 사항) 토큰 검증 및 권한 부여에 OAuth 2.0 메트릭을 활성화합니다.
oauth.enable.metrics="true" 1
- 1
- OAuth 메트릭을 활성화하거나 비활성화할지 여부를 제어합니다. 기본값은
false
입니다.
(선택 사항) 요청에서
Accept
헤더를 제거합니다.oauth.include.accept.header="false" 1
- 1
- 헤더를 포함하는 경우
false
로 설정하면 권한 부여 서버와 통신할 때 문제가 발생합니다. 기본값은true
입니다.
- Kafka 브로커에 특정 역할이 있는 클라이언트 또는 사용자로 액세스하여 구성된 권한에 액세스하고 필요한 액세스 권한이 있는지 또는 권한이 없는지 확인합니다.
6.4.11. OPA 정책 기반 권한 부여 사용
OPA(Open Policy Agent)는 오픈 소스 정책 엔진입니다. OPA를 AMQ Streams와 통합하여 Kafka 브로커에서 클라이언트 작업을 허용하는 정책 기반 권한 부여 메커니즘 역할을 할 수 있습니다.
클라이언트에서 요청이 생성되면 OPA는 Kafka 액세스에 대해 정의된 정책에 대한 요청을 평가한 다음 요청을 허용하거나 거부합니다.
Red Hat은 OPA 서버를 지원하지 않습니다.
추가 리소스
6.4.11.1. OPA 정책 정의
OPA를 AMQ Streams와 통합하기 전에 세분화된 액세스 제어를 제공하기 위해 정책을 정의하는 방법을 고려하십시오.
Kafka 클러스터, 소비자 그룹 및 주제에 대한 액세스 제어를 정의할 수 있습니다. 예를 들어 생산자 클라이언트에서 특정 브로커 주제로 쓰기 액세스를 허용하는 권한 부여 정책을 정의할 수 있습니다.
이 경우 정책은 다음을 지정할 수 있습니다.
- 생산자 클라이언트와 연결된 사용자 주체 및 호스트 주소
- 클라이언트에 허용되는 작업
-
정책이 적용되는 리소스
유형
(주체 ) 및 리소스 이름
허용 및 거부 결정은 정책에 기록되며, 제공되는 요청 및 클라이언트 식별 데이터에 따라 응답이 제공됩니다.
이 예제에서는 생산자 클라이언트가 해당 항목에 쓸 수 있도록 정책을 충족해야 합니다.
6.4.11.2. OPA에 연결
Kafka가 OPA 정책 엔진에 액세스하여 액세스 제어 정책을 쿼리할 수 있도록 Kafka server.properties
파일에서 사용자 지정 OPA 승인자 플러그인(kafka-authorizer-opa-VERSION.jar
)을 구성합니다.
클라이언트에서 요청을 수행하면 지정된 URL 주소와 정의된 정책의 이름이어야 하는 REST 끝점을 사용하여 플러그인에서 OPA 정책 엔진을 쿼리합니다.
플러그인은 클라이언트 요청(사용자 주체, 작업 및 리소스)의 세부 정보를 JSON 형식으로 제공하여 정책에 대해 확인합니다. 세부 정보에는 클라이언트의 고유 ID가 포함됩니다. 예를 들어 TLS 인증이 사용되는 경우 클라이언트 인증서와 고유 이름을 사용합니다.
OPA는 데이터를 사용하여 요청을 허용하거나 거부하기 위해 플러그인에 true 또는 false - 응답을 제공합니다.
6.4.11.3. OPA 권한 부여 지원 구성
다음 절차에서는 OPA 인증을 사용하도록 Kafka 브로커를 구성하는 방법을 설명합니다.
사전 준비 사항
필요한 액세스 또는 특정 사용자에 대해 제한하려는 경우를 고려하십시오. 사용자와 Kafka 리소스 를 결합하여 OPA 정책을 정의할 수 있습니다.
LDAP 데이터 소스에서 사용자 정보를 로드하도록 OPA를 설정할 수 있습니다.
Super 사용자는 Kafka 브로커에서 구현된 권한 부여에 관계없이 항상 Kafka 브로커에 대한 무제한 액세스 권한을 갖습니다.
사전 요구 사항
- OPA 서버를 연결할 수 있어야 합니다.
- OPA authorizer plugin for Kafka
프로세스
Kafka 브로커에서 작업을 수행하기 위해 클라이언트 요청을 승인하는 데 필요한 OPA 정책을 작성합니다.
OPA 정책 정의를 참조하십시오.
이제 OPA를 사용하도록 Kafka 브로커를 구성합니다.
Kafka에 대한 OPA 승인자 플러그인을 설치합니다.
OPA에 연결을 참조하십시오.
플러그인 파일이 Kafka 클래스 경로에 포함되어 있는지 확인합니다.
Kafka
server.properties
구성 파일에 다음을 추가하여 OPA 플러그인을 활성화합니다.authorizer.class.name: com.bisnode.kafka.authorization.OpaAuthorizer
Kafka 브로커의
server.properties
에 구성을 추가하여 OPA 정책 엔진 및 정책에 액세스합니다.예를 들면 다음과 같습니다.
opa.authorizer.url=https://OPA-ADDRESS/allow 1 opa.authorizer.allow.on.error=false 2 opa.authorizer.cache.initial.capacity=50000 3 opa.authorizer.cache.maximum.size=50000 4 opa.authorizer.cache.expire.after.seconds=600000 5 super.users=User:alice;User:bob 6
- 1
- (필수) 승인자 플러그인이 쿼리할 정책의 OAuth 2.0 토큰 끝점 URL입니다. 이 예에서는 정책을
allow
이라고 합니다. - 2
- 승인자 플러그인이 OPA 정책 엔진과 연결하지 못하는 경우 클라이언트가 기본적으로 액세스가 허용되거나 거부되는지 여부를 지정하는 플래그입니다.
- 3
- 로컬 캐시의 초기 용량(바이트)입니다. 플러그인이 모든 요청에 대해 OPA 정책 엔진을 쿼리할 필요가 없도록 캐시가 사용됩니다.
- 4
- 로컬 캐시의 최대 용량(바이트)입니다.
- 5
- OPA 정책 엔진에서 다시 로드하여 로컬 캐시를 새로 고치는 시간(밀리초)입니다.
- 6
- Open Policy Agent 정책을 쿼리하지 않고 항상 허용되도록 슈퍼 사용자로 취급되는 사용자 주체 목록입니다.
인증 및 권한 부여 옵션에 대한 정보는 Open Policy Agent 웹 사이트를 참조하십시오.
- 및 올바른 권한이 없는 클라이언트를 사용하여 Kafka 브로커에 액세스하여 구성된 권한을 확인합니다.