Chapter 7. Validating Kafka messages using serializers/deserializers in Java clients


Service Registry provides client serializers/deserializers (SerDes) for Kafka producer and consumer applications written in Java. Kafka producer applications use serializers to encode messages that conform to a specific event schema. Kafka consumer applications use deserializers to validate that messages have been serialized using the correct schema, based on a specific schema ID. This ensures consistent schema use and helps to prevent data errors at runtime.

This chapter explains how to use Kafka client SerDes in your producer and consumer client applications:

Prerequisites

7.1. Kafka client applications and Service Registry

Service Registry decouples schema management from client application configuration. You can enable a Java client application to use a schema from Service Registry by specifying its URL in your client code.

You can store the schemas in Service Registry to serialize and deserialize messages, which are referenced from your client applications to ensure that the messages that they send and receive are compatible with those schemas. Kafka client applications can push or pull their schemas from Service Registry at runtime.

Schemas can evolve, so you can define rules in Service Registry, for example, to ensure that schema changes are valid and do not break previous versions used by applications. Service Registry checks for compatibility by comparing a modified schema with previous schema versions.

Service Registry schema technologies

Service Registry provides schema registry support for schema technologies such as:

  • Avro
  • Protobuf
  • JSON Schema

These schema technologies can be used by client applications through the Kafka client serializer/deserializer (SerDes) services provided by Service Registry. The maturity and usage of the SerDes classes provided by Service Registry might vary. The sections that follow provide more details about each schema type.

Producer schema configuration

A producer client application uses a serializer to put the messages that it sends to a specific broker topic into the correct data format.

To enable a producer to use Service Registry for serialization:

After registering your schema, when you start Kafka and Service Registry, you can access the schema to format messages sent to the Kafka broker topic by the producer. Alternatively, depending on configuration, the producer can automatically register the schema on first use.

If a schema already exists, you can create a new version using the registry REST API based on compatibility rules defined in Service Registry. Versions are used for compatibility checking as a schema evolves. A group ID, artifact ID, and version represents a unique tuple that identifies a schema.

Consumer schema configuration

A consumer client application uses a deserializer to get the messages that it consumes from a specific broker topic into the correct data format.

To enable a consumer to use Service Registry for deserialization:

Retrieve schemas using a global ID

By default, the schema is retrieved from Service Registry by the deserializer using a global ID, which is specified in the message being consumed. The schema global ID can be located in the message headers or in the message payload, depending on the configuration of the producer application.

When locating the global ID in the message payload, the format of the data begins with a magic byte, used as a signal to consumers, followed by the global ID, and the message data as normal. For example:

# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]

Then when you start Kafka and Service Registry, you can access the schema to format messages received from the Kafka broker topic.

Retrieve schemas using a content ID

Alternatively, you can configure to retrieve schemas from Service Registry based on the content ID, which is the unique ID of the artifact content. While the global ID is the unique ID of an artifact version.

The content ID does not uniquely identify a version, but uniquely identifies the version content only. If multiple versions share the exact same content, they have a different global ID but the same content ID. Confluent Schema Registry uses content ID by default.

7.2. Strategies to look up a schema in Service Registry

The Kafka client serializer uses lookup strategies to determine the artifact ID and global ID under which the message schema is registered in Service Registry. For a given topic and message, you can use different implementations of the ArtifactReferenceResolverStrategy Java interface to return a reference to an artifact in the registry.

The classes for each strategy are in the io.apicurio.registry.serde.strategy package. Specific strategy classes for Avro SerDes are in the io.apicurio.registry.serde.avro.strategy package. The default strategy is the TopicIdStrategy, which looks for Service Registry artifacts with the same name as the Kafka topic receiving messages.

Example

