26.11. Kafka Headers propagation
当消耗来自 Kafka 的消息时,标头将自动传播到 camel Exchange 标头。呈现由同一机制支持的流 - 特定交换的 camel 标头将传播到 kafka 消息标头。
由于 kafka 标头仅允许 字节[]
值,因此为了 camel Exchange 标头,则传播其值应被序列化为 bytes[]
,否则将跳过标头。支持以下标头值类型: String
、Integer
、Long
、Duble、布尔值
、byte[]
。注: 所有 从 kafka 传播的标头都默认包含
byte[]
值。要覆盖默认功能 uri 参数,可从
route 和 headerSerializer
为 的路由设置 headerDeserializer
。例如:
from("kafka:my_topic?headerDeserializer=#myDeserializer") ... .to("kafka:my_topic?headerSerializer=#mySerializer")
默认情况下,所有标头都被 KafkaHeaderFilterStrategy
进行过滤。策略过滤以 Camel
或 org.apache.camel
前缀开头的标头。通过使用 route 中 headerFilterStrategy
uri 参数和 from
routes ,
可覆盖默认策略:
from("kafka:my_topic?headerFilterStrategy=#myStrategy") ... .to("kafka:my_topic?headerFilterStrategy=#myStrategy")
myStrategy
对象应该是 HeaderFilterStrategy
的子类,且必须手动放在 Camel registry 中,或者作为 bean 在 Spring/Blueprint 中被注册,因为它是 CamelContext
aware。