14.5. OAuth 2.0 토큰 기반 권한 부여 사용


토큰 기반 인증에 Red Hat Single Sign-On과 함께 OAuth 2.0을 사용하는 경우 Red Hat Single Sign-On을 사용하여 Kafka 브로커에 대한 클라이언트 액세스를 제한하도록 권한 부여 규칙을 구성할 수도 있습니다. 인증은 사용자의 ID를 설정합니다. 권한 부여는 해당 사용자의 액세스 수준을 결정합니다.

AMQ Streams는 Red Hat Single Sign-On Authorization Services 를 통해 OAuth 2.0 토큰 기반 권한 부여를 지원하므로 보안 정책 및 권한을 중앙에서 관리할 수 있습니다.

Red Hat Single Sign-On에 정의된 보안 정책 및 권한은 Kafka 브로커의 리소스에 대한 액세스 권한을 부여하는 데 사용됩니다. 사용자와 클라이언트는 Kafka 브로커에서 특정 작업을 수행하기 위해 액세스를 허용하는 정책과 대조됩니다.

Kafka는 기본적으로 모든 사용자에게 브로커에 대한 전체 액세스를 허용하고, ACL(액세스 제어 목록)을 기반으로 권한을 구성할 수 있도록 AclAuthorizer 플러그인도 제공합니다.

zookeeper는 사용자 이름을 기반으로 리소스에 대한 액세스를 부여하거나 거부하는 ACL 규칙을 저장합니다. 그러나 Red Hat Single Sign-On을 통한 OAuth 2.0 토큰 기반 권한 부여는 Kafka 브로커에 대한 액세스 제어를 구현하는 방법에 대해 훨씬 더 큰 유연성을 제공합니다. 또한 OAuth 2.0 권한 부여 및 ACL을 사용하도록 Kafka 브로커를 구성할 수 있습니다.

14.5.1. OAuth 2.0 인증 메커니즘

AMQ Streams의 OAuth 2.0 권한 부여에서는 Red Hat Single Sign-On 서버 권한 부여 서비스 REST 엔드포인트를 사용하여 특정 사용자에게 정의된 보안 정책을 적용하고 해당 사용자에게 다양한 리소스에 부여된 권한 목록을 제공하여 Red Hat Single Sign-On으로 토큰 기반 인증을 확장합니다. 정책은 역할 및 그룹을 사용하여 사용자와 권한을 일치시킵니다. OAuth 2.0 권한 부여는 Red Hat Single Sign-On 인증 서비스에서 사용자에게 수신된 부여 목록에 따라 권한을 로컬로 적용합니다.

14.5.1.1. Kafka 브로커 사용자 정의 인증

Red Hat Single Sign-On 승인자 (KeycloakAuthorizer)는 AMQ Streams와 함께 제공됩니다. Red Hat Single Sign-On에서 제공하는 인증 서비스에 Red Hat Single Sign-On REST 엔드포인트를 사용하려면 Kafka 브로커에 사용자 정의 인증 기관을 구성합니다.

작성자는 필요에 따라 권한 부여 서버에서 부여된 권한 목록을 가져와서 Kafka 브로커에 로컬로 권한 부여를 적용하여 각 클라이언트 요청에 대해 신속하게 승인 결정을 내립니다.

14.5.2. OAuth 2.0 권한 부여 지원 구성

다음 절차에서는 Red Hat Single Sign-On 인증 서비스를 사용하여 OAuth 2.0 인증을 사용하도록 Kafka 브로커를 구성하는 방법을 설명합니다.

사전 준비 사항

특정 사용자에 대해 요청하거나 제한하려는 액세스를 고려하십시오. Red Hat Single Sign-On 그룹,역할,클라이언트사용자를 결합하여 Red Hat Single Sign-On에서 액세스를 구성할 수 있습니다.

일반적으로 그룹은 조직의 부서 또는 지리적 위치에 따라 사용자와 일치시키는 데 사용됩니다. 및 역할은 해당 기능에 따라 사용자와 일치시키는 데 사용됩니다.

Red Hat Single Sign-On을 사용하면 사용자와 그룹을 LDAP에 저장할 수 있지만 클라이언트와 역할은 이러한 방식으로 저장할 수 없습니다. 권한 부여 정책 구성 방법을 선택하는 데 사용자 데이터에 대한 스토리지 및 액세스 요소가 될 수 있습니다.

참고

Super 사용자는 Kafka 브로커에서 구현된 권한 부여에 관계없이 항상 Kafka 브로커에 대한 무제한 액세스 권한을 갖습니다.

사전 요구 사항

  • AMQ Streams는 토큰 기반 인증에 Red Hat Single Sign-On과 함께 OAuth 2.0을 사용하도록 구성해야 합니다. 권한을 설정할 때 동일한 Red Hat Single Sign-On 서버 엔드포인트를 사용합니다.
  • 재인증을 활성화하려면 OAuth 2.0 인증을 maxSecondsWithoutReauthentication 옵션으로 구성해야 합니다.

