AMQ Streams Kafka 브리지 사용


Red Hat AMQ Streams 2.6

AMQ Streams Kafka 브리지를 사용하여 Kafka 클러스터 연결

초록

AMQ Streams Kafka 브리지는 HTTP 기반 클라이언트가 Kafka 클러스터와 상호 작용할 수 있는 RESTful 인터페이스를 제공합니다.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 용어를 교체하기 위해 최선을 다하고 있습니다. 먼저 마스터(master), 슬레이브(slave), 블랙리스트(blacklist), 화이트리스트(whitelist) 등 네 가지 용어를 교체하고 있습니다. 이러한 변경 작업은 작업 범위가 크므로 향후 여러 릴리스에 걸쳐 점차 구현할 예정입니다. 자세한 내용은 CTO Chris Wright의 메시지를 참조하십시오.

1장. Kafka 브리지 개요

AMQ Streams Kafka Bridge를 사용하여 Kafka 클러스터에 HTTP 요청을 만듭니다.

Kafka 브리지를 사용하여 HTTP 클라이언트 애플리케이션을 Kafka 클러스터와 통합할 수 있습니다.

HTTP 클라이언트 통합

Internal and external HTTP producers and consumers exchange data with the Kafka brokers through the Kafka Bridge

1.1. Kafka 브리지 실행

AMQ Streams Kafka Bridge를 설치하여 Kafka 클러스터와 동일한 환경에서 실행됩니다.

호스트 머신에 Kafka Bridge 설치 아티팩트를 다운로드하여 추가할 수 있습니다. 로컬 환경에서 Kafka 브리지를 시도하려면 Kafka Bridge 빠른 시작을 참조하십시오.

Kafka 브리지의 각 인스턴스는 HTTP 클라이언트를 대신하여 Kafka 브로커에 연결하는 자체 메모리 내 소비자(및 서브스크립션) 세트를 유지 관리합니다. 즉, 생성된 서브스크립션에 액세스하려면 각 HTTP 클라이언트에서 동일한 Kafka Bridge 인스턴스에 대한 선호도를 유지해야 합니다. 또한 Kafka Bridge 인스턴스가 다시 시작되면 메모리 내 소비자 및 서브스크립션이 손실됩니다. Kafka Bridge가 다시 시작되면 HTTP 클라이언트에서 소비자와 서브스크립션을 다시 생성해야 합니다.

1.1.1. OpenShift에서 Kafka 브리지 실행

OpenShift에 AMQ Streams를 배포한 경우 AMQ Streams Cluster Operator를 사용하여 Kafka 브리지를 OpenShift 클러스터에 배포할 수 있습니다. Kafka 브리지를 KafkaBridge 리소스로 구성하고 배포합니다. OpenShift 네임스페이스에서 Cluster Operator가 배포한 실행 중인 Kafka 클러스터가 필요합니다. OpenShift 클러스터 외부에서 Kafka Bridge에 액세스하도록 배포를 구성할 수 있습니다.

HTTP 클라이언트는 생성한 모든 소비자 또는 서브스크립션에 액세스하려면 Kafka 브리지의 동일한 인스턴스에 대한 선호도를 유지해야 합니다. 따라서 OpenShift 배포당 Kafka 브리지의 여러 복제본을 실행하는 것은 권장되지 않습니다. Kafka Bridge 포드가 다시 시작되면(예: OpenShift에서 워크로드를 다른 노드로 재배치하기 때문에) HTTP 클라이언트는 소비자 또는 서브스크립션을 다시 생성해야 합니다.

Kafka Bridge 를 KafkaBridge 리소스로 배포하고 구성하는 방법에 대한 자세한 내용은 AMQ Streams 설명서 를 참조하십시오.

1.2. Kafka 브리지 인터페이스

Kafka 브리지는 HTTP 기반 클라이언트가 Kafka 클러스터와 상호 작용할 수 있는 RESTful 인터페이스를 제공합니다.  클라이언트 애플리케이션에서 Kafka 프로토콜을 해석할 필요 없이 AMQ Streams에 대한 웹 API 연결의 이점을 제공합니다.

API에는 Kafka 클러스터의 소비자 및 생산자와 상호 작용하기 위해 엔드포인트를 통해 노출되고 액세스할 수 있는 소비자와 주제의 두 가지 주요 리소스가 있습니다. 리소스는 Kafka 브리지에 직접 연결된 소비자 및 생산자가 아닌 Kafka 브리지와만 관련이 있습니다.

1.2.1. HTTP 요청

Kafka 브리지는 다음과 같은 방법으로 Kafka 클러스터에 대한 HTTP 요청을 지원합니다.

  • 메시지를 주제로 보냅니다.
  • 주제에서 메시지를 검색합니다.
  • 주제의 파티션 목록을 검색합니다.
  • 소비자를 생성하고 삭제합니다.
  • 해당 주제에서 메시지를 수신하기 시작할 수 있도록 소비자를 구독합니다.
  • 소비자가 구독한 주제 목록을 검색합니다.
  • 주제에서 소비자를 서브스크립션 해제합니다.
  • 소비자에게 파티션을 할당합니다.
  • 소비자 오프셋 목록을 커밋합니다.
  • 소비자가 첫 번째 또는 마지막 오프셋 위치 또는 지정된 오프셋 위치에서 메시지를 수신하기 시작할 수 있도록 파티션을 찾습니다.

이 메서드는 JSON 응답 및 HTTP 응답 코드 오류 처리를 제공합니다. 메시지는 JSON 또는 바이너리 형식으로 보낼 수 있습니다.

클라이언트는 네이티브 Kafka 프로토콜을 사용해야 하는 요구 사항 없이 메시지를 생성하고 사용할 수 있습니다.

1.3. Kafka 브리지 OpenAPI 사양

Kafka Bridge API는 OAS(OpenAPI Specification)를 사용합니다. OAS는 HTTP API를 설명하고 구현하기 위한 표준 프레임워크를 제공합니다.

Kafka Bridge OpenAPI 사양은 JSON 형식입니다. OpenAPI JSON 파일은 Kafka Bridge 소스 다운로드 파일의 src/main/resources/ 폴더에서 찾을 수 있습니다. 다운로드 파일은 고객 포털에서 사용할 수 있습니다.

GET /openapi 메서드 를 사용하여 JSON 형식으로 OpenAPI v2 사양을 검색할 수도 있습니다.

1.4. Kafka 클러스터에 대한 연결 보안

Kafka 브리지와 Kafka 클러스터 간에 다음을 구성할 수 있습니다.

  • TLS 또는 SASL 기반 인증
  • TLS 암호화 연결

속성 파일을 통해 인증을 위해 Kafka 브리지를 구성합니다.

Kafka 브로커의 ACL을 사용하여 Kafka 브리지를 사용하여 사용하고 생성할 수 있는 주제를 제한할 수도 있습니다.

참고

1.5. Kafka Bridge HTTP 인터페이스 보안

HTTP 클라이언트와 Kafka 브리지 간의 인증 및 암호화는 Kafka 브리지에서 직접 지원하지 않습니다. 클라이언트에서 Kafka 브리지로 전송된 요청은 인증 또는 암호화 없이 전송됩니다. 요청은 HTTPS 대신 HTTP를 사용해야 합니다.

Kafka 브리지를 다음 툴과 결합하여 보안을 유지할 수 있습니다.

  • Kafka 브릿지에 액세스할 수 있는 Pod를 정의하는 네트워크 정책 및 방화벽
  • 역방향 프록시(예: OAuth 2.0)
  • API 게이트웨이

1.6. Kafka 브릿지 요청

데이터 형식 및 HTTP 헤더를 지정하여 유효한 요청이 Kafka 브리지에 제출되도록 합니다.

1.6.1. 콘텐츠 유형 헤더

API 요청 및 응답 본문은 항상 JSON으로 인코딩됩니다.

  • 소비자 작업을 수행할 때 비어 있지 않은 본문이 있는 경우 POST 요청은 다음 Content-Type 헤더를 제공해야 합니다.

    Content-Type: application/vnd.kafka.v2+json
    Copy to Clipboard Toggle word wrap
  • 생산자 작업을 수행할 때 POST 요청은 생성된 메시지의 포함된 데이터 형식을 지정하는 Content-Type 헤더를 제공해야 합니다. 이는 json 또는 binary 일 수 있습니다.

    Expand
    포함된 데이터 형식content-Type 헤더

    JSON

    content-Type: application/vnd.kafka.json.v2+json

    바이너리

    content-Type: application/vnd.kafka.binary.v2+json

포함된 데이터 형식은 다음 섹션에 설명된 대로 소비자별로 설정됩니다.

POST 요청에 빈 본문이 있는 경우 Content-Type 을 설정하지 않아야 합니다. 빈 본문을 사용하여 기본값이 있는 소비자를 생성할 수 있습니다.

1.6.2. 포함된 데이터 형식

포함된 데이터 형식은 HTTP를 통해 Kafka 브리지를 사용하여 생산자에서 소비자로 전송되는 Kafka 메시지의 형식입니다. 두 가지 임베디드 데이터 형식(JSON 및 바이너리)이 지원됩니다.

/consumers/groupid 엔드포인트를 사용하여 소비자를 생성할 때 POST 요청 본문은 JSON 또는 바이너리의 포함된 데이터 형식을 지정해야 합니다. 이는 format 필드에 지정됩니다. 예를 들면 다음과 같습니다.

{
  "name": "my-consumer",
  "format": "binary", 
1

  # ...
}
Copy to Clipboard Toggle word wrap
1
바이너리 포함된 데이터 형식입니다.

소비자를 생성할 때 지정된 포함된 데이터 형식은 사용할 Kafka 메시지의 데이터 형식과 일치해야 합니다.

바이너리 포함 데이터 형식을 지정하도록 선택하는 경우 후속 생산자 요청은 요청 본문에 바이너리 데이터를 Base64로 인코딩된 문자열로 제공해야 합니다. 예를 들어 /topics/topicname 엔드포인트를 사용하여 메시지를 보낼 때 records.value 는 Base64로 인코딩되어야 합니다.

{
  "records": [
    {
      "key": "my-key",
      "value": "ZWR3YXJkdGhldGhyZWVsZWdnZWRjYXQ="
    },
  ]
}
Copy to Clipboard Toggle word wrap

생산자 요청은 포함된 데이터 형식에 해당하는 Content-Type 헤더(예: Content-Type: application/vnd.kafka.binary.v2+json )도 제공해야 합니다.

1.6.3. 메시지 형식

/topics 엔드포인트를 사용하여 메시지를 보낼 때 요청 본문에 메시지 페이로드를 records 매개변수에 입력합니다.

