Chapter 7. Using Kafka client serializers/deserializers
Service Registry provides Kafka client serializers/deserializers for Kafka producer and consumer applications. Kafka producer applications use serializers to encode messages that conform to a specific event schema. Kafka consumer applications then use deserializers to validate that the messages have been serialized using the correct schema, based on a specific schema ID. This ensures consistent schema use and helps to prevent data errors at runtime.
This chapter provides instructions on how to use the Kafka client serializer and deserializer for Apache Avro in your AMQ Streams producer and consumer client applications:
- Section 7.1, “Kafka client applications and Service Registry”
- Section 7.4, “Strategies to lookup a schema”
- Section 7.5, “Service Registry constants”
- Section 7.6, “Registering a schema to Service Registry”
- Section 7.7, “Using a Service Registry schema from a consumer client”
- Section 7.8, “Using a Service Registry schema from a producer client”
Prerequisites
- You must read Section 1.6, “Kafka client serializers/deserializers”
- You must have installed Service Registry. See Chapter 3, Installing Service Registry on OpenShift.
- You must have created AMQ Streams producer and consumer client applications. See Using AMQ Streams on Openshift.
7.1. Kafka client applications and Service Registry
Using Service Registry decouples the process of managing schemas from the configuration of client applications. You enable an application to use a schema from the registry by specifying its URL in the client code.
For example, the schemas to serialize and deserialize messages can be stored in the registry, which are then referenced from the applications that use them to ensure that the messages that they send and receive are compatible with those schemas.
Kafka client applications can push or pull their schemas from Service Registry at runtime.
Schemas can evolve, so you can define rules in Service Registry, for example, to ensure that changes to a schema are valid and do not break previous versions used by applications. Service Registry checks for compatibility by comparing a modified schema with previous versions of schemas.
Service Registry provides full schema registry support for Avro schemas, which are used by client applications through Kafka client serializer/deserializer (SerDe) services provided by Service Registry.
7.2. Producer schema configuration
A producer client application uses a serializer to put the messages it sends to a specific broker topic into the correct data format.
To enable a producer to use Service Registry for serialization, you:
- Define and register your schema with Service Registry
Configure the producer client code with the:
- URL of Service Registry
- Service Registry serializer services to use with the messages
- Strategy to look up the schema used for serialization in Service Registry
After registering your schema, when you start Kafka and Service Registry, you can access the schema to format messages sent to the Kafka broker topic by the producer.
If a schema already exists, you can create a new version through the REST API based on compatibility rules defined in Service Registry. Versions are used for compatibility checking as a schema evolves. An artifact ID and schema version represents a unique tuple that identifies a schema.
7.3. Consumer schema configuration
A consumer client application uses a deserializer to get the messages it consumes from a specific broker topic into the correct data format.
To enable a consumer to use Service Registry for deserialization, you:
- Define and register your schema with Service Registry
Configure the consumer client code with the:
- URL of Service Registry
- Service Registry deserializer service to use with the messages
- Input data stream for deserialization
The schema is then retrieved by the deserializer using a global ID written into the message being consumed. The message received must, therefore, include a global ID as well as the message data.
For example:
# ... [MAGIC_BYTE] [GLOBAL_ID] [MESSAGE DATA]
Now, when you start Kafka and Service Registry, you can access the schema in order to format messages received from the Kafka broker topic.
7.4. Strategies to lookup a schema
A Service Registry strategy is used by the Kafka client serializer/deserializer to determine the artifact ID or global ID under which the message schema is registered in Service Registry.
For a given topic and message, you can use implementations of the following Java classes:
-
ArtifactIdStrategy
to return an artifact ID -
GlobalIdStrategy
to return a global ID
The artifact ID returned depends on whether the key or value in the message is being serialized.
The classes for each strategy are organized in the io.apicurio.registry.utils.serde.strategy
package.
The default strategy is TopicIdStrategy
, which looks for Service Registry artifacts with the same name as the Kafka topic receiving messages.
For example:
public String artifactId(String topic, boolean isKey, T schema) { return String.format("%s-%s", topic, isKey ? "key" : "value"); }
-
The
topic
parameter is the name of the Kafka topic receiving the message. -
The
isKey
parameter is true when the message key is being serialized, and false when the message value is being serialized. -
The
schema
parameter is the schema of the message being serialized/deserialized. -
The
artifactID
returned is the ID under which the schema is registered in Service Registry.
What lookup strategy you use depends on how and where you store your schema. For example, you might use a strategy that uses a record ID if you have different Kafka topics with the same Avro message type.
Strategies to return an artifact ID
Strategies to return an artifact ID based on an implementation of ArtifactIdStrategy
.
RecordIdStrategy
- Avro-specific strategy that uses the full name of the schema.
TopicRecordIdStrategy
- Avro-specific strategy that uses the topic name and the full name of the schema.
TopicIdStrategy
-
(Default) strategy that uses the topic name and
key
orvalue
suffix. SimpleTopicIdStrategy
- Simple strategy that only uses the topic name.
Strategies to return a global ID
Strategies to return a global ID based on an implementation of GlobalIdStrategy
.
FindLatestIdStrategy
- Strategy that returns the global ID of the latest schema version, based on an artifact ID.
FindBySchemaIdStrategy
- Strategy that matches schema content, based on an artifact ID, to return a global ID.
GetOrCreateIdStrategy
- Strategy that tries to get the latest schema, based on an artifact ID, and if it does not exist, it creates a new schema.
AutoRegisterIdStrategy
- Strategy that updates the schema, and uses the global ID of the updated schema.
7.5. Service Registry constants
You can configure specific client SerDe services and schema lookup strategies directly into a client using the constants outlined here.
Alternatively, you can use specify the constants in a properties file, or a properties instance.
Constants for serializer/deserializer (SerDe) services
public abstract class AbstractKafkaSerDe<T extends AbstractKafkaSerDe<T>> implements AutoCloseable { protected final Logger log = LoggerFactory.getLogger(getClass()); public static final String REGISTRY_URL_CONFIG_PARAM = "apicurio.registry.url"; 1 public static final String REGISTRY_CACHED_CONFIG_PARAM = "apicurio.registry.cached"; 2 public static final String REGISTRY_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.id-handler"; 3 public static final String REGISTRY_CONFLUENT_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.as-confluent"; 4
- 1
- (Required) The URL of Service Registry.
- 2
- Allows the client to make the request and look up the information from a cache of previous results, to improve processing time. If the cache is empty, the lookup is performed from Service Registry.
- 3
- Extends ID handling to support other ID formats and make them compatible with Service Registry SerDe services. For example, changing the ID format from
Long
toInteger
supports the Confluent ID format. - 4
- A flag to simplify the handling of Confluent IDs. If set to
true
, anInteger
is used for the global ID lookup.
Constants for lookup strategies
public abstract class AbstractKafkaStrategyAwareSerDe<T, S extends AbstractKafkaStrategyAwareSerDe<T, S>> extends AbstractKafkaSerDe<S> { public static final String REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.artifact-id"; 1 public static final String REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.global-id"; 2
Constants for converters
public class SchemalessConverter<T> extends AbstractKafkaSerDe<SchemalessConverter<T>> implements Converter { public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; 1 public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; 2
Constants for Avro data providers
public interface AvroDatumProvider<T> { String REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM = "apicurio.registry.avro-datum-provider"; 1 String REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM = "apicurio.registry.use-specific-avro-reader"; 2
DefaultAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 1 ReflectAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 2
7.6. Registering a schema to Service Registry
After you have defined a schema in the appropriate format, such as Apache Avro, you can add the schema to Service Registry.
You can add the schema through:
- The Service Registry web console
- A curl command using the Service Registry API
- A Maven plugin supplied with Service Registry
- Schema configuration added to your client code
Client applications cannot use Service Registry until you have registered your schemas.
Service Registry web console
Having installed Service Registry, you connect to the web console from the ui
endpoint:
http://MY-REGISTRY-URL/ui
From the console, you can add, view and configure schemas. You can also create the rules that prevent invalid content being added to the registry.
For more information on using the Service Registry web console, see the Chapter 3, Installing Service Registry on OpenShift.
Curl example
curl -X POST -H "Content-type: application/json; artifactType=AVRO" \ -H "X-Registry-ArtifactId: prices-value" \ --data '{ 1 "type":"record", "name":"price", "namespace":"com.redhat", "fields":[{"name":"symbol","type":"string"}, {"name":"price","type":"string"}] }' https://my-cluster-service-registry-myproject.example.com/api/artifacts -s 2
Plugin example
<plugin> <groupId>io.apicurio</groupId> <artifactId>apicurio-registry-maven-plugin</artifactId> <version>${registry.version}</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>register</goal> </goals> <configuration> <registryUrl>https://my-cluster-service-registry-myproject.example.com/api</registryUrl> <artifactType>AVRO</artifactType> <artifacts> <schema1>${project.basedir}/schemas/schema1.avsc</schema1> </artifacts> </configuration> </execution> </executions> </plugin>
Configuration through a (producer) client example
String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", 1 "https://my-cluster-service-registry-myproject.example.com/api"); try (RegistryService service = RegistryClient.create(registryUrl_node1)) { String artifactId = ApplicationImpl.INPUT_TOPIC + "-value"; try { service.getArtifactMetaData(artifactId); 2 } catch (WebApplicationException e) { CompletionStage <ArtifactMetaData> csa = service.createArtifact( ArtifactType.AVRO, artifactId, new ByteArrayInputStream(LogInput.SCHEMA$.toString().getBytes()) ); csa.toCompletableFuture().get(); } }
7.7. Using a Service Registry schema from a consumer client
This procedure describes how to configure a Java consumer client to use a schema from Service Registry.
Procedure
Configure the client with the URL of Service Registry.
For example:
String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", "https://my-cluster-service-registry-myproject.example.com/api"); RegistryService service = RegistryClient.cached(registryUrl);
Configure the client with the Service Registry deserializer service.
For example:
Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <> ( 1 service, new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true) ); Serde<LogInput> logSerde = Serdes.serdeFrom( 2 new AvroKafkaSerializer<>(service), deserializer ); KStream<String, LogInput> input = builder.stream( 3 INPUT_TOPIC, Consumed.with(Serdes.String(), logSerde) );
7.8. Using a Service Registry schema from a producer client
This procedure describes how to configure a Java producer client to use a schema from Service Registry.
Procedure
Configure the client with the URL of Service Registry.
For example:
String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", "https://my-cluster-service-registry-myproject.example.com/api"); RegistryService service = RegistryClient.cached(registryUrl);
Configure the client with the serializer services, and the strategy to look up the schema in Service Registry.
For example:
String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", "https://my-cluster-service-registry-myproject.example.com/api"); clientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, property(clientProperties, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092")); clientProperties.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, registryUrl_node1); 1 clientProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 2 clientProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 3 clientProperties.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, FindLatestIdStrategy.class.getName()); 4
- 1
- The Service Registry URL.
- 2
- The serializer service for the message key provided by Service Registry.
- 3
- The serializer service for the message value provided by Service Registry.
- 4
- Lookup strategy to find the global ID for the schema. Matches the schema of the message against its global ID (artifact ID and schema version) in Service Registry.