29.11. Kafka Headers propagation
当使用 Kafka 的消息时,标头将自动传播到 camel Exchange 标头。由相同行为支持的生成流 - 特定交换的 camel 标头将传播到 kafka 消息标头。
由于 kafka 标头只允许 byte[] 值,因此,为了传播它的值,应序列化为 bytes[],否则将跳过标头。支持以下标头值类型: String,Integer,Long, Double, 布尔值,byte[]。注: 所有标头 从 kafka 传播到 camel Exchange 将默认包含 byte[] 值。若要覆盖默认功能 uri 参数,可以设置: headerDeserializer for from route 和 headerSerializer 用于路由。例如:
from("kafka:my_topic?headerDeserializer=#myDeserializer")
...
.to("kafka:my_topic?headerSerializer=#mySerializer")
from("kafka:my_topic?headerDeserializer=#myDeserializer")
...
.to("kafka:my_topic?headerSerializer=#mySerializer")
默认情况下,所有标头都会被 KafkaHeaderFilterStrategy 过滤。策略过滤掉以 Camel 或 org.apache.camel 前缀开头的标头。可以使用 中 到 路由中的 headerFilterStrategy uri 参数和路由覆盖默认策略:
from("kafka:my_topic?headerFilterStrategy=#myStrategy")
...
.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
from("kafka:my_topic?headerFilterStrategy=#myStrategy")
...
.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
myStrategy 对象应该是 HeaderFilterStrategy 的子类,且必须放置到 Camel registry 中,也可以作为 Bean 在 Spring/Blueprint 中注册,因为它是 CamelContext aware。