public ArtifactReference artifactReference(String topic, boolean isKey, T schema) {
        return ArtifactReference.builder()
                .groupId(null)
                .artifactId(String.format("%s-%s", topic, isKey ? "key" : "value"))
                .build();

  • The topic parameter is the name of the Kafka topic receiving the message.
  • The isKey parameter is true when the message key is serialized, and false when the message value is serialized.
  • The schema parameter is the schema of the message serialized or deserialized.
  • The ArtifactReference returned contains the artifact ID under which the schema is registered.

Which lookup strategy you use depends on how and where you store your schema. For example, you might use a strategy that uses a record ID if you have different Kafka topics with the same Avro message type.

Artifact resolver strategy

The artifact resolver strategy provides a way to map the Kafka topic and message information to an artifact in Service Registry. The common convention for the mapping is to combine the Kafka topic name with the key or value, depending on whether the serializer is used for the Kafka message key or value.

However, you can use alternative conventions for the mapping by using a strategy provided by Service Registry, or by creating a custom Java class that implements io.apicurio.registry.serde.strategy.ArtifactReferenceResolverStrategy.

Strategies to return a reference to an artifact

Service Registry provides the following strategies to return a reference to an artifact based on an implementation of ArtifactReferenceResolverStrategy:

RecordIdStrategy
Avro-specific strategy that uses the full name of the schema.
TopicRecordIdStrategy
Avro-specific strategy that uses the topic name and the full name of the schema.
TopicIdStrategy
Default strategy that uses the topic name and key or value suffix.
SimpleTopicIdStrategy
Simple strategy that only uses the topic name.

DefaultSchemaResolver interface

The default schema resolver locates and identifies the specific version of the schema registered under the artifact reference provided by the artifact resolver strategy. Every version of every artifact has a single globally unique identifier that can be used to retrieve the content of that artifact. This global ID is included in every Kafka message so that a deserializer can properly fetch the schema from Apicurio Registry.

The default schema resolver can look up an existing artifact version, or it can register one if not found, depending on which strategy is used. You can also provide your own strategy by creating a custom Java class that implements io.apicurio.registry.resolver.SchemaResolver. However, it is recommended to use the DefaultSchemaResolver and specify configuration properties instead.

Configuration for registry lookup options

When using the DefaultSchemaResolver, you can configure its behavior using application properties. The following table shows some commonly used examples:

Table 7.1. Service Registry lookup configuration options
PropertyTypeDescriptionDefault

apicurio.registry.find-latest

boolean

Specify whether the serializer tries to find the latest artifact in the registry for the corresponding group ID and artifact ID.

false

apicurio.registry.use-id

String

Instructs the serializer to write the specified ID to Kafka and instructs the deserializer to use this ID to find the schema.

None

apicurio.registry.auto-register

boolean

Specify whether the serializer tries to create an artifact in the registry. The JSON Schema serializer does not support this.

false

apicurio.registry.check-period-ms

String

Specify how long to cache the global ID in milliseconds. If not configured, the global ID is fetched every time.

None

7.3. Registering a schema in Service Registry

After you have defined a schema in the appropriate format, such as Apache Avro, you can add the schema to Service Registry.

You can add the schema using the following approaches:

  • Service Registry web console
  • curl command using the Service Registry REST API
  • Maven plug-in supplied with Service Registry
  • Schema configuration added to your client code

Client applications cannot use Service Registry until you have registered your schemas.

Service Registry web console

When Service Registry is installed, you can connect to the web console from the ui endpoint:

http://MY-REGISTRY-URL/ui

From the console, you can add, view and configure schemas. You can also create the rules that prevent invalid content being added to the registry.

Curl command example

 curl -X POST -H "Content-type: application/json; artifactType=AVRO" \
   -H "X-Registry-ArtifactId: share-price" \ 1
   --data '{
     "type":"record",
     "name":"price",
     "namespace":"com.example",
     "fields":[{"name":"symbol","type":"string"},
     {"name":"price","type":"string"}]}'
   https://my-cluster-my-registry-my-project.example.com/apis/registry/v2/groups/my-group/artifacts -s 2
1
Simple Avro schema artifact.
2
OpenShift route name that exposes Service Registry.

Maven plug-in example

<plugin>
  <groupId>io.apicurio</groupId>
  <artifactId>apicurio-registry-maven-plugin</artifactId>
  <version>${apicurio.version}</version>
  <executions>
      <execution>
        <phase>generate-sources</phase>
        <goals>
            <goal>register</goal>  1
        </goals>
        <configuration>
            <registryUrl>http://REGISTRY-URL/apis/registry/v2</registryUrl> 2
            <artifacts>
                <artifact>
                    <groupId>TestGroup</groupId> 3
                    <artifactId>FullNameRecord</artifactId>
                    <file>${project.basedir}/src/main/resources/schemas/record.avsc</file>
                    <ifExists>FAIL</ifExists>
                </artifact>
                <artifact>
                    <groupId>TestGroup</groupId>
                    <artifactId>ExampleAPI</artifactId> 4
                    <type>GRAPHQL</type>
                    <file>${project.basedir}/src/main/resources/apis/example.graphql</file>
                    <ifExists>RETURN_OR_UPDATE</ifExists>
                    <canonicalize>true</canonicalize>
                </artifact>
            </artifacts>
        </configuration>
    </execution>
  </executions>
 </plugin>
