19.2. 파티션을 다시 할당하기 위한 JSON 파일 재 할당 생성
kafka-reassign-partitions.sh 도구를 사용하여 JSON 파일을 다시 할당하여 Kafka 클러스터를 확장한 후 파티션을 다시 할당합니다. 브로커를 추가하거나 제거해도 기존 파티션을 자동으로 재배포하지 않습니다. 파티션 배포의 균형을 조정하고 새 브로커를 최대한 활용하기 위해 kafka-reassign-partitions.sh 도구를 사용하여 파티션을 다시 할당할 수 있습니다.
Kafka 클러스터에 연결된 대화형 Pod 컨테이너에서 툴을 실행합니다.
다음 절차에서는 mTLS를 사용하는 보안 재 할당 프로세스를 설명합니다. TLS 암호화 및 mTLS 인증을 사용하는 Kafka 클러스터가 필요합니다.
연결을 설정하려면 다음이 필요합니다.
- Kafka 클러스터가 생성될 때 Cluster Operator가 생성한 클러스터 CA 인증서 및 암호
- Kafka 클러스터에 대한 클라이언트 액세스를 위해 사용자가 생성할 때 User Operator가 생성한 사용자 CA 인증서 및 암호
이 절차에서는 CA 인증서 및 해당 암호가 PKCS #12(.p12 및 .password) 형식으로 포함된 사용자 시크릿과 클러스터에서 추출됩니다. 암호를 사용하면 인증서가 포함된 .p12 저장소에 액세스할 수 있습니다. .p12 저장소를 사용하여 Kafka 클러스터에 대한 연결을 인증하는 신뢰 저장소 및 키 저장소를 지정합니다.
사전 요구 사항
- 실행 중인 Cluster Operator가 있어야 합니다.
내부 TLS 암호화 및 mTLS 인증으로 구성된
Kafka리소스를 기반으로 실행 중인 Kafka 클러스터가 있습니다.TLS 암호화 및 mTLS 인증을 통한 Kafka 구성
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: # ... listeners: # ... - name: tls port: 9093 type: internal tls: true1 authentication: type: tls2 # ...실행 중인 Kafka 클러스터에는 재배치할 주제와 파티션 세트가 포함되어 있습니다.
my-topic에 대한 주제 구성 예apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 10 replicas: 3 config: retention.ms: 7200000 segment.bytes: 1073741824 # ...Kafka 브로커에서 주제를 생성하고 사용할 수 있는 권한을 지정하는 ACL 규칙으로
KafkaUser가 구성되어 있습니다.my-topic및my-cluster에 대한 작업을 허용하는 ACL 규칙이 있는 Kafka 사용자 구성의 예apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaUser metadata: name: my-user labels: strimzi.io/cluster: my-cluster spec: authentication:1 type: tls authorization: type: simple2 acls: # access to the topic - resource: type: topic name: my-topic operations: - Create - Describe - Read - AlterConfigs host: "*" # access to the cluster - resource: type: cluster operations: - Alter - AlterConfigs host: "*" # ... # ...
절차
Kafka 클러스터의 <cluster
_name> -cluster-ca-cert 시크릿에서 클러스터CA 인증서 및 암호를 추출합니다.oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.p12}' | base64 -d > ca.p12oc get secret <cluster_name>-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 -d > ca.password& lt;cluster_name >을 Kafka 클러스터 이름으로 바꿉니다. Kafka 리소스를 사용하여 Kafka를 배포하면
Kafka클러스터 이름( <cluster_name> -cluster-ca-cert )을 사용하여 클러스터CA 인증서가 있는 보안이 생성됩니다. 예:my-cluster-cluster-ca-cert.AMQ Streams Kafka 이미지를 사용하여 새 대화형 Pod 컨테이너를 실행하여 실행 중인 Kafka 브로커에 연결합니다.
oc run --restart=Never --image=registry.redhat.io/amq-streams/kafka-35-rhel8:2.5.1 <interactive_pod_name> -- /bin/sh -c "sleep 3600"& lt;interactive_pod_name& gt;을 Pod 이름으로 바꿉니다.
클러스터 CA 인증서를 대화형 Pod 컨테이너에 복사합니다.
oc cp ca.p12 <interactive_pod_name>:/tmpKafka 브로커에 액세스할 수 있는 권한이 있는 Kafka 사용자의 시크릿에서 사용자 CA 인증서 및 암호를 추출합니다.
oc get secret <kafka_user> -o jsonpath='{.data.user\.p12}' | base64 -d > user.p12oc get secret <kafka_user> -o jsonpath='{.data.user\.password}' | base64 -d > user.password& lt;kafka_user >를 Kafka 사용자의 이름으로 바꿉니다.
KafkaUser리소스를 사용하여 Kafka 사용자를 생성하면 Kafka 사용자 이름으로 사용자 CA 인증서가 있는 시크릿이 생성됩니다. 예를 들면my-user입니다.사용자 CA 인증서를 대화형 Pod 컨테이너에 복사합니다.
oc cp user.p12 <interactive_pod_name>:/tmpCA 인증서를 사용하면 대화형 Pod 컨테이너가 TLS를 사용하여 Kafka 브로커에 연결할 수 있습니다.
config.properties파일을 생성하여 Kafka 클러스터에 대한 연결을 인증하는 데 사용되는 신뢰 저장소 및 키 저장소를 지정합니다.이전 단계에서 추출한 인증서와 암호를 사용합니다.
bootstrap.servers=<kafka_cluster_name>-kafka-bootstrap:90931 security.protocol=SSL2 ssl.truststore.location=/tmp/ca.p123 ssl.truststore.password=<truststore_password>4 ssl.keystore.location=/tmp/user.p125 ssl.keystore.password=<keystore_password>6 config.properties파일을 대화형 Pod 컨테이너에 복사합니다.oc cp config.properties <interactive_pod_name>:/tmp/config.properties이동할 주제를 지정하는
topics.json이라는 JSON 파일을 준비합니다.주제 이름을 쉼표로 구분된 목록으로 지정합니다.
my-topic의 모든 파티션을 다시 할당할 JSON 파일의 예{ "version": 1, "topics": [ { "topic": "my-topic"} ] }이 파일을 사용하여 주제의 복제 요소를 변경할 수도 있습니다.
topics.json파일을 대화형 Pod 컨테이너에 복사합니다.oc cp topics.json <interactive_pod_name>:/tmp/topics.json대화형 Pod 컨테이너에서 쉘 프로세스를 시작합니다.
oc exec -n <namespace> -ti <interactive_pod_name> /bin/bash& lt;namespace& gt;를 Pod가 실행 중인 OpenShift 네임스페이스로 바꿉니다.
kafka-reassign-partitions.sh명령을 사용하여 reassignment JSON을 생성합니다.my-topic의 파티션을 지정된 브로커로 이동하는 명령의 예bin/kafka-reassign-partitions.sh --bootstrap-server my-cluster-kafka-bootstrap:9093 \ --command-config /tmp/config.properties \ --topics-to-move-json-file /tmp/topics.json \ --broker-list 0,1,2,3,4 \ --generate