절차

  1. Red Hat Single Sign-On 관리 콘솔에 액세스하거나 OAuth 2.0 인증을 설정할 때 생성한 Kafka 브로커 클라이언트에 대한 인증 서비스를 활성화하려면 Red Hat Single Sign-On Admin CLI를 사용합니다.
  2. 권한 부여 서비스를 사용하여 클라이언트에 대한 리소스, 권한 부여 범위, 정책 및 권한을 정의합니다.
  3. 역할 및 그룹을 할당하여 사용자 및 클라이언트에 권한을 바인딩합니다.
  4. 편집기에서 Kafka 리소스의 Kafka 브로커 구성(Kafka.spec.kafka)을 업데이트하여 Red Hat Single Sign-On 인증을 사용하도록 Kafka 브로커를 구성합니다.

    oc edit kafka my-cluster
  5. keycloak 권한을 사용하고 권한 부여 서버 및 권한 부여 서비스에 액세스할 수 있도록 Kafka 브로커 kafka 구성을 구성합니다.

    예를 들면 다음과 같습니다.

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
    spec:
      kafka:
        # ...
        authorization:
          type: keycloak 
    1
    
          tokenEndpointUri: <https://<auth-server-address>/auth/realms/external/protocol/openid-connect/token> 
    2
    
          clientId: kafka 
    3
    
          delegateToKafkaAcls: false 
    4
    
          disableTlsHostnameVerification: false 
    5
    
          superUsers: 
    6
    
          - CN=fred
          - sam
          - CN=edward
          tlsTrustedCertificates: 
    7
    
          - secretName: oauth-server-cert
            certificate: ca.crt
          grantsRefreshPeriodSeconds: 60 
    8
    
          grantsRefreshPoolSize: 5 
    9
    
          grantsMaxIdleSeconds: 300 
    10
    
          grantsGcPeriodSeconds: 300 
    11
    
          grantsAlwaysLatest: false 
    12
    
          connectTimeoutSeconds: 60 
    13
    
          readTimeoutSeconds: 60 
    14
    
          httpRetries: 2 
    15
    
          enableMetrics: false 
    16
    
        #...
    1
    유형 keycloak 을 사용하면 Red Hat Single Sign-On 인증을 사용할 수 있습니다.
    2
    Red Hat Single Sign-On 토큰 끝점의 URI입니다. 프로덕션의 경우 항상 https:// URL을 사용합니다. 토큰 기반 oauth 인증을 구성할 때 jwksEndpointUri 를 로컬 JWT 검증을 위한 URI로 지정합니다. tokenEndpointUri URI의 호스트 이름은 동일해야 합니다.
    3
    권한 부여 서비스가 활성화된 Red Hat Single Sign-On의 OAuth 2.0 클라이언트 정의의 클라이언트 ID입니다. 일반적으로 kafka 는 ID로 사용됩니다.
    4
    (선택 사항) Red Hat Single Sign-On 인증 서비스 정책에서 액세스를 거부하는 경우 Kafka AclAuthorizer 에 대한 권한 부여 기본값은 false 입니다.
    5
    (선택 사항) TLS 호스트 이름 확인을 비활성화합니다. 기본값은 false 입니다.
    6
    (선택 사항) 지정된 수퍼 사용자입니다.
    7
    (선택 사항) 권한 부여 서버에 대한 TLS 연결에 필요한 신뢰할 수 있는 인증서입니다.
    8
    (선택 사항) 연속 두 개의 연속 부여 새로 고침 실행 사이의 시간입니다. 즉 활성 세션에서 Red Hat Single Sign-On에서 사용자의 권한 변경을 감지할 수 있는 최대 시간입니다. 기본값은 60입니다.
    9
    (선택 사항) 활성 세션에 대한 부여를 새로 고치는 데 사용할 스레드 수입니다. 기본값은 5입니다.
    10
    (선택 사항) 캐시의 유휴 부여를 제거할 수 있는 시간(초)입니다. 기본값은 300입니다.
    11
    (선택 사항) 캐시에서 오래된 권한을 정리하는 작업의 연속 실행 사이의 시간(초)입니다. 기본값은 300입니다.
    12
    (선택 사항) 새 세션에 대한 최신 권한을 가져올지 여부를 제어합니다. 활성화하면 Red Hat Single Sign-On에서 권한을 검색하고 사용자에게 캐시됩니다. 기본값은 false입니다.
    13
    (선택 사항) Red Hat Single Sign-On 토큰 엔드포인트에 연결할 때 연결 제한 시간(초)입니다. 기본값은 60입니다.
    14
    (선택 사항) Red Hat Single Sign-On 토큰 엔드포인트에 연결할 때 읽기 제한 시간(초)입니다. 기본값은 60입니다.
    15
    (선택 사항) 권한 부여 서버에 실패한 HTTP 요청을 재시도하지 않고 재시도할 최대 횟수입니다. 기본값은 0 입니다. 즉, 재시도가 수행되지 않습니다. 이 옵션을 효과적으로 사용하려면 connectTimeoutSecondsreadTimeoutSeconds 옵션에 대한 시간 초과 시간을 줄이는 것이 좋습니다. 그러나 재시도하면 현재 작업자 스레드를 다른 요청에서 사용할 수 없게 할 수 있으며 요청이 너무 많으면 Kafka 브로커가 응답하지 않을 수 있습니다.
    16
    (선택 사항) OAuth 메트릭을 활성화하거나 비활성화합니다. 기본값은 false입니다.
  6. 편집기를 저장하고 종료한 다음 롤링 업데이트가 완료될 때까지 기다립니다.
  7. 로그 또는 Pod 상태 전환을 확인하여 업데이트를 확인합니다.

    oc logs -f ${POD_NAME} -c kafka
    oc get pod -w

    롤링 업데이트는 브로커가 OAuth 2.0 인증을 사용하도록 구성합니다.

  8. Kafka 브로커에 특정 역할을 가진 클라이언트 또는 사용자로 액세스하여 구성된 권한을 확인하고, 필요한 액세스 권한이 있는지 확인하거나, 보유하지 않은 권한이 없는지 확인합니다.

14.5.3. Red Hat Single Sign-On 권한 부여 서비스에서 정책 및 권한 관리

이 섹션에서는 Red Hat Single Sign-On Authorization Services 및 Kafka에서 사용하는 권한 부여 모델에 대해 설명하고 각 모델에서 중요한 개념을 정의합니다.

Kafka에 액세스할 수 있는 권한을 부여하려면 Red Hat Single Sign-On에서 OAuth 클라이언트 사양 을 생성하여 Red Hat Single Sign-On 인증 서비스 오브젝트를 Kafka 리소스에 매핑할 수 있습니다. Kafka 권한은 Red Hat Single Sign-On 인증 서비스 규칙을 사용하여 사용자 계정 또는 서비스 계정에 부여됩니다.

다음은 주제 생성 및 나열과 같은 공통 Kafka 작업에 필요한 다양한 사용자 권한의 예입니다.

14.5.3.1. Kafka 및 Red Hat Single Sign-On 권한 부여 모델 개요

Kafka 및 Red Hat Single Sign-On 인증 서비스는 다른 권한 부여 모델을 사용합니다.

Kafka 인증 모델

Kafka의 권한 부여 모델은 리소스 유형을 사용합니다. Kafka 클라이언트가 브로커에서 작업을 수행할 때 브로커는 구성된 KeycloakAuthorizer 를 사용하여 작업 및 리소스 유형에 따라 클라이언트의 권한을 확인합니다.

Kafka는 5가지 리소스 유형을 사용하여 액세스를 제어합니다. Topic,Group,Cluster,TransactionalId, DelegationToken. 각 리소스 유형에는 사용 가능한 권한 집합이 있습니다.

주제

  • 개발
  • write
  • read
  • delete
  • describe
  • DescribeConfigs
  • 변경
  • AlterConfigs

그룹

  • read
  • describe
  • delete

Cluster

  • 개발
  • describe
  • 변경
  • DescribeConfigs
  • AlterConfigs
  • IdempotentWrite
  • ClusterAction

TransactionalId

  • describe
  • write

DelegationToken

  • describe
Red Hat Single Sign-On 인증 서비스 모델

Red Hat Single Sign-On Authorization Services 모델에는 권한 정의 및 부여 범위, 권한 부여범위,정책권한 부여 에 대한 4가지 개념이 있습니다.

Resources
리소스는 허용된 작업과 리소스를 일치시키는 데 사용되는 리소스 정의 집합입니다. 리소스는 개별 항목(예: 또는 동일한 접두사로 시작하는 이름이 있는 모든 항목)일 수 있습니다. 리소스 정의는 사용 가능한 권한 부여 범위 집합과 연관되며, 리소스에서 사용 가능한 모든 작업 집합을 나타냅니다. 이러한 작업의 하위 집합만 실제로 허용됩니다.
권한 부여 범위
권한 부여 범위는 특정 리소스 정의에서 사용 가능한 모든 작업의 집합입니다. 새 리소스를 정의할 때 모든 범위 집합의 범위를 추가합니다.
Policies

