6장. Kafka 관리
추가 구성 속성을 사용하여 AMQ Streams 배포를 유지 관리합니다. AMQ Streams의 성능에 응답하기 위해 설정을 추가하고 조정할 수 있습니다. 예를 들어 처리량 및 데이터 안정성을 개선하기 위해 추가 구성을 도입할 수 있습니다.
6.1. Kafka 구성 튜닝 링크 복사링크가 클립보드에 복사되었습니다!
구성 속성을 사용하여 Kafka 브로커, 생산자 및 소비자의 성능을 최적화합니다.
최소 구성 속성 세트가 필요하지만 속성을 추가하거나 조정하여 생산자 및 소비자가 Kafka 브로커와 상호 작용하는 방식을 변경할 수 있습니다. 예를 들어 클라이언트가 데이터에 실시간으로 응답할 수 있도록 메시지의 대기 시간과 처리량을 조정할 수 있습니다.
메트릭을 분석하여 초기 구성을 만들 위치를 측정한 다음 필요한 구성이 있을 때까지 증분 변경 및 메트릭을 추가로 비교할 수 있습니다.
Apache Kafka 구성 속성에 대한 자세한 내용은 Apache Kafka 설명서 를 참조하십시오.
6.1.1. Kafka 브로커 구성 튜닝 링크 복사링크가 클립보드에 복사되었습니다!
구성 속성을 사용하여 Kafka 브로커의 성능을 최적화합니다. AMQ Streams에서 직접 관리하는 속성을 제외하고 표준 Kafka 브로커 구성 옵션을 사용할 수 있습니다.
6.1.1.1. 기본 브로커 구성 링크 복사링크가 클립보드에 복사되었습니다!
기본 구성에는 브로커를 식별하고 보안 액세스를 제공하는 다음 속성이 포함됩니다.
-
broker.id는 Kafka 브로커의 ID입니다. -
log.dirs는 로그 데이터의 디렉터리입니다. -
Zookeeper.connect는 Kafka를 Zoo Cryostat와 연결할 수 있는 구성입니다. -
리스너는 Kafka 클러스터를 클라이언트에 노출 -
권한 부여메커니즘은 사용자가 실행하는 작업을 허용하거나 거부합니다. -
인증메커니즘은 Kafka에 액세스해야 하는 사용자의 ID를 증명합니다.
기본 구성 옵션에 대한 자세한 내용은 Kafka 구성에서 확인할 수 있습니다.
일반적인 브로커 구성에는 주제, 스레드 및 로그와 관련된 속성 설정도 포함됩니다.
기본 브로커 구성 속성
6.1.1.2. 고가용성을 위한 주제 복제 링크 복사링크가 클립보드에 복사되었습니다!
기본 주제 속성은 주제가 자동으로 생성되는 경우를 포함하여 이러한 속성을 명시적으로 설정하지 않고 생성된 항목에 적용되는 파티션 수와 복제 요소를 설정합니다.
auto.create.topics.enable 속성은 아직 존재하지 않는 항목이 생산자와 소비자에 의해 필요할 때 자동으로 생성되도록 기본적으로 활성화됩니다. 자동 주제 생성을 사용하는 경우 num.partitions 를 사용하여 주제의 기본 파티션 수를 설정할 수 있습니다. 그러나 일반적으로 이 속성은 명시적 주제 생성을 통해 주제를 통해 더 많은 제어가 제공되도록 비활성화되어 있습니다.
고가용성 환경의 경우 복제 요소를 주제의 경우 3 이상으로 늘리고 복제 요인보다 1 미만으로 필요한 최소 in-sync 복제본 수를 설정하는 것이 좋습니다.
데이터 지속성 의 경우 주제 구성에 min.insync.replicas 를 설정하고 생산자 구성에서 acks=all 을 사용하여 메시지 전달을 설정해야 합니다.
리더 파티션을 복제하는 각 후속자가 가져온 메시지의 최대 크기(바이트)를 설정하려면 replica.fetch.max.bytes 를 사용합니다. 평균 메시지 크기 및 처리량에 따라 이 값을 변경합니다. 읽기/쓰기 버퍼링에 필요한 총 메모리 할당을 고려할 때 사용 가능한 메모리는 모든 구현자를 곱할 때 최대 복제 메시지 크기를 수용할 수 있어야 합니다. 모든 메시지를 복제할 수 있도록 크기도 message.max.bytes 보다 커야 합니다.
항목을 삭제할 수 있도록 delete.topic.enable 속성은 기본적으로 활성화됩니다. 프로덕션 환경에서는 실수로 주제 삭제를 방지하기 위해 이 속성을 비활성화하여 데이터가 손실됩니다. 그러나 일시적으로 활성화한 후 주제를 삭제한 다음 다시 비활성화할 수 있습니다.
# ... auto.create.topics.enable=false delete.topic.enable=true # ...
# ...
auto.create.topics.enable=false
delete.topic.enable=true
# ...
6.1.1.3. 트랜잭션 및 커밋에 대한 내부 주제 설정 링크 복사링크가 클립보드에 복사되었습니다!
트랜잭션을 사용하여 생산자의 파티션에 대한 atomic 쓰기를 활성화하는 경우 트랜잭션 상태는 내부 __ Cryostat_state 항목에 저장됩니다. 기본적으로 브로커는 이 항목의 복제 요소 3과 최소 2개의 동기화 복제본으로 구성됩니다. 즉, Kafka 클러스터에 최소 3개의 브로커가 필요합니다.
# ... transaction.state.log.replication.factor=3 transaction.state.log.min.isr=2 # ...
# ...
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# ...
마찬가지로 소비자 상태를 저장하는 내부 __consumer_offsets 주제에는 파티션 수 및 복제 요인에 대한 기본 설정이 있습니다.
# ... offsets.topic.num.partitions=50 offsets.topic.replication.factor=3 # ...
# ...
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
# ...
프로덕션 환경에서 이러한 설정을 줄이지 마십시오. 프로덕션 환경에서 설정을 늘릴 수 있습니다. 예외적으로 단일broker 테스트 환경에서 설정을 줄일 수 있습니다.
6.1.1.4. I/O 스레드를 늘림으로써 요청 처리 처리량 개선 링크 복사링크가 클립보드에 복사되었습니다!
네트워크 스레드는 클라이언트 애플리케이션에서 요청을 생성하고 가져오는 등 Kafka 클러스터에 대한 요청을 처리합니다. 요청 생성은 요청 큐에 배치됩니다. 응답은 응답 큐에 배치됩니다.
네트워크 스레드 수는 복제 요소 및 Kafka 클러스터와 상호 작용하는 클라이언트 생산자 및 소비자의 활동 수준을 반영해야 합니다. 요청이 많은 경우 스레드 수를 늘릴 수 있습니다. 시간 스레드의 양을 사용하여 스레드를 추가할 시기를 결정합니다.
혼잡을 줄이고 요청 트래픽을 규제하기 위해 네트워크 스레드가 차단되기 전에 요청 큐에서 허용되는 요청 수를 제한할 수 있습니다.
I/O 스레드는 요청 대기열에서 요청을 가져와 처리합니다. 스레드를 추가하면 처리량이 향상될 수 있지만 CPU 코어 및 디스크 대역폭의 수는 실용적인 상한을 부과합니다. 최소한 I/O 스레드 수는 스토리지 볼륨 수와 같아야 합니다.
모든 브로커의 스레드 풀에 대한 구성 업데이트는 클러스터 수준에서 동적으로 발생할 수 있습니다. 이러한 업데이트는 현재 크기의 절반과 현재 크기의 두 배 사이로 제한됩니다.
Kafka 브로커 메트릭은 필요한 스레드 수를 처리하는 데 도움이 될 수 있습니다. 예를 들어, 네트워크 스레드가 유휴 상태인 평균 시간(kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent)은 사용된 리소스의 백분율을 나타냅니다. 유휴 시간이 0%인 경우 모든 리소스가 사용 중이므로 더 많은 스레드를 추가하는 것이 유용할 수 있습니다.
디스크 수로 인해 스레드가 느리거나 제한되는 경우 처리량을 개선하기 위해 네트워크 요청의 버퍼 크기를 늘릴 수 있습니다.
# ... replica.socket.receive.buffer.bytes=65536 # ...
# ...
replica.socket.receive.buffer.bytes=65536
# ...
또한 수신할 수 있는 최대 바이트 수를 늘립니다.
# ... socket.request.max.bytes=104857600 # ...
# ...
socket.request.max.bytes=104857600
# ...
6.1.1.5. 대기 시간이 긴 연결에 대한 대역폭 증가 링크 복사링크가 클립보드에 복사되었습니다!
Kafka는 데이터 센터 연결과 같이 Kafka에서 클라이언트로의 대기 시간이 긴 연결을 통해 합리적인 처리량을 달성하기 위해 데이터를 배치합니다. 그러나 대기 시간이 길면 메시지를 보내고 받기 위한 버퍼 크기를 늘릴 수 있습니다.
# ... socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 # ...
# ...
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
# ...
대역폭 지연 제품 계산을 사용하여 버퍼의 최적 크기를 추정할 수 있습니다. 이 계산은 라운드트립 지연 (초 단위)과 링크의 최대 대역폭을 곱하여 최대 처리량을 유지하기 위해 버퍼의 크기를 추정할 수 있습니다.
6.1.1.6. 데이터 보존 정책을 사용하여 로그 관리 링크 복사링크가 클립보드에 복사되었습니다!
Kafka는 로그를 사용하여 메시지 데이터를 저장합니다. 로그는 다양한 인덱스와 연결된 일련의 세그먼트입니다. 새 메시지는 활성 세그먼트에 기록되며 이후에 수정되지 않습니다. 소비자의 가져오기 요청을 제공할 때 세그먼트를 읽습니다. 활성 세그먼트는 주기적으로 롤백 되어 읽기 전용이 되고 이를 대체하기 위해 새 활성 세그먼트가 생성됩니다. 한 번에 하나의 세그먼트만 활성화됩니다. 이전 세그먼트는 삭제할 수 있을 때까지 유지됩니다.
브로커 수준의 구성은 로그 세그먼트의 최대 크기(바이트)와 활성 세그먼트가 롤아웃되기 전의 시간(밀리초)을 설정합니다.
# ... log.segment.bytes=1073741824 log.roll.ms=604800000 # ...
# ...
log.segment.bytes=1073741824
log.roll.ms=604800000
# ...
segment.bytes 및 segment.ms 를 사용하여 주제 수준에서 이러한 설정을 재정의할 수 있습니다. 이러한 값을 낮추거나 늘릴 필요가 있는지 여부는 세그먼트 삭제 정책에 따라 다릅니다. 크기가 클수록 활성 세그먼트에 더 많은 메시지가 포함되어 있으며 덜 자주 롤백됩니다. 또한 세그먼트는 덜 자주 삭제할 수 있습니다.
로그를 관리할 수 있도록 시간 기반 또는 크기 기반 로그 보존 및 정리 정책을 설정할 수 있습니다. 요구 사항에 따라 로그 보존 구성을 사용하여 이전 세그먼트를 삭제할 수 있습니다. 로그 보존 정책이 사용되는 경우 보존 제한에 도달하면 비활성 로그 세그먼트가 제거됩니다. 이전 세그먼트를 삭제하면 로그에 필요한 스토리지 공간이 바인딩되어 디스크 용량을 초과하지 않습니다.
시간 기반 로그 보존의 경우 시간, 분 및 밀리초를 기준으로 보존 기간을 설정합니다. 보존 기간은 메시지가 세그먼트에 추가된 시간을 기반으로 합니다.
밀리초 단위 구성의 우선 순위는 분보다 우선하며, 이는 몇 시간보다 우선 순위가 높습니다. 분 및 밀리초 구성은 기본적으로 null이지만 세 가지 옵션은 유지하려는 데이터에 대한 상당한 수준의 제어를 제공합니다. 동적으로 업데이트할 수 있는 세 가지 속성 중 하나만이므로 기본 설정을 밀리초 구성에 제공해야 합니다.
# ... log.retention.ms=1680000 # ...
# ...
log.retention.ms=1680000
# ...
log.retention.ms 가 -1로 설정된 경우 로그 보존에 시간 제한이 적용되지 않으므로 모든 로그가 유지됩니다. 디스크 사용량은 항상 모니터링해야 하지만 -1 설정은 일반적으로 전체 디스크에 문제가 발생하여 수정하기 어려울 수 있으므로 권장되지 않습니다.
크기 기반 로그 보존의 경우 최대 로그 크기(로그의 모든 세그먼트)를 바이트 단위로 설정합니다.
# ... log.retention.bytes=1073741824 # ...
# ...
log.retention.bytes=1073741824
# ...
즉, 로그에는 steady 상태가 되면 일반적으로 약 log.retention.bytes/log.segment.bytes 세그먼트가 있습니다. 최대 로그 크기에 도달하면 이전 세그먼트가 제거됩니다.
최대 로그 크기를 사용할 때 발생할 수 있는 문제는 시간 메시지가 세그먼트에 추가되었음을 고려하지 않는다는 것입니다. 정리 정책에 시간 기반 및 크기 기반 로그 보존을 사용하여 필요한 균형을 얻을 수 있습니다. 먼저 도달한 임계값 중 정리를 트리거하는 것은 무엇입니까.
세그먼트 파일이 시스템에서 삭제되기 전에 시간 지연을 추가하려면 브로커 수준 또는 file.delete.delay.ms 의 모든 항목에 log.segment.delete.delay.ms 를 사용하여 해당 주제의 특정 항목에 대해 지연을 추가할 수 있습니다.
# ... log.segment.delete.delay.ms=60000 # ...
# ...
log.segment.delete.delay.ms=60000
# ...
6.1.1.7. 정리 정책을 사용하여 로그 데이터 제거 링크 복사링크가 클립보드에 복사되었습니다!
이전 로그 데이터를 제거하는 방법은 로그 정리 구성에 따라 결정됩니다.
기본적으로 브로커에 대해 로그 정리가 활성화됩니다.
# ... log.cleaner.enable=true # ...
# ...
log.cleaner.enable=true
# ...
주제 또는 브로커 수준에서 정리 정책을 설정할 수 있습니다. 브로커 수준 구성은 정책이 설정되지 않은 항목의 기본값입니다.
로그, 컴팩트 로그를 삭제하거나 둘 다 수행하는 정책을 설정할 수 있습니다.
# ... log.cleanup.policy=compact,delete # ...
# ...
log.cleanup.policy=compact,delete
# ...
삭제 정책은 데이터 보존 정책을 사용하여 로그를 관리하는 데 해당합니다. 데이터가 영구적으로 유지될 필요가 없을 때 적합합니다. 컴팩트한 정책은 각 메시지 키에 대한 최신 메시지를 유지할 수 있도록 보장합니다. 로그 압축은 메시지 값을 변경할 수 있고 최신 업데이트를 유지해야 하는 경우에 적합합니다.
정리 정책이 로그를 삭제하도록 설정된 경우 로그 보존 제한을 기반으로 이전 세그먼트가 삭제됩니다. 그렇지 않으면 로그 정리기가 활성화되지 않고 로그 보존 제한이 없는 경우 로그가 계속 증가합니다.
정리 정책이 로그 압축용으로 설정된 경우 로그 헤드 는 새 메시지에 대한 쓰기 순서에 따라 표준 Kafka 로그로 작동합니다. 로그 정리기가 작동하는 컴팩트한 로그의 tail 에서는 로그에서 동일한 키가 있는 다른 레코드가 나중에 발생하면 레코드가 삭제됩니다. null 값이 있는 메시지도 삭제됩니다. 키를 사용하지 않는 경우 관련 메시지를 식별하는 데 키가 필요하므로 압축 기능을 사용할 수 없습니다. Kafka는 각 키의 최신 메시지가 유지되도록 보장하지만 전체 압축된 로그에는 중복이 포함되지 않도록 보장할 수는 없습니다.
그림 6.1. 압축하기 전에 오프셋 위치가 있는 키 값 쓰기를 표시하는 로그
Kafka 압축은 메시지를 식별하기 위해 키를 사용하여 특정 메시지 키에 대해 최신 메시지(가장 높은 오프셋을 사용하여)를 유지하여 결국 동일한 키가 있는 이전 메시지를 삭제합니다. 즉, latest 상태의 메시지는 항상 사용할 수 있으며 로그 정리기가 실행될 때 특정 메시지의 최신 레코드가 결국 제거됩니다. 메시지를 다시 이전 상태로 복원할 수 있습니다.
레코드는 주변 레코드가 삭제되는 경우에도 원래 오프셋을 유지합니다. 결과적으로 tail은 연속적이지 않은 오프셋을 가질 수 있습니다. tail에서 더 이상 사용할 수 없는 오프셋을 사용하면 다음 상위 오프셋이 있는 레코드가 검색됩니다.
그림 6.2. 압축 후 로그
컴팩트한 정책만 선택하면 로그가 임의로 커질 수 있습니다. 이 경우 로그를 압축 및 삭제하도록 정책을 설정할 수 있습니다. 컴팩트하고 삭제하도록 선택하면 먼저 로그 데이터가 압축되어 로그 헤드에 키가 있는 레코드를 제거합니다. 그런 다음 로그 보존 임계값 이전에 속하는 데이터가 삭제됩니다.
그림 6.3. 로그 보존 지점 및 압축 지점
로그에서 정리를 확인하는 빈도를 밀리초 단위로 설정합니다.
# ... log.retention.check.interval.ms=300000 # ...
# ...
log.retention.check.interval.ms=300000
# ...
로그 보존 설정과 관련된 로그 보존 확인 간격을 조정합니다. 보존 크기가 작으면 더 자주 점검해야 할 수 있습니다.
정리 빈도는 디스크 공간을 관리하기에 충분한 경우가 많지만 주제의 성능에 영향을 미치는 경우가 많지는 않습니다.
정리할 로그가 없는 경우 대기 상태에 더 많은 시간을 배치하도록 시간(밀리초)을 설정할 수도 있습니다.
# ... log.cleaner.backoff.ms=15000 # ...
# ...
log.cleaner.backoff.ms=15000
# ...
이전 로그 데이터를 삭제하도록 선택하는 경우 마침표를 밀리초 단위로 설정하여 삭제된 데이터를 제거하기 전에 유지할 수 있습니다.
# ... log.cleaner.delete.retention.ms=86400000 # ...
# ...
log.cleaner.delete.retention.ms=86400000
# ...
삭제된 데이터 보존 기간은 데이터가 삭제되기 전에 데이터가 손실되었음을 알리는 시간을 제공합니다.
특정 키와 관련된 모든 메시지를 삭제하려면 생산자가 tombstone 메시지를 보낼 수 있습니다. tombstone 에는 null 값이 있으며 사용자에게 값을 삭제하는 마커 역할을 합니다. 압축 후 tombstone만 유지되며, 이는 소비자가 메시지가 삭제되었음을 알 수 있도록 충분한 기간 동안 유지되어야 합니다. 이전 메시지가 삭제되면 값이 없으면 tombstone 키도 파티션에서 삭제됩니다.
6.1.1.8. 디스크 사용률 관리 링크 복사링크가 클립보드에 복사되었습니다!
로그 정리와 관련된 다른 많은 구성 설정이 있지만 특히 메모리 할당은 중요합니다.
중복 제거 속성은 모든 로그 정리 스레드에서 정리에 대한 총 메모리를 지정합니다. 버퍼 로드 요소를 통해 사용된 메모리의 백분율에 대한 상한을 설정할 수 있습니다.
# ... log.cleaner.dedupe.buffer.size=134217728 log.cleaner.io.buffer.load.factor=0.9 # ...
# ...
log.cleaner.dedupe.buffer.size=134217728
log.cleaner.io.buffer.load.factor=0.9
# ...
각 로그 항목은 정확히 24바이트를 사용하므로 버퍼가 단일 실행에서 처리할 수 있는 로그 항목 수를 확인하고 그에 따라 설정을 조정할 수 있습니다.
가능한 경우 로그 정리 시간을 줄이려는 경우 로그 정리 스레드 수를 늘리는 것이 좋습니다.
# ... log.cleaner.threads=8 # ...
# ...
log.cleaner.threads=8
# ...
100% 디스크 대역폭 사용량에 문제가 발생하면 로그 정리 I/O를 제한하여 작업을 수행하는 디스크 기능에 따라 읽기/쓰기 작업의 합계가 지정된 이중 값보다 작도록 할 수 있습니다.
# ... log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 # ...
# ...
log.cleaner.io.max.bytes.per.second=1.7976931348623157E308
# ...
6.1.1.9. 큰 메시지 크기 처리 링크 복사링크가 클립보드에 복사되었습니다!
메시지의 기본 배치 크기는 1MB이며 대부분의 사용 사례에서 최대 처리량에 적합합니다. Kafka는 적절한 디스크 용량을 가정하면 처리량이 감소된 대규모 배치를 수용할 수 있습니다.
큰 메시지 크기는 다음 네 가지 방법으로 처리됩니다.
- 생산자 측 메시지 압축 은 압축 메시지를 로그에 씁니다.
- 참조 기반 메시징은 메시지의 값에 다른 시스템에 저장된 데이터에 대한 참조만 보냅니다.
- 인라인 메시징은 메시지를 동일한 키를 사용하는 청크로 분할한 다음 Kafka Streams와 같은 스트림 프로세서를 사용하여 출력에 결합됩니다.
- 더 큰 메시지 크기를 처리하도록 구축된 브로커 및 생산자/소유 클라이언트 애플리케이션 구성입니다.
참조 기반 메시징 및 메시지 압축 옵션이 권장되며 대부분의 상황을 다룹니다. 이러한 옵션을 사용하면 성능 문제가 발생하지 않도록 주의해야 합니다.
생산자 측 압축
생산자 구성의 경우 생산자가 생성한 데이터의 배치에 적용되는 Gzip과 같은 compression.type 을 지정합니다. 브로커는 브로커 구성 compression.type=producer 를 사용하여 생산자가 사용한 압축을 유지합니다. 생산자 및 주제 압축이 일치하지 않을 때마다 브로커는 로그에 추가하기 전에 일괄 처리를 다시 압축해야 브로커 성능에 영향을 미칩니다.
압축은 또한 소비자에 생산자 및 압축 해제 오버헤드에 추가 처리 오버헤드를 추가하지만 배치에 더 많은 데이터를 포함하므로 메시지 데이터가 잘 압축될 때 처리량에 도움이 되는 경우가 많습니다.
생산자 측 압축을 배치 크기의 미세 조정과 결합하여 처리량을 극대화할 수 있습니다. 메트릭을 사용하면 필요한 평균 배치 크기를 측정하는 데 도움이 됩니다.
참조 기반 메시징
참조 기반 메시징은 메시지의 규모를 모르는 경우 데이터 복제에 유용합니다. 외부 데이터 저장소는 이 구성이 작동하려면 빠르고 사용 가능한 고가용성이어야 합니다. 데이터는 데이터 저장소에 기록되고 데이터에 대한 참조가 반환됩니다. 생산자는 Kafka에 대한 참조가 포함된 메시지를 보냅니다. 소비자는 메시지에서 참조를 가져와서 이를 사용하여 데이터 저장소에서 데이터를 가져옵니다.
그림 6.4. 참조 기반 메시징 흐름
메시지 전달에는 더 많은 여행이 필요하므로 엔드 투 엔드 대기 시간이 증가합니다. 이 접근 방식의 또 다른 중요한 단점은 Kafka 메시지가 정리될 때 외부 시스템의 데이터를 자동으로 정리하지 않는다는 것입니다. 하이브리드 접근 방식은 대규모 메시지를 데이터 저장소로만 전송하고 표준 크기의 메시지를 직접 처리하는 것입니다.
인라인 메시징
인라인 메시징은 복잡하지만 참조 기반 메시징과 같은 외부 시스템에 따라 의 오버헤드가 없습니다.
생성 클라이언트 애플리케이션은 메시지가 너무 크면 데이터를 직렬화한 다음 청크해야 합니다. 그런 다음 생산자는 Kafka Cryostat ArraySerializer 를 사용하거나 각 청크를 보내기 전에 다시 직렬화하는 것과 유사합니다. 소비자는 전체 메시지가 있을 때까지 메시지 및 버퍼 청크를 추적합니다. 클라이언트 애플리케이션을 사용하는 애플리케이션은 역직렬화 전에 어셈블되는 청크를 수신합니다. 전체 메시지는 청크된 각 메시지 세트에 대한 첫 번째 또는 마지막 청크 오프셋에 따라 소비되는 애플리케이션의 나머지 부분에 전달됩니다. 리밸런스 중 중복을 방지하기 위해 전체 메시지를 오프셋 메타데이터에 대해 성공적으로 전달할 수 있습니다.
그림 6.5. 인라인 메시징 흐름
인라인 메시징은 특히 일련의 대규모 메시지를 병렬로 처리할 때 필요한 버퍼링 때문에 소비자 측에서 성능 오버헤드가 발생합니다. 버퍼에 있는 다른 대규모 메시지의 청크가 불완전하면 메시지의 모든 청크가 사용되었을 때 커밋할 수 없도록 대규모 메시지의 청크가 인터리브될 수 있습니다. 이러한 이유로 메시지 청크를 유지하거나 커밋 논리를 구현하여 버퍼링이 일반적으로 지원됩니다.
더 큰 메시지를 처리하는 구성
더 큰 메시지를 방지할 수 없고 메시지 흐름의 어느 시점에서든 블록을 피하기 위해 메시지 제한을 늘릴 수 있습니다. 이렇게 하려면 주제 수준에서 message.max.bytes 를 구성하여 개별 주제의 최대 레코드 배치 크기를 설정합니다. 브로커 수준에서 message.max.bytes 를 설정하면 모든 주제에 대해 더 큰 메시지가 허용됩니다.
브로커는 message.max.bytes 로 설정된 제한보다 큰 메시지를 거부합니다. 생산자(max.request.size) 및 소비자(message.max.bytes)의 버퍼 크기는 더 큰 메시지를 수용할 수 있어야 합니다.
6.1.1.10. 메시지 데이터의 로그 플러시 제어 링크 복사링크가 클립보드에 복사되었습니다!
로그 플러시 속성은 캐시된 메시지 데이터의 주기적인 쓰기를 디스크에 제어합니다. 스케줄러는 로그 캐시의 검사 빈도를 밀리초 단위로 지정합니다.
# ... log.flush.scheduler.interval.ms=2000 # ...
# ...
log.flush.scheduler.interval.ms=2000
# ...
메시지가 메모리에 보관되는 최대 시간과 디스크에 쓰기 전에 로그의 최대 메시지 수에 따라 플러시 빈도를 제어할 수 있습니다.
# ... log.flush.interval.ms=50000 log.flush.interval.messages=100000 # ...
# ...
log.flush.interval.ms=50000
log.flush.interval.messages=100000
# ...
플러시 간 대기에는 플러시를 수행하기 전에 검사를 수행하는 시간과 지정된 간격이 포함됩니다. 플러시 빈도를 늘리면 처리량에 영향을 미칠 수 있습니다.
일반적으로 명시적 플러시 임계값을 설정하지 않고 운영 체제가 기본 설정을 사용하여 백그라운드 플러시를 수행하도록 하는 것이 좋습니다. 파티션 복제는 실패한 브로커가 동기화된 복제본에서 복구할 수 있으므로 단일 디스크에 쓰기보다 데이터 지속성을 향상시킵니다.
애플리케이션 플러시 관리를 사용하는 경우 더 빠른 디스크를 사용하는 경우 플러시 임계값을 낮추는 것이 적합할 수 있습니다.
6.1.1.11. 가용성에 대한 파티션 재조정 링크 복사링크가 클립보드에 복사되었습니다!
내결함성을 위해 브로커 간에 파티션을 복제할 수 있습니다. 지정된 파티션의 경우 하나의 브로커가 리더로 선택되고 모든 생성 요청을 처리합니다(로그에 쓰기). 다른 브로커의 파티션 팀은 리더가 실패할 경우 데이터 안정성을 위해 파티션 리더의 파티션 데이터를 복제합니다.
erser는 일반적으로 클라이언트를 제공하지 않지만 broker.rack 을 사용하면 Kafka 클러스터가 여러 데이터 센터에 걸쳐 있을 때 소비자가 가장 가까운 복제본에서 메시지를 사용할 수 있습니다. 차선은 파티션 리더의 메시지를 복제하고 리더가 실패할 경우 복구를 허용하기 위해서만 작동합니다. 복구에는 동기화 내 후속 조치가 필요합니다. 조각화는 리더에게 페치 요청을 보내 동기화 상태를 유지하며, 이 리더에게 메시지를 순서대로 반환합니다. 팔로워는 리더에서 가장 최근에 커밋된 메시지와 함께 배치된 경우 동기화된 것으로 간주됩니다. 리더는 후속자가 요청한 마지막 오프셋을 확인하여 이를 확인합니다. 예기치 않은 리더 선택이 허용되지 않는 한 현재 리더가 실패해야 하는 리더로서 일반적으로 동기화되지 않은 후속 조치를 취할 수 없습니다.
후속 작업이 동기화되지 않은 것으로 간주되기 전에 지연 시간을 조정할 수 있습니다.
# ... replica.lag.time.max.ms=30000 # ...
# ...
replica.lag.time.max.ms=30000
# ...
지연 시간이 동기화된 모든 복제본에 메시지를 복제하는 시간과 생산자가 승인을 기다려야 하는 기간을 지정합니다. 후속 프로그램이 가져오기 요청을 작성하고 지정된 지연 시간 내에 최신 메시지를 catch하지 못하면 동기화 내 복제본에서 제거됩니다. 실패한 복제본을 더 빨리 감지할 수 있는 지연 시간을 줄일 수 있지만 이렇게 하면 불필요하게 동기화되지 않는 자국 수를 늘릴 수 있습니다. 올바른 지연 시간 값은 네트워크 대기 시간과 브로커 디스크 대역폭에 따라 다릅니다.
리더 파티션을 더 이상 사용할 수 없는 경우 동기화 중인 복제본 중 하나가 새 리더로 선택됩니다. 파티션의 복제본 목록의 첫 번째 브로커를 선호하는 리더라고 합니다. 기본적으로 Kafka는 리더 배포 주기에 따라 자동 파티션 리더 재조정에 대해 활성화됩니다. 즉 Kafka에서 기본 리더가 현재 리더인지 확인합니다. 리밸런스를 사용하면 브로커와 브로커 간에 리더가 균등하게 분배되지 않습니다.
AMQ Streams에 대한 Cruise Control을 사용하여 클러스터 전체에서 부하를 균등하게 조정하는 브로커에 대한 복제본 할당을 파악할 수 있습니다. 그 계산은 리더와 팔로워가 경험하는 다양한 부하를 고려합니다. 나머지 브로커는 주요 추가 파티션의 추가 작업을 받기 때문에 실패한 리더는 Kafka 클러스터의 균형에 영향을 미칩니다.
Cruise Control이 실제로 균형을 맞추려면 기본 리더가 파티션을 이끌 필요가 있습니다. Kafka는 기본 리더가 사용되도록 자동으로 확인할 수 있으며 필요한 경우 현재 리더를 변경할 수 있습니다. 이렇게 하면 클러스터가 Cruise Control에서 찾은 균형 있는 상태로 유지됩니다.
리밸런스 검사의 빈도(초)와 리밸런스가 트리거되기 전에 브로커에 허용되는 최대 불균형 비율을 제어할 수 있습니다.
#... auto.leader.rebalance.enable=true leader.imbalance.check.interval.seconds=300 leader.imbalance.per.broker.percentage=10 #...
#...
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
#...
브로커의 백분율 리더 불균형은 브로커가 현재 리더인 현재 파티션 수와 기본 리더인 파티션 수 간의 비율입니다. 백분율을 0으로 설정하여 선호되는 리더가 항상 동기화 중이라고 가정하도록 할 수 있습니다.
리밸런스에 대한 재조정에 더 많은 제어가 필요한 경우 자동 리밸런스를 비활성화할 수 있습니다. 그런 다음 kafka-leader-election.sh 명령줄 도구를 사용하여 리밸런스를 트리거할 시기를 선택할 수 있습니다.
AMQ Streams와 함께 제공되는 Grafana 대시보드는 활성 리더가 없는 비복제 파티션 및 파티션에 대한 지표를 보여줍니다.
6.1.1.12. 불명확한 리더 선택 링크 복사링크가 클립보드에 복사되었습니다!
동기화되지 않은 복제본의 리더는 데이터 손실을 보장하므로 정리된 것으로 간주됩니다. 이 작업은 기본적으로 수행됩니다. 그러나 리더십을 위해 동기화되지 않은 복제본이 없으면 어떻게 됩니까? ISR(In-sync) 복제본에는 리더 디스크가 사라진 경우에만 리더가 포함되어 있을 수 있습니다. 최소 동기화 내 복제본 수를 설정하지 않고 하드 드라이브가 실패할 때 파티션 리더와 동기화되는 팔로워가 없는 경우 데이터가 이미 손실됩니다. 그뿐만 아니라 동기화 되지 않은 자국이 없기 때문에 새로운 리더를 선택할 수 없습니다.
Kafka에서 리더 실패를 처리하는 방법을 구성할 수 있습니다.
# ... unclean.leader.election.enable=false # ...
# ...
unclean.leader.election.enable=false
# ...
불명확한 리더 선택은 기본적으로 비활성화되어 있습니다. 즉, 동기화되지 않은 복제본이 리더가 될 수 없습니다. 명확한 리더 선택을 통해 이전 리더가 손실되었을 때 다른 브로커가 ISR에 없는 경우 Kafka는 메시지를 쓰거나 읽을 수 있기 전에 해당 리더가 다시 온라인 상태가 될 때까지 기다립니다. 비정형 리더 선택이란 동기화되지 않은 복제본이 리더가 될 수 있지만 메시지를 손실할 위험이 있습니다. 고객의 요구 사항이 가용성 또는 지속성을 선호하는지 여부에 따라 선택할 수 있습니다.
주제 수준에서 특정 주제의 기본 구성을 재정의할 수 있습니다. 데이터 손실 위험을 감수할 수 없는 경우 기본 구성을 그대로 둡니다.
6.1.1.13. 불필요한 소비자 그룹 재조정 방지 링크 복사링크가 클립보드에 복사되었습니다!
새 소비자 그룹에 가입하는 소비자의 경우 지연을 추가하여 브로커에 불필요한 재조정을 피할 수 있습니다.
# ... group.initial.rebalance.delay.ms=3000 # ...
# ...
group.initial.rebalance.delay.ms=3000
# ...
지연은 코디네이터가 멤버가 참여할 때까지 대기하는 시간입니다. 지연이 길어질수록 모든 멤버가 제 시간에 참여하고 재조정을 피할 가능성이 높습니다. 그러나 지연은 기간이 종료될 때까지 그룹이 소비되지 않도록합니다.
6.1.2. Kafka 생산자 구성 튜닝 링크 복사링크가 클립보드에 복사되었습니다!
특정 사용 사례에 맞는 선택적 속성이 포함된 기본 생산자 구성을 사용합니다.
처리량을 극대화하도록 구성을 조정하면 대기 시간이 증가하거나 그 반대의 경우도 발생할 수 있습니다. 필요한 균형을 유지하기 위해 생산자 구성을 실험하고 튜닝해야 합니다.
6.1.2.1. 기본 생산자 구성 링크 복사링크가 클립보드에 복사되었습니다!
모든 생산자에 연결 및 serializer 속성이 필요합니다. 일반적으로 추적을 위해 클라이언트 ID를 추가하고 생산자에 압축을 사용하여 요청의 배치 크기를 줄이는 것이 좋습니다.
기본 생산자 구성에서 다음을 수행합니다.
- 파티션의 메시지 순서가 보장되지 않습니다.
- 브로커에 도달하는 메시지에 대한 승인은 지속성을 보장하지 않습니다.
기본 생산자 구성 속성
- 1
- (필수) Kafka 브로커의 host:port 부트스트랩 서버 주소를 사용하여 Kafka 클러스터에 연결하도록 생산자를 Tells합니다. 생산자는 주소를 사용하여 클러스터의 모든 브로커를 검색하고 연결합니다. 서버가 다운된 경우 쉼표로 구분된 목록을 사용하여 두 개 또는 세 개의 주소를 지정하지만 클러스터의 모든 브로커 목록을 제공할 필요는 없습니다.
- 2
- (필수) 각 메시지의 키를 브로커로 보내기 전에 바이트로 변환하는 Serializer.
- 3
- (필수) 각 메시지의 값을 브로커로 보내기 전에 바이트로 변환하는 Serializer.
- 4
- (선택 사항) 요청 소스를 식별하기 위해 로그 및 메트릭에 사용되는 클라이언트의 논리 이름입니다.
- 5
- (선택 사항) 전송된 메시지를 압축하고 압축 형식으로 저장한 다음 소비자에 도달할 때 압축을 풉니다. 압축은 처리량을 개선하고 스토리지의 부하를 줄이는 데 유용하지만 압축 또는 압축 해제 비용이 금지될 수 있는 짧은 대기 시간 애플리케이션에는 적합하지 않을 수 있습니다.
6.1.2.2. 데이터 지속성 링크 복사링크가 클립보드에 복사되었습니다!
메시지 전송 승인을 사용하여 메시지가 손실될 가능성을 최소화하기 위해 더 큰 데이터 지속성을 적용할 수 있습니다.
# ... acks=all # ...
# ...
acks=all
# ...
- 1
acks=all을 지정하면 메시지 요청이 성공적으로 수신되었음을 확인하기 전에 파티션 리더가 특정 수의 팔로우에 메시지를 복제해야 합니다. 추가 검사로 인해acks=all은 메시지를 전송하고 승인을 수신하는 생산자 간의 대기 시간을 늘립니다.
승인이 생산자로 전송되기 전에 로그에 메시지를 추가해야 하는 브로커 수는 주제의 min.insync.replicas 구성에 따라 결정됩니다. 일반적인 시작 지점은 다른 브로커에 두 개의 동기화 내 복제본이 있는 3의 주제 복제 인수를 갖는 것입니다. 이 구성에서 단일 브로커를 사용할 수 없는 경우 생산자는 영향을 받지 않을 수 있습니다. 두 번째 브로커를 사용할 수 없게 되면 생산자는 승인을 받지 못하며 더 많은 메시지를 생성할 수 없습니다.
acks=all을 지원하는 주제 구성
# ... min.insync.replicas=2 # ...
# ...
min.insync.replicas=2
# ...
- 1
2개의 동기화된 복제본을 사용합니다. 기본값은1입니다.
시스템이 실패하면 버퍼에 의도하지 않은 데이터가 손실될 위험이 있습니다.
6.1.2.3. 주문 배송 링크 복사링크가 클립보드에 복사되었습니다!
멱등 생산자는 메시지가 정확히 한 번 전송되므로 중복을 방지합니다. ID 및 순서 번호는 실패 시에도 전달 순서를 보장하기 위해 메시지에 할당됩니다. 데이터 일관성을 위해 acks=all 을 사용하는 경우, 주문된 전송에 멱등을 활성화하는 것이 적절합니다.
idempotency로 주문된 제공
성능 비용 때문에 acks=all 및 idempotency를 사용하지 않는 경우 순서를 유지하기 위해 in-flight(알림되지 않음) 요청 수를 1로 설정합니다. 그렇지 않으면 Message-B 가 이미 브로커에 기록된 후에만 Message-A 가 성공하지 못하는 상황이 발생할 수 있습니다.
idempotency 없이 주문된 제공
# ... enable.idempotence=false max.in.flight.requests.per.connection=1 retries=2147483647 # ...
# ...
enable.idempotence=false
max.in.flight.requests.per.connection=1
retries=2147483647
# ...
6.1.2.4. 신뢰성 보장 링크 복사링크가 클립보드에 복사되었습니다!
멱등은 단일 파티션에 정확히 한 번 쓰는 데 유용합니다. 멱등과 함께 사용되는 트랜잭션은 여러 파티션에 정확히 한 번의 쓰기를 허용합니다.
트랜잭션을 사용하면 동일한 트랜잭션 ID를 사용하는 메시지가 한 번 생성되고 모두 해당 로그에 성공적으로 기록되거나 그 중 하나가 표시되지 않습니다.
transactional.id 를 선택하는 것은 트랜잭션 보장이 유지되는 순서대로 중요합니다. 각 트랜잭션 ID는 고유한 주제 파티션 집합에 사용해야 합니다. 예를 들어, 이는 주제 파티션 이름의 외부 매핑을 트랜잭션 ID에 연결하거나 충돌을 방지하는 함수를 사용하여 주제 파티션 이름에서 트랜잭션 ID를 계산하여 수행할 수 있습니다.
6.1.2.5. 처리량 및 대기 시간 최적화 링크 복사링크가 클립보드에 복사되었습니다!
일반적으로 시스템의 요구 사항은 지정된 대기 시간 내의 메시지 비율에 대한 특정 처리량 대상을 충족하기 위한 것입니다. 예를 들어 초당 500,000개의 메시지를 대상으로 하는 메시지 중 95%가 2초 이내에 승인됩니다.
생산자의 메시징 의미 체계(메시지 순서 및 지속성)는 애플리케이션의 요구 사항에 따라 정의됩니다. 예를 들어 애플리케이션에서 제공하는 중요한 속성이나 보장을 손상시키지 않고 acks=0 또는 acks=1 을 사용할 수 있는 옵션이 없을 수 있습니다.
브로커 재시작은 높은 백분위 통계에 큰 영향을 미칩니다. 예를 들어 오랜 기간 동안 99번째 백분위 대기 시간이 브로커 재시작에 대한 동작으로 지배됩니다. 이는 벤치마크를 설계하거나 벤치마킹에서 성능 번호를 프로덕션에서 볼 수 있는 성능 수와 비교할 때 고려해야 합니다.
Kafka는 목표에 따라 처리량 및 대기 시간을 위해 생산자 성능 튜닝을 위한 다양한 구성 매개변수와 기술을 제공합니다.
- 메시지 일괄 처리(
linger.ms및batch.size) -
메시지 일괄 처리는 동일한 브로커로 향하는 더 많은 메시지가 전송되어 단일 생성 요청에 일괄 처리할 수 있기를 바랍니다. 일괄 처리는 처리량이 증가하기 위해 대기 시간이 길기 때문에 지연 시간이 길어집니다.Batching is a compromise between higher latency in return for higher throughput. 시간 기반 일괄 처리는
linger.ms를 사용하여 구성되며 크기 기반 일괄 처리는batch.size를 사용하여 구성됩니다. - 압축(
compression.type) -
메시지 압축은 생산자에 대기 시간을 추가하지만(메시지 압축에 사용된 CPU 시간) 요청(및 잠재적으로 디스크 쓰기)을 줄여 처리량을 높일 수 있습니다. 압축 여부, 사용할 최상의 압축은 전송되는 메시지에 따라 달라집니다.
KafkaProducer.send()를 호출하는 스레드에서 압축이 수행되므로 이 방법의 대기 시간이 애플리케이션에 중요한 경우 더 많은 스레드 사용을 고려해야 합니다. - pipelining (
max.in.flight.requests.per.connection) - pipelining은 이전 요청에 대한 응답이 수신되기 전에 더 많은 요청을 보내는 것을 의미합니다. 일반적으로 파이프라이닝은 더 나은 처리량을 의미하며, 이로 인해 더 심각한 일괄 처리와 같은 다른 효과의 경우 처리량에 대한 영향을 대응하기 시작합니다.
대기 시간 단축
애플리케이션이 KafkaProducer.send() 를 호출할 때 메시지는 다음과 같습니다.
- 인터셉터에서 처리
- 직렬화됨
- 파티션에 할당됨
- 압축
- 파티션별 큐의 메시지 배치에 추가됨
이 시점에서 send() 메서드가 반환됩니다. 따라서 send() 가 차단되는 시간은 다음에 따라 결정됩니다.
- 인터셉터, serializers 및 partitioner에 소요된 시간
- 사용된 압축 알고리즘
- 버퍼가 압축에 사용할 때까지 대기하는 시간
배치는 다음 중 하나가 발생할 때까지 큐에 유지됩니다.
-
배치가 가득 차 있습니다 (
batch.size에 따라 ) -
linger.ms에 의해 도입된 지연이 통과되었습니다. - 발신자는 다른 파티션의 메시지 일괄 처리를 동일한 브로커로 보내려고 하며 이 배치도 추가할 수 있습니다.
- 프로듀서가 플러시되거나 닫히고 있습니다.
대기 시간에 대한 send() 차단의 영향을 완화하기 위한 일괄 처리 및 버퍼링 구성을 확인합니다.
# ... linger.ms=100 batch.size=16384 buffer.memory=33554432 # ...
# ...
linger.ms=100
batch.size=16384
buffer.memory=33554432
# ...
처리량 증가
메시지가 전달되고 전송 요청을 완료할 때까지 대기할 최대 시간을 조정하여 메시지 요청의 처리량을 개선합니다.
또한 기본값을 대체하기 위해 사용자 지정 partitioner를 작성하여 지정된 파티션으로 메시지를 보낼 수도 있습니다.
# ... delivery.timeout.ms=120000 partitioner.class=my-custom-partitioner # ...
# ...
delivery.timeout.ms=120000
partitioner.class=my-custom-partitioner
# ...
6.1.3. Kafka 소비자 구성 튜닝 링크 복사링크가 클립보드에 복사되었습니다!
특정 사용 사례에 맞는 선택적 속성이 포함된 기본 소비자 구성을 사용합니다.
소비자를 튜닝할 때 주요 우려 사항은 수집된 데이터 양으로 효율적으로 처리되도록 하는 것입니다. 생산자 튜닝과 마찬가지로 소비자가 예상대로 작동할 때까지 점진적으로 변경할 준비가 되어 있어야 합니다.
6.1.3.1. 기본 소비자 구성 링크 복사링크가 클립보드에 복사되었습니다!
모든 소비자에 연결 및 역직렬러 속성이 필요합니다. 일반적으로 추적을 위해 클라이언트 ID를 추가하는 것이 좋습니다.
후속 구성과 관계없이 소비자 구성에서 다음을 수행합니다.
- 소비자는 지정된 오프셋에서 메시지를 가져오고 메시지를 건너뛰거나 다시 읽도록 오프셋이 변경되지 않는 한 순서대로 메시지를 사용합니다.
- 오프셋이 클러스터의 다른 브로커로 전송될 수 있기 때문에 Kafka에 오프셋을 커밋하는 경우에도 브로커가 응답을 처리했는지 여부를 알 수 없습니다.
기본 소비자 구성 속성
- 1
- (필수) Kafka 브로커의 host:port 부트스트랩 서버 주소를 사용하여 Kafka 클러스터에 연결하도록 소비자를 Tells합니다. 소비자는 주소를 사용하여 클러스터의 모든 브로커를 검색하고 연결합니다. 서버가 다운된 경우 쉼표로 구분된 목록을 사용하여 두 개 또는 세 개의 주소를 지정하지만 클러스터의 모든 브로커 목록을 제공할 필요는 없습니다. 로드 밸런서 서비스를 사용하여 Kafka 클러스터를 노출하는 경우 로드 밸런서에서 가용성을 처리하므로 서비스의 주소만 있으면 됩니다.
- 2
- (필수) Kafka 브로커에서 가져온 바이트를 메시지 키로 변환하는 Deserializer입니다.
- 3
- (필수) Kafka 브로커에서 가져온 바이트를 메시지 값으로 변환하는 Deserializer입니다.
- 4
- (선택 사항) 요청 소스를 식별하기 위해 로그 및 메트릭에 사용되는 클라이언트의 논리 이름입니다. id는 또한 처리 시간 할당량을 기반으로 소비자를 차단하는 데 사용할 수 있습니다.
- 5
- (조건) 소비자가 소비자 그룹에 가입하려면 그룹 ID가 필요합니다.
6.1.3.2. 소비자 그룹을 사용하여 데이터 사용량 확장 링크 복사링크가 클립보드에 복사되었습니다!
소비자 그룹은 지정된 주제에서 하나 이상의 생산자가 생성한 일반적으로 대규모 데이터 스트림을 공유합니다. 소비자는 group.id 속성을 사용하여 그룹화되므로 메시지를 멤버에 분산할 수 있습니다. 그룹의 소비자 중 한 명이 리더를 선택하고 그룹의 사용자에게 파티션을 할당하는 방법을 결정합니다. 각 파티션은 단일 소비자에만 할당할 수 있습니다.
파티션만큼 많은 소비자가 없는 경우 동일한 group.id 를 사용하여 더 많은 소비자 인스턴스를 추가하여 데이터 소비를 확장할 수 있습니다. 파티션이 있는 것보다 더 많은 소비자를 그룹에 추가하면 처리량은 도움이 되지 않지만 대기 상태의 소비자가 작동을 중지해야 함을 의미합니다. 더 적은 소비자로 처리량 목표를 달성할 수 있는 경우 리소스를 절약할 수 있습니다.
동일한 소비자 그룹 내의 소비자는 오프셋 커밋과 하트비트를 동일한 브로커에 보냅니다. 따라서 그룹의 소비자 수가 많을수록 브로커에 요청이 로드됩니다.
# ... group.id=my-group-id # ...
# ...
group.id=my-group-id
# ...
- 1
- 그룹 ID를 사용하여 소비자 그룹에 소비자를 추가합니다.
6.1.3.3. 메시지 순서 확인 링크 복사링크가 클립보드에 복사되었습니다!
Kafka 브로커는 브로커에 주제, 파티션 및 오프셋 위치 목록에서 메시지를 전송하도록 요청하는 소비자의 가져오기 요청을 받습니다.
소비자는 브로커에 커밋된 것과 동일한 순서로 단일 파티션의 메시지를 관찰합니다. 즉 Kafka는 단일 파티션의 메시지에 대한 순서 만 보장합니다. 반대로, 소비자가 여러 파티션의 메시지를 사용하는 경우 소비자가 관찰한 여러 파티션의 메시지 순서가 전송된 순서를 반영하지 않습니다.
한 주제에서 메시지 순서를 엄격하게 지정하려면 소비자당 하나의 파티션을 사용합니다.
6.1.3.4. 처리량 및 대기 시간 최적화 링크 복사링크가 클립보드에 복사되었습니다!
클라이언트 애플리케이션이 KafkaConsumer.poll() 을 호출할 때 반환된 메시지 수를 제어합니다.
fetch.max.wait.ms 및 fetch.min.bytes 속성을 사용하여 Kafka 브로커에서 소비자가 가져온 최소 데이터 양을 늘립니다. 시간 기반 일괄 처리는 fetch.max.wait.ms 를 사용하여 구성되며 크기 기반 일괄 처리는 fetch.min.bytes 를 사용하여 구성됩니다.
소비자 또는 브로커의 CPU 사용률이 높으면 소비자의 요청이 너무 많기 때문일 수 있습니다. 더 적은 요청 및 메시지가 더 큰 배치로 전달되도록 fetch.max.wait.ms 및 fetch.min.bytes 속성을 더 높게 조정할 수 있습니다. 높은 조정을 통해 처리량은 약간의 대기 시간으로 개선됩니다. 또한 생성되는 데이터 양이 낮은 경우 더 높게 조정할 수도 있습니다.
예를 들어 fetch.max.wait.ms 를 500ms로 설정하고 fetch.min.bytes 를 16384바이트로 설정하면 Kafka에서 소비자로부터 가져오기 요청을 수신하면 두 임계값 중 첫 번째에 도달할 때 응답합니다.
반대로 fetch.max.wait.ms 및 fetch.min.bytes 속성을 더 낮게 조정하여 엔드 투 엔드 대기 시간을 개선할 수 있습니다.
# ... fetch.max.wait.ms=500 fetch.min.bytes=16384 # ...
# ...
fetch.max.wait.ms=500
fetch.min.bytes=16384
# ...
가져오기 요청 크기를 늘려 대기 시간 단축
fetch.max.bytes 및 max.partition.fetch.bytes 속성을 사용하여 Kafka 브로커에서 소비자가 가져온 최대 데이터 양을 늘립니다.
fetch.max.bytes 속성은 브로커에서 한 번에 가져온 데이터 양에 최대 제한을 바이트 단위로 설정합니다.
max.partition.fetch.bytes 는 각 파티션에 대해 반환되는 데이터의 양에 대한 최대 제한을 바이트 단위로 설정합니다. 이 제한은 항상 브로커에 설정된 바이트 수 또는 max.message.bytes 의 주제 구성보다 커야 합니다.
클라이언트가 사용할 수 있는 최대 메모리 양은 대략적으로 다음과 같이 계산됩니다.
NUMBER-OF-BROKERS * fetch.max.bytes and NUMBER-OF-PARTITIONS * max.partition.fetch.bytes
NUMBER-OF-BROKERS * fetch.max.bytes and NUMBER-OF-PARTITIONS * max.partition.fetch.bytes
메모리 사용량이 이를 수용할 수 있는 경우 이 두 속성의 값을 늘릴 수 있습니다. 각 요청에서 더 많은 데이터를 허용하면 가져오기 요청이 줄어들기 때문에 대기 시간이 향상됩니다.
# ... fetch.max.bytes=52428800 max.partition.fetch.bytes=1048576 # ...
# ...
fetch.max.bytes=52428800
max.partition.fetch.bytes=1048576
# ...
6.1.3.5. 오프셋을 커밋할 때 데이터 손실 또는 중복 방지 링크 복사링크가 클립보드에 복사되었습니다!
Kafka 자동 커밋 메커니즘을 사용하면 소비자가 메시지의 오프셋을 자동으로 커밋할 수 있습니다. 활성화하면 소비자는 5000ms 간격으로 브로커 폴링에서 수신된 오프셋을 커밋합니다.
자동 커밋 메커니즘은 편리하지만 데이터 손실 및 복제 위험이 발생합니다. 소비자가 여러 메시지를 가져와서 변환했지만 자동 커밋을 수행할 때 시스템이 소비자 버퍼에서 처리된 메시지와 함께 충돌하면 해당 데이터가 손실됩니다. 메시지를 처리한 후 시스템이 충돌하지만 자동 커밋을 수행하기 전에 시스템의 재조정 후 다른 소비자 인스턴스에서 데이터가 복제됩니다.
자동 커밋은 브로커의 다음 폴링 전에 모든 메시지가 처리되거나 소비자가 닫히기 전에 모든 메시지가 처리되는 경우에만 데이터 손실을 방지할 수 있습니다.
데이터 손실 또는 복제 가능성을 최소화하기 위해 enable.auto.commit 를 false 로 설정하고 클라이언트 애플리케이션을 개발하여 오프셋을 더 많이 제어할 수 있습니다. 또는 auto.commit.interval.ms 를 사용하여 커밋 간 간격을 줄일 수 있습니다.
# ... enable.auto.commit=false # ...
# ...
enable.auto.commit=false
# ...
- 1
- 커밋 오프셋을 더 많이 제어하려면 자동 커밋이 false로 설정됩니다.
enable.auto.commit 를 false 로 설정하면 모든 처리가 수행되고 메시지가 사용된 후 오프셋을 커밋할 수 있습니다. 예를 들어 Kafka commitSync 및 commitAsync 커밋 API를 호출하도록 애플리케이션을 설정할 수 있습니다.
commitSync API는 폴링에서 반환된 메시지 배치의 오프셋을 커밋합니다. 일괄 처리의 모든 메시지 처리를 완료하면 API를 호출합니다. commitSync API를 사용하는 경우 배치의 마지막 오프셋이 커밋될 때까지 애플리케이션은 새 메시지를 폴링하지 않습니다. 처리량에 부정적인 영향을 미치는 경우 더 자주 커밋하거나 commitAsync API를 사용할 수 있습니다. commitAsync API는 브로커가 커밋 요청에 응답할 때까지 기다리지 않지만 재조정 시 더 많은 중복을 생성할 위험이 있습니다. 일반적인 접근 방식은 애플리케이션에서 커밋 API를 모두 결합한 후 소비자를 종료하거나 재조정하기 전에 사용된 commitSync API를 결합하여 최종 커밋이 성공했는지 확인하는 것입니다.
6.1.3.5.1. 트랜잭션 메시지 제어 링크 복사링크가 클립보드에 복사되었습니다!
트랜잭션 ID를 사용하고 생산자 측에서 멱등(enable.idempotence=true)을 활성화하여 정확하게 제공되도록 하는 것이 좋습니다. 소비자 측에서 isolation.level 속성을 사용하여 소비자가 트랜잭션 메시지를 읽는 방법을 제어할 수 있습니다.
isolation.level 속성에는 다음 두 가지 유효한 값이 있습니다.
-
read_committed -
read_uncommitted(기본값)
read_committed 를 사용하여 커밋된 트랜잭션 메시지만 소비자가 읽도록 합니다. 그러나 이로 인해 소비자가 브로커에서 트랜잭션 결과를 기록하는 트랜잭션 마커를 작성할 때까지 메시지를 반환할 수 없기 때문에 엔드 투 엔드 대기 시간이 증가합니다(커밋 또는 중단됨).
# ... enable.auto.commit=false isolation.level=read_committed # ...
# ...
enable.auto.commit=false
isolation.level=read_committed
# ...
- 1
- 사용자가 커밋한 메시지만 읽도록
read_committed로 설정합니다.
6.1.3.6. 데이터 손실을 방지하기 위해 실패에서 복구 링크 복사링크가 클립보드에 복사되었습니다!
session.timeout.ms 및 heartbeat.interval.ms 속성을 사용하여 소비자 그룹 내의 소비자 실패를 확인하고 복구하는 데 걸리는 시간을 구성합니다.
session.timeout.ms 속성은 소비자 그룹 내의 소비자가 비활성으로 간주되기 전에 브로커와 연결할 수 없는 최대 시간(밀리초)을 지정하고 그룹의 활성 소비자 간에 재조정 이 트리거됩니다. 그룹이 재조정되면 그룹이 그룹 멤버에 다시 할당됩니다.
heartbeat.interval.ms 속성은 소비자가 활성 상태이고 연결되어 있음을 나타내는 소비자 그룹 코디네이터와 하트비트 검사 사이의 간격(밀리초)을 지정합니다. 하트비트 간격은 세션 시간 제한 간격보다 일반적으로 3 일까지 작아야 합니다.
session.timeout.ms 속성을 더 낮게 설정하면 실패한 소비자가 이전에 감지되고 재조정이 더 빨라질 수 있습니다. 그러나 브로커가 제 시간에 하트비트를 수신하지 못하고 불필요한 재조정을 트리거하도록 시간 초과를 설정하지 않도록 주의하십시오.
하트비트 간격을 줄이면 우발적인 재조정 가능성이 줄어들지만 더 자주 하트비트는 브로커 리소스에 대한 오버헤드를 늘립니다.
6.1.3.7. 오프셋 정책 관리 링크 복사링크가 클립보드에 복사되었습니다!
auto.offset.reset 속성을 사용하여 오프셋이 커밋되지 않은 경우 소비자가 작동하는 방식을 제어하거나 커밋된 오프셋이 더 이상 유효하지 않거나 삭제되지 않습니다.
소비자 애플리케이션을 처음 배포하고 기존 주제에서 메시지를 읽습니다. group.id 가 처음 사용되므로 __consumer_offsets 항목에 이 애플리케이션에 대한 오프셋 정보가 포함되어 있지 않습니다. 새 애플리케이션은 로그 시작부터 모든 기존 메시지 처리를 시작하거나 새 메시지만 처리할 수 있습니다. 기본 재설정 값은 파티션 끝에 시작되는 latest 이므로 일부 메시지가 누락됨을 의미합니다. 데이터 손실을 피하되 처리 양을 늘리려면 auto.offset.reset 을 가장 빨리 설정하여 파티션 시작 부분에 시작합니다.
또한 브로커에 대해 구성된 오프셋 보존 기간(offsets.retention.minutes)이 종료될 때 메시지가 손실되지 않도록 가장 초기 옵션을 사용하는 것이 좋습니다. 소비자 그룹 또는 독립 실행형 소비자가 비활성 상태이고 보존 기간 동안 오프셋이 없는 경우 이전에 커밋된 오프셋은 __consumer_offsets 에서 삭제됩니다.
# ... heartbeat.interval.ms=3000 session.timeout.ms=10000 auto.offset.reset=earliest # ...
# ...
heartbeat.interval.ms=3000
session.timeout.ms=10000
auto.offset.reset=earliest
# ...
단일 가져오기 요청에서 반환된 데이터 양이 크면 소비자가 처리하기 전에 시간 초과가 발생할 수 있습니다. 이 경우 max.partition.fetch.bytes 를 낮추거나 session.timeout.ms 를 늘릴 수 있습니다.
6.1.3.8. 리밸런스의 영향을 최소화 링크 복사링크가 클립보드에 복사되었습니다!
그룹의 활성 소비자 간의 파티션 재조정은 다음 작업에 걸리는 시간입니다.
- 사용자가 오프셋을 커밋
- 구성할 새 소비자 그룹입니다.
- 그룹 구성원에 파티션을 할당하는 그룹 리더
- 그룹의 소비자가 할당을 수신하고 가져오기 시작합니다.
명확하게, 이 프로세스는 특히 소비자 그룹 클러스터를 롤링 재시작하는 동안 반복적으로 발생하는 경우 서비스의 다운타임을 늘립니다.
이 경우 정적 멤버십 개념을 사용하여 리밸런스 수를 줄일 수 있습니다. 재조정은 소비자 그룹 멤버 간에 주제 파티션을 균등하게 할당합니다. 정적 멤버십에서는 세션 시간 초과 후 다시 시작하는 동안 소비자 인스턴스를 인식할 수 있도록 지속성을 사용합니다.
소비자 그룹 코디네이터는 group.instance.id 속성을 사용하여 지정된 고유 ID를 사용하여 새 소비자 인스턴스를 식별할 수 있습니다. 다시 시작하는 동안 소비자에는 새 멤버 ID가 할당되지만 정적 멤버에서는 동일한 인스턴스 ID를 계속 사용하고 주제 파티션의 동일한 할당이 수행됩니다.
소비자 애플리케이션에서 적어도 모든 max.poll.interval.ms 밀리초를 폴링하도록 호출하지 않으면 소비자가 실패한 것으로 간주되어 재조정이 발생합니다. 애플리케이션에서 폴링에서 반환된 모든 레코드를 시간 단위로 처리할 수 없는 경우 max.poll.interval.ms 속성을 사용하여 리밸런스(rebalance)를 방지하여 소비자의 새 메시지에 대한 폴링 간격(밀리초)을 지정할 수 있습니다. 또는 max.poll.records 속성을 사용하여 소비자 버퍼에서 반환된 레코드 수에 대한 최대 제한을 설정하여 애플리케이션이 max.poll.interval.ms 제한 내에서 더 적은 레코드를 처리할 수 있습니다.
# ... group.instance.id=UNIQUE-ID max.poll.interval.ms=300000 max.poll.records=500 # ...
# ...
group.instance.id=UNIQUE-ID
max.poll.interval.ms=300000
max.poll.records=500
# ...