首先是创建一个SpringBoot工程,我相信读这篇文章的你已经会了,所以直接忽略了;
本文的业务逻辑前提如下:
- 对于某一次构建任务,他会产生一个构建执行日志文件,而这一个日志文件可能不止一个客户端会去实时监控,所以我以构建任务的Id作为不同的客户端去查看同一个日志文件的标识
注入ServerEndpointExpoter,以便后续使用注解开发
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3、会话池
会话池代码没有啥技术含量,不作其他解释,具体可参考代码中注释
public class MonitorSessionPool {
private static final Map> BUILD_RUN_SESSIONS = new ConcurrentHashMap<>();
private static final Map ALL_BUILD_RUN_SESSIONS = new ConcurrentHashMap<>();
public static Long close(String sessionId) {
List sessions = ALL_BUILD_RUN_SESSIONS.values().stream().filter(session -> session.getId().equals(sessionId)).collect(Collectors.toList());
if (CollectionUtils.isEmpty(sessions) || sessions.size() != 1) {
throw new RuntimeException("获取会话信息失败");
}
Session session = sessions.get(0);
if (session == null) {
throw new RuntimeException("目标Session未找到");
}
Set>> buildRunSessionEntrySet = BUILD_RUN_SESSIONS.entrySet();
for (Map.Entry> entry : buildRunSessionEntrySet) {
List ids = entry.getValue();
Iterator idIterator = ids.iterator();
while (idIterator.hasNext()) {
String id = idIterator.next();
if (id.equals(sessionId)) {
idIterator.remove();
ALL_BUILD_RUN_SESSIONS.remove(sessionId);
return entry.getKey();
}
}
}
return -1L;
}
public static void sendMessage(Long buildRunId, String log) {
List sessionIds = BUILD_RUN_SESSIONS.get(buildRunId);
if (CollectionUtils.isEmpty(sessionIds)) {
throw new RuntimeException("当前构建任务下方无实时日志监听客户端");
}
sessionIds.forEach(sessionId -> {
Session session = ALL_BUILD_RUN_SESSIONS.get(sessionId);
RemoteEndpoint.Async asyncRemote = session.getAsyncRemote();
asyncRemote.sendText(log);
});
}
public static void openSession(Session session, Long buildRunId) {
List sessionIds = BUILD_RUN_SESSIONS.computeIfAbsent(buildRunId, k -> new ArrayList<>());
sessionIds.add(session.getId());
ALL_BUILD_RUN_SESSIONS.put(session.getId(), session);
}
public static Integer getAliveClientTotalNumber(Long buildRunId) {
List sessionIds = BUILD_RUN_SESSIONS.get(buildRunId);
if (CollectionUtils.isEmpty(sessionIds)) {
return 0;
}
return sessionIds.size();
}
}
4、多客户端读取同一日志文件的辅助类
因为不同的客户端监听的日志文件可能不唯一,所以我创建了一个监控日志输出内容相关信息的辅助类,其中:
- logFilePath:日志文件路径
- log:已经输出的日志文件内容(如果存在日志内容过大问题,读者可以思考自行解决,我这里强行认为我的内存够用,一般情况下,某一次构建产生的日志文件不会太大)
- nowPosition:当前已经输出的日志内容坐标
- nowLine:因为支持按行输出日志,所以保存当前输出的日志内容行数
- submit:标识一个监控线程异步执行的结果,会用于销毁线程
@Data
@NoArgsConstructor
@AllArgsConstructor
@Component
public final class FileLocationDescriptor {
private String logFilePath;
private List log;
private Long nowPosition;
private Long nowLine;
private Future> submit;
}
5、WebSocket服务端
这里使用了线程池去实时监控一个日志文件,另外还使用了任务定时调度线程池,让其每隔一段时间去监控日志文件内容变化,还有一点就是文件路径,项目中应该需要按照一定的逻辑动态获取,这里简单起见直接用了一个桌面的txt日志文件,具体逻辑可以参考代码中的注释,我写的挺详细的
@ServerEndpoint(value = "/buildRunLog/{buildRunId}")
@Component
@Slf4j
public class WebSocketEndpoint {
private static final Map FILE_LOCATION_DEscriptORS = new HashMap<>();
private static final ScheduledExecutorService SCHEDULED_EXECUTOR_POOL = Executors.newScheduledThreadPool(5);
@OnOpen
public void onOpen(final Session session, @PathParam("buildRunId") Long buildRunId) throws IOException, EncodeException {
//先将当前会话放入会话池
MonitorSessionPool.openSession(session, buildRunId);
//获取输出构建日志文件信息辅助类
FileLocationDescriptor fileLocationDescriptor = FILE_LOCATION_DEscriptORS.get(buildRunId);
String filePath = "C:\Users\Administrator\Desktop\log.txt";
if (fileLocationDescriptor == null) {
//初始化输出构建日志文件信息辅助类
fileLocationDescriptor = new FileLocationDescriptor(filePath, new ArrayList<>(), 0L, 0L, null);
//放入缓存Map中
FILE_LOCATION_DEscriptORS.put(buildRunId, fileLocationDescriptor);
//lambda匿名类表达式中引用外部变量,需要加final
final FileLocationDescriptor finalFileLocationDescriptor = fileLocationDescriptor;
//将当前监控线程异步执行的结果获取出来
Future> submit = startMonitoring(buildRunId, filePath, finalFileLocationDescriptor);
//将监控线程异步执行结果存入信息类中
fileLocationDescriptor.setSubmit(submit);
} else {
JSONObject jsonObject = new JSONObject();
jsonObject.put("log", fileLocationDescriptor.getLog());
session.getBasicRemote().sendText(jsonObject.toJSONString());
}
}
@OnClose
public void onClose(Session session) {
//先将当前会话移除会话池
Long buildRunId = MonitorSessionPool.close(session.getId());
if (buildRunId == -1) {
throw new RuntimeException("session未找到");
}
//获取当前构建执行Id下方还有多少客户端连接
Integer aliveClientTotalNumber = MonitorSessionPool.getAliveClientTotalNumber(buildRunId);
//如果所有客户端都断开连接,则将当前监控线程销毁
if (aliveClientTotalNumber == 0) {
//获取到目标构建执行监控线程异步执行的结果
Future> submit = FILE_LOCATION_DEscriptORS.get(buildRunId).getSubmit();
//直接强制取消,忽略当前是否正在执行
submit.cancel(true);
//从缓存中移除
FILE_LOCATION_DEscriptORS.remove(buildRunId);
}
}
private Future> startMonitoring(Long buildRunId, String filePath, FileLocationDescriptor finalFileLocationDescriptor) {
//创建日志文件对象
File logFile = new File(filePath);
if (!logFile.exists()) {
throw new RuntimeException("目标日志文件不存在");
}
return SCHEDULED_EXECUTOR_POOL.scheduleAtFixedRate(() -> {
try {
//获取当前日志文件行数
long nowLine = Files.lines(Paths.get(filePath)).count();
RandomAccessFile randomAccessFile = new RandomAccessFile(logFile, "r");
//只有当有新内容添加才会开始读取
Long oldPosition = finalFileLocationDescriptor.getNowPosition();
if (finalFileLocationDescriptor.getNowLine() < nowLine) {
//跳转到未读数据的位置
randomAccessFile.seek(oldPosition);
//新数据集合
List newLog = new ArrayList<>();
String lineLog;
while ((lineLog = randomAccessFile.readLine()) != null) {
//从上一行末尾换行符前一个位置读取下一行,第一行必然是一个换行,所以需要转换
if ("".equals(lineLog)) {
lineLog = "n";
}
newLog.add(lineLog);
}
//获取当前读到的坐标,存入信息类中
long nowPosition = randomAccessFile.getFilePointer();
finalFileLocationDescriptor.setNowPosition(nowPosition);
finalFileLocationDescriptor.setNowLine(nowLine);
//将新的日志内容输出到客户端
JSONObject jsonObject = new JSONObject();
jsonObject.put("newLog", newLog);
MonitorSessionPool.sendMessage(buildRunId, jsonObject.toJSONString());
finalFileLocationDescriptor.getLog().addAll(newLog);
} else {
//如果行数没有新增,判断一下有没有在末尾新增日志
long nowPosition = randomAccessFile.length();
//如果有新增
if (oldPosition < nowPosition) {
//设置当前读取开始位置
randomAccessFile.seek(oldPosition);
//内容长度为新旧坐标之差
int newLogByteLength = Integer.parseInt((nowPosition - oldPosition) + "");
//创建对应的读取字节数组
byte[] newLogByte = new byte[newLogByteLength];
//将剩余内容读取到字节数组中
randomAccessFile.read(newLogByte);
//转换日志内容为字符串
String newLog = new String(newLogByte, StandardCharsets.UTF_8);
JSONObject jsonObject = new JSONObject();
jsonObject.put("newLog", newLog);
MonitorSessionPool.sendMessage(buildRunId, jsonObject.toJSONString());
//发送到客户端
finalFileLocationDescriptor.setNowPosition(nowPosition);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}, 0, 1, TimeUnit.SECONDS);
}
}
6、效果
这里我没有编写前端页面测试,而是使用了Postman去测试,将就看一下



