8.3. 다른 클라이언트 직렬화기/deserializer 유형을 구성하는 방법


Kafka 클라이언트 애플리케이션에서 스키마를 사용하는 경우 사용 사례에 따라 사용할 특정 스키마 유형을 선택해야 합니다. Service Registry는 Apache Avro, JSON Schema 및 Google Protobuf에 대해 SerDe Java 클래스를 제공합니다. 다음 섹션에서는 각 유형을 사용하도록 Kafka 애플리케이션을 구성하는 방법을 설명합니다.

Kafka를 사용하여 사용자 정의 serializer 및 deserializer 클래스를 구현하고 Service Registry REST Java 클라이언트를 사용하여 서비스 레지스트리 기능을 활용할 수도 있습니다.

serializers/deserializers에 대한 Kafka 애플리케이션 구성

Kafka 애플리케이션에서 서비스 레지스트리에서 제공하는 SerDe 클래스를 사용하려면 올바른 구성 속성을 설정해야 합니다. 다음의 간단한 Avro 예제에서는 Kafka 생산자 애플리케이션에서 serializer를 구성하는 방법과 Kafka 소비자 애플리케이션에서 deserialize자를 구성하는 방법을 보여줍니다.

Kafka 프로듀서의 직렬화기 구성 예

// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use Service Registry-provided Kafka serializer for Avro
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());

    // Configure the Service Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // Register the schema artifact if not found in the registry.
    props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}

Kafka 소비자의 deserializer 구성 예

// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use Service Registry-provided Kafka deserializer for Avro
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

    // Configure the Service Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // No other configuration needed because the schema globalId the deserializer uses is sent
    // in the payload. The deserializer extracts the globalId and uses it to look up the schema
    // from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}

추가 리소스

8.3.1. 서비스 레지스트리를 사용하여 Avro SerDes 구성

이 항목에서는 Apache Avro에 Kafka 클라이언트 직렬화기 및 역직렬러(SerDes) 클래스를 사용하는 방법을 설명합니다.

Service Registry에서는 Avro에 대해 다음과 같은 Kafka 클라이언트 SerDes 클래스를 제공합니다.

  • io.apicurio.registry.serde.avro.AvroKafkaSerializer
  • io.apicurio.registry.serde.avro.AvroKafkaDeserializer

Avro serializer 구성

다음을 사용하여 Avro serializer 클래스를 구성할 수 있습니다.

  • 서비스 레지스트리 URL
  • 아티팩트 해석기 전략
  • ID 위치
  • ID 인코딩
  • Avro datum 공급자
  • Avro 인코딩

ID 위치

직렬화기에서 스키마의 고유 ID를 Kafka 메시지의 일부로 전달하여 소비자가 deserialization에 올바른 스키마를 사용할 수 있습니다. ID는 메시지 페이로드 또는 메시지 헤더에 있을 수 있습니다. 기본 위치는 메시지 페이로드입니다. 메시지 헤더에 ID를 보내려면 다음 구성 속성을 설정합니다.

props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")

속성 이름은 apicurio.registry.headers.enabled 입니다.

ID 인코딩

Kafka 메시지 본문에 전달할 때 스키마 ID를 인코딩하는 방법을 사용자 지정할 수 있습니다. apicurio.registry.id-handler 구성 속성을 io.apicurio.registry.serde.IdHandler 인터페이스를 구현하는 클래스로 설정합니다. Service Registry에서는 다음과 같은 구현을 제공합니다.

  • io.apicurio.registry.serde.DefaultIdHandler: ID를 8바이트 길이로 저장
  • io.apicurio.registry.serde.Legacy4ByteIdHandler: ID를 4바이트 정수로 저장

Service Registry는 스키마 ID를 긴 것으로 나타내지만, 기존의 이유로 또는 다른 레지스트리 또는 SerDe 클래스와의 호환성을 위해 ID를 보낼 때 4바이트를 사용할 수 있습니다.

Avro datum 공급자