records 매개변수는 다음 선택적 필드를 포함할 수 있습니다.

  • 메시지 헤더
  • 메시지
  • 메시지
  • 대상 파티션

/topics에 대한 POST 요청의 예

curl -X POST \
  http://localhost:8080/topics/my-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001",
            "partition": 2,
            "headers": [
              {
                "key": "key1",
                "value": "QXBhY2hlIEthZmthIGlzIHRoZSBib21iIQ==" 
1

              }
            ]
        }
    ]
}'
Copy to Clipboard Toggle word wrap

1
바이너리 형식의 헤더 값과 Base64로 인코딩됩니다.

1.6.4. 헤더 수락

소비자를 생성한 후 모든 후속 GET 요청은 다음 형식으로 Accept 헤더를 제공해야 합니다.

Accept: application/vnd.kafka.EMBEDDED-DATA-FORMAT.v2+json
Copy to Clipboard Toggle word wrap

EMBEDDED-DATA-FORMATjson 또는 바이너리 입니다.

예를 들어 JSON의 포함된 데이터 형식을 사용하여 구독된 소비자에 대한 레코드를 검색할 때 다음 Accept 헤더를 포함합니다.

Accept: application/vnd.kafka.json.v2+json
Copy to Clipboard Toggle word wrap

1.7. CORS

일반적으로 HTTP 클라이언트는 다양한 도메인에서 요청을 발행할 수 없습니다.

예를 들어 Kafka 클러스터와 함께 배포한 Kafka 브릿지는 http://my-bridge.io 도메인을 사용하여 액세스할 수 있다고 가정합니다. HTTP 클라이언트는 URL을 사용하여 Kafka 브리지와 상호 작용하고 Kafka 클러스터를 통해 메시지를 교환할 수 있습니다. 그러나 클라이언트는 http://my-web-application.io 도메인에서 웹 애플리케이션으로 실행되고 있습니다. 클라이언트(소스) 도메인은 Kafka 브리지(대상) 도메인과 다릅니다. 동일한 원본 정책 제한으로 인해 클라이언트의 요청이 실패합니다. CORS(Cross-Origin Resource Sharing)를 사용하여 이러한 상황을 방지할 수 있습니다.

CORS를 사용하면 다른 도메인의 원본 소스 간에 간단하고 미리 제공되는 요청을 수행할 수 있습니다.

간단한 요청은 GET,HEAD,POST 메서드를 사용하는 표준 요청에 적합합니다.

보류 중인 요청은 HTTP OPTIONS 요청을 실제 요청이 안전한지 초기 검사로 보냅니다. 확인 시 실제 요청이 전송됩니다. preflight 요청은 PUTDELETE 와 같은 더 큰 보호 장치가 필요한 메서드에 적합하며 비표준 헤더를 사용합니다.

모든 요청에는 HTTP 요청의 소스인 헤더의 origins 값이 필요합니다.

CORS를 사용하면 Kafka Bridge HTTP 구성에서 Kafka 클러스터에 액세스하기 위해 허용된 메서드 및 원래 URL을 지정할 수 있습니다.

Kafka 브리지의 CORS 구성 예

# ...
http.cors.enabled=true
http.cors.allowedOrigins=http://my-web-application.io
http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH
Copy to Clipboard Toggle word wrap

1.7.1. 간단한 요청

예를 들어 이 간단한 요청 헤더는 원본을 http://my-web-application.io 로 지정합니다.

Origin: http://my-web-application.io
Copy to Clipboard Toggle word wrap

레코드를 사용하기 위해 헤더 정보가 요청에 추가됩니다.

curl -v -X GET HTTP-BRIDGE-ADDRESS/consumers/my-group/instances/my-consumer/records \
-H 'Origin: http://my-web-application.io'\
-H 'content-type: application/vnd.kafka.v2+json'
Copy to Clipboard Toggle word wrap

Kafka 브리지의 응답에서 Access-Control-Allow-Origin 헤더가 반환됩니다. HTTP 요청을 브리지에 발행할 수 있는 도메인 목록이 포함되어 있습니다.

HTTP/1.1 200 OK
Access-Control-Allow-Origin: * 
1
Copy to Clipboard Toggle word wrap
1
별표(*)를 반환하면 모든 도메인에서 리소스에 액세스할 수 있습니다.

1.7.2. Preflighted 요청

초기 preflight 요청은 OPTIONS 방법을 사용하여 Kafka Bridge로 전송됩니다. HTTP OPTIONS 요청은 헤더 정보를 전송하여 Kafka 브리지에서 실제 요청을 허용하는지 확인합니다.

여기서 preflight 요청은 http://my-web-application.io 에서 POST 요청이 유효한지 확인합니다.

OPTIONS /my-group/instances/my-consumer/subscription HTTP/1.1
Origin: http://my-web-application.io
Access-Control-Request-Method: POST 
1

Access-Control-Request-Headers: Content-Type 
2
Copy to Clipboard Toggle word wrap
1
Kafka 브리지는 실제 요청이 POST 요청임을 경고합니다.
2
실제 요청은 Content-Type 헤더와 함께 전송됩니다.

OPTIONS 는 preflight 요청의 헤더 정보에 추가됩니다.

curl -v -X OPTIONS -H 'Origin: http://my-web-application.io' \
-H 'Access-Control-Request-Method: POST' \
-H 'content-type: application/vnd.kafka.v2+json'
Copy to Clipboard Toggle word wrap

Kafka 브리지는 초기 요청에 응답하여 요청이 수락되는지 확인합니다. 응답 헤더는 허용되는 원본, 메서드 및 헤더를 반환합니다.

HTTP/1.1 200 OK
Access-Control-Allow-Origin: http://my-web-application.io
Access-Control-Allow-Methods: GET,POST,PUT,DELETE,OPTIONS,PATCH
Access-Control-Allow-Headers: content-type
Copy to Clipboard Toggle word wrap

origin 또는 method가 거부되면 오류 메시지가 반환됩니다.

실제 요청에는 사전 요청에서 확인되었지만 origin 헤더가 필요하므로 Access-Control-Request-Method 헤더가 필요하지 않습니다.

curl -v -X POST HTTP-BRIDGE-ADDRESS/topics/bridge-topic \
-H 'Origin: http://my-web-application.io' \
-H 'content-type: application/vnd.kafka.v2+json'
Copy to Clipboard Toggle word wrap

응답에는 원래 URL이 허용되는 것으로 표시됩니다.

HTTP/1.1 200 OK
Access-Control-Allow-Origin: http://my-web-application.io
Copy to Clipboard Toggle word wrap

1.8. Kafka 브리지의 로거 구성

Kafka Bridge OpenAPI 사양에 정의된 각 작업에 대해 다른 로그 수준을 설정할 수 있습니다.

각 작업에는 브리지가 HTTP 클라이언트에서 요청을 수신하는 해당 API 끝점이 있습니다. 각 끝점에서 로그 수준을 변경하여 들어오고 나가는 HTTP 요청에 대한 보다 세밀한 로깅 정보를 생성할 수 있습니다.

로거는 log4j2.properties 파일에 정의되어 있으며 정상 및 준비된 끝점에 대해 다음과 같은 기본 구성이 있습니다.

logger.healthy.name = http.openapi.operation.healthy
logger.healthy.level = WARN
logger.ready.name = http.openapi.operation.ready
logger.ready.level = WARN
Copy to Clipboard Toggle word wrap

다른 모든 작업의 로그 수준은 기본적으로 INFO 로 설정됩니다. 로거는 다음과 같이 포맷됩니다.

logger.<operation_id>.name = http.openapi.operation.<operation_id>
logger.<operation_id>_level = _<LOG_LEVEL>
Copy to Clipboard Toggle word wrap

여기서 <operation_id >는 특정 작업의 식별자입니다.

OpenAPI 사양으로 정의된 작업 목록

  • createConsumer
  • deleteConsumer
  • 서브스크립션
  • 서브스크립션 취소
  • 폴링
  • 할당
  • commit
  • 전송
  • sendToPartition
  • seekToBeginning
  • seekToEnd
  • 검색
  • 상태
  • Ready
  • openapi

여기서 <LOG_LEVEL >은 log4j2에서 정의한 로깅 수준입니다(예: INFO,DEBUG, …​).

2장. Kafka 브리지 빠른 시작

이 빠른 시작을 사용하여 로컬 개발 환경에서 AMQ Streams Kafka 브리지를 사용해 보십시오.

다음을 수행하는 방법을 알아봅니다.

  • Kafka 클러스터의 주제 및 파티션에 메시지 생성
  • Kafka 브리지 소비자 생성
  • 소비자가 주제에 가입하고 생성한 메시지 검색과 같은 기본 소비자 작업 수행

이 빠른 시작에서는 HTTP 요청이 터미널에 복사하여 붙여넣을 수 있는 curl 명령으로 포맷됩니다.

사전 요구 사항이 있는지 확인한 다음 이 장에 제공된 순서대로 작업을 수행합니다.

이 빠른 시작에서는 JSON 형식으로 메시지를 생성하고 사용합니다.

빠른 시작을 위한 사전 요구 사항

  • Kafka 클러스터가 호스트 시스템에서 실행 중입니다.

2.1. Kafka 브리지 아카이브 다운로드

AMQ Streams Kafka Bridge의 압축 배포는 다운로드할 수 있습니다.

2.2. Kafka 브리지 설치

Kafka 브리지 아카이브와 함께 제공된 스크립트를 사용하여 Kafka 브리지를 설치합니다. 설치 아카이브와 함께 제공된 application.properties 파일은 기본 구성 설정을 제공합니다.

다음 기본 속성 값은 포트 8080에서 요청을 수신 대기하도록 Kafka 브리지를 구성합니다.

기본 구성 속성

http.host=0.0.0.0
http.port=8080
Copy to Clipboard Toggle word wrap

프로세스

  1. 아직 수행하지 않은 경우 Kafka 브리지 설치 아카이브의 압축을 임의의 디렉터리에 풉니다.
  2. 구성 속성을 매개변수로 사용하여 Kafka Bridge 스크립트를 실행합니다.

    예를 들면 다음과 같습니다.

    ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
    Copy to Clipboard Toggle word wrap
  3. 로그에 설치에 성공했는지 확인합니다.

    HTTP-Kafka Bridge started and listening on port 8080
    HTTP-Kafka Bridge bootstrap servers localhost:9092
    Copy to Clipboard Toggle word wrap

2.3. 주제 및 파티션에 메시지 생성

Kafka 브리지를 사용하여 주제 끝점을 사용하여 JSON 형식의 Kafka 항목에 메시지를 생성합니다.

