栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka重复消费,消息丢失,消息积压,消息顺序消费解决方案

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka重复消费,消息丢失,消息积压,消息顺序消费解决方案

一  读取 指定路径文件日志 生产信息
  @Test
    public void testReadLog() throws Exception {
        // 读取日志文件
        String path="D:\idea-work\2021-12-28";
        System.out.println("开始读取");
        this.getFile(new File(path));

        // 测试类 读取文件消耗时间
        Thread.sleep(100000);
        System.out.println("结束读取");
    }

    private void getFile(File file) throws Exception {
        // 列出 该目录下所有文件
        File[] files = file.listFiles();
        if (files!=null && files.length>0){
            for (File file1 : files) {
                if (file1.isFile()){
                    // 如果是文件 就读去
                    System.err.println("文件名称=====>" + file1.getName());
                    BufferedReader reader = new BufferedReader(new FileReader(file1));
                    // 唯一值  防止 重复消费
                    UUID uuid = UUID.randomUUID();
                    String str="";
                    while ((str=reader.readLine()) != null){
                        kafkaTemplate.sendDefault("import",uuid+str);
                        // 发送消息
                    }
                } else {
                    // 如果是文件夹 递归 读取
                    this.getFile(file1);
                }
            }
        }
    }
二  kafka 消费端消费消息 使用手动提交解决防止丢失          使用线程队列 顺序
package com.hao.listener;

import com.hao.dao.LogRep;
import com.hao.domain.until.LogInfo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

import java.util.concurrent.linkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;


public class MyAKMessage implements AcknowledgingMessageListener {


    @Autowired
    LogRep logRep;

    @Autowired
    RedisTemplate redisTemplate;

    
    linkedBlockingQueue> queue = new linkedBlockingQueue<>();
    // 公平锁
    
    ReentrantLock lock = new ReentrantLock(true);
    
    @Override
    public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
        
        // 开启一个线程用来 把消息放入队列
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 存放信息
                    queue.put(data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 开启一个线程 消费信息
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 取出信息
                ConsumerRecord data = queue.poll();
                if (null!=data){
                    // 消费信息
                    myWork(data,acknowledgment);
                }
            }

        }).start();
    }
    int i=0;
    private void myWork(ConsumerRecord data, Acknowledgment acknowledgment) {
        // 上锁 
        lock.lock();
        // 获取键
        String key = data.key();
        // 获取值
        String value = data.value();
        System.out.println("消费者接收数据+key:"+key+"value:"+value);
        if (key.equals("import")){
            // 构建实体 存入 es
            LogInfo logInfo = new LogInfo();
            logInfo.setId(i++);
            logInfo.setValue(value);
            String id = value.substring(0, 35);
            Boolean b = redisTemplate.opsForValue().setIfAbsent(id, value, 7, TimeUnit.DAYS);
            if (b){
                // 防止 重复小费
                logRep.save(logInfo);
            }
        }
        // 手动提交
        acknowledgment.acknowledge();
        lock.unlock();//解锁
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/684467.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号