栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq异步消息传递的使用

rabbitmq异步消息传递的使用

2021SC@SDUSC

rabbitmq异步消息传递的使用
  • 配置rabbitmq
  • 建立实体类
  • service
  • receiver

配置rabbitmq

通过配置rabbitmq的config以及rabbitmq的templete实现更自由的定制,实现重发机制等

@Configuration
public class RabbitMqConfig {
    //空字符串是默认交换机
//    static final String SOLVED_EXCHANGE_NAME = "";
    public static final String SOLVED_QUEUE = "rabbitmq-to-reply-queue";
    //空字符串是默认交换机
//    static final String NEED_SOLVE_EXCHANGE_NAME = "";
    public static final String NEED_SOLVE_QUEUE = "rabbitmq-to-handle-queue";
    // 同步消息
    public static final String SYNC_SOLVE_QUEUE = "rabbitmq-sync-judge";

    @Bean
    Queue solvedQueue() {
        return new Queue(SOLVED_QUEUE, true);
    }

    @Bean
    Queue needSolveQueue() {
        return new Queue(NEED_SOLVE_QUEUE, true);
    }

    @Bean
    RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.set/confirm/iCallback((correlationData, ack, cause) -> System.out.println(cause));
        rabbitTemplate.setReturnsCallback(System.out::println);
        return rabbitTemplate;
    }

    @Bean
    AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
        return new AsyncRabbitTemplate(rabbitTemplate);
    }
}

建立实体类
public class JudgeMessage {
    //处理类型
    private String procedure;
    //请求头
    private Header header;
    //请求实体
    private RequestBody requestBody;
    //响应实体
    private ResponseBody responseBody;
    //未用
    private String testData;
}

 public static class Header {
        //消费者
        private String custom;
        //时间发起的server
        private String server;
        //信息传递的时间,unix时间
        private long date;
        //唯一表示,uuid
        private String id;
}
    public static class Body {
        //请求唯一标识
        private String requestID;
        //唯一问题标识号
        private String problemID;
        private List pointList;
        //代码实体
        private Code code;
        //请求限制
        private JudgeLimit judgeLimit;

        //返回值结果
        private String judgeResult;
        //判题端运行的结果
        private Map pointJudgeResponseMap;
    }
    public static class JudgeLimit {
        private Integer maxCPUTime;
        private Integer maxRealTime;
        private Integer maxMemory;
        private Integer maxStack;
        private Integer maxProcessNumber;
        private Integer maxOutputSize;
    }
    public static class RunningDetail {
        private int time;
        private int memory;
    }

service

判题采用异步机制,因为多个测试点可能花费许多时间,所以异步传输机制

@Transactional
    public Integer trySolveProblem(Solve solve) throws DataIntegrityViolationException {
        //不存在对应题目直接返回
        if (!problemMapper.exists(solve.getProblemId())) {
            return null;
        }
        if (!solveMapper.insertSolve(solve)) {
            throw new RuntimeException();
        }
        ProblemLimit problemLimit = problemLimitMapper.getProblemLimitById(solve.getId());
        List testPoints = testPointMapper.getTestPointByProblemId(solve.getProblemId());
        List pointList = new ArrayList<>();
        // 只取/分割的最后部分的结果
        for (TestPoint testPoint : testPoints) {
            String lastName = testPoint.getInPath().split("/")[1];
            pointList.add(lastName);
            solveTestPointMapper.insertSolveTestPoint(solve.getId(), lastName);
        }
        String json = JSON.toJSONString(new JudgeMessage(solve, problemLimit, pointList));
        System.out.println(json);
        rabbitTemplate.convertAndSend("rabbitmq-to-handle-queue", json);
        return solve.getId();
    }
receiver

注册receiver,实现异步消息的接受

@Component
@RabbitListener(queues = "rabbitmq-to-reply-queue")
public class SolveReceive {
    @Autowired
    SolveService solveService;
    @Autowired
    SolveTestPointMapper solveTestPointMapper;