정책은 계정 목록과 일치하는 조건을 사용하는 권한 부여 규칙입니다. 정책은 다음과 일치할 수 있습니다.

  • 클라이언트 ID 또는 역할을 기반으로 하는 서비스 계정
  • 사용자 이름, 그룹 또는 역할을 기반으로 하는 사용자 계정 입니다.
권한
권한은 특정 리소스 정의의 권한 부여 범위 하위 집합을 사용자 집합에 부여합니다.

14.5.3.2. Red Hat Single Sign-On 인증 서비스를 Kafka 인증 모델에 매핑

Kafka 권한 부여 모델은 Kafka에 대한 액세스를 제어하는 Red Hat Single Sign-On 역할 및 리소스를 정의하는 기반으로 사용됩니다.

사용자 계정 또는 서비스 계정에 Kafka 권한을 부여하려면 먼저 Kafka 브로커에 대한 Red Hat Single Sign-On에 OAuth 클라이언트 사양 을 생성합니다. 그런 다음 클라이언트에서 Red Hat Single Sign-On 인증 서비스 규칙을 지정합니다. 일반적으로 브로커를 나타내는 OAuth 클라이언트의 클라이언트 ID는 kafka 입니다. AMQ Streams와 함께 제공되는 설정 파일의 예는 kafka 를 OAuth 클라이언트 ID로 사용합니다.

참고

Kafka 클러스터가 여러 개인 경우 모든 클러스터에 단일 OAuth 클라이언트(Kafka)를 사용할 수 있습니다. 권한 부여 규칙을 정의하고 관리할 수 있는 단일 통합 공간을 제공합니다. 그러나 서로 다른 OAuth 클라이언트 ID(예: my-cluster-kafka 또는 cluster-dev-kafka)를 사용하고 각 클라이언트 구성 내에서 각 클러스터에 대한 권한 부여 규칙을 정의할 수도 있습니다.

kafka 클라이언트 정의에 Red Hat Single Sign-On 관리 콘솔에서 Authorization Enabled 옵션이 활성화되어 있어야 합니다.

모든 권한은 kafka 클라이언트의 범위 내에 있습니다. 다른 OAuth 클라이언트 ID로 구성된 Kafka 클러스터가 서로 다른 경우 동일한 Red Hat Single Sign-On 영역의 일부인 경우에도 각각 별도의 권한 집합이 필요합니다.

Kafka 클라이언트가 OAUTHBEARER 인증을 사용하는 경우 Red Hat Single Sign-On 승인자(KeycloakAuthorizer)는 현재 세션의 액세스 토큰을 사용하여 Red Hat Single Sign-On 서버에서 권한 부여 목록을 검색합니다. 권한을 검색하기 위해 작성자는 Red Hat Single Sign-On Authorization Services 정책 및 권한을 평가합니다.

Kafka 권한의 권한 부여 범위

초기 Red Hat Single Sign-On 구성은 일반적으로 각 Kafka 리소스 유형에서 수행할 수 있는 모든 가능한 작업 목록을 생성하기 위해 권한 부여 범위를 업로드하는 것입니다. 이 단계는 권한을 정의하기 전에 한 번만 수행됩니다. 권한 부여 범위를 업로드하는 대신 수동으로 추가할 수 있습니다.

권한 부여 범위에는 리소스 유형에 관계없이 가능한 모든 Kafka 권한이 포함되어야 합니다.

  • 개발
  • write
  • read
  • delete
  • describe
  • 변경
  • DescribeConfig
  • AlterConfig
  • ClusterAction
  • IdempotentWrite
참고

권한이 필요하지 않은 경우(예: IdempotentWrite), 권한 부여 범위 목록에서 생략할 수 있습니다. 그러나 이 권한은 Kafka 리소스에서 대상으로 할 수 없습니다.

권한 검사에 대한 리소스 패턴

리소스 패턴은 권한 검사를 수행할 때 대상 리소스와 일치하는 패턴에 사용됩니다. 일반 패턴 형식은 RESOURCE-TYPE:PATTERN-NAME 입니다.

리소스 유형은 Kafka 권한 부여 모델을 미러링합니다. 이 패턴은 두 가지 일치 옵션을 허용합니다.

  • 정확한 일치 (지표가 *로 끝나지 않는 경우)
  • 접두사 일치 (일정이 *로 끝나는 경우)

리소스의 패턴 예

Topic:my-topic
Topic:orders-*
Group:orders-*
Cluster:*

또한 일반 패턴 형식 앞에 kafka-cluster:CLUSTER-NAME 을 붙일 수 있습니다. 여기서 CLUSTER-NAME 은 Kafka 사용자 정의 리소스의 metadata.name 을 나타냅니다.

클러스터 접두사가 있는 리소스의 패턴 예

kafka-cluster:my-cluster,Topic:*
kafka-cluster:*,Group:b_*

kafka-cluster 접두사가 없는 경우 kafka-cluster:* 로 간주됩니다.

리소스를 정의할 때 리소스와 관련된 가능한 권한 부여 범위 목록과 연결할 수 있습니다. 대상 리소스 유형에 적합한 모든 작업을 설정합니다.

모든 리소스에 권한 부여 범위를 추가할 수 있지만 리소스 유형에서 지원하는 범위만 액세스 제어로 간주됩니다.

액세스 권한 적용을 위한 정책

정책은 하나 이상의 사용자 계정 또는 서비스 계정에 대한 권한을 대상으로 하는 데 사용됩니다. 대상은 다음을 참조할 수 있습니다.

  • 특정 사용자 또는 서비스 계정
  • 영역 역할 또는 클라이언트 역할
  • 사용자 그룹
  • 클라이언트 IP 주소와 일치하는 JavaScript 규칙

정책에는 고유한 이름이 지정되고 재사용하여 여러 리소스에 대한 여러 권한을 대상으로 할 수 있습니다.

액세스 권한을 부여할 수 있는 권한

세분화된 권한을 사용하여 사용자에게 액세스 권한을 부여하는 정책, 리소스 및 권한 부여 범위를 함께 가져옵니다.

각 권한의 이름은 어떤 사용자에게 부여하는 권한을 명확하게 정의해야 합니다. 예를 들어 Dev Team B는 x로 시작하는 주제에서 읽을 수 있습니다.

14.5.3.3. Kafka 작업에 필요한 권한 예

다음 예제에서는 Kafka에서 공통 작업을 수행하는 데 필요한 사용자 권한을 보여줍니다.

주제 만들기

주제를 생성하려면 특정 주제 또는 Cluster:kafka-cluster 에 대해 Create 권한이 필요합니다.

bin/kafka-topics.sh --create --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

주제 나열

사용자에게 지정된 항목에 대한 Describe 권한이 있는 경우 해당 항목이 나열됩니다.

bin/kafka-topics.sh --list \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

주제 세부 정보 표시

주제 세부 사항을 표시하려면 항목에 대해 설명 및 설명 권한이 필요합니다.

bin/kafka-topics.sh --describe --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

항목에 대한 메시지 생성

항목에 메시지를 생성하려면 항목에 대해 설명쓰기 권한이 필요합니다.

주제가 아직 생성되지 않았으며 자동 생성 항목이 활성화되어 있으면 주제를 만들 수 있는 권한이 필요합니다.

