此内容没有您所选择的语言版本。
Chapter 10. Managing schemas with Service Registry
This chapter outlines how to deploy and integrate AMQ Streams with Red Hat Service Registry. You can use Service Registry as a centralized store of service schemas for data streaming.
			Service Registry supports the storage and management of many standard artifact types. For example, for Kafka you can use schema definitions based on AVRO or JSON.
		
Service Registry provides a REST API and a Java REST client to register and query the schemas from client applications through server-side endpoints. You can also use the Service Registry web console to browse and update schemas directly. You can configure producer and consumer clients to use Service Registry.
A Maven plugin is also provided so that you can upload and download schemas as part of your build. The Maven plugin is useful for testing and validation, when checking that your schema updates are compatible with client applications.
Additional resources
- Service Registry documentation
- Service Registry is built on the Apicurio Registry open source community project available from GitHub: Apicurio/apicurio-registry
- A demo of Service Registry is also available from GitHub: Apicurio/apicurio-registry-demo
- Apache Avro
10.1. Why use Service Registry?
Using Service Registry decouples the process of managing schemas from the configuration of client applications. You enable an application to use a schema from the registry by specifying its URL in the client code.
For example, the schemas to serialize and deserialize messages can be stored in the registry, which are then referenced from the applications that use them to ensure that the messages that they send and receive are compatible with those schemas.
Kafka client applications can push or pull their schemas from Service Registry at runtime.
Schemas can evolve, so you can define rules in Service Registry, for example, to ensure that changes to a schema are valid and do not break previous versions used by applications. Service Registry checks for compatibility by comparing a modified schema with previous versions of schemas.
Service Registry provides full schema registry support for Avro schemas, which are used by client applications through Kafka client serializer/deserializer (SerDe) services provided by Service Registry.
10.2. Producer schema configuration
A producer client application uses a serializer to put the messages it sends to a specific broker topic into the correct data format.
To enable a producer to use Service Registry for serialization, you:
- Define and register your schema with Service Registry
- Configure the producer client code with the: - URL of Service Registry
- Service Registry serializer services to use with the messages
- Strategy to look up the schema used for serialization in Service Registry
 
After registering your schema, when you start Kafka and Service Registry, you can access the schema to format messages sent to the Kafka broker topic by the producer.
If a schema already exists, you can create a new version through the REST API based on compatibility rules defined in Service Registry. Versions are used for compatibility checking as a schema evolves. An artifact ID and schema version represents a unique tuple that identifies a schema.
10.3. Consumer schema configuration
A consumer client application uses a deserializer to get the messages it consumes from a specific broker topic into the correct data format.
To enable a consumer to use Service Registry for deserialization, you:
- Define and register your schema with Service Registry
- Configure the consumer client code with the: - URL of Service Registry
- Service Registry deserializer service to use with the messages
- Input data stream for deserialization
 
The schema is then retrieved by the deserializer using a global ID written into the message being consumed. The message received must, therefore, include a global ID as well as the message data.
For example:
...
# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]Now, when you start Kafka and Service Registry, you can access the schema in order to format messages received from the Kafka broker topic.
10.4. Strategies to lookup a schema
A Service Registry strategy is used by the Kafka client serializer/deserializer to determine the artifact ID or global ID under which the message schema is registered in Service Registry.
For a given topic and message, you can use implementations of the following Java classes:
- 
						ArtifactIdStrategyto return an artifact ID
- 
						GlobalIdStrategyto return a global ID
The artifact ID returned depends on whether the key or value in the message is being serialized.
				The classes for each strategy are organized in the io.apicurio.registry.utils.serde.strategy package.
			
				The default strategy is TopicIdStrategy, which looks for Service Registry artifacts with the same name as the Kafka topic receiving messages.
			
For example:
public String artifactId(String topic, boolean isKey, T schema) {
    return String.format("%s-%s", topic, isKey ? "key" : "value");
}
public String artifactId(String topic, boolean isKey, T schema) {
    return String.format("%s-%s", topic, isKey ? "key" : "value");
}- 
						The topicparameter is the name of the Kafka topic receiving the message.
- 
						The isKeyparameter is true when the message key is being serialized, and false when the message value is being serialized.
- 
						The schemaparameter is the schema of the message being serialized/deserialized.
- 
						The artifactIDreturned is the ID under which the schema is registered in Service Registry.
What lookup strategy you use depends on how and where you store your schema. For example, you might use a strategy that uses a record ID if you have different Kafka topics with the same Avro message type.
Strategies to return an artifact ID
				Strategies to return an artifact ID based on an implementation of ArtifactIdStrategy.
			
- RecordIdStrategy
- Avro-specific strategy that uses the full name of the schema.
- TopicRecordIdStrategy
- Avro-specific strategy that uses the topic name and the full name of the schema.
- TopicIdStrategy
- 
							(Default) strategy that uses the topic name and keyorvaluesuffix.
- SimpleTopicIdStrategy
- Simple strategy that only uses the topic name.
Strategies to return a global ID
				Strategies to return a global ID based on an implementation of GlobalIdStrategy.
			