주제 끝점을 사용하여 JSON 형식의 항목에 메시지를 생성할 수 있습니다. 요청 본문에 메시지에 대한 대상 파티션을 지정할 수 있습니다. 파티션 끝점은 모든 메시지의 단일 대상 파티션을 경로 매개 변수로 지정하는 대체 방법을 제공합니다.

이 절차에서는 bridge-quickstart-topic 이라는 항목에 메시지가 생성됩니다.

사전 요구 사항

  • Kafka 클러스터에는 세 개의 파티션이 있는 주제가 있습니다.

    kafka-topics.sh 유틸리티를 사용하여 주제를 생성할 수 있습니다.

    세 개의 파티션이 있는 주제 생성 예

    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bridge-quickstart-topic --partitions 3 --replication-factor 1
    Copy to Clipboard Toggle word wrap

    주제가 생성되었는지 확인

    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bridge-quickstart-topic
    Copy to Clipboard Toggle word wrap

참고

OpenShift에 AMQ Streams를 배포하는 경우 KafkaTopic 사용자 정의 리소스를 사용하여 주제를 생성할 수 있습니다.

프로세스

  1. Kafka 브리지를 사용하여 생성한 항목에 세 개의 메시지를 생성합니다.

    curl -X POST \
      http://localhost:8080/topics/bridge-quickstart-topic \
      -H 'content-type: application/vnd.kafka.json.v2+json' \
      -d '{
        "records": [
            {
                "key": "my-key",
                "value": "sales-lead-0001"
            },
            {
                "value": "sales-lead-0002",
                "partition": 2
            },
            {
                "value": "sales-lead-0003"
            }
        ]
    }'
    Copy to Clipboard Toggle word wrap
    • sales-lead-0001 은 키의 해시를 기반으로 파티션으로 전송됩니다.
    • sales-lead-0002 는 파티션 2로 직접 전송됩니다.
    • sales-lead-0003 은 라운드 로빈 방법을 사용하여 bridge-quickstart-topic 주제의 파티션으로 전송됩니다.
  2. 요청이 성공하면 Kafka 브리지는 200 코드 및 application/vnd.kafka.v2+jsoncontent-type 헤더와 함께 오프셋 배열을 반환합니다. 각 메시지에 대해 오프셋 배열은 다음을 설명합니다.

    • 메시지가 전송된 파티션
    • 파티션의 현재 메시지 오프셋

      응답 예

      #...
      {
        "offsets":[
          {
            "partition":0,
            "offset":0
          },
          {
            "partition":2,
            "offset":0
          },
          {
            "partition":0,
            "offset":1
          }
        ]
      }
      Copy to Clipboard Toggle word wrap

추가 주제 요청

기타 curl 요청을 만들어 주제 및 파티션에 대한 정보를 찾습니다.

주제 목록
curl -X GET \
  http://localhost:8080/topics
Copy to Clipboard Toggle word wrap

응답 예

[
  "__strimzi_store_topic",
  "__strimzi-topic-operator-kstreams-topic-store-changelog",
  "bridge-quickstart-topic",
  "my-topic"
]
Copy to Clipboard Toggle word wrap

주제 구성 및 파티션 세부 정보 가져오기
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic
Copy to Clipboard Toggle word wrap

응답 예

{
  "name": "bridge-quickstart-topic",
  "configs": {
    "compression.type": "producer",
    "leader.replication.throttled.replicas": "",
    "min.insync.replicas": "1",
    "message.downconversion.enable": "true",
    "segment.jitter.ms": "0",
    "cleanup.policy": "delete",
    "flush.ms": "9223372036854775807",
    "follower.replication.throttled.replicas": "",
    "segment.bytes": "1073741824",
    "retention.ms": "604800000",
    "flush.messages": "9223372036854775807",
    "message.format.version": "2.8-IV1",
    "max.compaction.lag.ms": "9223372036854775807",
    "file.delete.delay.ms": "60000",
    "max.message.bytes": "1048588",
    "min.compaction.lag.ms": "0",
    "message.timestamp.type": "CreateTime",
    "preallocate": "false",
    "index.interval.bytes": "4096",
    "min.cleanable.dirty.ratio": "0.5",
    "unclean.leader.election.enable": "false",
    "retention.bytes": "-1",
    "delete.retention.ms": "86400000",
    "segment.ms": "604800000",
    "message.timestamp.difference.max.ms": "9223372036854775807",
    "segment.index.bytes": "10485760"
  },
  "partitions": [
    {
      "partition": 0,
      "leader": 0,
      "replicas": [
        {
          "broker": 0,
          "leader": true,
          "in_sync": true
        },
        {
          "broker": 1,
          "leader": false,
          "in_sync": true
        },
        {
          "broker": 2,
          "leader": false,
          "in_sync": true
        }
      ]
    },
    {
      "partition": 1,
      "leader": 2,
      "replicas": [
        {
          "broker": 2,
          "leader": true,
          "in_sync": true
        },
        {
          "broker": 0,
          "leader": false,
          "in_sync": true
        },
        {
          "broker": 1,
          "leader": false,
          "in_sync": true
        }
      ]
    },
    {
      "partition": 2,
      "leader": 1,
      "replicas": [
        {
          "broker": 1,
          "leader": true,
          "in_sync": true
        },
        {
          "broker": 2,
          "leader": false,
          "in_sync": true
        },
        {
          "broker": 0,
          "leader": false,
          "in_sync": true
        }
      ]
    }
  ]
}
Copy to Clipboard Toggle word wrap

특정 주제의 파티션 나열
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic/partitions
Copy to Clipboard Toggle word wrap

응답 예

[
  {
    "partition": 0,
    "leader": 0,
    "replicas": [
      {
        "broker": 0,
        "leader": true,
        "in_sync": true
      },
      {
        "broker": 1,
        "leader": false,
        "in_sync": true
      },
      {
        "broker": 2,
        "leader": false,
        "in_sync": true
      }
    ]
  },
  {
    "partition": 1,
    "leader": 2,
    "replicas": [
      {
        "broker": 2,
        "leader": true,
        "in_sync": true
      },
      {
        "broker": 0,
        "leader": false,
        "in_sync": true
      },
      {
        "broker": 1,
        "leader": false,
        "in_sync": true
      }
    ]
  },
  {
    "partition": 2,
    "leader": 1,
    "replicas": [
      {
        "broker": 1,
        "leader": true,
        "in_sync": true
      },
      {
        "broker": 2,
        "leader": false,
        "in_sync": true
      },
      {
        "broker": 0,
        "leader": false,
        "in_sync": true
      }
    ]
  }
]
Copy to Clipboard Toggle word wrap

특정 주제 파티션의 세부 정보 나열
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic/partitions/0
Copy to Clipboard Toggle word wrap

응답 예

{
  "partition": 0,
  "leader": 0,
  "replicas": [
    {
      "broker": 0,
      "leader": true,
      "in_sync": true
    },
    {
      "broker": 1,
      "leader": false,
      "in_sync": true
    },
    {
      "broker": 2,
      "leader": false,
      "in_sync": true
    }
  ]
}
Copy to Clipboard Toggle word wrap

특정 주제 파티션의 오프셋 나열
curl -X GET \
  http://localhost:8080/topics/bridge-quickstart-topic/partitions/0/offsets
Copy to Clipboard Toggle word wrap

응답 예

{
  "beginning_offset": 0,
  "end_offset": 1
}
Copy to Clipboard Toggle word wrap

다음에 수행할 작업

주제 및 파티션에 메시지를 생성한 후 Kafka Bridge 소비자를 생성합니다.

2.4. Kafka 브리지 소비자 생성

Kafka 클러스터에서 소비자 작업을 수행하려면 먼저 소비자 끝점을 사용하여 소비자를 생성해야 합니다. 소비자를 Kafka 브리지 소비자 라고 합니다.

프로세스

  1. bridge-quickstart-consumer-group 이라는 새 소비자 그룹에 Kafka 브리지 소비자를 생성합니다.

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "name": "bridge-quickstart-consumer",
        "auto.offset.reset": "earliest",
        "format": "json",
        "enable.auto.commit": false,
        "fetch.min.bytes": 512,
        "consumer.request.timeout.ms": 30000
      }'
    Copy to Clipboard Toggle word wrap
    • 소비자의 이름은 bridge-quickstart-consumer 로 지정되며 포함된 데이터 형식은 json 으로 설정됩니다.
    • 일부 기본 구성 설정이 정의됩니다.
    • enable.auto.commit 설정은 false 이므로 소비자는 로그에 오프셋을 자동으로 커밋하지 않습니다. 이 빠른 시작의 뒷부분에서 오프셋을 수동으로 커밋합니다.

      요청이 성공하면 Kafka 브리지는 응답 본문의 소비자 ID(instance_id) 및 기본 URL(base_uri)을 200 코드와 함께 반환합니다.

      응답 예

      #...
      {
        "instance_id": "bridge-quickstart-consumer",
        "base_uri":"http://<bridge_id>-bridge-service:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer"
      }
      Copy to Clipboard Toggle word wrap

  2. 이 빠른 시작의 다른 소비자 작업에 사용할 기본 URL(base_uri)을 복사합니다.

다음에 수행할 작업

이제 Kafka 브리지 소비자를 생성했으므로 주제를 구독할 수 있습니다.

2.5. Kafka 브리지 소비자 등록

Kafka Bridge 소비자를 생성한 후 서브스크립션 끝점을 사용하여 하나 이상의 항목에 등록합니다. 구독하면 소비자가 주제로 생성된 모든 메시지를 수신하기 시작합니다.

프로세스

  • 이전에 생성한 bridge-quickstart-topic 항목에 소비자를 구독하면 메시지를 주제 및 파티션에 유도합니다.

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "topics": [
            "bridge-quickstart-topic"
        ]
    }'
    Copy to Clipboard Toggle word wrap

    주제 배열에는 단일 주제(여기에 표시된 대로) 또는 여러 주제가 포함될 수 있습니다. 일반 표현식과 일치하는 여러 항목에 소비자를 서브스크립션하려면 주제 배열 대신 topic_pattern 문자열을 사용할 수 있습니다.

    요청이 성공하면 Kafka 브리지는 204 (콘텐츠 없음) 코드만 반환합니다.

Apache Kafka 클라이언트를 사용하는 경우 HTTP 서브스크립션 작업은 로컬 소비자의 서브스크립션에 주제를 추가합니다. 소비자 그룹에 가입하고 파티션 할당을 가져오는 작업은 여러 HTTP 폴링 작업을 실행하여 파티션 리밸런스 및 조인 그룹 프로세스를 시작합니다. 초기 HTTP 폴링 작업에서 레코드를 반환하지 않을 수 있다는 점에 유의해야 합니다.

