Kafka发送消息分为同步(sync)、异步(async)在与spring集成中通过配置文件修改
配置文件放在文末
acks = 0;表示producer不需要等待broker确认收到消息
acks = 1;表示producer至少需要等待leader已经成功写入本地log,但是follower如果没有成功备份同时leader挂掉,就造成消息丢失
acks = all 或者-1;表示需要等待 min.insync.replicas(默认为1,推荐配置大于2,如果配置为2,此时就需要leader和一个follower同步完后,才会返回ack)
Kafka生产者发送消息防止丢失
java提供了一个注解:@PostConstruct用来修饰非静态的void()方法,作用就是:当我们的项目启动时候就会进行预热,首先执行这个方法
使用kafkaTemplate在这个预热方法中使用setProducerListener();方法来为消息生产者创建一个监听器监听消息发送是否成功
@PostConstruct
public void initKafka(){
kafkaTemplate.setProducerListener(new ProducerListener(){
public void onSuccess(String topic, Integer partition, String key, String value, Recordmetadata recordmetadata) {
System.out.println("kafka发送消息成功:topic:" + topic + "key:" + key + "value:" + value);
}
public void onError(String topic, Integer partition, String key, String value, Exception exception) {
// 消息发送失败 可以执行一系列操作....
System.err.println("kafka发送消息失败:topic:" + topic + "key:" + key + "value:" + value);
}
});
}
2.Kafka consumer接收消息
2.1自动提交
假设我们生产者向服务器成功发送7条消息,消费者批量拿取,假设第一次拿取1,2两条消息,当消费者拿到消息之后会周期性的提交offset,也就是说,我拿到消息之后我就反馈给broker:我下次要从3开始,如果宕机,重启后还是从3开始就会造成消息丢失
解决方案: 在kafka producer发送之前,存入redis缓存中,当消费者正常消费时候在删除,同时,在编写一个定时任务,定时获取redis中消费失败的消息重新发送
// 防止消息丢失 (存入redis中)
redisTemplate.opsForHash().put("order",out_trade_no,order);
// kafka 发送消息
kafkaTemplate.send("test","order", JSON.toJSONString(order));
定时任务 cron表达式在线生成
@Scheduled(cron = "0 * * * * ?")
public void reSendMsg(){
List orderList = redisTemplate.opsForHash().values("order");
if(orderList == null){
return;
}
// 重新发送给消费者完成消费
for (Order order : orderList) {
kafkaTemplate.send("hgShop","order", JSON.toJSONString(order));
}
}
2.2手动提交
当我们使用的手动提交时候,我们拿走1,2数据,但是在没有消费成功的情况下不反馈给 broker,比如:我消息1 消费完成,这时候消费者挂掉,重启之后,消费者重新获取消息,还是从2开始,因为消息1消费完成之后反馈给broker,但是消息2丢失了,没有反馈,broker就认为没有消费,这样也就能较好的避免大部分消息丢失。
要注意的是
在消费者端一定是实现 AcknowledgingMessageListener 接口 实现 重写两个参数的方法 然后 手动提交!!!
配置文件 自动提交–消费者自动提交–生产者
手动提交–消费者
手动提交–生产者



