栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot利用WebSocket结合任务调度线程池实现多客户端实时监控同一日志文件

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot利用WebSocket结合任务调度线程池实现多客户端实时监控同一日志文件

1、前言

首先是创建一个SpringBoot工程,我相信读这篇文章的你已经会了,所以直接忽略了;

本文的业务逻辑前提如下:

  • 对于某一次构建任务,他会产生一个构建执行日志文件,而这一个日志文件可能不止一个客户端会去实时监控,所以我以构建任务的Id作为不同的客户端去查看同一个日志文件的标识
2、配置类

注入ServerEndpointExpoter,以便后续使用注解开发

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
3、会话池

会话池代码没有啥技术含量,不作其他解释,具体可参考代码中注释

public class MonitorSessionPool {

    
    private static final Map> BUILD_RUN_SESSIONS = new ConcurrentHashMap<>();

    
    private static final Map ALL_BUILD_RUN_SESSIONS = new ConcurrentHashMap<>();

    
    public static Long close(String sessionId) {
        List sessions = ALL_BUILD_RUN_SESSIONS.values().stream().filter(session -> session.getId().equals(sessionId)).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(sessions) || sessions.size() != 1) {
            throw new RuntimeException("获取会话信息失败");
        }
        Session session = sessions.get(0);
        if (session == null) {
            throw new RuntimeException("目标Session未找到");
        }
        Set>> buildRunSessionEntrySet = BUILD_RUN_SESSIONS.entrySet();
        for (Map.Entry> entry : buildRunSessionEntrySet) {
            List ids = entry.getValue();
            Iterator idIterator = ids.iterator();
            while (idIterator.hasNext()) {
                String id = idIterator.next();
                if (id.equals(sessionId)) {
                    idIterator.remove();
                    ALL_BUILD_RUN_SESSIONS.remove(sessionId);
                    return entry.getKey();
                }
            }
        }
        return -1L;
    }

    
    public static void sendMessage(Long buildRunId, String log) {
        List sessionIds = BUILD_RUN_SESSIONS.get(buildRunId);
        if (CollectionUtils.isEmpty(sessionIds)) {
            throw new RuntimeException("当前构建任务下方无实时日志监听客户端");
        }
        sessionIds.forEach(sessionId -> {
            Session session = ALL_BUILD_RUN_SESSIONS.get(sessionId);
            RemoteEndpoint.Async asyncRemote = session.getAsyncRemote();
            asyncRemote.sendText(log);
        });
    }

    
    public static void openSession(Session session, Long buildRunId) {
        List sessionIds = BUILD_RUN_SESSIONS.computeIfAbsent(buildRunId, k -> new ArrayList<>());
        sessionIds.add(session.getId());
        ALL_BUILD_RUN_SESSIONS.put(session.getId(), session);
    }

    
    public static Integer getAliveClientTotalNumber(Long buildRunId) {
        List sessionIds = BUILD_RUN_SESSIONS.get(buildRunId);
        if (CollectionUtils.isEmpty(sessionIds)) {
            return 0;
        }
        return sessionIds.size();
    }
}
4、多客户端读取同一日志文件的辅助类

因为不同的客户端监听的日志文件可能不唯一,所以我创建了一个监控日志输出内容相关信息的辅助类,其中:

  • logFilePath:日志文件路径
  • log:已经输出的日志文件内容(如果存在日志内容过大问题,读者可以思考自行解决,我这里强行认为我的内存够用,一般情况下,某一次构建产生的日志文件不会太大)
  • nowPosition:当前已经输出的日志内容坐标
  • nowLine:因为支持按行输出日志,所以保存当前输出的日志内容行数
  • submit:标识一个监控线程异步执行的结果,会用于销毁线程
@Data
@NoArgsConstructor
@AllArgsConstructor
@Component
public final class FileLocationDescriptor {
    private String logFilePath;
    private List log;
    private Long nowPosition;
    private Long nowLine;
    private Future submit;
}
5、WebSocket服务端

这里使用了线程池去实时监控一个日志文件,另外还使用了任务定时调度线程池,让其每隔一段时间去监控日志文件内容变化,还有一点就是文件路径,项目中应该需要按照一定的逻辑动态获取,这里简单起见直接用了一个桌面的txt日志文件,具体逻辑可以参考代码中的注释,我写的挺详细的

