2.2. (预览)记录验证过滤器
Record Validation 过滤器验证由制作者发送的记录。只有通过验证的记录才会发送到代理。此过滤器可用于防止 poison 信息(如包含损坏的数据或无效格式的用户)从输入 Kafka 系统,否则可能会导致消费者失败。
过滤器目前支持两种操作模式:
- 模式 Validation 确保记录的内容符合 Apicurio Registry 中存储的模式。
- JSON 语法验证可确保记录的内容包含语法有效的 JSON。
可以应用验证规则来检查 Kafka 记录键或值的内容。
如果验证失败,产品请求将被拒绝,生成的应用程序会收到错误响应。代理不会接收被拒绝的记录。
这个过滤器目前为 incubation,作为技术预览提供。我们不建议在生产环境中使用它。
2.2.1. (预览)设置记录验证过滤器 复制链接链接已复制到粘贴板!
这个步骤描述了如何设置 Record Validation 过滤器。提供过滤器配置和规则,用于针对 Kafka 记录键和值检查。
先决条件
- Apache Kafka 代理的 Streams 实例。有关为 Apache Kafka 代理部署流的详情,请参考 示例和示例。
- Streams for Apache Kafka Proxy 的配置映射,其中包含用于创建虚拟集群的配置。
- Apicurio Registry (如果希望使用 Schema 验证)。
流程
-
配置
RecordValidation类型过滤器。
filters:
- type: RecordValidation
config:
rules:
- topicNames:
- <topic name>
keyRule:
<rule definition>
valueRule:
<rule definition>
defaultRule:
keyRule:
<rule definition>
valueRule:
<rule definition>
forwardPartialRequests: false
根据您的要求,将 YAML 配置中的 token <rule definition> 替换为 Schema Validation 规则或 JSON 语法验证规则。
架构验证规则定义示例
Schema Validation 规则验证规则验证,键或值是否与 Apicurio Schema Registry 中的全局 ID 标识的模式匹配。
如果 key 或 value 不遵循 schema,则记录将被拒绝。
另外,如果 kafka producer 在记录中嵌入了一个全局 ID,它将针对规则定义的全局 ID 验证。如果不匹配,则记录将被拒绝。如需了解有关如何将全局 ID 嵌入到记录中的详细信息,请参阅 Apicurio 文档。过滤器支持从 Apicurio globalId 记录标头或序列化内容本身的初始字节提取 ID。
schemaValidationConfig:
apicurioGlobalId: 1001
apicurioRegistryUrl: http://registry.local:8080
allowNulls: true
allowEmpty: true
模式验证模式目前只能强制使用 JSON 模式(问题)
JSON 语法验证规则定义示例
JSON 语法验证规则验证键或值仅包含正确的 JSON。
syntacticallyCorrectJson:
validateObjectKeysUnique: true
allowNulls: true
allowEmpty: true