第9章 Service Registry を使用したスキーマの管理
本章では、AMQ Streams をデプロイし Red Hat Service Registry と統合する方法について解説します。Service Registry は、データストリーミングのサービススキーマの集中型ストアとして使用できます。Kafka では、Service Registry を使用して Apache Avro または JSON スキーマを格納できます。
Service Registry は、REST API および Java REST クライアントを提供し、サーバー側のエンドポイントを介してクライアントアプリケーションからスキーマを登録およびクエリーします。プロデューサークライアントおよびコンシューマークライアントが Service Registry を使用するように設定できます。
Maven プラグインも提供されるので、ビルドの一部としてスキーマをアップロードおよびダウンロードできます。スキーマの更新がクライアントアプリケーションと互換性があることを確認する場合、Maven プラグインはテストおよび検証に役立ちます。
Service Registry はテクノロジープレビューとしてのみ提供されます。テクノロジープレビューの機能は、Red Hat の本番環境のサービスレベルアグリーメント (SLA) ではサポートされず、機能的に完全ではないことがあります。Red Hat は、本番環境でのテクノロジープレビュー機能の実装は推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。Red Hat のテクノロジープレビュー機能のサポート範囲に関する詳細は、「テクノロジプレビュー機能のサポート範囲」を参照してください。
その他のリソース
- Service Registry のドキュメント
- Service Registry は、GitHub の Apicurio/Apicurio-registry で利用可能な Apicurio Registry オープンソースコミュニティープロジェクトで構築されます。
- また Service Registry のデモは、GitHub の Apicurio/Apicurio-registry-demo から利用できます。
- Apache Avro
9.1. Service Registry を使用する理由
Service Registry を使用すると、クライアントアプリケーションの設定からスキーマ管理のプロセスが分離されます。クライアントコードに URL を指定して、アプリケーションがレジストリーからスキーマを使用できるようにします。
たとえば、メッセージをシリアライズおよびデシリアライズするスキーマをレジストリーに保存できます。保存後、スキーマを使用するアプリケーションから参照され、アプリケーションが送受信するメッセージがこれらのスキーマと互換性を維持するようにします。
Kafka クライアントアプリケーションは実行時にスキーマを Service Registry からプッシュまたはプルできます。
スキーマは進化するので、Service Registry でルールを定義できます。たとえば、スキーマへの変更が有効で、アプリケーションによって使用される以前のバージョンとの互換性を維持するようにします。Service Registry は、変更済みのスキーマと前バージョンのスキーマを比較することで、互換性をチェックします。
Service Registry は Avro スキーマのスキーマレジストリーを完全にサポートします。Avro スキーマは、Service Registry で提供される Kafka クライアントのシリアライザー/デシリアライザー (SerDe) サービスを通じてクライアントアプリケーションによって使用されます。
9.2. プロデューサースキーマの設定
プロデューサークライアントアプリケーションは、シリアライザーを使用して、特定のブローカートピックに送信するメッセージを正しいデータ形式にします。
プロデューサーが Service Registry を使用してシリアライズできるようにするには、以下を行います。
- スキーマを Service Registry に定義、登録します。
- Service Registry の URL
- メッセージで使用する Service Registry シリアライザーサービス
- Service Registry でのシリアライズに使用するスキーマを検索する ストラテジー
スキーマを登録したら、Kafka および Service Registry を開始するときに、スキーマにアクセスして、プロデューサーにより Kafka ブローカートピックに送信されるメッセージをフォーマットできます。
スキーマがすでに存在する場合、Service Registry に定義される互換性ルールに基づいて、REST API により新バージョンのスキーマを作成できます。バージョンは、スキーマの進化にともなう互換性チェックに使用します。アーティファクト ID およびスキーマバージョンは、スキーマを識別する一意のタプルを表します。
9.3. コンシューマースキーマの設定
コンシューマークライアントアプリケーションは、デシリアライザーを使用することで、そのアプリケーションが消費するメッセージを特定のブローカートピックから正しいデータ形式にします。
コンシューマーがデシリアライズに Service Registry を使用できるようにするには、以下を実行します。
- スキーマを Service Registry に定義、登録します。
以下を使用して コンシューマークライアントコードを設定します。
- Service Registry の URL
- メッセージで使用する Service Registry デシリアライザーサービス
- デシリアライズの入力データストリーム
次に、消費されるメッセージに書き込まれたグローバル ID を使用して、デシリアライザーによってスキーマが取得されます。このため、受信されるメッセージにはグローバル ID およびメッセージデータが含まれる必要があります。
以下に例を示します。
# ... [MAGIC_BYTE] [GLOBAL_ID] [MESSAGE DATA]
これで、Kafka および Service Registry を開始するとき、スキーマにアクセスして、Kafka ブローカートピックから受信するメッセージをフォーマットできます。
9.4. スキーマ検索のストラテジー
Service Registry ストラテジー は、Service Registry でメッセージスキーマが登録されるアーティファクト ID またはグローバル ID を判断するために、Kafka クライアントシリアライザー/デシリアライザーによって使用されます。
特定のトピックおよびメッセージで、以下の Java クラスの実装を使用できます。
-
ArtifactIdStrategy
、アーティファクト ID を返します。 -
GlobalIdStrategy
、グローバル ID を返します。
返されるアーティファクト ID は、メッセージの キー または 値 のどちらがシリアライズされるかによって異なります。
各 ストラテジー のクラスは、io.apicurio.registry.utils.serde.strategy
パッケージにまとめられています。
デフォルトのストラテジー、TopicIdStrategy
は、メッセージを受信する Kafka トピックと同じ名前の Service Registry アーティファクトを検索します。
以下に例を示します。
public String artifactId(String topic, boolean isKey, T schema) { return String.format("%s-%s", topic, isKey ? "key" : "value"); }
-
topic
パラメーターは、メッセージを受信する Kafka トピックの名前です。 -
isKey
パラメーター は、メッセージキーがシリアライズされる場合は true、メッセージ値がシリアライズされる場合は false です。 -
schema
パラメーターは、シリアライズ/デシリアライズされるメッセージのスキーマです。 -
返される
artifactID
は、スキーマが Service Registry に登録される ID です。
使用する検索アップストラテジーは、スキーマを保存する方法と場所によって異なります。たとえば、同じ Avro メッセージタイプを持つ Kafka トピックが複数ある場合、レコード ID を使用するストラテジーを使用することがあります。
アーティファクト ID を返すストラテジー
これらのストラテジーは、ArtifactIdStrategy
の実装に基づいてアーティファクト ID を返します。
RecordIdStrategy
- スキーマのフルネームを使用する Avro 固有のストラテジー。
TopicRecordIdStrategy
- トピック名およびスキーマのフルネームを使用する Avro 固有のストラテジー。
TopicIdStrategy
-
(デフォルト) トピック名と、
key
またはvalue
接尾辞を使用するストラテジー。 SimpleTopicIdStrategy
- トピック名のみを使用する単純なストラテジー。
グローバル ID を返すストラテジー
これらのストラテジーは、GlobalIdStrategy
の実装に基づいてグローバル ID を返します。
FindLatestIdStrategy
- アーティファクト ID に基づいて最新のスキーマバージョンのグローバル ID を返すストラテジー。
FindBySchemaIdStrategy
- アーティファクト ID に基づいてスキーマコンテンツと一致する、グローバル ID を返すストラテジー。
GetOrCreateIdStrategy
- アーティファクト ID に基づいて最新スキーマの取得を試み、スキーマが存在しなければ新規スキーマを作成するストラテジー。
AutoRegisterIdStrategy
- スキーマを更新し、更新されたスキーマのグローバル ID を使用するストラテジー。
9.5. Service Registry の定数
このセクションで概説する定数を使用して、特定のクライアントの SerDe サービスおよびスキーマ検索ストラテジーを直接クライアントに設定できます。
または、プロパティーファイルまたはプロパティーインスタンスで定数を指定することもできます。
シリアライザー/デシリアライザー (SerDe) サービスの定数
public abstract class AbstractKafkaSerDe<T extends AbstractKafkaSerDe<T>> implements AutoCloseable { protected final Logger log = LoggerFactory.getLogger(getClass()); public static final String REGISTRY_URL_CONFIG_PARAM = "apicurio.registry.url"; 1 public static final String REGISTRY_CACHED_CONFIG_PARAM = "apicurio.registry.cached"; 2 public static final String REGISTRY_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.id-handler"; 3 public static final String REGISTRY_CONFLUENT_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.as-confluent"; 4
- 1
- (必須) Service Registry の URL。
- 2
- クライアントがリクエストを実行し、以前の結果のキャッシュから情報を検索して処理時間を短縮できるようにします。キャッシュが空の場合、検索は Service Registry から実行されます。
- 3
- ID 処理を拡張することで、他の ID 形式をサポートし、その形式に Service Registry SerDe サービスとの互換性を持たせます。たとえば、ID 形式を
Long
からInteger
に変更すると Confluent ID 形式がサポートされます。 - 4
- Confluent ID の処理を簡素化するフラグ。
true
に設定すると、Integer
がグローバル ID の検索に使用されます。
検索ストラテジーの定数
public abstract class AbstractKafkaStrategyAwareSerDe<T, S extends AbstractKafkaStrategyAwareSerDe<T, S>> extends AbstractKafkaSerDe<S> { public static final String REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.artifact-id"; 1 public static final String REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.global-id"; 2
コンバーターの定数
public class SchemalessConverter<T> extends AbstractKafkaSerDe<SchemalessConverter<T>> implements Converter { public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; 1 public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; 2
Avro データプロバイダーの定数
public interface AvroDatumProvider<T> { String REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM = "apicurio.registry.avro-datum-provider"; 1 String REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM = "apicurio.registry.use-specific-avro-reader"; 2
DefaultAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 1 ReflectAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 2
9.6. Service Registry のインストール
AMQ Streams ストレージで Service Registry をインストールする手順は、Service Registry のドキュメントを参照してください。
クラスターの設定に応じて、複数の Service Registry インスタンスをインストールできます。インスタンス数は、使用するストレージタイプと、処理する必要のあるスキーマの数によって異なります。
9.7. スキーマの Service Registry への登録
スキーマを Apache Avro などの適切な形式で定義したら、スキーマを Service Registry に追加できます。
スキーマは以下を使用して追加できます。
- Service Registry API を使用する curl コマンド
- Service Registry に付属する Maven プラグイン
- クライアントコードに加えられたスキーマ設定
スキーマを登録するまでは、クライアントアプリケーションは Service Registry を使用できません。
curl の例
curl -X POST -H "Content-type: application/json; artifactType=AVRO" \ -H "X-Registry-ArtifactId: prices-value" \ --data '{ 1 "type":"record", "name":"price", "namespace":"com.redhat", "fields":[{"name":"symbol","type":"string"}, {"name":"price","type":"string"}] }' my-cluster-service-registry-myproject.example.com/artifacts -s 2
プラグインの例
<plugin> <groupId>io.apicurio</groupId> <artifactId>apicurio-registry-maven-plugin</artifactId> <version>${registry.version}</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>register</goal> </goals> <configuration> <registryUrl>https//my-cluster-service-registry-myproject.example.com</registryUrl> <artifactType>AVRO</artifactType> <artifacts> <schema1>${project.basedir}/schemas/schema1.avsc</schema1> </artifacts> </configuration> </execution> </executions> </plugin>
(プロデューサー) クライアントによる設定例
String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", 1 "https//my-cluster-service-registry-myproject.example.com"); try (RegistryService service = RegistryClient.create(registryUrl_node1)) { String artifactId = ApplicationImpl.INPUT_TOPIC + "-value"; try { service.getArtifactMetaData(artifactId); 2 } catch (WebApplicationException e) { CompletionStage <ArtifactMetaData> csa = service.createArtifact( ArtifactType.AVRO, artifactId, new ByteArrayInputStream(LogInput.SCHEMA$.toString().getBytes()) ); csa.toCompletableFuture().get(); } }
9.8. プロデューサークライアントからの Service Registry スキーマの使用
この手順では、Service Registry からのスキーマを使用するように Java プロデューサークライアントを設定する方法について説明します。
手順
Service Registry の URL でクライアントを設定します。
以下に例を示します。
String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", "https//my-cluster-service-registry-myproject.example.com"); RegistryService service = RegistryClient.cached(registryUrl);
クライアントをシリアライザーサービスで設定し、Service Registry でスキーマを検索するようにストラテジーを設定します。
以下に例を示します。
String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", "https//my-cluster-service-registry-myproject.example.com"); clientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, property(clientProperties, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092")); clientProperties.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, registryUrl_node1); 1 clientProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 2 clientProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 3 clientProperties.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, FindLatestIdStrategy.class.getName()); 4
9.9. コンシューマークライアントからの Service Registry スキーマの使用
この手順では、Service Registry からのスキーマを使用するように Java コンシューマークライアントを設定する方法について説明します。
手順
Service Registry の URL でクライアントを設定します。
以下に例を示します。
String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", "https//my-cluster-service-registry-myproject.example.com"); RegistryService service = RegistryClient.cached(registryUrl);
Service Registry デシリアライザーサービスでクライアントを設定します。
以下に例を示します。
Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <> ( 1 service, new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true) ); Serde<LogInput> logSerde = Serdes.serdeFrom( 2 new AvroKafkaSerializer<>(service), deserializer ); KStream<String, LogInput> input = builder.stream( 3 INPUT_TOPIC, Consumed.with(Serdes.String(), logSerde) );