KafkaConf
KafkaConf
简介
config
1 | public class ReliableProducer { |
关于消费策略:
- 最多一次(at most once): 消息可能丢失也可能被处理,但最多只会被处理一次。可能丢失 不会重复
- 至少一次(at least once): 消息不会丢失,但可能被处理多次。可能重复 不会丢失
- 确传递一次(exactly once): 消息被处理且只会被处理一次。不丢失 不重复 就一次
1、ProducerConfig.ACKS_CONFIG 设置为0时,实现了at most once。
2、ProducerConfig.ACKS_CONFIG 设置为1时 实现了at least once (有一种情况就是消息成功写入,而这个时候由于网络问题producer没有收到写入成功的响应,producer就会开启重试的操作,直到网络恢复,消息就发送了多次)
3、kafka 0.11.0.0 版本引入了 idempotent producer 机制:enable.idempotent 设置为true。
Kafka 0.11.0.0 版本引入了幂等生产者和事务支持。幂等生产者确保同一消息多次发送时只写入一次。
在多分区场景下,事务确保所有消息要么全部成功写入,要么全部回滚,以保持数据完整性。
消费者端可能需要额外处理来确保每条消息只被精确地消费一次,如手动管理偏移量提交。
关键配置解读:
enable.auto.commit
:是否开启自动提交Offset 默认 true (偏移量是在使用者的轮询方法执行期间提交的)auto.commit.interval.ms
:自动提交Offset的时间间隔 默认 5000ms(仅定义提交之间的最小延迟)
仅提交在以前的轮询调用中返回的记录的偏移量。由于处理发生在轮询调用之间,因此永远不会提交未处理记录的偏移量。这保证了at-least-once
至少一次的交付语义。
处理速度慢的问题
轮询方法调用之间允许的最大延迟由 max.poll.interval.ms
配置定义,默认为 5 分钟。
如果使用者未能在该时间间隔内调用轮询方法,则将其视为死,并触发组重新平衡。
对于每个使用者的线程和每个记录需要很长时间才能处理的用例的默认配置,这种情况经常发生。
使用每个使用者线程模型时,可以通过调整以下配置值来解决此问题:
将 max.poll.records
设置为较小的值
将 max.poll.interval.ms
设置为更高的值
在kafka分区无法改动的情况下使用以下两种方案作为参考
1、使用Confluent Parallel Consumer
2、单线程消费,分配给多线程处理,处理完成后由主线程提交offset 多线程消费