다음에 수행할 작업

Kafka Bridge 소비자를 항목에 등록한 후 소비자 에서 메시지를 검색할 수 있습니다.

2.6. Kafka 브리지 소비자에서 최신 메시지 검색

레코드 끝점에서 데이터를 요청하여 Kafka Bridge 소비자에서 최신 메시지를 검색합니다. 프로덕션에서 HTTP 클라이언트는 이 끝점을 루프에서 반복적으로 호출할 수 있습니다.

프로세스

  1. 주제 및 파티션에 대한 메시지 전달에 설명된 대로 Kafka Bridge 소비자에 추가 메시지를 생성합니다.
  2. GET 요청을 레코드 끝점에 제출합니다.

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'
    Copy to Clipboard Toggle word wrap

    Kafka Bridge 소비자를 생성하고 구독하면 첫 번째 GET 요청이 비어 있는 응답을 반환합니다. 폴링 작업에서 파티션을 할당하는 재조정 프로세스를 시작하기 때문입니다.

  3. 두 단계를 반복하여 Kafka 브리지 소비자에서 메시지를 검색합니다.

    Kafka Bridge는 200 코드와 함께 응답 본문에 있는 주제 이름, 키, 값, 파티션 및 오프셋 Cryostat-지정을 설명하는 messages의 배열을 반환합니다. 메시지는 기본적으로 최신 오프셋에서 검색됩니다.

    HTTP/1.1 200 OK
    content-type: application/vnd.kafka.json.v2+json
    #...
    [
      {
        "topic":"bridge-quickstart-topic",
        "key":"my-key",
        "value":"sales-lead-0001",
        "partition":0,
        "offset":0
      },
      {
        "topic":"bridge-quickstart-topic",
        "key":null,
        "value":"sales-lead-0003",
        "partition":0,
        "offset":1
      },
    #...
    Copy to Clipboard Toggle word wrap
    참고

    빈 응답이 반환되면 주제 및 파티션에 대한 메시지 전달에 설명된 대로 소비자에 더 많은 레코드를 생성한 다음 메시지를 다시 검색해 보십시오.

다음에 수행할 작업

Kafka 브리지 소비자에서 메시지를 검색한 후 로그에 오프셋을 커밋합니다.

2.7. 로그에 오프셋 커밋

오프셋 끝점을 사용하여 Kafka Bridge 소비자가 수신한 모든 메시지의 로그에 오프셋을 수동으로 커밋합니다. 이전에 생성한 Kafka 브리지 소비자 생성 에서 enable.auto.commit 설정을 false 로 사용하여 구성되었기 때문에 이 작업이 필요합니다.

프로세스

  • bridge-quickstart-consumer:의 로그에 대한 커밋 오프셋

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
    Copy to Clipboard Toggle word wrap

    요청 본문이 제출되지 않으므로 소비자가 수신한 모든 레코드에 대해 오프셋이 커밋됩니다. 또는 요청 본문에 오프셋을 커밋할 주제와 파티션을 지정하는 배열(OffsetCommitSeekList)이 포함될 수 있습니다.

    요청이 성공하면 Kafka 브리지는 204 코드만 반환합니다.

다음에 수행할 작업

로그에 오프셋을 커밋한 후 오프셋을 찾기 위해 끝점을 시도합니다.

2.8. 파티션 오프셋 검색

위치 끝점을 사용하여 특정 오프셋에서 파티션에 대한 메시지를 검색한 다음 최신 오프셋에서 메시지를 검색하도록 Kafka 브리지 소비자를 구성합니다. 이를 Apache Kafka에서 검색 작업이라고 합니다.

프로세스

  1. quickstart-bridge-topic 주제의 파티션 0에 대한 특정 오프셋을 찾습니다.

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "offsets": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0,
                "offset": 2
            }
        ]
    }'
    Copy to Clipboard Toggle word wrap

    요청이 성공하면 Kafka 브리지는 204 코드만 반환합니다.

  2. GET 요청을 레코드 끝점에 제출합니다.

    curl -X GET http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
      -H 'accept: application/vnd.kafka.json.v2+json'
    Copy to Clipboard Toggle word wrap

    Kafka 브리지는 원하는 오프셋에서 메시지를 반환합니다.

  3. 동일한 파티션의 마지막 오프셋을 찾아 기본 메시지 검색 동작을 복원합니다. 이번에는 위치/엔드 엔드포인트를 사용합니다.

    curl -X POST http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions/end \
      -H 'content-type: application/vnd.kafka.v2+json' \
      -d '{
        "partitions": [
            {
                "topic": "bridge-quickstart-topic",
                "partition": 0
            }
        ]
    }'
    Copy to Clipboard Toggle word wrap

    요청이 성공하면 Kafka 브리지는 또 다른 204 코드를 반환합니다.

참고

position/beginning 끝점을 사용하여 하나 이상의 파티션에 대한 첫 번째 오프셋을 검색할 수도 있습니다.

다음에 수행할 작업

이 빠른 시작에서는 AMQ Streams Kafka 브리지를 사용하여 Kafka 클러스터에서 몇 가지 일반적인 작업을 수행했습니다. 이제 이전에 생성한 Kafka 브리지 소비자를 삭제할 수 있습니다.

2.9. Kafka 브리지 소비자 삭제

이 빠른 시작 전체에서 사용한 Kafka 브리지 소비자를 삭제합니다.

프로세스

  • 인스턴스 엔드포인트에 DELETE 요청을 전송하여 Kafka 브리지 소비자를 삭제합니다.

    curl -X DELETE http://localhost:8080/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer
    Copy to Clipboard Toggle word wrap

    요청이 성공하면 Kafka 브리지는 204 코드를 반환합니다.

3장. Kafka 브리지 구성

구성 속성을 사용하여 Kafka 브리지 배포를 구성합니다. Kafka를 구성하고 Kafka와 상호 작용하는 데 필요한 HTTP 연결 세부 정보를 지정합니다. 구성 속성을 사용하여 Kafka 브리지에서 분산 추적을 활성화하고 사용할 수도 있습니다. 분산 추적을 사용하면 분산 시스템의 애플리케이션 간 트랜잭션 진행 상황을 추적할 수 있습니다.

참고

3.1. Kafka 브리지 속성 구성

다음 절차에서는 Kafka 브리지에서 사용하는 Kafka 및 HTTP 연결 속성을 구성하는 방법을 설명합니다.

Kafka 관련 속성에 적절한 접두사를 사용하여 Kafka 브리지를 다른 Kafka 클라이언트로 구성합니다.

  • Kafka. 서버 연결 및 보안과 같은 생산자 및 소비자에 적용되는 일반 구성의 경우
  • 소비자별 구성의 경우 Kafka.consumer.는 소비자에게만 전달됩니다.
  • 생산자별 구성의 경우 Kafka.producer.는 생산자에게만 전달됩니다.

Kafka 클러스터에 대한 HTTP 액세스를 활성화할 뿐만 아니라 HTTP 속성은 CORS(Cross-Origin Resource Sharing)를 통해 Kafka 브릿지에 대한 액세스 제어를 활성화하고 정의하는 기능을 제공합니다. CORS는 브라우저에서 두 개 이상의 원본에서 선택한 리소스에 액세스할 수 있는 HTTP 메커니즘입니다. CORS를 구성하려면 허용된 리소스 원본 및 HTTP 메서드 목록을 정의하여 액세스할 수 있습니다. 요청의 추가 HTTP 헤더는 Kafka 클러스터에 액세스할 수 있는 CORS 원본을 설명합니다.

프로세스

  1. Kafka Bridge 설치 아카이브와 함께 제공된 application.properties 파일을 편집합니다.

    속성 파일을 사용하여 Kafka 및 HTTP 관련 속성을 지정합니다.

    1. Kafka 소비자 및 생산자와 관련된 속성을 포함하여 표준 Kafka 관련 속성을 구성합니다.

      다음을 사용하십시오.

      • Kafka 클러스터에 대한 호스트/포트 연결을 정의하는 Kafka .bootstrap.servers
      • Kafka.producer.acks HTTP 클라이언트에 승인을 제공
      • Kafka.consumer.auto.offset.reset 에서 오프셋의 재설정을 관리하는 방법

        Kafka 속성 구성에 대한 자세한 내용은 Apache Kafka 웹 사이트를참조하십시오.

    2. Kafka 클러스터에 대한 HTTP 액세스를 활성화하도록 HTTP 관련 속성을 구성합니다.

      예를 들면 다음과 같습니다.

      bridge.id=my-bridge
      http.host=0.0.0.0
      http.port=8080 
      1
      
      http.cors.enabled=true 
      2
      
      http.cors.allowedOrigins=https://strimzi.io 
      3
      
      http.cors.allowedMethods=GET,POST,PUT,DELETE,OPTIONS,PATCH 
      4
      Copy to Clipboard Toggle word wrap
      1
      포트 8080에서 수신 대기할 Kafka 브리지의 기본 HTTP 구성입니다.
      2
      CORS를 활성화하려면 true 로 설정합니다.
      3
      쉼표로 구분된 허용된 CORS 원본 목록입니다. URL 또는 Java 정규식을 사용할 수 있습니다.
      4
      CORS에 허용되는 HTTP 메서드의 쉼표로 구분된 목록입니다.
  2. 구성 파일을 저장합니다.

3.2. 분산 추적 구성

분산 추적을 활성화하여 Kafka Bridge에서 사용하고 생성한 메시지 및 클라이언트 애플리케이션의 HTTP 요청을 추적할 수 있습니다.

추적을 활성화하는 속성은 application.properties 파일에 있습니다. 분산 추적을 활성화하려면 다음을 수행합니다.

  • 사용할 추적을 활성화하려면 bridge.tracing 속성 값을 설정합니다. 가능한 유일한 값은 opentelemetry 입니다.
  • 추적을 위한 환경 변수를 설정합니다.

기본 구성으로 OpenTelemetry 추적은 OTLP를 내보내기 프로토콜로 사용합니다. OTLP 끝점을 구성하면 Jaeger 백엔드 인스턴스를 사용하여 추적을 가져올 수 있습니다.

참고

Jaeger는 1.35 버전 이후 OTLP 프로토콜을 지원했습니다. 이전 Jaeger 버전은 OTLP 프로토콜을 사용하여 추적을 가져올 수 없습니다.

OpenTelemetry는 추적 데이터를 메트릭 데이터의 범위로 수집하기 위한 API 사양을 정의합니다. 범위는 특정 작업을 나타냅니다. 추적은 하나 이상의 범위 컬렉션입니다.

