65.12. Kafka トランザクション
プロデューサーとの Kafka トランザクションを有効にするには、additional-properties に transactional.id、enable.idempotence、retries を追加する必要があります。
from("direct:transaction")
.to("kafka:my_topic?additional-properties[transactional.id]=1234&additional-properties[enable.idempotence]=true&additional-properties[retries]=5");
エクスチェンジルーティングの終了時に、Kafka プロデューサーはトランザクションをコミットするか、例外がスローされた場合、またはエクスチェンジが RollbackOnly の場合はトランザクションを中止します。Kafka はマルチスレッドでのトランザクションをサポートしていないため、トランザクション要求を行うために同じ transaction.id を持つ別のプロデューサーが存在する場合、ProducerFencedException がスローされます。
これは、transacted() を使用して JTA camel-jta と連携して動作します。また、XA をサポートするリソース (SQL または JMS) が関係する場合は、それらが連携して動作し、エクスチェンジルーティングの最後に両方がコミットまたはロールバックします。JTA トランザクションマネージャーがコミットに失敗したが (2PC 処理中)、その前に kafka トランザクションがコミットされており、kafka トランザクションは JTA/XA 仕様をサポートしていないため変更をロールバックできないという場合もあります。データの整合性に関しては依然としてリスクが存在します。