Avro는 다양한 datum 작성자와 리더를 제공하여 데이터를 작성하고 읽을 수 있습니다. Service Registry에서는 다음 세 가지 유형을 지원합니다.

  • 일반
  • 특정
  • Reflect

Service Registry AvroDatumProvider 는 기본적으로 DefaultAvroDatumProvider 를 사용하는 유형을 추상화하는 것입니다.

다음 설정 옵션을 설정할 수 있습니다.

  • apicurio.registry.avro-datum-provider: AvroDatumProvider 구현의 정규화된 Java 클래스 이름을 지정합니다(예: io.apicurio.registry.serde.avlectAvroDatumProvider).
  • apicurio.registry.use-specific-avro-reader: DefaultAvroDatumProvider를 사용할 때 특정 유형을 사용하려면 true 로 설정

Avro 인코딩

Avro를 사용하여 데이터를 직렬화할 때 Avro 바이너리 인코딩 형식을 사용하여 데이터가 가능한 한 효율적으로 인코딩되도록 할 수 있습니다. 또한 Avro는 JSON으로 데이터 인코딩을 지원하므로 각 메시지의 페이로드를 더 쉽게 검사할 수 있습니다(예: 로깅 또는 디버깅).

JSON 또는 BINARY 값으로 apicurio.registry.avro.encoding 속성을 구성하여 Avro 인코딩을 설정할 수 있습니다. 기본값은 BINARY 입니다.

Avro deserializer 구성

serializer의 다음 구성 설정과 일치하도록 Avro deserializer 클래스를 구성해야 합니다.

  • 서비스 레지스트리 URL
  • ID 인코딩
  • Avro datum 공급자
  • Avro 인코딩

이러한 구성 옵션은 직렬화기 섹션을 참조하십시오. 속성 이름과 값은 동일합니다.

참고

deserialize자를 구성할 때는 다음 옵션이 필요하지 않습니다.

  • 아티팩트 해석기 전략
  • ID 위치

deserializer 클래스는 메시지에서 이러한 옵션에 대한 값을 결정할 수 있습니다. 직렬화기에서 메시지의 일부로 ID를 전송해야 하므로 전략이 필요하지 않습니다.

ID 위치는 메시지 페이로드의 시작 부분에 있는 매직 바이트를 확인하여 결정됩니다. 해당 바이트가 발견되면 구성된 처리기를 사용하여 메시지 페이로드에서 ID를 읽습니다. 매직 바이트를 찾을 수 없으면 메시지 헤더에서 ID를 읽습니다.

Avro SerDes 및 아티팩트 참조

Avro 메시지 및 중첩 레코드가 포함된 스키마로 작업할 때 중첩 레코드별로 새 아티팩트가 등록됩니다. 예를 들어, 다음 involves Key 스키마에는 중첩된 Exchange 스키마가 포함되어 있습니다.

TRADEKEY 스키마 중첩 Exchange 스키마

{
  "namespace": "com.kubetrade.schema.trade",
  "type": "record",
  "name": "TradeKey",
  "fields": [
    {
      "name": "exchange",
      "type": "com.kubetrade.schema.common.Exchange"
    },
    {
      "name": "key",
      "type": "string"
    }
  ]
}

교환 스키마

{
  "namespace": "com.kubetrade.schema.common",
  "type": "enum",
  "name": "Exchange",
  "symbols" : ["GEMINI"]
}

Avro SerDes와 함께 이러한 스키마를 사용하면 두 개의 아티팩트가 Service Registry에서 생성되며 하나는 tradeKey 스키마용이고 하나는 Exchange 스키마용입니다. tradeKey 스키마를 사용하는 메시지가 직렬화되거나 역직렬화될 때마다 두 스키마가 모두 검색되므로 정의를 다른 파일로 분할할 수 있습니다.

추가 리소스

8.3.2. 서비스 레지스트리를 사용하여 JSON Schema SerDes 구성

이 항목에서는 JSON 스키마에 Kafka 클라이언트 직렬화기 및 역직렬러(SerDes) 클래스를 사용하는 방법을 설명합니다.