- FindLatestIdStrategy
- Strategy that returns the global ID of the latest schema version, based on an artifact ID.
- FindBySchemaIdStrategy
- Strategy that matches schema content, based on an artifact ID, to return a global ID.
- GetOrCreateIdStrategy
- Strategy that tries to get the latest schema, based on an artifact ID, and if it does not exist, it creates a new schema.
- AutoRegisterIdStrategy
- Strategy that updates the schema, and uses the global ID of the updated schema.
10.5. Service Registry constants
You can configure specific client SerDe services and schema lookup strategies directly into a client using the constants outlined here.
Alternatively, you can use specify the constants in a properties file, or a properties instance.
Constants for serializer/deserializer (SerDe) services
- 1
- (Required) The URL of Service Registry.
- 2
- Allows the client to make the request and look up the information from a cache of previous results, to improve processing time. If the cache is empty, the lookup is performed from Service Registry.
- 3
- Extends ID handling to support other ID formats and make them compatible with Service Registry SerDe services. For example, changing the ID format fromLongtoIntegersupports the Confluent ID format.
- 4
- A flag to simplify the handling of Confluent IDs. If set totrue, anIntegeris used for the global ID lookup.
Constants for lookup strategies
public abstract class AbstractKafkaStrategyAwareSerDe<T, S extends AbstractKafkaStrategyAwareSerDe<T, S>> extends AbstractKafkaSerDe<S> {
      public static final String REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.artifact-id"; 
      public static final String REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.global-id"; 
public abstract class AbstractKafkaStrategyAwareSerDe<T, S extends AbstractKafkaStrategyAwareSerDe<T, S>> extends AbstractKafkaSerDe<S> {
      public static final String REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.artifact-id"; 
      public static final String REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.global-id"; Constants for converters
public class SchemalessConverter<T> extends AbstractKafkaSerDe<SchemalessConverter<T>> implements Converter {
      public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; 
      public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; 
public class SchemalessConverter<T> extends AbstractKafkaSerDe<SchemalessConverter<T>> implements Converter {
      public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; 
      public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; Constants for Avro data providers
public interface AvroDatumProvider<T> {
      String REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM = "apicurio.registry.avro-datum-provider"; 
      String REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM = "apicurio.registry.use-specific-avro-reader"; 
public interface AvroDatumProvider<T> {
      String REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM = "apicurio.registry.avro-datum-provider"; 
      String REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM = "apicurio.registry.use-specific-avro-reader"; DefaultAvroDatumProvider (io.apicurio.registry.utils.serde.avro) ReflectAvroDatumProvider (io.apicurio.registry.utils.serde.avro)
DefaultAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 
ReflectAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 10.6. Installing Service Registry
The instructions to install Service Registry with AMQ Streams storage are described in the Service Registry documentation.
You can install more than one instance of Service Registry depending on your cluster configuration. The number of instances depends on the storage type you use and how many schemas you need to handle.
10.7. Registering a schema to Service Registry
After you have defined a schema in the appropriate format, such as Apache Avro, you can add the schema to Service Registry.
You can add the schema through:
- The Service Registry web console
- A curl command using the Service Registry API
- A Maven plugin supplied with Service Registry
- Schema configuration added to your client code
Client applications cannot use Service Registry until you have registered your schemas.
Service Registry web console
				Having installed Service Registry, you connect to the web console from the ui endpoint:
			
				http://MY-REGISTRY-URL/ui
			
From the console, you can add, view and configure schemas. You can also create the rules that prevent invalid content being added to the registry.
For more information on using the Service Registry web console, see the Service Registry documentation.
Curl example
Plugin example
Configuration through a (producer) client example
This procedure describes how to configure a Java producer client to use a schema from Service Registry.
Procedure
- Configure the client with the URL of Service Registry. - For example: - String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", "https://my-cluster-service-registry-myproject.example.com/api"); RegistryService service = RegistryClient.cached(registryUrl);- String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", "https://my-cluster-service-registry-myproject.example.com/api"); RegistryService service = RegistryClient.cached(registryUrl);- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow 
- Configure the client with the serializer services, and the strategy to look up the schema in Service Registry. - For example: - Copy to Clipboard Copied! - Toggle word wrap Toggle overflow - 1
- The Service Registry URL.
- 2
- The serializer service for the message key provided by Service Registry.
- 3
- The serializer service for the message value provided by Service Registry.
- 4
- Lookup strategy to find the global ID for the schema. Matches the schema of the message against its global ID (artifact ID and schema version) in Service Registry.
 
This procedure describes how to configure a Java consumer client to use a schema from Service Registry.
Procedure
- Configure the client with the URL of Service Registry. - For example: - String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", "https://my-cluster-service-registry-myproject.example.com/api"); RegistryService service = RegistryClient.cached(registryUrl);- String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", "https://my-cluster-service-registry-myproject.example.com/api"); RegistryService service = RegistryClient.cached(registryUrl);- Copy to Clipboard Copied! - Toggle word wrap Toggle overflow 
- Configure the client with the Service Registry deserializer service. - For example: - Copy to Clipboard Copied! - Toggle word wrap Toggle overflow