25장. 대량의 메시지 처리


AMQ Streams 배포에서 대량의 메시지를 처리해야 하는 경우 구성 옵션을 사용하여 처리량과 대기 시간을 최적화할 수 있습니다.

생산자 및 소비자 구성은 Kafka 브로커에 대한 요청의 크기와 빈도를 제어하는 데 도움이 될 수 있습니다. 구성 옵션에 대한 자세한 내용은 다음을 참조하십시오.

Kafka Connect 런타임 소스 커넥터(MirrorMaker 2) 및 싱크 커넥터에서 사용하는 생산자 및 소비자와 동일한 구성 옵션을 사용할 수도 있습니다.

소스 커넥터
  • Kafka Connect 런타임의 생산자는 Kafka 클러스터로 메시지를 보냅니다.
  • MirrorMaker 2의 경우 소스 시스템이 Kafka이므로 소비자는 소스 Kafka 클러스터에서 메시지를 검색합니다.
싱크 커넥터
  • Kafka Connect 런타임의 소비자는 Kafka 클러스터에서 메시지를 검색합니다.

소비자의 경우 단일 가져오기 요청에서 가져온 데이터 양을 늘려 대기 시간을 줄일 수 있습니다. fetch.max.bytesmax.partition.fetch.bytes 속성을 사용하여 가져오기 요청 크기를 늘립니다. max.poll.records 속성을 사용하여 소비자 버퍼에서 반환된 메시지 수에 대한 최대 제한을 설정할 수도 있습니다.

MirrorMaker 2의 경우 소스의 메시지를 가져오는 특정 소비자와 관련된 소스 커넥터 수준(consumer.*)에서 fetch.max.bytes , max.poll.records 값을 구성합니다.

생산자의 경우 단일 생성 요청에 전송된 메시지 일괄 처리 크기를 늘릴 수 있습니다. batch.size 속성을 사용하여 배치 크기를 늘립니다. 배치 크기가 클수록 전송 준비가 된 미결 메시지 수와 메시지 큐의 백로그 크기를 줄일 수 있습니다. 동일한 파티션에 전송되는 메시지가 함께 배치됩니다. 배치 크기에 도달하면 생성 요청이 대상 클러스터로 전송됩니다. 배치 크기를 늘리면 생성 요청이 지연되고 더 많은 메시지가 일괄 처리에 추가되어 브로커에게 동시에 전송됩니다. 이렇게 하면 많은 수의 메시지를 처리하는 몇 가지 주제 파티션이 있는 경우 처리량이 향상될 수 있습니다.

생산자가 적절한 생산자 배치 크기에 대해 처리하는 레코드의 수와 크기를 고려합니다.

linger.ms 를 사용하여 생산자 로드가 감소할 때 요청을 지연하기 위해 대기 시간을 밀리초 단위로 추가합니다. 지연은 최대 배치 크기 미만인 경우 배치에 더 많은 레코드를 추가할 수 있음을 의미합니다.

대상 Kafka 클러스터로 메시지를 보내는 특정 생산자와 관련된 소스 커넥터 수준(producer.override.*)에서 batch.sizelinger.ms 값을 구성합니다.

Kafka Connect 소스 커넥터의 경우 대상 Kafka 클러스터에 대한 데이터 스트리밍 파이프라인은 다음과 같습니다.

Kafka Connect 소스 커넥터의 데이터 스트리밍 파이프라인

외부 데이터 소스 (Kafka Connect 작업) 소스 메시지 큐 생산자 버퍼 대상 Kafka 주제

Kafka Connect 싱크 커넥터의 경우 대상 외부 데이터 소스에 대한 데이터 스트리밍 파이프라인은 다음과 같습니다.

Kafka Connect 싱크 커넥터의 데이터 스트리밍 파이프라인

소스 Kafka 주제 (Kafka Connect 작업) 싱크 메시지 큐 소비자 버퍼 외부 데이터 소스

MirrorMaker 2의 경우 대상 Kafka 클러스터에 대한 데이터 미러링 파이프라인은 다음과 같습니다.

MirrorMaker 2의 데이터 미러링 파이프라인

소스 Kafka 주제 (Kafka Connect 작업) 소스 메시지 큐 생산자 버퍼 대상 Kafka 주제

