4.5. 在提交偏移时避免数据丢失或重复
Kafka auto-commit 机制 允许使用者自动提交消息偏移。如果启用,使用者将以 5000ms 间隔提交从轮询代理接收的偏移量。
自动提交机制很方便,但会带来数据丢失和重复的风险。如果消费者已获取并转换了很多消息,但如果执行自动提交时,系统会因为消费者缓冲区中处理的消息崩溃,则数据将会丢失。如果在处理消息后系统崩溃,但在执行 auto-commit 前,数据会在重新平衡后在另一使用者实例上重复。
只有在下一次轮询代理或消费者关闭前处理所有消息时,自动提交可以避免数据丢失。
为最大程度降低数据丢失或重复的可能性,您可以将 enable.auto.commit
设置为 false
,并开发客户端应用程序来对提交偏移具有更多的控制。或者,您可以使用 auto.commit.interval.ms
来减少提交之间的间隔。
... ...
# ...
enable.auto.commit=false
# ...
- 1
- 自动提交设置为 false,以提供更多对提交偏移的控制。
通过将 设置为 enable.auto.commit
设置为 false
,您可以在 执行所有 处理后提交偏移,并且消息已被使用。例如,您可以设置应用程序来调用 Kafka commitSync
和 commitAsync
提交 API。
commitSync
API 在从轮询返回的消息批处理中提交偏移量。完成后,您要在处理批处理中的所有消息时调用 API。如果使用 commitSync
API,则应用程序不会轮询新消息,直到批处理中的最后一个偏移提交为止。如果这种负面会影响吞吐量,您可以频繁地提交,也可以使用 commitAsync
API。commitAsync
API 不等待代理响应提交请求,而是在重新平衡时创建更多重复的风险。常见的方法是将这两个提交 API 合并到应用程序中,而 提交Sync
API 仅在关闭消费者或重新平衡之前使用,以确保最终提交成功。
4.5.1. 控制事务信息 复制链接链接已复制到粘贴板!
考虑在生产端使用事务 ID 和启用幂等性(enable.idempotence=true
)来保证交付精确。然后,您可以使用 isolated .level
属性来控制消费者读取事务消息的方式。
isolation.level
属性有两个有效的值:
-
read_committed
-
read_uncommitted
(默认)
使用 read_committed
来确保只有已提交的事务消息被消费者读取。但是,这会导致端到端延迟增加,因为消费者将无法返回消息,直到代理编写记录交易结果(提交 或 中止)的事务标记为止。
... ...
# ...
enable.auto.commit=false
isolation.level=read_committed
# ...
- 1
- 设置
read_committed
,以便只有提交的消息才会被消费者读取。