bin/kafka-console-producer.sh  --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties

주제의 메시지 사용

주제의 메시지를 사용하려면 항목에 대해 설명읽기 권한이 필요합니다. 주제에서 사용하는 것은 일반적으로 소비자 그룹에 소비자 오프셋을 저장하는 데 의존하며, 소비자 그룹에 대한 추가 설명읽기 권한이 필요합니다.

일치하려면 두 개의 리소스 가 필요합니다. 예를 들면 다음과 같습니다.

Topic:my-topic
Group:my-group-*
bin/kafka-console-consumer.sh --topic my-topic --group my-group-1 --from-beginning \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --consumer.config /tmp/config.properties

idempotent 생산자를 사용하여 항목에 메시지 생성

주제로 생성할 수 있는 권한뿐만 아니라 Cluster:kafka-cluster 리소스에 추가 IdempotentWrite 권한이 필요합니다.

일치하려면 두 개의 리소스 가 필요합니다. 예를 들면 다음과 같습니다.

Topic:my-topic
Cluster:kafka-cluster
bin/kafka-console-producer.sh  --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --producer.config=/tmp/config.properties --producer-property enable.idempotence=true --request-required-acks -1

소비자 그룹 나열

소비자 그룹을 나열할 때 사용자에게 Describe 권한이 있는 그룹만 반환됩니다. 또는 사용자에게 Cluster:kafka-cluster 에 대한 Describe 권한이 있는 경우 모든 소비자 그룹이 반환됩니다.

bin/kafka-consumer-groups.sh --list \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

소비자 그룹 세부 정보 표시

소비자 그룹의 세부 사항을 표시하려면 그룹 및 그룹과 연결된 항목에 대한 설명 권한이 필요합니다.

bin/kafka-consumer-groups.sh --describe --group my-group-1 \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

주제 구성 변경

주제 구성을 변경하려면 항목에 대해 설명 대체 권한이 필요합니다.

bin/kafka-topics.sh --alter --topic my-topic --partitions 2 \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

Kafka 브로커 구성 표시

kafka-configs.sh 를 사용하여 브로커 구성을 가져오려면 Cluster:kafka-clusterDescribeConfigs 권한이 필요합니다.

bin/kafka-configs.sh --entity-type brokers --entity-name 0 --describe --all \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

Kafka 브로커 구성 변경

Kafka 브로커 구성을 변경하려면 Cluster:kafka-cluster 에 대해 DescribeConfigsAlterConfigs 권한이 필요합니다.

bin/kafka-configs --entity-type brokers --entity-name 0 --alter --add-config log.cleaner.threads=2 \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

주제 삭제

주제를 삭제하려면 해당 항목에 대한 설명삭제 권한이 필요합니다.

bin/kafka-topics.sh --delete --topic my-topic \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config=/tmp/config.properties

작업자 파티션 선택

주제 파티션에 대해 리더 선택을 실행하려면 Cluster:kafka-clusterAlter 권한이 필요합니다.

bin/kafka-leader-election.sh --topic my-topic --partition 0 --election-type PREFERRED  /
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --admin.config /tmp/config.properties

파티션 다시 할당

파티션 재할당 파일을 생성하려면 관련 항목에 대한 권한 설명이 필요합니다.

bin/kafka-reassign-partitions.sh --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "0,1" --generate \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties > /tmp/partition-reassignment.json

파티션 재할당을 실행하려면 Cluster:kafka-cluster대해 설명 및 대체 권한이 필요합니다. 또한 사용 권한에 대해 설명해야 합니다.Also, Describe permissions are required on the topics involved.

bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --execute \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties

파티션 재할당을 확인하려면 Cluster:kafka-cluster 및 관련 주제 각각에 대해 파티션 할당을 확인하고 AlterConfigs 권한이 필요합니다.

bin/kafka-reassign-partitions.sh --reassignment-json-file /tmp/partition-reassignment.json --verify \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 --command-config /tmp/config.properties

14.5.4. Red Hat Single Sign-On 인증 서비스 사용

이 예에서는 키cloak 권한 부여와 함께 Red Hat Single Sign-On 인증 서비스를 사용하는 방법을 설명합니다. Kafka 클라이언트에 대한 액세스 제한을 적용하려면 Red Hat Single Sign-On 인증 서비스를 사용합니다. Red Hat Single Sign-On Authorization Services는 권한 부여 범위, 정책 및 권한을 사용하여 리소스에 대한 액세스 제어를 정의하고 적용합니다.

Red Hat Single Sign-On Authorization Services REST 엔드포인트는 인증된 사용자를 위한 리소스에 대한 승인된 권한 목록을 제공합니다. Kafka 클라이언트가 인증된 세션을 설정한 후 Red Hat Single Sign-On 서버에서 첫 번째 조치로 부여(권한) 목록은 Red Hat Single Sign-On 서버에서 가져옵니다. 권한 부여 변경 사항이 감지되도록 목록이 백그라운드에서 새로 고쳐집니다. 권한 부여는 빠른 권한 결정을 제공하기 위해 각 사용자 세션에 대해 Kafka 브로커에 로컬로 캐시되고 적용됩니다.

AMQ Streams는 구성 파일 예제 를 제공합니다. 다음은 Red Hat Single Sign-On 설정을 위한 다음 예제 파일이 있습니다.

kafka-ephemeral-oauth-single-keycloak-authz.yaml
Red Hat Single Sign-On을 사용하여 OAuth 2.0 토큰 기반 인증에 대해 구성된 Kafka 사용자 지정 리소스의 예입니다. 사용자 정의 리소스를 사용하여 키cloak 권한 부여 및 토큰 기반 oauth 인증을 사용하는 Kafka 클러스터를 배포할 수 있습니다.
kafka-authz-realm.json
샘플 그룹, 사용자, 역할 및 클라이언트로 구성된 Red Hat Single Sign-On 영역의 예입니다. 영역을 Red Hat Single Sign-On 인스턴스로 가져와서 Kafka에 액세스할 수 있는 세분화된 권한을 설정할 수 있습니다.

Red Hat Single Sign-On으로 예제를 시도하려면 다음 파일을 사용하여 표시된 순서대로 이 섹션에 설명된 작업을 수행합니다.

인증

토큰 기반 oauth 인증을 구성할 때 jwksEndpointUri 를 로컬 JWT 검증을 위한 URI로 지정합니다. keycloak 인증을 구성할 때 tokenEndpointUri 를 Red Hat Single Sign-On 토큰 끝점의 URI로 지정합니다. 두 URI의 호스트 이름은 동일해야 합니다.

그룹 또는 역할 정책이 포함된 대상 권한

