9장. 대량의 메시지 처리
AMQ Streams 배포에서 대량의 메시지를 처리해야 하는 경우 구성 옵션을 사용하여 처리량 및 대기 시간을 최적화할 수 있습니다.
Kafka 생산자 및 소비자 구성은 Kafka 브로커에 대한 요청 크기와 빈도를 제어하는 데 도움이 될 수 있습니다. 구성 옵션에 대한 자세한 내용은 다음을 참조하십시오.
Kafka Connect 런타임 소스 커넥터(MirrorMaker 2.0 포함) 및 싱크 커넥터에서 사용하는 생산자 및 소비자와 동일한 구성 옵션을 사용할 수도 있습니다.
- 소스 커넥터
- Kafka Connect 런타임의 생산자는 Kafka 클러스터에 메시지를 보냅니다.
- MirrorMaker 2.0의 경우 소스 시스템이 Kafka이므로 소비자는 소스 Kafka 클러스터에서 메시지를 검색합니다.
- 싱크 커넥터
- Kafka Connect 런타임의 소비자는 Kafka 클러스터에서 메시지를 검색합니다.
소비자 구성(consumer.*)의 경우 대기 시간을 줄이기 위해 단일 가져오기 요청에서 가져온 데이터 양을 늘릴 수 있습니다. fetch.max.bytes 및 max.partition.fetch.bytes 속성을 사용하여 가져오기 요청 크기를 늘립니다. max.poll.records 속성을 사용하여 소비자 버퍼에서 반환된 메시지 수에 대한 최대 제한을 설정할 수도 있습니다.
생산자 구성(producer.*)의 경우 단일 생성 요청으로 전송된 메시지 배치 크기를 늘릴 수 있습니다. batch.size 속성을 사용하여 배치 크기를 늘립니다. 대규모 배치 크기는 전송할 준비가 된 미해결 메시지 수와 메시지 큐의 백로그 크기를 줄입니다. 동일한 파티션으로 전송되는 메시지는 함께 일괄 처리됩니다. 배치 크기에 도달하면 생성 요청이 대상 클러스터로 전송됩니다. 배치 크기를 늘리면 생성 요청이 지연되고 더 많은 메시지가 일괄 처리에 추가되고 동시에 브로커로 전송됩니다. 이렇게 하면 많은 수의 메시지를 처리하는 몇 가지 주제 파티션이 있는 경우 처리량이 향상될 수 있습니다.
생산자가 적절한 생산자 배치 크기에 대해 처리하는 레코드의 수와 크기를 고려하십시오.
linger.ms 를 사용하여 생산자 로드가 감소할 때 요청 생성을 지연하기 위해 대기 시간(밀리초)을 추가합니다. 지연은 최대 배치 크기에 있는 경우 배치에 더 많은 레코드를 추가할 수 있음을 의미합니다.
Kafka Connect 소스 커넥터의 경우 대상 Kafka 클러스터에 대한 데이터 스트리밍 파이프라인은 다음과 같습니다.
Kafka Connect 소스 커넥터용 데이터 스트리밍 파이프라인
외부 데이터 소스
Kafka Connect 싱크 커넥터의 경우 대상 외부 데이터 소스에 대한 데이터 스트리밍 파이프라인은 다음과 같습니다.
Kafka Connect 싱크 커넥터의 데이터 스트리밍 파이프라인
소스 Kafka 주제
MirrorMaker 2.0의 경우 대상 Kafka 클러스터에 대한 데이터 미러링 파이프라인은 다음과 같습니다.
MirrorMaker 2.0의 데이터 미러링 파이프라인
소스 Kafka 주제
생산자는 버퍼의 메시지를 대상 Kafka 클러스터의 주제로 보냅니다. 이러한 상황이 발생하는 동안 Kafka Connect 작업은 데이터 소스를 폴링하여 소스 메시지 큐에 메시지를 추가합니다.
소스 커넥터의 생산자 버퍼 크기는 buffer.memory 속성을 사용하여 설정됩니다. 작업은 버퍼가 플러시되기 전에 지정된 시간 초과 기간(offset.flush.timeout.ms)을 기다립니다. 전송된 메시지가 브로커에 의해 승인되고 커밋된 데이터 오프셋에 충분한 시간이 되어야 합니다. 소스 작업에서는 종료 중을 제외하고 오프셋을 커밋하기 전에 생산자가 메시지 큐를 비우기를 기다리지 않습니다.
생산자가 소스 메시지 큐의 메시지 처리량을 유지할 수 없는 경우 max.block.ms 가 바인딩된 시간 내에 버퍼에 사용 가능한 공간이 있을 때까지 버퍼링이 차단됩니다. 이 기간 동안 승인되지 않은 모든 메시지가 버퍼에 계속 전송됩니다. 이러한 메시지가 승인되고 플러시될 때까지 새 메시지가 버퍼에 추가되지 않습니다.
다음 구성 변경을 시도하여 미해결 메시지의 기본 소스 메시지 큐를 관리 가능한 크기로 유지할 수 있습니다.
-
offset.flush.timeout.ms의 기본값을 밀리초 단위로 늘리기 - CPU 및 메모리 리소스가 충분한지 확인
다음을 수행하여 병렬로 실행되는 작업 수를 늘립니다.
-
tasks.max속성을 사용하여 병렬로 실행되는 작업 수 증가 - 작업을 실행하는 작업자의 노드 수 증가
-
사용 가능한 CPU 및 메모리 리소스 및 작업자 노드 수에 따라 병렬로 실행할 수 있는 작업 수를 고려하십시오. 원하는 효과를 얻을 때까지 구성 값을 계속 조정해야 할 수 있습니다.
9.1. 대용량 메시지에 대한 Kafka Connect 구성 링크 복사링크가 클립보드에 복사되었습니다!
Kafka Connect는 소스 외부 데이터 시스템에서 데이터를 가져와서 Kafka Connect 런타임 생산자에 전달하여 대상 클러스터에 복제됩니다.
다음 예제에서는 Kafka Connect 소스 커넥터에 대한 구성을 보여줍니다.
대량의 메시지를 처리하기 위한 소스 커넥터 구성의 예
# ...
producer.batch.size=327680
producer.linger.ms=100
# ...
tasks.max = 2
싱크 커넥터에 대한 소비자 구성이 추가됩니다.
대량의 메시지를 처리하기 위한 싱크 커넥터 구성의 예
# ...
consumer.fetch.max.bytes=52428800
consumer.max.partition.fetch.bytes=1048576
consumer.max.poll.records=500
# ...
tasks.max = 2