34.11. Kafka ヘッダーの伝播
Kafka からメッセージを消費する場合、ヘッダーは camel exchange ヘッダーに自動的に伝播されます。同じ動作に裏打ちされたフローの生成 - 特定の交換のキャメルヘッダーは、kafka メッセージヘッダーに伝達されます。
kafka ヘッダーは byte[] 値のみを許可するため、camel exchange ヘッダーを伝播するには、その値を bytes[] にシリアル化する必要があります。そうしないと、ヘッダーはスキップされます。次のヘッダー値タイプがサポートされています: String、Integer、Long、Double、Boolean、byte[]。注: kafka から camel exchange に 伝播されるすべてのヘッダーには、デフォルトで byte[] 値が含まれます。デフォルトの機能をオーバーライドするために、URI パラメーターを設定できます。ルート から の headerDeserializer と ルートへの headerSerializer です。以下に例を示します。
from("kafka:my_topic?headerDeserializer=#myDeserializer")
...
.to("kafka:my_topic?headerSerializer=#mySerializer")
from("kafka:my_topic?headerDeserializer=#myDeserializer")
...
.to("kafka:my_topic?headerSerializer=#mySerializer")
デフォルトでは、すべてのヘッダーが KafkaHeaderFilterStrategy によってフィルタリングされています。Strategy は、Camel または org.apache.camel 接頭辞で始まるヘッダーを除外します。デフォルトの戦略は、to ルートと from ルートの両方で headerFilterStrategy uri パラメーターを使用してオーバーライドできます。
from("kafka:my_topic?headerFilterStrategy=#myStrategy")
...
.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
from("kafka:my_topic?headerFilterStrategy=#myStrategy")
...
.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
myStrategy オブジェクトは HeaderFilterStrategy のサブクラスである必要があり、CamelContext に対応しているため、手動で、または Spring/Blueprint で Bean として登録することにより、Camel レジストリーに配置する必要があります。