Red Hat Single Sign-On에서 서비스 계정이 있는 기밀 클라이언트는 클라이언트 ID와 시크릿을 사용하여 자체 이름으로 서버에 인증할 수 있습니다. 이 기능은 일반적으로 특정 사용자(예: 웹 사이트)의 에이전트가 아닌 자체 이름으로 작동하는 마이크로 서비스에 편리합니다. 서비스 계정에는 일반 사용자와 같은 역할이 할당될 수 있습니다. 그러나 그룹을 할당할 수는 없습니다. 결과적으로 서비스 계정을 사용하여 마이크로 서비스에 대한 권한을 대상으로 지정하려면 그룹 정책을 사용할 수 없으며 대신 역할 정책을 사용해야 합니다. 반대로 사용자 이름 및 암호를 사용한 인증이 필요한 일반 사용자 계정으로만 특정 권한을 제한하려면 역할 정책이 아닌 그룹 정책을 사용하여 이를 수행할 수 있습니다. 이는 ClusterManager 로 시작하는 권한에 대해 이 예에서 사용되는 것입니다. 일반적으로 CLI 툴을 사용하여 클러스터 관리 수행은 대화식으로 수행됩니다. 결과 액세스 토큰을 사용하여 Kafka 브로커에 인증하기 전에 사용자가 로그인해야 하는 것이 좋습니다. 이 경우 액세스 토큰은 클라이언트 애플리케이션이 아닌 특정 사용자를 나타냅니다.

14.5.4.1. Red Hat Single Sign-On 관리 콘솔에 액세스

Red Hat Single Sign-On을 설정한 다음 관리 콘솔에 연결하고 사전 구성된 영역을 추가합니다. 예제 kafka-authz-realm.json 파일을 사용하여 영역을 가져옵니다. Admin Console에서 영역에 대해 정의된 권한 부여 규칙을 확인할 수 있습니다. 규칙은 예제 Red Hat Single Sign-On 영역을 사용하도록 구성된 Kafka 클러스터의 리소스에 대한 액세스 권한을 부여합니다.

사전 요구 사항

  • 실행 중인 OpenShift 클러스터.
  • AMQ Streams examples/security/keycloak-authorization/kafka-authz-realm.json 파일은 사전 구성된 영역을 포함합니다.

절차

  1. Red Hat Single Sign-On 설명서의 서버 설치 및 구성에 설명된 대로 Red Hat Single Sign-On Operator를 사용하여 Red Hat Single Sign-On 서버를 설치합니다.
  2. Red Hat Single Sign-On 인스턴스가 실행될 때까지 기다립니다.
  3. 관리 콘솔에 액세스할 수 있도록 외부 호스트 이름을 가져옵니다.

    NS=sso
    oc get ingress keycloak -n $NS

    이 예에서는 Red Hat Single Sign-On 서버가 sso 네임스페이스에서 실행되고 있다고 가정합니다.

  4. admin 사용자의 암호를 가져옵니다.

    oc get -n $NS pod keycloak-0 -o yaml | less

    암호는 시크릿으로 저장되므로 Red Hat Single Sign-On 인스턴스의 구성 YAML 파일을 가져와 시크릿 이름을 확인합니다(secretKeyRef.name).

  5. 보안 이름을 사용하여 일반 텍스트 암호를 가져옵니다.

    SECRET_NAME=credential-keycloak
    oc get -n $NS secret $SECRET_NAME -o yaml | grep PASSWORD | awk '{print $2}' | base64 -D

    이 예제에서는 시크릿 이름이 credentials -keycloak 이라고 가정합니다.

  6. 사용자 이름 admin 과 가져온 암호를 사용하여 관리 콘솔에 로그인합니다.

    https://HOSTNAME 을 사용하여 Kubernetes Ingress 에 액세스합니다.

    이제 Admin Console을 사용하여 예제 영역을 Red Hat Single Sign-On에 업로드할 수 있습니다.

  7. Add CloudEvent를 클릭하여 예제 영역을 가져옵니다.
  8. examples/security/keycloak-authorization/kafka-authz-realm.json 파일을 추가한 다음 Create 를 클릭합니다.

    이제 관리 콘솔의 현재 영역으로 kafka-authz 가 있습니다.

    기본 보기는 마스터 영역을 표시합니다.

  9. Red Hat Single Sign-On 관리 콘솔에서 클라이언트 > kafka > Authorization > Settings 로 이동하여 결정 전략이 승인으로 설정되어 있는지 확인합니다.

    검증 정책은 클라이언트가 Kafka 클러스터에 액세스하기 위해 하나 이상의 정책을 충족해야 함을 의미합니다.

  10. Red Hat Single Sign-On 관리 콘솔에서 그룹,사용자,역할, 클라이언트로 이동 하여 영역 구성을 확인합니다.

    그룹
    그룹은 사용자 그룹을 생성하고 사용자 권한을 설정하는 데 사용됩니다. groups은 이름이 할당된 사용자 집합입니다. 사용자를 지역, 조직 또는 부서 단위로 구분하는 데 사용됩니다. 그룹을 LDAP ID 공급자에 연결할 수 있습니다. 예를 들어 Kafka 리소스에 대한 권한을 부여하기 위해 사용자 정의 LDAP 서버 관리자 사용자 인터페이스를 통해 사용자를 그룹 멤버로 만들 수 있습니다.
    사용자
    사용자는 사용자를 생성하는 데 사용됩니다. 이 예제에서는 alicebob 가 정의됩니다. AliceClusterManager 그룹의 멤버이며 bobClusterManager-my-cluster 그룹의 멤버입니다. 사용자는 LDAP ID 공급자에 저장할 수 있습니다.
    역할
    역할은 사용자 또는 클라이언트를 특정 권한을 갖는 것으로 표시합니다. 역할은 그룹과 유사한 개념입니다. 일반적으로 조직의 역할을 가진 사용자를 태그 하고 필수 권한을 갖는 데 사용됩니다. 역할은 LDAP ID 공급자에 저장할 수 없습니다. LDAP가 요구 사항인 경우 대신 그룹을 사용하고 Red Hat Single Sign-On 역할을 그룹에 추가하여 사용자에게 해당 역할을 할당받을 수 있습니다.
    클라이언트

    클라이언트에 는 특정 구성이 있을 수 있습니다. 예에서는 kafka,kafka-cli,team-a-clientteam-b-client 클라이언트가 구성됩니다.

    • kafka 클라이언트는 Kafka 브로커가 액세스 토큰 검증에 필요한 OAuth 2.0 통신을 수행하는 데 사용됩니다. 이 클라이언트에는 Kafka 브로커에서 권한을 수행하는 데 사용되는 권한 부여 서비스 리소스 정의, 정책 및 권한 부여 범위도 포함되어 있습니다. 권한 부여 구성은 Authorization EnabledSettings 탭에서 전환될 때 표시되는 Authorization 탭의 kafka 클라이언트에 정의되어 있습니다.
    • kafka-cli 클라이언트는 액세스 토큰 또는 새로 고침 토큰을 가져오기 위해 사용자 이름 및 암호로 인증할 때 Kafka 명령줄 툴에서 사용하는 공용 클라이언트입니다.
    • team-a-clientteam-b-client 클라이언트는 특정 Kafka 항목에 대한 부분 액세스 권한이 있는 서비스를 나타내는 기밀 클라이언트입니다.
  11. Red Hat Single Sign-On 관리 콘솔에서 권한 부여 > 권한으로 이동하여 영역에 정의된 리소스 및 정책을 사용하는 승인된 권한을 확인합니다.

    예를 들어 kafka 클라이언트에는 다음과 같은 권한이 있습니다.

    Dev Team A can write to topics that start with x_ on any cluster
    Dev Team B can read from topics that start with x_ on any cluster
    Dev Team B can update consumer group offsets that start with x_ on any cluster
    ClusterManager of my-cluster Group has full access to cluster config on my-cluster
    ClusterManager of my-cluster Group has full access to consumer groups on my-cluster
    ClusterManager of my-cluster Group has full access to topics on my-cluster
    Dev Team A
    Dev Team A realm 역할은 모든 클러스터에서 x_ 로 시작하는 항목에 쓸 수 있습니다. 이를 통해 Topic:x_*, DescribeWrite 범위, Dev Team A 정책이 결합되어 있습니다. Dev Team A 정책은 Dev Team A 라는 영역 역할이 있는 모든 사용자와 일치합니다.
    Dev Team B
    Dev Team B 영역 역할은 모든 클러스터에서 x_ 로 시작하는 주제에서 읽을 수 있습니다. 이를 통해 Topic:x_*, Group:x_* 리소스, 설명읽기 범위, Dev Team B 정책이 결합되어 있습니다. Dev Team B 정책은 Dev Team B 라는 영역 역할이 있는 모든 사용자와 일치합니다. 사용자 및 클라이언트 일치에는 주제에서 읽고 x_ 로 시작하는 이름이 있는 주제 및 소비자 그룹에 대해 소비된 오프셋을 업데이트할 수 있습니다.