생산자는 버퍼의 메시지를 대상 Kafka 클러스터의 항목으로 보냅니다. 이러한 상황이 발생하는 동안 Kafka Connect 작업은 데이터 소스를 계속 폴링하여 소스 메시지 큐에 메시지를 추가합니다.

소스 커넥터의 생산자 버퍼 크기는 producer.override.buffer.memory 속성을 사용하여 설정됩니다. 작업은 버퍼가 플러시되기 전에 지정된 시간 초과 기간(offset.flush.timeout.ms)을 기다립니다. 이는 전송된 메시지가 브로커에 의해 승인되고 커밋된 데이터를 오프셋할 수 있는 충분한 시간이어야 합니다. 소스 작업에서는 종료 중을 제외하고 생산자가 오프셋을 커밋하기 전에 메시지 큐를 비우기를 기다리지 않습니다.

생산자가 소스 메시지 큐의 메시지 처리량을 유지할 수 없는 경우 max.block.ms 에 의해 바인딩된 시간 내에 버퍼에 사용 가능한 공간이 있을 때까지 버퍼가 차단됩니다. 이 기간 동안 버퍼에 아직 승인되지 않은 모든 메시지가 전송됩니다. 이러한 메시지가 승인되고 플러시될 때까지 새 메시지가 버퍼에 추가되지 않습니다.

다음 구성 변경을 시도하여 미해결 메시지의 기본 소스 메시지 대기열을 관리 가능한 크기로 유지할 수 있습니다.

  • offset.flush.timeout.ms의 기본값(밀리초)을 늘립니다.
  • CPU 및 메모리 리소스가 충분한지 확인
  • 다음을 수행하여 병렬로 실행되는 작업 수를 늘립니다.

    • tasksMax 속성을 사용하여 병렬로 실행되는 작업 수 늘리기
    • replicas 속성을 사용하여 작업을 실행하는 작업자 노드 수 늘리기

사용 가능한 CPU 및 메모리 리소스 및 작업자 노드 수에 따라 병렬로 실행할 수 있는 작업 수를 고려하십시오. 원하는 효과가 있을 때까지 구성 값을 계속 조정해야 할 수 있습니다.

25.1. 대용량 메시지에 대한 Kafka Connect 구성

Kafka Connect가 소스 외부 데이터 시스템에서 데이터를 가져와서 Kafka Connect 런타임 생산자에 전달하여 대상 클러스터에 복제되도록 합니다.

다음 예제는 KafkaConnect 사용자 정의 리소스를 사용하는 Kafka Connect 구성을 보여줍니다.

대량의 메시지를 처리하기 위한 Kafka Connect 구성의 예

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  replicas: 3
  config:
    offset.flush.timeout.ms: 10000
    # ...
  resources:
    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  # ...

KafkaConnector 사용자 정의 리소스를 사용하여 관리하는 소스 커넥터에 대한 생산자 구성이 추가됩니다.

대량의 메시지를 처리하기 위한 소스 커넥터 구성 예

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector
  tasksMax: 2
  config:
    producer.override.batch.size: 327680
    producer.override.linger.ms: 100
    # ...

참고

ScanSettingSourceConnectorScanSettingSinkConnector 는 예제 커넥터로 제공됩니다. KafkaConnector 리소스로 배포하는 방법에 대한 자세한 내용은 6.4.3.3절. “KafkaConnector 리소스 배포” 을 참조하십시오.

싱크 커넥터에 대한 소비자 구성이 추가되었습니다.

대량의 메시지를 처리하기 위한 싱크 커넥터 구성의 예

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.kafka.connect.file.FileStreamSinkConnector
  tasksMax: 2
  config:
    consumer.fetch.max.bytes: 52428800
    consumer.max.partition.fetch.bytes: 1048576
    consumer.max.poll.records: 500
    # ...

KafkaConnector 사용자 지정 리소스 대신 Kafka Connect API를 사용하여 커넥터를 관리하는 경우 커넥터 구성을 JSON 오브젝트로 추가할 수 있습니다.

대량의 메시지를 처리하기 위한 소스 커넥터 구성 추가에 대한 curl 요청의 예

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
      "producer.override.batch.size": 327680
      "producer.override.linger.ms": 100
    }
}'

Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat
맨 위로 이동