서비스 레지스트리는 JSON 스키마에 대해 다음과 같은 Kafka 클라이언트 SerDes 클래스를 제공합니다.

  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer

Apache Avro와 달리 JSON Schema는 직렬화 기술이 아니며 대신 검증 기술입니다. 따라서 JSON 스키마에 대한 구성 옵션은 매우 다릅니다. 예를 들어, 데이터는 항상 JSON으로 인코딩되므로 인코딩 옵션이 없습니다.

JSON Schema serializer 구성

다음과 같이 JSON Schema serializer 클래스를 구성할 수 있습니다.

  • 서비스 레지스트리 URL
  • 아티팩트 해석기 전략
  • 스키마 검증

비표준 구성 속성은 기본적으로 활성화되어 있는 JSON 스키마 검증입니다. apicurio.registry.serde.validation-enabled"false" 로 설정하여 이를 비활성화할 수 있습니다. 예를 들면 다음과 같습니다.

props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)

JSON Schema deserializer 구성

다음과 같이 JSON Schema deserializer 클래스를 구성할 수 있습니다.

  • 서비스 레지스트리 URL
  • 스키마 검증
  • 데이터 역직렬화의 클래스

스키마를 로드할 수 있도록 서비스 레지스트리의 위치를 제공해야 합니다. 다른 구성은 선택 사항입니다.

참고

Deserializer 유효성 검사는 serializer가 Kafka 메시지에서 글로벌 ID를 통과하는 경우에만 작동합니다.이 ID는 serializer에서 유효성 검사를 사용할 수 있는 경우에만 발생합니다.Deserializer validation works only if the serializer passes the global ID in the Kafka message, which will only happen when validation is enabled in the serializer.

JSON Schema SerDes 및 아티팩트 참조

JSON Schema SerDes는 메시지 페이로드에서 스키마를 검색할 수 없으므로 스키마 아티팩트를 미리 등록해야 하며 아티팩트 참조도 적용합니다.

스키마의 콘텐츠에 따라 $ref 값이 URL인 경우 SerDes는 참조된 스키마를 사용하여 해당 URL을 사용하여 확인한 다음 유효성 검사는 일반적으로 작동하고, 기본 스키마에 대해 데이터를 검증하고 중첩 스키마에 대해 중첩된 값을 검증합니다. 서비스 레지스트리의 아티팩트 참조 지원도 구현되었습니다.

예를 들어 다음 growth .json 스키마는 city.json 스키마를 참조합니다.

city.json 스키마에 대한 참조가 포함된 growing.json 스키마

{
 "$id": "https://example.com/citizen.schema.json",
 "$schema": "http://json-schema.org/draft-07/schema#",
 "title": "Citizen",
 "type": "object",
 "properties": {
   "firstName": {
     "type": "string",
     "description": "The citizen's first name."
   },
   "lastName": {
     "type": "string",
     "description": "The citizen's last name."
   },
   "age": {
     "description": "Age in years which must be equal to or greater than zero.",
     "type": "integer",
     "minimum": 0
   },
   "city": {
     "$ref": "city.json"
   }
 }
}

city.json schema

{
 "$id": "https://example.com/city.schema.json",
 "$schema": "http://json-schema.org/draft-07/schema#",
 "title": "City",
 "type": "object",
 "properties": {
   "name": {
     "type": "string",
     "description": "The city's name."
   },
   "zipCode": {
     "type": "integer",
     "description": "The zip code.",
     "minimum": 0
   }
 }
}

이 예에서, 주어진 영주권자는 도시가 있습니다. Service Registry에서 city.json 이라는 이름을 사용하여 도시 아티팩트에 대한 참조가 있는 영주적인 아티팩트가 생성됩니다. SerDes에서, 광주 스키마를 가져올 때, 도시 스키마는 성도 스키마에서 참조되기 때문에 가져옵니다. 데이터를 직렬화/감사할 때 참조 이름은 중첩된 스키마를 확인하는 데 사용되며, 이를 통해 기존 스키마 및 중첩된 도시 스키마에 대한 유효성을 검증할 수 있습니다.