14.5.4.2. Red Hat Single Sign-On 인증을 사용하여 Kafka 클러스터 배포

Red Hat Single Sign-On 서버에 연결하도록 구성된 Kafka 클러스터를 배포합니다. 예제 kafka-ephemeral-oauth-single-keycloak-authz.yaml 파일을 사용하여 Kafka 사용자 정의 리소스로 Kafka 클러스터를 배포합니다. 이 예제에서는 keycloak 권한 부여 및 oauth 인증을 사용하여 단일 노드 Kafka 클러스터를 배포합니다.

사전 요구 사항

  • Red Hat Single Sign-On 권한 부여 서버는 OpenShift 클러스터에 배포되어 예제 영역을 사용하여 로드됩니다.
  • Cluster Operator가 OpenShift 클러스터에 배포됩니다.
  • AMQ Streams 예제/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml 사용자 정의 리소스

절차

  1. 배포된 Red Hat Single Sign-On 인스턴스의 호스트 이름을 사용하여 Kafka 브로커가 Red Hat Single Sign-On 서버와 통신할 수 있도록 신뢰 저장소 인증서를 준비합니다.

    SSO_HOST=SSO-HOSTNAME
    SSO_HOST_PORT=$SSO_HOST:443
    STOREPASS=storepass
    
    echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pem

    Kubernetes Ingress 를 사용하여 보안(HTTPS)을 연결하면 인증서가 필요합니다.

    일반적으로 단일 인증서는 없지만 인증서 체인이 있습니다. /tmp/sso.pem 파일에 마지막으로 나열된 최상위 발급자 CA만 제공해야 합니다. 수동으로 추출하거나 다음 명령을 사용할 수 있습니다.

    인증서 체인에서 최상위 CA 인증서를 추출하는 명령 예

    split -p "-----BEGIN CERTIFICATE-----" sso.pem sso-
    for f in $(ls sso-*); do mv $f $f.pem; done
    cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt

    참고

    신뢰할 수 있는 CA 인증서는 일반적으로 openssl 명령을 사용하는 것이 아니라 신뢰할 수 있는 소스에서 가져옵니다.

  2. OpenShift에 인증서를 시크릿으로 배포합니다.

    oc create secret generic oauth-server-cert --from-file=/tmp/sso-ca.crt -n $NS
  3. 호스트 이름을 환경 변수로 설정

    SSO_HOST=SSO-HOSTNAME
  4. 예제 Kafka 클러스터를 생성하고 배포합니다.

    cat examples/security/keycloak-authorization/kafka-ephemeral-oauth-single-keycloak-authz.yaml | sed -E 's#\${SSO_HOST}'"#$SSO_HOST#" | oc create -n $NS -f -

14.5.4.3. CLI Kafka 클라이언트 세션에 대한 TLS 연결 준비

대화형 CLI 세션에 대한 새 Pod를 생성합니다. TLS 연결을 위해 Red Hat Single Sign-On 인증서가 있는 신뢰 저장소를 설정합니다. 신뢰 저장소는 Red Hat Single Sign-On 및 Kafka 브로커에 연결하는 것입니다.

사전 요구 사항

  • Red Hat Single Sign-On 권한 부여 서버는 OpenShift 클러스터에 배포되어 예제 영역을 사용하여 로드됩니다.

    Red Hat Single Sign-On 관리 콘솔에서 클라이언트에 할당된 역할이 클라이언트 > 서비스 계정 역할에 표시되는지 확인합니다.

  • Red Hat Single Sign-On과 연결하도록 구성된 Kafka 클러스터는 OpenShift 클러스터에 배포됩니다.

