Questo contenuto non è disponibile nella lingua selezionata.
Chapter 1. (Preview) Record Validation filter
The Record Validation filter validates records sent by a producer. Only records that pass the validation are sent to the broker. This filter can be used to prevent poison messages—such as those containing corrupted data or invalid formats—from entering the Kafka system, which may otherwise lead to consumer failure.
The filter currently supports two modes of operation:
- Schema Validation ensures the content of the record conforms to a schema stored in an Apicurio Registry.
- JSON Syntax Validation ensures the content of the record contains syntactically valid JSON.
Validation rules can be applied to check the content of the Kafka record key or value.
If the validation fails, the product request is rejected and the producing application receives an error response. The broker will not receive the rejected records.
This filter is currently in incubation and available as a preview. We would not recommend using it in a production environment.
1.1. (Preview) Setting up the Record Validation filter Copia collegamentoCollegamento copiato negli appunti!
This procedure describes how to set up the Record Validation filter. Provide the filter configuration and rules that the filter uses to check against Kafka record keys and values.
Prerequisites
- An instance of Streams for Apache Kafka Proxy. For information on deploying Streams for Apache Kafka Proxy, see the samples and examples.
- A config map for Streams for Apache Kafka Proxy that includes the configuration for creating a virtual cluster.
- Apicurio Registry (if wanting to use Schema validation).
Procedure
-
Configure a
RecordValidationtype filter.
rules:
- topicNames:
- <topic name>
keyRule:
<rule definition>
valueRule:
<rule definition>
defaultRule:
keyRule:
<rule definition>
valueRule:
<rule definition>
Replace the token <rule definition> in the YAML configuration with either a Schema Validation rule or a JSON Syntax Validation rule depending on your requirements.
Example Schema Validation Rule Definition
The Schema Validation rule validates that the key or value matches a schema identified by its global ID within an Apicurio Schema Registry.
If the key or value does not adhere to the schema, the record will be rejected.
Additionally, if the kafka producer has embedded a global ID within the record it will be validated against the global ID defined by the rule. If they do not match, the record will be rejected. See the Apicurio documentation for details on how the global ID could be embedded into the record. The filter supports extracting ID’s from either the Apicurio globalId record header or from the initial bytes of the serialized content itself.
schemaValidationConfig:
apicurioGlobalId: 1001
apicurioRegistryUrl: http://registry.local:8080
allowNulls: true
allowEmpty: true
Schema validation mode currently has the capability to enforce only JSON schemas (issue)
Example JSON Syntax Validation Rule Definition
The JSON Syntax Validation rule validates that the key or value contains only syntactically correct JSON.
syntacticallyCorrectJson:
validateObjectKeysUnique: true
allowNulls: true
allowEmpty: true