추가 리소스

8.3.3. 서비스 레지스트리를 사용하여 Protobuf SerDes 구성

이 항목에서는 Google Protobuf에 Kafka 클라이언트 직렬화기 및 역직렬러(SerDes) 클래스를 사용하는 방법을 설명합니다.

Service Registry에서는 Protobuf에 대한 다음과 같은 Kafka 클라이언트 SerDes 클래스를 제공합니다.

  • io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer
  • io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer

Protobuf serializer 구성

다음과 같이 Protobuf serializer 클래스를 구성할 수 있습니다.

  • 서비스 레지스트리 URL
  • 아티팩트 해석기 전략
  • ID 위치
  • ID 인코딩
  • 스키마 검증

이러한 구성 옵션에 대한 자세한 내용은 다음 섹션을 참조하십시오.

Protobuf deserializer 구성

serializer의 다음 구성 설정과 일치하도록 Protobuf deserializer 클래스를 구성해야 합니다.

  • 서비스 레지스트리 URL
  • ID 인코딩

구성 속성 이름과 값은 serializer의 경우와 동일합니다.

참고

deserialize자를 구성할 때는 다음 옵션이 필요하지 않습니다.

  • 아티팩트 해석기 전략
  • ID 위치

deserializer 클래스는 메시지에서 이러한 옵션에 대한 값을 결정할 수 있습니다. 직렬화기에서 메시지의 일부로 ID를 전송해야 하므로 전략이 필요하지 않습니다.

ID 위치는 메시지 페이로드의 시작 부분에 있는 매직 바이트를 확인하여 결정됩니다. 해당 바이트가 발견되면 구성된 처리기를 사용하여 메시지 페이로드에서 ID를 읽습니다. 매직 바이트를 찾을 수 없으면 메시지 헤더에서 ID를 읽습니다.

참고

Protobuf deserializer는 정확한 Protobuf Message 구현으로 역직렬하지 않고 DynamicMessage 인스턴스입니다. 다른 작업을 수행하는 적절한 API는 없습니다.

protobuf SerDes 및 아티팩트 참조

가져오기 문과 함께 복잡한 Protobuf 메시지를 사용하면 가져온 Protobuf 메시지가 서비스 레지스트리에 별도의 아티팩트로 저장됩니다. 그러면 서비스 레지스트리에서 Protobuf 메시지를 확인하기 위해 기본 스키마를 가져오면 참조된 체계도 검색되므로 전체 메시지 스키마를 확인하고 직렬화할 수 있습니다.

예를 들어 다음 table_info.proto 스키마 파일에는 가져온 mode.proto 스키마 파일이 포함되어 있습니다.

가져온 .mode.proto 파일인 table_info.proto 파일

syntax = "proto3";
package sample;
option java_package = "io.api.sample";
option java_multiple_files = true;

import "sample/mode.proto";

message TableInfo {

 int32 winIndex = 1;
 Mode mode = 2;
 int32 min = 3;
 int32 max = 4;
 string id = 5;
 string dataAdapter = 6;
 string schema = 7;
 string selector = 8;
 string subscription_id = 9;
}

mode.proto 파일

syntax = "proto3";
package sample;
option java_package = "io.api.sample";
option java_multiple_files = true;

enum Mode {

MODE_UNKNOWN = 0;
RAW = 1;
MERGE = 2;
DISTINCT = 3;
COMMAND = 4;
}

이 예에서는 두 개의 Protobuf 아티팩트가 TableInfo 에 저장되고 다른 하나는 Mode 용으로 Service Registry에 저장됩니다. 그러나 Mode 는 TableInfo의 일부이기 때문에 TableInfo 를 가져올 때마다 TableInfo 가 SerDes의 메시지를 검사하도록 할 때마다 ModeTableInfo 에서 참조하는 아티팩트로 반환됩니다.

추가 리소스

Red Hat logoGithubRedditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

© 2024 Red Hat, Inc.