절차

  1. AMQ Streams Kafka 이미지를 사용하여 새 대화형 Pod 컨테이너를 실행하여 실행 중인 Kafka 브로커에 연결합니다.

    NS=sso
    oc run -ti --restart=Never --image=registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1 kafka-cli -n $NS -- /bin/sh
    참고

    이미지 다운로드에서 oc 시간이 초과되면 후속 시도로 인해 AlreadyExists 오류가 발생할 수 있습니다.

  2. Pod 컨테이너에 연결합니다.

    oc attach -ti kafka-cli -n $NS
  3. Red Hat Single Sign-On 인스턴스의 호스트 이름을 사용하여 TLS를 사용한 클라이언트 연결 인증서를 준비합니다.

    SSO_HOST=SSO-HOSTNAME
    SSO_HOST_PORT=$SSO_HOST:443
    STOREPASS=storepass
    
    echo "Q" | openssl s_client -showcerts -connect $SSO_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/sso.pem

    일반적으로 단일 인증서는 없지만 인증서 체인이 있습니다. /tmp/sso.pem 파일에 마지막으로 나열된 최상위 발급자 CA만 제공해야 합니다. 수동으로 추출하거나 다음 명령을 사용할 수 있습니다.

    인증서 체인에서 최상위 CA 인증서를 추출하는 명령 예

    split -p "-----BEGIN CERTIFICATE-----" sso.pem sso-
    for f in $(ls sso-*); do mv $f $f.pem; done
    cp $(ls sso-* | sort -r | head -n 1) sso-ca.crt

    참고

    신뢰할 수 있는 CA 인증서는 일반적으로 openssl 명령을 사용하는 것이 아니라 신뢰할 수 있는 소스에서 가져옵니다.

  4. Kafka 브로커에 대한 TLS 연결에 대한 신뢰 저장소를 생성합니다.

    keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias sso -storepass $STOREPASS -import -file /tmp/sso-ca.crt -noprompt
  5. Kafka 부트스트랩 주소를 Kafka 브로커의 호스트 이름으로 사용하고 tls 리스너 포트(9093)를 사용하여 Kafka 브로커에 대한 인증서를 준비합니다.

    KAFKA_HOST_PORT=my-cluster-kafka-bootstrap:9093
    STOREPASS=storepass
    
    echo "Q" | openssl s_client -showcerts -connect $KAFKA_HOST_PORT 2>/dev/null | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ { print $0 } ' > /tmp/my-cluster-kafka.pem

    가져온 .pem 파일은 일반적으로 단일 인증서가 아니라 인증서 체인입니다. /tmp/my-cluster-kafka.pem 파일에 마지막으로 나열된 최상위 발급자 CA만 제공해야 합니다. 수동으로 추출하거나 다음 명령을 사용할 수 있습니다.

    인증서 체인에서 최상위 CA 인증서를 추출하는 명령 예

    split -p "-----BEGIN CERTIFICATE-----" /tmp/my-cluster-kafka.pem kafka-
    for f in $(ls kafka-*); do mv $f $f.pem; done
    cp $(ls kafka-* | sort -r | head -n 1) my-cluster-kafka-ca.crt

    참고

    신뢰할 수 있는 CA 인증서는 일반적으로 openssl 명령을 사용하는 것이 아니라 신뢰할 수 있는 소스에서 가져옵니다. 이 예제에서는 클라이언트가 Kafka 클러스터가 배포된 동일한 네임스페이스의 Pod에서 실행되고 있다고 가정합니다. 클라이언트가 OpenShift 클러스터 외부에서 Kafka 클러스터에 액세스하는 경우 먼저 부트스트랩 주소를 확인해야 합니다. 이 경우 OpenShift 시크릿에서 직접 클러스터 인증서를 가져올 수도 있으며 openssl 이 필요하지 않습니다. 자세한 내용은 13장. Kafka 클러스터에 대한 클라이언트 액세스 설정의 내용을 참조하십시오.

  6. Kafka 브로커의 인증서를 신뢰 저장소에 추가합니다.

    keytool -keystore /tmp/truststore.p12 -storetype pkcs12 -alias my-cluster-kafka -storepass $STOREPASS -import -file /tmp/my-cluster-kafka-ca.crt -noprompt

    권한이 부여된 액세스 권한을 확인하기 위해 세션을 열린 상태로 유지합니다.

대화형 CLI 세션을 사용하여 Red Hat Single Sign-On 영역을 통해 적용된 권한 부여 규칙을 확인합니다. Kafka의 예제 생산자 및 소비자 클라이언트를 사용하여 점검을 적용하여 액세스 수준이 다른 사용자 및 서비스 계정에 주제를 생성합니다.

team-a-clientteam-b-client 클라이언트를 사용하여 권한 부여 규칙을 확인합니다. alice admin 사용자를 사용하여 Kafka에서 추가 관리 작업을 수행합니다.

이 예제에서 사용된 AMQ Streams Kafka 이미지에는 Kafka 생산자 및 소비자 바이너리가 포함되어 있습니다.

사전 요구 사항

클라이언트 및 관리자 구성 설정

  1. team-a-client 클라이언트에 대한 인증 속성을 사용하여 Kafka 구성 파일을 준비합니다.

    SSO_HOST=SSO-HOSTNAME
    
    cat > /tmp/team-a-client.properties << EOF
    security.protocol=SASL_SSL
    ssl.truststore.location=/tmp/truststore.p12
    ssl.truststore.password=$STOREPASS
    ssl.truststore.type=PKCS12
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.client.id="team-a-client" \
      oauth.client.secret="team-a-client-secret" \
      oauth.ssl.truststore.location="/tmp/truststore.p12" \
      oauth.ssl.truststore.password="$STOREPASS" \
      oauth.ssl.truststore.type="PKCS12" \
      oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ;
    sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    EOF

    SASL OAUTHBLOGER 메커니즘이 사용됩니다. 이 메커니즘을 사용하려면 클라이언트 ID 및 클라이언트 보안이 필요합니다. 즉, 클라이언트가 먼저 Red Hat Single Sign-On 서버에 연결하여 액세스 토큰을 가져옵니다. 그러면 클라이언트는 Kafka 브로커에 연결하고 액세스 토큰을 사용하여 인증합니다.

  2. team-b-client 클라이언트에 대한 인증 속성을 사용하여 Kafka 구성 파일을 준비합니다.

    cat > /tmp/team-b-client.properties << EOF
    security.protocol=SASL_SSL
    ssl.truststore.location=/tmp/truststore.p12
    ssl.truststore.password=$STOREPASS
    ssl.truststore.type=PKCS12
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.client.id="team-b-client" \
      oauth.client.secret="team-b-client-secret" \
      oauth.ssl.truststore.location="/tmp/truststore.p12" \
      oauth.ssl.truststore.password="$STOREPASS" \
      oauth.ssl.truststore.type="PKCS12" \
      oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ;
    sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    EOF
  3. curl 을 사용하여 admin 사용자 alice 를 인증하고 암호 부여 인증을 수행하여 새로 고침 토큰을 가져옵니다.

    USERNAME=alice
    PASSWORD=alice-password
    
    GRANT_RESPONSE=$(curl -X POST "https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" -H 'Content-Type: application/x-www-form-urlencoded' -d "grant_type=password&username=$USERNAME&password=$PASSWORD&client_id=kafka-cli&scope=offline_access" -s -k)
    
    REFRESH_TOKEN=$(echo $GRANT_RESPONSE | awk -F "refresh_token\":\"" '{printf $2}' | awk -F "\"" '{printf $1}')

    새로 고침 토큰은 수명이 길며 만료되지 않는 오프라인 토큰입니다.

  4. admin 사용자의 인증 속성을 사용하여 Kafka 구성 파일을 준비합니다.

    cat > /tmp/alice.properties << EOF
    security.protocol=SASL_SSL
    ssl.truststore.location=/tmp/truststore.p12
    ssl.truststore.password=$STOREPASS
    ssl.truststore.type=PKCS12
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
      oauth.refresh.token="$REFRESH_TOKEN" \
      oauth.client.id="kafka-cli" \
      oauth.ssl.truststore.location="/tmp/truststore.p12" \
      oauth.ssl.truststore.password="$STOREPASS" \
      oauth.ssl.truststore.type="PKCS12" \
      oauth.token.endpoint.uri="https://$SSO_HOST/auth/realms/kafka-authz/protocol/openid-connect/token" ;
    sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
    EOF

    kafka-cli 공용 클라이언트는 sasl.jaas.configoauth.client.id 에 사용됩니다. 이는 공용 클라이언트이므로 보안이 필요하지 않습니다. 클라이언트는 이전 단계에서 인증된 새로 고침 토큰으로 인증됩니다. 새로 고침 토큰은 인증을 위해 Kafka 브로커로 전송되는 뒤에서 액세스 토큰을 요청합니다.

권한 있는 액세스 권한으로 메시지 생성