Kafka 브리지가 다음을 수행할 때 추적이 생성됩니다.

  • Kafka에서 소비자 HTTP 클라이언트로 메시지 전송
  • Kafka에 보낼 생산자 HTTP 클라이언트에서 메시지를 수신

Jaeger는 필요한 API를 구현하고 분석을 위해 사용자 인터페이스에서 추적 데이터의 시각화를 제공합니다.

엔드 투 엔드 추적을 사용하려면 HTTP 클라이언트에서 추적을 구성해야 합니다.

Important

AMQ Streams는 더 이상 OpenTracing을 지원하지 않습니다. 이전에 bridge.tracing=jaeger 옵션과 함께 OpenTracing을 사용하는 경우 대신 OpenTelemetry를 사용하도록 전환하는 것이 좋습니다.

프로세스

  1. Kafka Bridge 설치 아카이브와 함께 제공된 application.properties 파일을 편집합니다.

    bridge.tracing 속성을 사용하여 사용하려는 추적을 활성화합니다.

    OpenTelemetry를 활성화하는 구성 예

    bridge.tracing=opentelemetry 
    1
    Copy to Clipboard Toggle word wrap

    1
    OpenTelemetry를 활성화하는 속성은 줄 시작 시 # 을 제거하여 주석 처리를 해제합니다.

    추적이 활성화된 경우 Kafka Bridge 스크립트를 실행할 때 추적을 초기화합니다.

  2. 구성 파일을 저장합니다.
  3. 추적의 환경 변수를 설정합니다.

    OpenTelemetry의 환경 변수

    OTEL_SERVICE_NAME=my-tracing-service 
    1
    
    OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 
    2
    Copy to Clipboard Toggle word wrap

    1
    OpenTelemetry 추적기 서비스의 이름입니다.
    2
    포트 4317에서 범위를 수신 대기하는 gRPC 기반 OTLP 끝점입니다.
  4. 추적을 위해 활성화된 속성을 사용하여 Kafka Bridge 스크립트를 실행합니다.

    OpenTelemetry를 사용하여 Kafka 브리지 실행

    ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
    Copy to Clipboard Toggle word wrap

    Kafka 브리지의 내부 소비자 및 생산자가 이제 추적에 사용할 수 있습니다.

3.2.1. OpenTelemetry를 사용하여 추적 시스템 지정

기본 OTLP 추적 시스템 대신 OpenTelemetry에서 지원하는 다른 추적 시스템을 지정할 수 있습니다.

OpenTelemetry와 함께 다른 추적 시스템을 사용하려면 다음을 수행하십시오.

  1. 추적 시스템의 라이브러리를 Kafka 클래스 경로에 추가합니다.
  2. 추적 시스템의 이름을 추가 내보내기 환경 변수로 추가합니다.

    OTLP를 사용하지 않는 경우 추가 환경 변수

    OTEL_SERVICE_NAME=my-tracing-service
    OTEL_TRACES_EXPORTER=zipkin 
    1
    
    OTEL_EXPORTER_ZIPKIN_ENDPOINT=http://localhost:9411/api/v2/spans 
    2
    Copy to Clipboard Toggle word wrap

    1
    추적 시스템의 이름입니다. 이 예에서는 Zipkin이 지정됩니다.
    2
    범위를 수신 대기하는 특정 선택한 내보내기의 끝점입니다. 이 예에서는 Zipkin 끝점이 지정됩니다.

4장. AMQ Streams Kafka Bridge API 참조

4.1. 개요

AMQ Streams Kafka Bridge는 HTTP 기반 클라이언트 애플리케이션을 Kafka 클러스터와 통합하기 위한 REST API를 제공합니다. API를 사용하여 소비자를 생성 및 관리하고 기본 Kafka 프로토콜이 아닌 HTTP를 통해 레코드를 보내고 받을 수 있습니다.

4.1.1. 버전 정보

버전 : 0.1.0

4.1.2. 태그

  • 소비자: Kafka 클러스터에서 소비자를 생성하고 주제 구독, 처리된 레코드 검색, 오프셋 커밋과 같은 일반적인 작업을 수행하는 소비자 작업입니다.
  • producer: 지정된 주제 또는 주제 파티션에 레코드를 보내는 Producer 작업입니다.
  • seek: 소비자가 지정된 오프셋 위치에서 메시지를 수신할 수 있도록 하는 Seek 작업입니다.
  • topics : 지정된 주제 또는 주제 파티션에 메시지를 보내는 주제 작업, 필요에 따라 요청에 메시지 키를 포함합니다. 주제 및 주제 메타데이터를 검색할 수도 있습니다.

4.1.3. 사용

  • application/json

4.1.4. 생성

  • application/json

4.2. 정의

4.2.1. AssignedTopicPartitions

유형 : < string, < integer (int32) > array > map

4.2.2. BridgeInfo

Kafka 브리지 인스턴스에 대한 정보입니다.

Expand
이름스키마

bridge_version
optional

string

4.2.3. 소비자

Expand
이름설명스키마

auto.offset.reset
optional

소비자의 오프셋 위치를 재설정합니다. latest (기본값)로 설정하면 최신 오프셋에서 메시지를 읽습니다. earliest 로 설정하면 첫 번째 오프셋에서 메시지를 읽습니다.

string

consumer.request.timeout.ms
optional

소비자가 요청을 위해 메시지를 대기할 최대 시간(밀리초)을 설정합니다. 응답 없이 제한 시간에 도달하면 오류가 반환됩니다. 기본값은 30000 (30초)입니다.

integer

enable.auto.commit
optional

true (기본값)로 설정하면 소비자에게 메시지 오프셋이 자동으로 커밋됩니다. false 로 설정된 경우 메시지 오프셋을 수동으로 커밋해야 합니다.

boolean

fetch.min.bytes
optional

소비자가 수신할 최소 데이터 양(바이트)을 설정합니다. 브로커는 데이터가 이 양을 초과할 때까지 기다립니다. 기본값은 1 바이트입니다.

integer

형식
선택 사항

소비자에 대해 허용되는 메시지 형식입니다. 바이너리 (기본값) 또는 json 일 수 있습니다. 메시지는 JSON 형식으로 변환됩니다.

string

isolation.level
optional

read_uncommitted (기본값)로 설정하면 모든 트랜잭션 레코드가 트랜잭션 결과에 따라 검색됩니다. read_committed 로 설정하면 커밋된 트랜잭션의 레코드가 검색됩니다.

string

이름
선택 사항

소비자 인스턴스의 고유 이름입니다. 이름은 consumer 그룹의 범위 내에서 고유합니다. 이름은 URL에 사용됩니다. 이름을 지정하지 않으면 임의로 생성된 이름이 할당됩니다.

string

4.2.4. ConsumerRecord

Expand
이름스키마

헤더
선택 사항

KafkaHeaderList

오프셋
선택 사항

정수(int64)

파티션
선택 사항

정수(int32)

주제
선택 사항

string

4.2.5. ConsumerRecordList

유형 : < ConsumerRecord > array

4.2.6. CreatedConsumer

Expand
이름설명스키마

base_uri
optional

이 소비자 인스턴스에 대한 후속 요청에 대한 URI를 구성하는 데 사용되는 기본 URI입니다.

string

instance_id
optional

그룹의 소비자 인스턴스에 대한 고유 ID입니다.

string

4.2.7. 오류

Expand
이름스키마

error_code
optional

정수(int32)

메시지
선택 사항

string

4.2.8. KafkaHeader

Expand
이름설명스키마


필요

 

string


필수

바이너리 형식의 헤더 값인 base64로 인코딩된
패턴 : "^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}=]{2}=|[A-Za-z0-9+/]{3}=)?$"

문자열(바이트)

4.2.9. KafkaHeaderList

유형 : < KafkaHeader > array

4.2.10. OffsetCommitSeek

Expand
이름스키마


필수 오프셋

정수(int64)

파티션
필요

정수(int32)

주제
필요

string

4.2.11. OffsetCommitSeekList

Expand
이름스키마


오프셋선택 사항

< OffsetCommitSeek > array

4.2.12. OffsetRecordSent

Expand
이름스키마

오프셋
선택 사항

정수(int64)

파티션
선택 사항

정수(int32)

4.2.13. OffsetRecordSentList

Expand
이름스키마


오프셋선택 사항

< OffsetRecordSent > array

4.2.14. OffsetsSummary

Expand
이름스키마

beginning_offset
optional

정수(int64)

end_offset
optional

정수(int64)

4.2.15. 파티션

Expand
이름스키마

파티션
선택 사항

정수(int32)

주제
선택 사항

string

4.2.16. PartitionMetadata

Expand
이름스키마

리더
선택 사항

정수(int32)

파티션
선택 사항

정수(int32)

복제본
선택 사항

< replica > array

4.2.17. 파티션

Expand
이름스키마

파티션
선택 사항

< partition > array

4.2.18. ProducerRecord

Expand
이름스키마

헤더
선택 사항

KafkaHeaderList

파티션
선택 사항

정수(int32)

4.2.19. ProducerRecordList

Expand
이름스키마

레코드
선택 사항

< ProducerRecord > array

4.2.20. ProducerRecordToPartition

Expand
이름스키마

헤더
선택 사항

KafkaHeaderList

4.2.21. ProducerRecordToPartitionList

Expand
이름스키마

레코드
선택 사항

< ProducerRecordTo Cryostat > 배열

4.2.22. replica

Expand
이름스키마

브로커
선택 사항

정수(int32)

in_sync
optional

boolean

리더
선택 사항

boolean

4.2.23. SubscribedTopicList

Expand
이름스키마

파티션
선택 사항

< AssignedTopic Cryostats & gt; array

주제
선택 사항

주제

4.2.24. TopicMetadata

Expand
이름설명스키마

구성
선택 사항

주제별 구성 덮어쓰기

< string, string > map

이름
선택 사항

주제의 이름

string

파티션
선택 사항

 

< PartitionMetadata > 배열

4.2.25. 주제

Expand
이름설명스키마

topic_pattern
선택 사항

여러 주제 일치를 위한 regex 주제 패턴

string

주제
선택 사항

 

< string > array

4.3. 경로

4.3.1. GET /

4.3.1.1. 설명

Kafka Bridge 인스턴스에 대한 정보를 JSON 형식으로 검색합니다.

4.3.1.2. 응답
Expand
HTTP 코드설명스키마

200

Kafka 브리지 인스턴스에 대한 정보입니다.

BridgeInfo

4.3.1.3. 생성
  • application/json
