65.12. Kafka Transaction
您需要添加 transactional.id、enable.idempotence 和 retries in additional-properties,以启用带有制作者的 kafka 事务。
from("direct:transaction")
.to("kafka:my_topic?additional-properties[transactional.id]=1234&additional-properties[enable.idempotence]=true&additional-properties[retries]=5");
from("direct:transaction")
.to("kafka:my_topic?additional-properties[transactional.id]=1234&additional-properties[enable.idempotence]=true&additional-properties[retries]=5");
在交换路由结束时,kafka 生成者将在 Exception 抛出时提交事务或中止事务,或者交换是 RollbackOnly。由于 Kafka 不支持多线程中的事务,因此如果有具有相同 transaction.id 的另一个制作者来发出事务请求,它将抛出 ProducerFencedException。
它使用 transacted () 与 JTA camel-jta 一起工作,如果它涉及支持 XA 的一些资源(SQL 或 JMS),则它们将按 tandem 工作,它们都会在交换路由结束时提交或回滚。在某些情况下,如果 JTA 事务管理器无法提交(在 2PC 处理时),但 kafka 事务已在之前提交,且因为 kafka 事务不支持 JTA/XA spec,因此不会回滚更改。数据一致性仍存在风险。