1
Specify register as the execution goal to upload the schema artifact to the registry.
2
Specify the Service Registry URL with the ../apis/registry/v2 endpoint.
3
Specify the Service Registry artifact group ID.
4
You can upload multiple artifacts using the specified group ID, artifact ID, and location.

Configuration using a producer client example

String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
    "https://my-cluster-service-registry-myproject.example.com/apis/registry/v2"); 1
try (RegistryService service = RegistryClient.create(registryUrl_node1)) {
    String artifactId = ApplicationImpl.INPUT_TOPIC + "-value";
    try {
        service.getArtifactMetaData(artifactId); 2
    } catch (WebApplicationException e) {
        CompletionStage <ArtifactMetaData> csa = service.createArtifact(
            "AVRO",
            artifactId,
            new ByteArrayInputStream(LogInput.SCHEMA$.toString().getBytes())
        );
        csa.toCompletableFuture().get();
    }
}
1
You can register properties against more than one URL node.
2
Check to see if the schema already exists based on the artifact ID.

7.4. Using a schema from a Kafka consumer client

This procedure describes how to configure a Kafka consumer client written in Java to use a schema from Service Registry.

Prerequisites

  • Service Registry is installed
  • The schema is registered with Service Registry

Procedure

  1. Configure the client with the URL of Service Registry. For example:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    Properties props = new Properties();
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, registryUrl);
  2. Configure the client with the Service Registry deserializer. For example:

    // Configure Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // Configure deserializer settings
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        AvroKafkaDeserializer.class.getName()); 1
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        AvroKafkaDeserializer.class.getName()); 2
    1
    The deserializer provided by Service Registry.
    2
    The deserialization is in Apache Avro JSON format.

7.5. Using a schema from a Kafka producer client

This procedure describes how to configure a Kafka producer client written in Java to use a schema from Service Registry.

Prerequisites

  • Service Registry is installed
  • The schema is registered with Service Registry

Procedure

  1. Configure the client with the URL of Service Registry. For example:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    Properties props = new Properties();
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, registryUrl);
  2. Configure the client with the serializer, and the strategy to look up the schema in Service Registry. For example:

    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 1
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 2
    props.put(SerdeConfig.FIND_LATEST_ARTIFACT, Boolean.TRUE); 3
    1
    The serializer for the message key provided by Service Registry.
    2
    The serializer for the message value provided by Service Registry.
    3
    The lookup strategy to find the global ID for the schema.

7.6. Using a schema from a Kafka Streams application

This procedure describes how to configure a Kafka Streams client written in Java to use an Apache Avro schema from Service Registry.

Prerequisites

  • Service Registry is installed
  • The schema is registered with Service Registry

Procedure

  1. Create and configure a Java client with the Service Registry URL:

    String registryUrl = "https://registry.example.com/apis/registry/v2";
    
    RegistryService client = RegistryClient.cached(registryUrl);
  2. Configure the serializer and deserializer:

    Serializer<LogInput> serializer = new AvroKafkaSerializer<LogInput>(); 1
    
    Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <LogInput>(); 2
    
    Serde<LogInput> logSerde = Serdes.serdeFrom(
        serializer,
        deserializer
    );
    
    Map<String, Object> config = new HashMap<>();
    config.put(SerdeConfig.REGISTRY_URL, registryUrl);
    config.put(AvroKafkaSerdeConfig.USE_SPECIFIC_AVRO_READER, true);
    logSerde.configure(config, false); 3
    1
    The Avro serializer provided by Service Registry.
    2
    The Avro deserializer provided by Service Registry.
    3
    Configures the Service Registry URL and the Avro reader for deserialization in Avro format.
  3. Create the Kafka Streams client:

    KStream<String, LogInput> input = builder.stream(
        INPUT_TOPIC,
        Consumed.with(Serdes.String(), logSerde)
    );
Red Hat logoGithubRedditYoutubeTwitter

Learn

Try, buy, & sell

Communities

About Red Hat Documentation

We help Red Hat users innovate and achieve their goals with our products and services with content they can trust.

Making open source more inclusive

Red Hat is committed to replacing problematic language in our code, documentation, and web properties. For more details, see the Red Hat Blog.

About Red Hat

We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2024 Red Hat, Inc.