    @RabbitHandler
    @Transactional
    public void receiveMessage(String message) {
        try {
            JudgeMessage judgeMessage = JSON.parseObject(message, JudgeMessage.class);
            JudgeMessage.Body body;
            int solveId = Integer.parseInt(judgeMessage.getHeader().getId());
            Solve solve = solveService.adminSelectSolveRecord(solveId);
            solve.setPass(true);

            if (judgeMessage.getResponseBody().getStatus().equals("ClientError")) {  //客户端格式错误
                List solveTestPoints = solveTestPointMapper.selectSolvePointResultBySolveId(solveId);
                for (SolveTestPoint solveTestPoint : solveTestPoints) {
                    solveTestPoint.setDetailOfError(message);
                    solveTestPoint.setResult("CLIENT_ERROR");
                    solveTestPoint.setMemory(-1);
                    solveTestPoint.setTime(-1);
                    solveTestPointMapper.updateRowBySolveIdAndTestPointId(solveTestPoint);
                }
                solve.setPass(false);
                solve.setResult("CLIENT_ERROR");
                if (!solveService.adminUpdateSolveRecordStatus(solve)) {
                    //更新失败回退状态
                    solveService.adminDeleteSolveRecord(solve.getId());
                }
            } else if (judgeMessage.getResponseBody().getStatus().equals("InternalError")) { //判题端错误
                List solveTestPoints = solveTestPointMapper.selectSolvePointResultBySolveId(solveId);
                for (SolveTestPoint solveTestPoint : solveTestPoints) {
                    solveTestPoint.setDetailOfError(message);
                    solveTestPoint.setResult("INTERNAL_ERROR");
                    solveTestPoint.setMemory(-1);
                    solveTestPoint.setTime(-1);
                    solveTestPointMapper.updateRowBySolveIdAndTestPointId(solveTestPoint);
                }
                solve.setPass(false);
                solve.setResult("INTERNAL_ERROR");
                if (!solveService.adminUpdateSolveRecordStatus(solve)) {
                    //更新失败回退状态
                    solveService.adminDeleteSolveRecord(solve.getId());
                }
            } else { //正常情况
                body = JSON.parseObject(judgeMessage.getResponseBody().getBody(), JudgeMessage.Body.class);
                boolean partlySolved = false;
                //遍历多测试点结果并保存
                for (Map.Entry res : body.getPointJudgeResponseMap().entrySet()) {
                    SolveTestPoint solveTestPoint = new SolveTestPoint();
                    solveTestPoint.setSolveId(solveId);
                    solveTestPoint.setTestPointId(res.getKey());
                    String result = res.getValue().getJudgeResult();
                    solveTestPoint.setResult(result);

                    if (result.equals("ACCEPT")) {
                        JudgeMessage.RunningDetail detail = res.getValue().getRunningDetails();
                        solveTestPoint.setMemory(detail.getMemory());
                        solveTestPoint.setTime(detail.getTime());
                        partlySolved = true;
                    } else {
                        solveTestPoint.setMemory(-1);
                        solveTestPoint.setTime(-1);
                        if (res.getValue().getDetailOfError() != null) {
                            solveTestPoint.setDetailOfError(res.getValue().getDetailOfError().toString());
                        }
                        solve.setPass(false);
                    }
                    solveTestPointMapper.updateRowBySolveIdAndTestPointId(solveTestPoint);
                }
                if (solve.isPass()) {
                    solve.setResult("ACCEPT");
                } else {
                    if (partlySolved) {
                        solve.setResult("PARTLY_ACCEPT");
                    } else {
                        solve.setResult("WRONG_ANSWER");
                    }
                }
                //保存最终结果
                if (!solveService.adminUpdateSolveRecordStatus(solve)) {
                    //更新失败回退状态
                    solveService.adminDeleteSolveRecord(solve.getId());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

使用异步消息接受机制,减少了服务器的压力,更便于逻辑的实现

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/450078.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号