@ServerEndpoint(value = "/buildRunLog/{buildRunId}")
@Component
@Slf4j
public class WebSocketEndpoint {

    
    private static final Map FILE_LOCATION_DEscriptORS = new HashMap<>();

    
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_POOL = Executors.newScheduledThreadPool(5);

    
    @OnOpen
    public void onOpen(final Session session, @PathParam("buildRunId") Long buildRunId) throws IOException, EncodeException {
        //先将当前会话放入会话池
        MonitorSessionPool.openSession(session, buildRunId);
        //获取输出构建日志文件信息辅助类
        FileLocationDescriptor fileLocationDescriptor = FILE_LOCATION_DEscriptORS.get(buildRunId);
        String filePath = "C:\Users\Administrator\Desktop\log.txt";
        
        if (fileLocationDescriptor == null) {
            //初始化输出构建日志文件信息辅助类
            fileLocationDescriptor = new FileLocationDescriptor(filePath, new ArrayList<>(), 0L, 0L, null);
            //放入缓存Map中
            FILE_LOCATION_DEscriptORS.put(buildRunId, fileLocationDescriptor);
            //lambda匿名类表达式中引用外部变量,需要加final
            final FileLocationDescriptor finalFileLocationDescriptor = fileLocationDescriptor;
            //将当前监控线程异步执行的结果获取出来
            Future submit = startMonitoring(buildRunId, filePath, finalFileLocationDescriptor);
            //将监控线程异步执行结果存入信息类中
            fileLocationDescriptor.setSubmit(submit);
        } else {
            
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("log", fileLocationDescriptor.getLog());
            session.getBasicRemote().sendText(jsonObject.toJSONString());
        }
    }

    
    @OnClose
    public void onClose(Session session) {
        //先将当前会话移除会话池
        Long buildRunId = MonitorSessionPool.close(session.getId());
        if (buildRunId == -1) {
            throw new RuntimeException("session未找到");
        }
        //获取当前构建执行Id下方还有多少客户端连接
        Integer aliveClientTotalNumber = MonitorSessionPool.getAliveClientTotalNumber(buildRunId);
        //如果所有客户端都断开连接,则将当前监控线程销毁
        if (aliveClientTotalNumber == 0) {
            //获取到目标构建执行监控线程异步执行的结果
            Future submit = FILE_LOCATION_DEscriptORS.get(buildRunId).getSubmit();
            //直接强制取消,忽略当前是否正在执行
            submit.cancel(true);
            //从缓存中移除
            FILE_LOCATION_DEscriptORS.remove(buildRunId);
        }
    }

    
    private Future startMonitoring(Long buildRunId, String filePath, FileLocationDescriptor finalFileLocationDescriptor) {
        //创建日志文件对象
        File logFile = new File(filePath);
        if (!logFile.exists()) {
            throw new RuntimeException("目标日志文件不存在");
        }
        return SCHEDULED_EXECUTOR_POOL.scheduleAtFixedRate(() -> {
            try {
                //获取当前日志文件行数
                long nowLine = Files.lines(Paths.get(filePath)).count();
                RandomAccessFile randomAccessFile = new RandomAccessFile(logFile, "r");
                //只有当有新内容添加才会开始读取
                Long oldPosition = finalFileLocationDescriptor.getNowPosition();
                if (finalFileLocationDescriptor.getNowLine() < nowLine) {
                    //跳转到未读数据的位置
                    randomAccessFile.seek(oldPosition);
                    //新数据集合
                    List newLog = new ArrayList<>();
                    String lineLog;
                    while ((lineLog = randomAccessFile.readLine()) != null) {
                        //从上一行末尾换行符前一个位置读取下一行,第一行必然是一个换行,所以需要转换
                        if ("".equals(lineLog)) {
                            lineLog = "n";
                        }
                        newLog.add(lineLog);
                    }
                    //获取当前读到的坐标,存入信息类中
                    long nowPosition = randomAccessFile.getFilePointer();
                    finalFileLocationDescriptor.setNowPosition(nowPosition);
                    finalFileLocationDescriptor.setNowLine(nowLine);
                    //将新的日志内容输出到客户端
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("newLog", newLog);
                    MonitorSessionPool.sendMessage(buildRunId, jsonObject.toJSONString());
                    finalFileLocationDescriptor.getLog().addAll(newLog);
                } else {
                    //如果行数没有新增,判断一下有没有在末尾新增日志
                    long nowPosition = randomAccessFile.length();
                    //如果有新增
                    if (oldPosition < nowPosition) {
                        //设置当前读取开始位置
                        randomAccessFile.seek(oldPosition);
                        //内容长度为新旧坐标之差
                        int newLogByteLength = Integer.parseInt((nowPosition - oldPosition) + "");
                        //创建对应的读取字节数组
                        byte[] newLogByte = new byte[newLogByteLength];
                        //将剩余内容读取到字节数组中
                        randomAccessFile.read(newLogByte);
                        //转换日志内容为字符串
                        String newLog = new String(newLogByte, StandardCharsets.UTF_8);
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("newLog", newLog);
                        MonitorSessionPool.sendMessage(buildRunId, jsonObject.toJSONString());
                        //发送到客户端
                        finalFileLocationDescriptor.setNowPosition(nowPosition);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, 0, 1, TimeUnit.SECONDS);
    }
}
6、效果

这里我没有编写前端页面测试,而是使用了Postman去测试,将就看一下

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/667052.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号