191.9. Kafka Headers propagation
从 Camel 2.22 开始提供
当消耗来自 Kafka 的消息时,标头会自动传播到 camel 的交换标头。由同一行为支持的生成流 - 特定交换的 camel 标头将传播到 kafka 消息标头。
因为 kafka 标头只允许 byte[]
值,因此如果 camel exchnage 标头被传播,其值应该被序列化为 bytes[]
,否则会跳过标头。支持以下标头值类型: String
,Integer
,Long
, Double,
Boolean
,byte[]
。注: 所有标头生成的 从 kafka 到 camel 的交换默认都会包括值 byte[]
。要覆盖默认功能 uri 参数,可以为 从
route 和 kafkaHeaderSerializer
为 route 设置 kafkaHeaderDeserializer
参数。例如:
from("kafka:my_topic?kafkaHeaderDeserializer=#myDeserializer") ... .to("kafka:my_topic?kafkaHeaderSerializer=#mySerializer")
默认情况下,所有标头都由 KafkaHeaderFilterStrategy
过滤。策略过滤掉以 Camel
或 org.apache.camel
前缀开头的标头。默认的策略可以通过在 to
和 from
路由中使用 headerFilterStrategy
uri 参数进行覆盖:
from("kafka:my_topic?headerFilterStrategy=#myStrategy") ... .to("kafka:my_topic?headerFilterStrategy=#myStrategy")
myStrategy
对象应该是 HeaderFilterStrategy
的子类,必须手动或注册为 Spring/Blueprint 中的 bean,因为它是 CamelContext
感知。