4.3.1.4. HTTP 응답의 예
4.3.1.4.1. 응답 200
{
  "bridge_version" : "0.16.0"
}
Copy to Clipboard Toggle word wrap

4.3.2. POST /consumers/{groupid}

4.3.2.1. 설명

지정된 소비자 그룹에서 소비자 인스턴스를 생성합니다. 필요한 경우 소비자 이름 및 지원되는 구성 옵션을 지정할 수 있습니다. 이 소비자 인스턴스에 대한 후속 요청에 대한 URL을 구성하는 데 사용해야 하는 기본 URI를 반환합니다.

4.3.2.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

소비자를 생성할 소비자 그룹의 ID입니다.

string

본문

본문
필요

소비자의 이름 및 구성입니다. 이름은 consumer 그룹의 범위 내에서 고유합니다. 이름을 지정하지 않으면 임의로 생성된 이름이 할당됩니다. 모든 매개변수는 선택 사항입니다. 지원되는 구성 옵션은 다음 예에 표시되어 있습니다.

소비자

4.3.2.3. 응답
Expand
HTTP 코드설명스키마

200

소비자가 성공적으로 생성되었습니다.

CreatedConsumer

409

지정된 이름의 소비자 인스턴스가 Kafka 브리지에 이미 존재합니다.

오류

422

하나 이상의 소비자 구성 옵션에는 잘못된 값이 있습니다.

오류

4.3.2.4. 사용
  • application/vnd.kafka.v2+json
4.3.2.5. 생성
  • application/vnd.kafka.v2+json
4.3.2.6. 태그
  • 소비자
4.3.2.7. HTTP 요청의 예
4.3.2.7.1. 요청 본문
{
  "name" : "consumer1",
  "format" : "binary",
  "auto.offset.reset" : "earliest",
  "enable.auto.commit" : false,
  "fetch.min.bytes" : 512,
  "consumer.request.timeout.ms" : 30000,
  "isolation.level" : "read_committed"
}
Copy to Clipboard Toggle word wrap
4.3.2.8. HTTP 응답의 예
4.3.2.8.1. 응답 200
{
  "instance_id" : "consumer1",
  "base_uri" : "http://localhost:8080/consumers/my-group/instances/consumer1"
}
Copy to Clipboard Toggle word wrap
4.3.2.8.2. 응답 409
{
  "error_code" : 409,
  "message" : "A consumer instance with the specified name already exists in the Kafka Bridge."
}
Copy to Clipboard Toggle word wrap
4.3.2.8.3. 응답 422
{
  "error_code" : 422,
  "message" : "One or more consumer configuration options have invalid values."
}
Copy to Clipboard Toggle word wrap

4.3.3. DELETE /consumers/{groupid}/instances/{name}

4.3.3.1. 설명

지정된 소비자 인스턴스를 삭제합니다. 이 작업에 대한 요청은 이 소비자를 생성하는 데 사용된 /consumers/{groupid} 에 대한 POST 요청에서 반환된 기본 URL(호스트 및 포트 포함)을 사용해야 합니다.

4.3.3.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

소비자가 속한 소비자 그룹의 ID입니다.

string

경로

이름
필요

삭제할 소비자의 이름입니다.

string

4.3.3.3. 응답
Expand
HTTP 코드설명스키마

204

소비자가 성공적으로 제거되었습니다.

콘텐츠 없음

404

지정된 소비자 인스턴스를 찾을 수 없습니다.

오류

4.3.3.4. 사용
  • application/vnd.kafka.v2+json
4.3.3.5. 생성
  • application/vnd.kafka.v2+json
4.3.3.6. 태그
  • 소비자
4.3.3.7. HTTP 응답의 예
4.3.3.7.1. 응답 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Copy to Clipboard Toggle word wrap

4.3.4. POST /consumers/{groupid}/instances/{name}/assignments

4.3.4.1. 설명

하나 이상의 주제 파티션을 소비자에 할당합니다.

4.3.4.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

소비자가 속한 소비자 그룹의 ID입니다.

string

경로

이름
필요

주제 파티션을 할당할 소비자의 이름입니다.

string

본문

본문
필요

소비자에게 할당할 주제 파티션 목록입니다.

파티션

4.3.4.3. 응답
Expand
HTTP 코드설명스키마

204

파티션이 성공적으로 할당되었습니다.

콘텐츠 없음

404

지정된 소비자 인스턴스를 찾을 수 없습니다.

오류

409

주제, 파티션 및 패턴에 대한 서브스크립션은 함께 사용할 수 없습니다.

오류

4.3.4.4. 사용
  • application/vnd.kafka.v2+json
4.3.4.5. 생성
  • application/vnd.kafka.v2+json
4.3.4.6. 태그
  • 소비자