team-a-client 구성을 사용하여 a_ 또는 x_ 로 시작하는 항목에 메시지를 생성할 수 있는지 확인합니다.

  1. my-topic 에 주제를 작성합니다.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic my-topic \
      --producer.config=/tmp/team-a-client.properties
    First message

    이 요청은 주제에 액세스할 수 있는 Not authorized to access topics: [my-topic] 오류를 반환합니다.

    team-a-client 에는 a_ 로 시작하는 항목에 대해 지원되는 모든 작업을 수행할 수 있는 권한을 부여하는 Dev Team A 역할이 있지만 x_ 로 시작하는 항목에만 쓸 수 있습니다. my-topic 이라는 주제는 해당 규칙과 일치하지 않습니다.

  2. a_ECDHE에 주제를 작성합니다.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --producer.config /tmp/team-a-client.properties
    First message
    Second message

    Kafka에 메시지가 성공적으로 생성됩니다.

  3. Ctrl+C를 눌러 CLI 애플리케이션을 종료합니다.
  4. 요청에 대한 Authorization GRANTED 의 디버그 로그가 Kafka 컨테이너 로그를 확인합니다.

    oc logs my-cluster-kafka-0 -f -n $NS

권한 있는 액세스로 메시지 사용

team-a-client 구성을 사용하여 a_ECDHE 주제의 메시지를 사용합니다.

  1. 주제 a_ECDHE에서 메시지를 가져옵니다.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties

    team-a-clientDev Team A 역할은 a_ 로 시작하는 이름이 있는 소비자 그룹에만 액세스할 수 있기 때문에 요청이 오류를 반환합니다.

  2. team-a-client 속성을 업데이트하여 사용할 수 있는 사용자 지정 소비자 그룹을 지정합니다.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_1

    소비자는 a_ECDHE 주제에서 모든 메시지를 수신합니다.

권한 있는 액세스로 Kafka 삭제

team-a-client 는 클러스터 수준 액세스 권한이 없는 계정이지만 일부 관리 작업과 함께 사용할 수 있습니다.

  1. 주제 나열.

    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list

    a_ECDHE 주제가 반환됩니다.

  2. 소비자 그룹을 나열합니다.

    bin/kafka-consumer-groups.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list

    a_consumer_group_1 소비자 그룹이 반환됩니다.

    클러스터 구성에 대한 세부 정보를 가져옵니다.

    bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties \
      --entity-type brokers --describe --entity-default

    작업에 team-a-client 가 없는 클러스터 수준 권한이 필요하므로 요청이 오류를 반환합니다.

다른 권한이 있는 클라이언트 사용

team-b-client 구성을 사용하여 b_ 로 시작하는 항목에 대한 메시지를 생성합니다.

  1. a_ECDHE에 주제를 작성합니다.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic a_messages \
      --producer.config /tmp/team-b-client.properties
    Message 1

    이 요청은 항목에 액세스할 수 있는 Not authorized to access topics: [a_ECDHE] 오류를 반환합니다.

  2. 기록 b_ECDHE.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic b_messages \
      --producer.config /tmp/team-b-client.properties
    Message 1
    Message 2
    Message 3

    Kafka에 메시지가 성공적으로 생성됩니다.

  3. x_ECDHE 주제를 작성할 수 있습니다.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-b-client.properties
    Message 1

    항목에 액세스할 수 있는 권한이 없음: [x_ECDHE] 오류가 반환되고 team-b-client 는 주제 x_ECDHE 에서만 읽을 수 있습니다.

  4. team-a-client 를 사용하여 x_ ECDHE 주제를 작성합니다.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-a-client.properties
    Message 1

    이 요청은 항목에 액세스할 수 있는 Not authorized to access topics: [x_ECDHE] 오류를 반환합니다. team-a-clientx_ ECDHE 항목에 쓸 수 있지만 아직 없는 경우 주제를 만들 수 있는 권한이 없습니다. team-a-clientx_ ECDHE 항목에 쓰기 전에 관리자의 고급 사용자가 파티션 및 복제본 수와 같은 올바른 구성으로 생성해야 합니다.

권한이 있는 admin 사용자로 Kafka 관리

admin 사용자 alice 를 사용하여 Kafka를 관리합니다. Alice 는 Kafka 클러스터의 모든 항목을 관리할 수 있는 전체 액세스 권한을 갖습니다.

  1. x_ECDHE 주제를 alice 로 만듭니다.

    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \
      --topic x_messages --create --replication-factor 1 --partitions 1

    주제가 성공적으로 생성되었습니다.

  2. 모든 주제를 alice 로 나열합니다.

    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties --list
    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-a-client.properties --list
    bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/team-b-client.properties --list

    admin 사용자 alice 는 모든 주제를 나열할 수 있지만 team-a-clientteam-b-client 는 액세스할 수 있는 주제만 나열할 수 있습니다.

    Dev Team ADev Team B 역할에는 모두 x_ 로 시작하는 항목에 대한 자세한 권한이 있지만 해당 역할에 대한 자세한 권한이 없기 때문에 다른 팀의 주제를 볼 수 없습니다.

  3. team-a-client 를 사용하여 x_ECDHE 항목에 대한 메시지를 생성합니다.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-a-client.properties
    Message 1
    Message 2
    Message 3

    alicex_ECDHE 주제를 만들었으므로 Kafka에 메시지가 성공적으로 생성됩니다.

  4. team-b-client 를 사용하여 x_ECDHE 항목에 대한 메시지를 생성합니다.

    bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --producer.config /tmp/team-b-client.properties
    Message 4
    Message 5

    이 요청은 항목에 액세스할 수 있는 Not authorized to access topics: [x_ECDHE] 오류를 반환합니다.

  5. team-b-client 를 사용하여 x_ECDHE 주제의 메시지를 사용합니다.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/team-b-client.properties --group x_consumer_group_b

    소비자는 x_ECDHE 주제에서 모든 메시지를 수신합니다.

  6. team-a-client 를 사용하여 x_ECDHE 주제의 메시지를 사용합니다.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties --group x_consumer_group_a

    이 요청은 항목에 액세스할 수 있는 Not authorized to access topics: [x_ECDHE] 오류를 반환합니다.

  7. team-a-client 를 사용하여 a_ 로 시작하는 소비자 그룹의 메시지를 사용합니다.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/team-a-client.properties --group a_consumer_group_a

    이 요청은 항목에 액세스할 수 있는 Not authorized to access topics: [x_ECDHE] 오류를 반환합니다.

    Dev Team A 에는 x_ 로 시작하는 항목에 대한 읽기 액세스 권한이 없습니다.

  8. alice 를 사용하여 x_ECDHE 항목에 대한 메시지를 생성합니다.

    bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --topic x_messages \
      --from-beginning --consumer.config /tmp/alice.properties

    Kafka에 메시지가 성공적으로 생성됩니다.

    Alice 는 모든 주제에서 읽거나 쓸 수 있습니다.

  9. alice 를 사용하여 클러스터 구성을 읽습니다.

    bin/kafka-configs.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 --command-config /tmp/alice.properties \
      --entity-type brokers --describe --entity-default

    이 예제의 클러스터 구성은 비어 있습니다.

Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat
맨 위로 이동