191.9. Kafka Headers 传播
从 Camel 2.22 开始提供
当消耗 Kafka 的消息时,标头会自动传播到 camel Exchange 标头。由同一行为支持的生成流 - 特定交换的 camel 标头将传播到 kafka 消息标头。
因为 kafka 标头只允许 byte[] 值,以便 camel exchnage 标头传播其值应该被序列化为 bytes[],否则标头会被跳过。支持以下标头值类型: 字符串,Integer,Long, Double, 布尔值,byte[]。注: 所有标头生成的 从 kafka 到 camel 的交换默认都会包括值 byte[]。要覆盖默认功能 uri 参数,可以为 route 和 kafkaHeaderSerializer 设置为 route: kafkaHeaderDeserializer。 Example:
from("kafka:my_topic?kafkaHeaderDeserializer=#myDeserializer")
...
.to("kafka:my_topic?kafkaHeaderSerializer=#mySerializer")
from("kafka:my_topic?kafkaHeaderDeserializer=#myDeserializer")
...
.to("kafka:my_topic?kafkaHeaderSerializer=#mySerializer")
默认情况下,所有标头都由 KafkaHeaderFilterStrategy 过滤。策略过滤出以 Camel 或 org.apache.camel 前缀开头的标头。默认的策略可以通过在 to 和 from 路由中使用 headerFilterStrategy uri 参数进行覆盖:
from("kafka:my_topic?headerFilterStrategy=#myStrategy")
...
.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
from("kafka:my_topic?headerFilterStrategy=#myStrategy")
...
.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
myStrategy 对象应该是 HeaderFilterStrategy 的子类,且必须手动放置到 Camel registry 中,或者通过注册作为 Spring/Blueprint 中的 bean,因为它是 CamelContext aware。