4.3.4.7. HTTP 요청의 예
4.3.4.7.1. 요청 본문
{
  "partitions" : [ {
    "topic" : "topic",
    "partition" : 0
  }, {
    "topic" : "topic",
    "partition" : 1
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.4.8. HTTP 응답의 예
4.3.4.8.1. 응답 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Copy to Clipboard Toggle word wrap
4.3.4.8.2. 응답 409
{
  "error_code" : 409,
  "message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive."
}
Copy to Clipboard Toggle word wrap

4.3.5. POST /consumers/{groupid}/instances/{name}/offsets

4.3.5.1. 설명

소비자 오프셋 목록을 커밋합니다. 소비자가 가져온 모든 레코드에 대한 오프셋을 커밋하려면 요청 본문을 비워 둡니다.

4.3.5.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

소비자가 속한 소비자 그룹의 ID입니다.

string

경로

이름
필요

소비자의 이름입니다.

string

본문

본문
선택 사항

소비자 오프셋 커밋 로그에 커밋할 소비자 오프셋 목록입니다. 오프셋을 커밋할 하나 이상의 주제 파티션을 지정할 수 있습니다.

OffsetCommitSeekList

4.3.5.3. 응답
Expand
HTTP 코드설명스키마

204

성공적으로 커밋되었습니다.

콘텐츠 없음

404

지정된 소비자 인스턴스를 찾을 수 없습니다.

오류

4.3.5.4. 사용
  • application/vnd.kafka.v2+json
4.3.5.5. 생성
  • application/vnd.kafka.v2+json
4.3.5.6. 태그
  • 소비자
4.3.5.7. HTTP 요청의 예
4.3.5.7.1. 요청 본문
{
  "offsets" : [ {
    "topic" : "topic",
    "partition" : 0,
    "offset" : 15
  }, {
    "topic" : "topic",
    "partition" : 1,
    "offset" : 42
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.5.8. HTTP 응답의 예
4.3.5.8.1. 응답 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Copy to Clipboard Toggle word wrap

4.3.6. POST /consumers/{groupid}/instances/{name}/positions

4.3.6.1. 설명

다음 번에 지정된 주제 파티션에서 레코드 집합을 가져올 때 서브스크립션된 소비자가 특정 오프셋에서 오프셋을 가져오도록 구성합니다. 이렇게 하면 소비자의 기본 가져오기 동작이 재정의됩니다. 하나 이상의 주제 파티션을 지정할 수 있습니다.

4.3.6.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

소비자가 속한 소비자 그룹의 ID입니다.

string

경로

이름
필요

구독한 소비자의 이름입니다.

string

본문

본문
필요

서브스크립션 소비자가 다음에 레코드를 가져올 파티션 오프셋 목록입니다.

OffsetCommitSeekList

4.3.6.3. 응답
Expand
HTTP 코드설명스키마

204

검색이 성공적으로 수행되었습니다.

콘텐츠 없음

404

지정된 소비자 인스턴스를 찾을 수 없거나 지정된 소비자 인스턴스에 지정된 파티션 중 하나가 할당되지 않았습니다.

오류

4.3.6.4. 사용
  • application/vnd.kafka.v2+json
4.3.6.5. 생성
  • application/vnd.kafka.v2+json
4.3.6.6. 태그
  • 소비자
  • 검색
4.3.6.7. HTTP 요청의 예
4.3.6.7.1. 요청 본문
{
  "offsets" : [ {
    "topic" : "topic",
    "partition" : 0,
    "offset" : 15
  }, {
    "topic" : "topic",
    "partition" : 1,
    "offset" : 42
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.6.8. HTTP 응답의 예
4.3.6.8.1. 응답 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Copy to Clipboard Toggle word wrap

4.3.7. POST /consumers/{groupid}/instances/{name}/positions/beginning

4.3.7.1. 설명

서브스크립션된 소비자를 구성하여 하나 이상의 지정된 주제 파티션에서 첫 번째 오프셋을 검색(따라서 읽음)합니다.

4.3.7.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

서브스크립션된 소비자가 속한 소비자 그룹의 ID입니다.

string

경로

이름
필요

구독한 소비자의 이름입니다.

string

본문

본문
필요

소비자가 서브스크립션하는 주제 파티션 목록입니다. 소비자는 지정된 파티션에서 첫 번째 오프셋을 검색합니다.

파티션

4.3.7.3. 응답
Expand
HTTP 코드설명스키마

204

처음에는 성공적으로 수행되었습니다.

콘텐츠 없음

404

지정된 소비자 인스턴스를 찾을 수 없거나 지정된 소비자 인스턴스에 지정된 파티션 중 하나가 할당되지 않았습니다.

오류

4.3.7.4. 사용
  • application/vnd.kafka.v2+json
4.3.7.5. 생성
  • application/vnd.kafka.v2+json
4.3.7.6. 태그
  • 소비자
  • 검색
4.3.7.7. HTTP 요청의 예
4.3.7.7.1. 요청 본문
{
  "partitions" : [ {
    "topic" : "topic",
    "partition" : 0
  }, {
    "topic" : "topic",
    "partition" : 1
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.7.8. HTTP 응답의 예
4.3.7.8.1. 응답 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Copy to Clipboard Toggle word wrap

4.3.8. POST /consumers/{groupid}/instances/{name}/positions/end

4.3.8.1. 설명

서브스크립션된 소비자를 구성하여 하나 이상의 주제 파티션 끝에 오프셋을 검색(및 이후에 읽음)합니다.

4.3.8.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

서브스크립션된 소비자가 속한 소비자 그룹의 ID입니다.

string

경로

이름
필요

구독한 소비자의 이름입니다.

string

본문

본문
선택 사항

소비자가 서브스크립션하는 주제 파티션 목록입니다. 소비자는 지정된 파티션에서 마지막 오프셋을 검색합니다.

파티션

4.3.8.3. 응답
Expand
HTTP 코드설명스키마

204

성공적으로 수행 된 끝을 찾습니다.

콘텐츠 없음

404

지정된 소비자 인스턴스를 찾을 수 없거나 지정된 소비자 인스턴스에 지정된 파티션 중 하나가 할당되지 않았습니다.

오류

4.3.8.4. 사용
  • application/vnd.kafka.v2+json
4.3.8.5. 생성
  • application/vnd.kafka.v2+json
4.3.8.6. 태그
  • 소비자
  • 검색
4.3.8.7. HTTP 요청의 예
4.3.8.7.1. 요청 본문
{
  "partitions" : [ {
    "topic" : "topic",
    "partition" : 0
  }, {
    "topic" : "topic",
    "partition" : 1
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.8.8. HTTP 응답의 예
4.3.8.8.1. 응답 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Copy to Clipboard Toggle word wrap

4.3.9. GET /consumers/{groupid}/instances/{name}/records

4.3.9.1. 설명

메시지 값, 주제 및 파티션을 포함하여 구독한 소비자에 대한 레코드를 검색합니다. 이 작업에 대한 요청은 이 소비자를 생성하는 데 사용된 /consumers/{groupid} 에 대한 POST 요청에서 반환된 기본 URL(호스트 및 포트 포함)을 사용해야 합니다.

4.3.9.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

서브스크립션된 소비자가 속한 소비자 그룹의 ID입니다.

string

경로

이름
필요

레코드를 검색할 구독 소비자의 이름입니다.

string

쿼리

max_bytes
optional

응답에 포함될 수 있는 인코딩되지 않은 키와 값의 최대 크기(바이트)입니다. 그렇지 않으면 코드 422에 대한 오류 응답이 반환됩니다.

integer

쿼리

시간 초과
선택 사항

HTTP 브리지가 요청을 타이밍하기 전에 레코드를 검색하는 데 사용하는 최대 시간(밀리초)입니다.

integer

4.3.9.3. 응답
Expand
HTTP 코드설명스키마

200

설문 조사 요청이 성공적으로 실행되었습니다.

ConsumerRecordList

404

지정된 소비자 인스턴스를 찾을 수 없습니다.

오류

406

소비자 생성 요청에 사용된 형식이 이 요청의 Accept 헤더에 포함된 형식과 일치하지 않거나 브릿지는 JSON 인코딩되지 않은 주제에서 메시지를 받았습니다.

오류

422

응답은 소비자가 수신할 수 있는 최대 바이트 수를 초과합니다.

오류

4.3.9.4. 생성
  • application/vnd.kafka.json.v2+json
  • application/vnd.kafka.binary.v2+json
  • application/vnd.kafka.v2+json
4.3.9.5. 태그
  • 소비자
4.3.9.6. HTTP 응답의 예
4.3.9.6.1. 응답 200
[ {
  "topic" : "topic",
  "key" : "key1",
  "value" : {
    "foo" : "bar"
  },
  "partition" : 0,
  "offset" : 2
}, {
  "topic" : "topic",
  "key" : "key2",
  "value" : [ "foo2", "bar2" ],
  "partition" : 1,
  "offset" : 3
} ]
Copy to Clipboard Toggle word wrap
[
  {
    "topic": "test",
    "key": "a2V5",
    "value": "Y29uZmx1ZW50",
    "partition": 1,
    "offset": 100,
  },
  {
    "topic": "test",
    "key": "a2V5",
    "value": "a2Fma2E=",
    "partition": 2,
    "offset": 101,
  }
]
Copy to Clipboard Toggle word wrap
4.3.9.6.2. 응답 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Copy to Clipboard Toggle word wrap
4.3.9.6.3. 응답 406
{
  "error_code" : 406,
  "message" : "The `format` used in the consumer creation request does not match the embedded format in the Accept header of this request."
}
Copy to Clipboard Toggle word wrap
4.3.9.6.4. 응답 422
{
  "error_code" : 422,
  "message" : "Response exceeds the maximum number of bytes the consumer can receive"
}
Copy to Clipboard Toggle word wrap

4.3.10. POST /consumers/{groupid}/instances/{name}/subscription

4.3.10.1. 설명

사용자를 하나 이상의 항목에 서브스크립션합니다. 소비자가 목록(주제 유형) 또는 topic_pattern 필드로 서브스크립션할 주제를 설명할 수 있습니다. 각 호출은 구독자에 대한 서브스크립션을 대체합니다.

4.3.10.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

서브스크립션된 소비자가 속한 소비자 그룹의 ID입니다.

string

경로

이름
필요

주제를 등록할 소비자의 이름입니다.

string

본문

본문
필요

소비자가 서브스크립션할 주제 목록입니다.

주제

4.3.10.3. 응답
Expand
HTTP 코드설명스키마

204

소비자가 성공적으로 구독했습니다.

콘텐츠 없음

404

지정된 소비자 인스턴스를 찾을 수 없습니다.

오류

409

주제, 파티션 및 패턴에 대한 서브스크립션은 함께 사용할 수 없습니다.

오류

422

목록( 주제 유형) 또는 topic_pattern 을 지정해야 합니다.

오류

4.3.10.4. 사용
  • application/vnd.kafka.v2+json
4.3.10.5. 생성
  • application/vnd.kafka.v2+json
4.3.10.6. 태그
  • 소비자
4.3.10.7. HTTP 요청의 예
4.3.10.7.1. 요청 본문
{
  "topics" : [ "topic1", "topic2" ]
}
Copy to Clipboard Toggle word wrap
4.3.10.8. HTTP 응답의 예
4.3.10.8.1. 응답 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Copy to Clipboard Toggle word wrap
4.3.10.8.2. 응답 409
{
  "error_code" : 409,
  "message" : "Subscriptions to topics, partitions, and patterns are mutually exclusive."
}
Copy to Clipboard Toggle word wrap
4.3.10.8.3. 응답 422
{
  "error_code" : 422,
  "message" : "A list (of Topics type) or a topic_pattern must be specified."
}
Copy to Clipboard Toggle word wrap

4.3.11. GET /consumers/{groupid}/instances/{name}/subscription

4.3.11.1. 설명

소비자가 서브스크립션하는 주제 목록을 검색합니다.

4.3.11.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

서브스크립션된 소비자가 속한 소비자 그룹의 ID입니다.

string

경로

이름
필요

구독한 소비자의 이름입니다.

string

4.3.11.3. 응답
Expand
HTTP 코드설명스키마

200

서브스크립션된 주제 및 파티션 목록입니다.

SubscribedTopicList

404

지정된 소비자 인스턴스를 찾을 수 없습니다.

오류

4.3.11.4. 생성
  • application/vnd.kafka.v2+json
4.3.11.5. 태그
  • 소비자
4.3.11.6. HTTP 응답의 예
4.3.11.6.1. 응답 200
{
  "topics" : [ "my-topic1", "my-topic2" ],
  "partitions" : [ {
    "my-topic1" : [ 1, 2, 3 ]
  }, {
    "my-topic2" : [ 1 ]
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.11.6.2. 응답 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Copy to Clipboard Toggle word wrap

4.3.12. DELETE /consumers/{groupid}/instances/{name}/subscription

4.3.12.1. 설명

모든 주제에서 소비자를 서브스크립션 해제합니다.

4.3.12.2. 매개 변수
Expand
유형이름설명스키마

경로

groupID
필요

서브스크립션된 소비자가 속한 소비자 그룹의 ID입니다.

string

경로

이름
필요

주제에서 구독 취소할 소비자의 이름입니다.

string

4.3.12.3. 응답
Expand
HTTP 코드설명스키마

204

소비자가 성공적으로 취소되었습니다.

콘텐츠 없음

404

지정된 소비자 인스턴스를 찾을 수 없습니다.

오류

4.3.12.4. 태그
  • 소비자
4.3.12.5. HTTP 응답의 예
4.3.12.5.1. 응답 404
{
  "error_code" : 404,
  "message" : "The specified consumer instance was not found."
}
Copy to Clipboard Toggle word wrap

4.3.13. GET /healthy

4.3.13.1. 설명

브리지가 실행 중인지 확인합니다. 이는 반드시 요청을 수락할 준비가 되어 있음을 의미하지는 않습니다.

4.3.13.2. 응답
Expand
HTTP 코드설명스키마

204

브릿지가 정상입니다.

콘텐츠 없음

500

브릿지는 건강하지 않습니다.

콘텐츠 없음

4.3.14. GET /metrics

4.3.14.1. 설명

Prometheus 형식으로 브리지 지표를 검색합니다.

4.3.14.2. 응답
Expand
HTTP 코드설명스키마

200

Prometheus 형식의 메트릭이 성공적으로 검색되었습니다.

string

4.3.14.3. 생성
  • text/plain

4.3.15. GET /openapi

4.3.15.1. 설명

OpenAPI v2 사양을 JSON 형식으로 검색합니다.

4.3.15.2. 응답
Expand
HTTP 코드설명스키마

204

JSON 형식의 OpenAPI v2 사양이 성공적으로 검색되었습니다.

string

4.3.15.3. 생성
  • application/json

4.3.16. GET /ready

4.3.16.1. 설명

브리지가 준비되었는지 확인하고 요청을 수락할 수 있는지 확인합니다.

4.3.16.2. 응답
Expand
HTTP 코드설명스키마

204

브리지가 준비됨

콘텐츠 없음

500

브리지가 준비되지 않았습니다.

콘텐츠 없음

4.3.17. GET /topics

4.3.17.1. 설명

모든 주제 목록을 검색합니다.

4.3.17.2. 응답
Expand
HTTP 코드설명스키마

200

주제 목록.

< string > array

4.3.17.3. 생성
  • application/vnd.kafka.v2+json
4.3.17.4. 태그
  • 주제
4.3.17.5. HTTP 응답의 예
4.3.17.5.1. 응답 200
[ "topic1", "topic2" ]
Copy to Clipboard Toggle word wrap

4.3.18. POST /topics/{topicname}

4.3.18.1. 설명

하나 이상의 레코드를 지정된 항목에 전송하고, 필요에 따라 파티션, 키 또는 둘 다를 지정합니다.

4.3.18.2. 매개 변수
Expand
유형이름설명스키마

경로

주제 이름
필요

레코드를 보내거나 메타데이터에서 검색할 항목의 이름입니다.

string

쿼리

Async
선택 사항

메타데이터를 기다리는 대신 레코드를 전송할 때 즉시 반환할지 여부입니다. 지정된 경우 오프셋은 반환되지 않습니다. 기본값은 false입니다.

boolean

본문

본문
필요

 

ProducerRecordList

4.3.18.3. 응답
Expand
HTTP 코드설명스키마

200

레코드가 성공적으로 전송되었습니다.

OffsetRecordSentList

404

지정된 주제를 찾을 수 없습니다.

오류

422

레코드 목록이 유효하지 않습니다.

오류

4.3.18.4. 사용
  • application/vnd.kafka.json.v2+json
  • application/vnd.kafka.binary.v2+json
4.3.18.5. 생성
  • application/vnd.kafka.v2+json
4.3.18.6. 태그
  • 생산자
  • 주제
4.3.18.7. HTTP 요청의 예
4.3.18.7.1. 요청 본문
{
  "records" : [ {
    "key" : "key1",
    "value" : "value1"
  }, {
    "value" : "value2",
    "partition" : 1
  }, {
    "value" : "value3"
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.18.8. HTTP 응답의 예
4.3.18.8.1. 응답 200
{
  "offsets" : [ {
    "partition" : 2,
    "offset" : 0
  }, {
    "partition" : 1,
    "offset" : 1
  }, {
    "partition" : 2,
    "offset" : 2
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.18.8.2. 응답 404
{
  "error_code" : 404,
  "message" : "The specified topic was not found."
}
Copy to Clipboard Toggle word wrap
4.3.18.8.3. 응답 422
{
  "error_code" : 422,
  "message" : "The record list contains invalid records."
}
Copy to Clipboard Toggle word wrap

4.3.19. GET /topics/{topicname}

4.3.19.1. 설명

지정된 항목에 대한 메타데이터를 검색합니다.

4.3.19.2. 매개 변수
Expand
유형이름설명스키마

경로

주제 이름
필요

레코드를 보내거나 메타데이터에서 검색할 항목의 이름입니다.

string

4.3.19.3. 응답
Expand
HTTP 코드설명스키마

200

주제 메타데이터

TopicMetadata

4.3.19.4. 생성
  • application/vnd.kafka.v2+json
4.3.19.5. 태그
  • 주제
4.3.19.6. HTTP 응답의 예
4.3.19.6.1. 응답 200
{
  "name" : "topic",
  "offset" : 2,
  "configs" : {
    "cleanup.policy" : "compact"
  },
  "partitions" : [ {
    "partition" : 1,
    "leader" : 1,
    "replicas" : [ {
      "broker" : 1,
      "leader" : true,
      "in_sync" : true
    }, {
      "broker" : 2,
      "leader" : false,
      "in_sync" : true
    } ]
  }, {
    "partition" : 2,
    "leader" : 2,
    "replicas" : [ {
      "broker" : 1,
      "leader" : false,
      "in_sync" : true
    }, {
      "broker" : 2,
      "leader" : true,
      "in_sync" : true
    } ]
  } ]
}
Copy to Clipboard Toggle word wrap

4.3.20. GET /topics/{topicname}/partitions

4.3.20.1. 설명

주제의 파티션 목록을 검색합니다.

4.3.20.2. 매개 변수
Expand
유형이름설명스키마

경로

주제 이름
필요

레코드를 보내거나 메타데이터에서 검색할 항목의 이름입니다.

string

4.3.20.3. 응답
Expand
HTTP 코드설명스키마

200

파티션 목록

< PartitionMetadata > 배열

404

지정된 주제를 찾을 수 없습니다.

오류

4.3.20.4. 생성
  • application/vnd.kafka.v2+json
4.3.20.5. 태그
  • 주제
4.3.20.6. HTTP 응답의 예
4.3.20.6.1. 응답 200
[ {
  "partition" : 1,
  "leader" : 1,
  "replicas" : [ {
    "broker" : 1,
    "leader" : true,
    "in_sync" : true
  }, {
    "broker" : 2,
    "leader" : false,
    "in_sync" : true
  } ]
}, {
  "partition" : 2,
  "leader" : 2,
  "replicas" : [ {
    "broker" : 1,
    "leader" : false,
    "in_sync" : true
  }, {
    "broker" : 2,
    "leader" : true,
    "in_sync" : true
  } ]
} ]
Copy to Clipboard Toggle word wrap
4.3.20.6.2. 응답 404
{
  "error_code" : 404,
  "message" : "The specified topic was not found."
}
Copy to Clipboard Toggle word wrap

4.3.21. POST /topics/{topicname}/partitions/{partitionid}

4.3.21.1. 설명

지정된 주제 파티션에 하나 이상의 레코드를 전송하고 선택적으로 키를 지정합니다.

4.3.21.2. 매개 변수
Expand
유형이름설명스키마

경로

PartitionID
필요

레코드를 보내거나 메타데이터에서 검색할 파티션의 ID입니다.

integer

경로

주제 이름
필요

레코드를 보내거나 메타데이터에서 검색할 항목의 이름입니다.

string

쿼리

Async
선택 사항

메타데이터를 기다리는 대신 레코드를 전송할 때 즉시 반환할지 여부입니다. 지정된 경우 오프셋은 반환되지 않습니다. 기본값은 false입니다.

boolean

본문

본문
필요

값(필수) 및 키(선택 사항)를 포함하여 지정된 주제 파티션에 보낼 레코드 목록입니다.

ProducerRecordToPartitionList

4.3.21.3. 응답
Expand
HTTP 코드설명스키마

200

레코드가 성공적으로 전송되었습니다.

OffsetRecordSentList

404

지정된 주제 파티션을 찾을 수 없습니다.

오류

422

레코드가 유효하지 않습니다.

오류

4.3.21.4. 사용
  • application/vnd.kafka.json.v2+json
  • application/vnd.kafka.binary.v2+json
4.3.21.5. 생성
  • application/vnd.kafka.v2+json
4.3.21.6. 태그
  • 생산자
  • 주제
4.3.21.7. HTTP 요청의 예
4.3.21.7.1. 요청 본문
{
  "records" : [ {
    "key" : "key1",
    "value" : "value1"
  }, {
    "value" : "value2"
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.21.8. HTTP 응답의 예
4.3.21.8.1. 응답 200
{
  "offsets" : [ {
    "partition" : 2,
    "offset" : 0
  }, {
    "partition" : 1,
    "offset" : 1
  }, {
    "partition" : 2,
    "offset" : 2
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.21.8.2. 응답 404
{
  "error_code" : 404,
  "message" : "The specified topic partition was not found."
}
Copy to Clipboard Toggle word wrap
4.3.21.8.3. 응답 422
{
  "error_code" : 422,
  "message" : "The record is not valid."
}
Copy to Clipboard Toggle word wrap

4.3.22. GET /topics/{topicname}/partitions/{partitionid}

4.3.22.1. 설명

주제 파티션의 파티션 메타데이터를 검색합니다.

4.3.22.2. 매개 변수
Expand
유형이름설명스키마

경로

PartitionID
필요

레코드를 보내거나 메타데이터에서 검색할 파티션의 ID입니다.

integer

경로

주제 이름
필요

레코드를 보내거나 메타데이터에서 검색할 항목의 이름입니다.

string

4.3.22.3. 응답
Expand
HTTP 코드설명스키마

200

파티션 메타데이터

PartitionMetadata

404

지정된 주제 파티션을 찾을 수 없습니다.

오류

4.3.22.4. 생성
  • application/vnd.kafka.v2+json
4.3.22.5. 태그
  • 주제
4.3.22.6. HTTP 응답의 예
4.3.22.6.1. 응답 200
{
  "partition" : 1,
  "leader" : 1,
  "replicas" : [ {
    "broker" : 1,
    "leader" : true,
    "in_sync" : true
  }, {
    "broker" : 2,
    "leader" : false,
    "in_sync" : true
  } ]
}
Copy to Clipboard Toggle word wrap
4.3.22.6.2. 응답 404
{
  "error_code" : 404,
  "message" : "The specified topic partition was not found."
}
Copy to Clipboard Toggle word wrap

4.3.23. GET /topics/{topicname}/partitions/{partitionid}/offsets

4.3.23.1. 설명

주제 파티션에 대한 오프셋에 대한 요약을 검색합니다.

4.3.23.2. 매개 변수
Expand
유형이름설명스키마

경로

PartitionID
필요

파티션의 ID입니다.

integer

경로

주제 이름
필요

파티션이 포함된 주제의 이름입니다.

string

4.3.23.3. 응답
Expand
HTTP 코드설명스키마

200

주제 파티션에 대한 오프셋에 대한 요약입니다.

OffsetsSummary

404

지정된 주제 파티션을 찾을 수 없습니다.

오류

4.3.23.4. 생성
  • application/vnd.kafka.v2+json
4.3.23.5. 태그
  • 주제
4.3.23.6. HTTP 응답의 예
4.3.23.6.1. 응답 200
{
  "beginning_offset" : 10,
  "end_offset" : 50
}
Copy to Clipboard Toggle word wrap
4.3.23.6.2. 응답 404
{
  "error_code" : 404,
  "message" : "The specified topic partition was not found."
}
Copy to Clipboard Toggle word wrap

부록 A. 서브스크립션 사용

AMQ Streams는 소프트웨어 서브스크립션을 통해 제공됩니다. 서브스크립션을 관리하려면 Red Hat 고객 포털에서 계정에 액세스하십시오.

계정 액세스

  1. access.redhat.com 으로 이동합니다.
  2. 계정이 없는 경우 계정을 생성합니다.
  3. 계정에 로그인합니다.

서브스크립션 활성화

  1. access.redhat.com 으로 이동합니다.
  2. 내 서브스크립션 으로 이동합니다.
  3. 서브스크립션 활성화로 이동하여 16자리 활성화 번호를 입력합니다.

Zip 및 Tar 파일 다운로드

zip 또는 tar 파일에 액세스하려면 고객 포털을 사용하여 다운로드할 관련 파일을 찾습니다. RPM 패키지를 사용하는 경우 이 단계는 필요하지 않습니다.

  1. 브라우저를 열고 Red Hat Customer Portal 제품 다운로드 페이지에 access.redhat.com/downloads.
  2. INTEGRATION 및 AUTOMATION 카테고리에서 AMQ Streams for Apache Kafka 항목을 찾습니다.
  3. 원하는 AMQ Streams 제품을 선택합니다. 소프트웨어 다운로드 페이지가 열립니다.
  4. 구성 요소에 대한 다운로드 링크를 클릭합니다.

DNF를 사용하여 패키지 설치

패키지 및 모든 패키지 종속 항목을 설치하려면 다음을 사용합니다.

dnf install <package_name>
Copy to Clipboard Toggle word wrap

로컬 디렉터리에서 이전에 다운로드한 패키지를 설치하려면 다음을 사용합니다.

dnf install <path_to_download_package>
Copy to Clipboard Toggle word wrap

2023-12-06에 최종 업데이트된 문서

법적 공지

Copyright © 2023 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
맨 위로 이동
Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2025 Red Hat