栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

腾讯CMQ消息处理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

腾讯CMQ消息处理

CMQ延迟消息处理方式

针对延迟消息的时间满足不了需求的情况 , 把用户冻结的过期时间进行处理

用户的冻结时间未到期把消息进行重复投递回队列中 , 一直未过期就一直投递 , 直到用户冻结时间一过期把mysql库中的用户状态改为认证通过的状态

取消redis缓存作为判断冻结时间到期的依据, 自定义的消息体中增加到期时间字段 , 这样在消费的时候就知道其冻结的到期时间是多少, 如果当前消费的时间是在过期时间之前那么重复投递消息, 重复投递又有2个情况, 时间间隔超过1小时,按照1小时的过期时间投递, 如果不超过1小时的时间按照时间差进行投递, 如果是到了过期时间或者过期时间之后, 那么修改数据库的用户冻结状态, 发送解冻的短信给用户

实现方式 :
CMQ 参数配置类

package com.tencent.iov.userservice.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;

import java.time.Duration;


@Data
@Component
@ConfigurationProperties(prefix = "cmq")
@RefreshScope
public class CmqConfig {

    
    private HttpConf httpConf;


    
    private FrozenDelayQueueSetting frozenDelayQueueSetting;


    @Data
    public static class HttpConf {

        private String secretId;

        private String secretKey;

        private String endpoint;

        private String path;

        private String method;

    }


    @Data
    public static class FrozenDelayQueueSetting {

        private String queueName;

        private int consumerThreadCount;

        private Duration delayTime;
    }


}

基础功能类(基类)

package com.tencent.iov.userservice.cmq;

import com.ruqimobility.common.dtos.baseResponse;
import com.ruqimobility.user.enummeta.UserResultEnum;
import com.tencent.iov.parent.cmq.Account;
import com.tencent.iov.parent.cmq.Queue;
import com.tencent.iov.parent.cmq.Subscription;
import com.tencent.iov.parent.cmq.Topic;
import com.tencent.iov.userservice.config.CmqConfig;
import com.tencent.iov.userservice.util.ResponseUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;


@Slf4j
@DependsOn(value = {"cmqConfig"})
@Service
public class baseMqService {

    @Resource
    private CmqConfig cmqConfig;

    public baseResponse delMsgPerReq(String queueRealName, String receiptHandle) {
        Account account = createAccount();
        Queue queue = account.getQueue(queueRealName);
        if (null == queue) {
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
        try {
            queue.deleteMessage(receiptHandle);
            return ResponseUtils.success(Boolean.TRUE);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("del msg fail. queue:" + queueRealName + " | receiptHandle:" + receiptHandle + " | e:{}", e);
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
    }

    
    public baseResponse batchDelMsgPerReq(String queueRealName, List receiptHandleList) {
        if (CollectionUtils.isEmpty(receiptHandleList)) {
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
        Account account = createAccount();
        Queue queue = account.getQueue(queueRealName);
        if (null == queue) {
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
        try {
            queue.batchDeleteMessage(receiptHandleList);
            return ResponseUtils.success(Boolean.TRUE);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("del msg fail. queue:" + queueRealName + " | e:{}", e);
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
    }

    protected Account createAccount() {
        return new Account(cmqConfig.getHttpConf().getSecretId(),
                cmqConfig.getHttpConf().getSecretKey(),
                cmqConfig.getHttpConf().getEndpoint(),
                cmqConfig.getHttpConf().getPath(),
                cmqConfig.getHttpConf().getMethod());
    }

    public CmqConfig.FrozenDelayQueueSetting getFrozenDelayQueueSetting() {
        return cmqConfig.getFrozenDelayQueueSetting();
    }

    public void createSubscribe(String topicName, String subscriptionName, String Endpoint, String Protocal) throws Exception {
        createAccount().createSubscribe(topicName, subscriptionName, Endpoint, Protocal);
    }

    public Queue getQueue(String queue) {
        return createAccount().getQueue(queue);
    }

    public Topic getTopic(String topicName) {
        return createAccount().getTopic(topicName);
    }

    public Subscription getSubscription(String topicName, String subscriptionName) {
        return createAccount().getSubscription(topicName, subscriptionName);
    }

}

cmq消费类

package com.tencent.iov.userservice.cmq;

import com.alibaba.fastjson.JSON;
import com.ruqimobility.common.dtos.baseResponse;
import com.ruqimobility.user.enummeta.UserResultEnum;
import com.tencent.iov.parent.cmq.Account;
import com.tencent.iov.parent.cmq.CMQServerException;
import com.tencent.iov.parent.cmq.Message;
import com.tencent.iov.parent.cmq.Queue;
import com.tencent.iov.userservice.util.ResponseUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.Optional;


@Slf4j
@Service
public class CmqConsumerService extends baseMqService {

    public Optional consumerMsg(String queueName) {
        Queue queue = super.getQueue(queueName);
        if (null == queue) {
            return Optional.empty();
        }
        Message message = null;
        try {
            message = queue.receiveMessage();
            return Optional.ofNullable(message);
        } catch (CMQServerException e) {
            return Optional.empty();
        } catch (Exception e1) {
            log.error("consume cmq msg fail. queue:" + queueName + " | message:" + JSON.toJSONString(message) + " | e:{}", e1);
            return Optional.empty();
        }
    }

    
    public baseResponse consumerMsgPerReq(String queueRealName) {
        Account account = super.createAccount();
        Queue queue = account.getQueue(queueRealName);
        if (null == queue) {
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
        Message message = null;
        try {
            message = queue.receiveMessage();
            return ResponseUtils.success(message);
        } catch (CMQServerException e1) {
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("consume msg fail. queue:" + queueRealName + " | message:" + JSON.toJSONString(message) + " | e:{}", e);
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
    }

    
    public baseResponse> batchConsumerMsgPerReq(String queueRealName,
                                                              int numOfBatchMsgPerPolling) {
        if (numOfBatchMsgPerPolling > 16) {
            log.info("error");
        }
        Account account = super.createAccount();
        Queue queue = account.getQueue(queueRealName);
        if (null == queue) {
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
        List messageList = Collections.emptyList();
        try {
            messageList = queue.batchReceiveMessage(numOfBatchMsgPerPolling);
        } catch (CMQServerException e1) {
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        } catch (Exception e) {
            log.error("consume msg fail. queue:" + queueRealName + " | message:" + JSON.toJSONString(messageList) + " | e:{}", e);
           return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
        return ResponseUtils.success(messageList);
    }


}

cmq生产者类

package com.tencent.iov.userservice.cmq;

import com.ruqimobility.common.dtos.baseResponse;
import com.ruqimobility.user.enummeta.UserResultEnum;
import com.tencent.iov.parent.cmq.Account;
import com.tencent.iov.parent.cmq.CMQServerException;
import com.tencent.iov.parent.cmq.Queue;
import com.tencent.iov.userservice.util.ResponseUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.List;


@Slf4j
@Service
public class CmqProducerService extends baseMqService {

    public baseResponse sendToQueue(String queueName, String msgBody, int delay) {
        try {
            Queue queue = super.getQueue(queueName);
            if (null == queue) {
                return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
            }
            if (delay > 0) {
                queue.sendMessage(msgBody, delay);
            } else {
                queue.sendMessage(msgBody);
            }
            return ResponseUtils.success(Boolean.TRUE);
        } catch (CMQServerException e1) {
            log.error("CMQServerException. error:{}", e1);
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("send msg fail. queue:" + queueName + " | msgBody:+" + msgBody + " | e:{}", e);
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
    }

    
    public baseResponse sendToQueuePerReq(String queueRealName, String msgBody, int delay) {
        try {
            Account account = super.createAccount();
            Queue queue = account.getQueue(queueRealName);
            if (null == queue) {
                return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
            }
            if (delay > 0) {
                queue.sendMessage(msgBody, delay);
            } else {
                queue.sendMessage(msgBody);
            }
            return ResponseUtils.success(Boolean.TRUE);
        } catch (CMQServerException e1) {
            log.error("CMQServerException. error:{}", e1);
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("send msg fail. queue:" + queueRealName + " | msgBody:+" + msgBody + " | e:{}", e);
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
    }

    
    public baseResponse batchSendToQueuePerReq(String queueRealName, List msgBody, int delay) {
        try {
            Account account = super.createAccount();
            Queue queue = account.getQueue(queueRealName);
            if (null == queue) {
                return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
            }
            if (delay > 0) {
                queue.batchSendMessage(msgBody, delay);
            } else {
                queue.batchSendMessage(msgBody);
            }
            return ResponseUtils.success(Boolean.TRUE);
        } catch (CMQServerException e1) {
            log.error("CMQServerException. error:{}", e1);
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("send msg fail. queue:" + queueRealName + " | msgBody:+" + msgBody + " | e:{}", e);
            return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR);
        }
    }

}

冻结消费service

package com.tencent.iov.userservice.cmq;

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruqimobility.common.dtos.baseResponse;
import com.ruqimobility.user.enummeta.SrAuthEnum;
import com.ruqimobility.user.enummeta.UserRoleEnum;
import com.tencent.iov.parent.cmq.Message;
import com.tencent.iov.userservice.config.CmqConfig;
import com.tencent.iov.userservice.dao.UserMapper;
import com.tencent.iov.userservice.dto.message.UserRoleFrozenMsg;
import com.tencent.iov.userservice.model.User;
import com.tencent.iov.userservice.service.UserRoleFrozenService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Optional;
import java.util.concurrent.Executor;


@Slf4j
@Component
public class FrozenConsumerService extends CmqConsumerService implements ApplicationRunner {

    private static boolean CONSUMER_RUNNING = true;

    @Resource(name = "taskAsyncPool")
    private Executor taskExecutor;

    @Resource
    private UserRoleFrozenService userRoleFrozenService;

    @Resource
    private FrozenCmqProducerService frozenCmqProducerService;

    @Resource
    private UserMapper userMapper;

    public void startUp() {
        CmqConfig.FrozenDelayQueueSetting setting = super.getFrozenDelayQueueSetting();
        // 队列名称
        String queueName = setting.getQueueName();
        // 消费线程数
        int queueNum = setting.getConsumerThreadCount();

        for (int i = 0; i < queueNum; i++) {
            // 创建任务
            UnFrozenUserTask unFrozenUserTask = new UnFrozenUserTask(queueName, this, userRoleFrozenService);
            // executor执行
            taskExecutor.execute(unFrozenUserTask);
        }
        //注册钩子
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("Order DispatchOrder The hook running...");
            //第三步:调用停机处理
            stop();
        }));
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("=========  consumer frozen queue message start ========= ");
        startUp();
    }

    @PreDestroy
    public void stop() {
        log.info("stop UnFrozenUser task consumer");
        CONSUMER_RUNNING = false;
    }


    public class UnFrozenUserTask implements Runnable {
        private String queueName;
        private FrozenConsumerService consumerService;
        private UserRoleFrozenService userRoleFrozenService;

        public UnFrozenUserTask(String queueName,
                                FrozenConsumerService consumerService,
                                UserRoleFrozenService userRoleFrozenService) {
            this.queueName = queueName;
            this.consumerService = consumerService;
            this.userRoleFrozenService = userRoleFrozenService;
        }

        
        @Override
        public void run() {
            // 消费解冻逻辑
            while (CONSUMER_RUNNING) {
                // 消费消息
                Optional response = consumerService.consumerMsg(queueName);
                if (!response.isPresent()) {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        log.error("consumer order status cmq thread sleep ex: {}", e);
                    }
                    // 消息为空continue 重复消费
                    continue;
                }
                //获取消息体
                String msgBody = response.get().msgBody;
                log.info("get the userRoleFrozenMsg message body from cmq: {}", msgBody);
                try {
                    UserRoleFrozenMsg userRoleFrozenMsg = new ObjectMapper().readValue(msgBody, UserRoleFrozenMsg.class);
                    User user = userMapper.getById(userRoleFrozenMsg.getUserId());
                    // 处于冻结状态时,判断是否冻结时间是否已过
                    if (user.getSrAuth() == SrAuthEnum.FROZEN.code) {
                        Timestamp now = new Timestamp(System.currentTimeMillis());
                        Timestamp endTime = userRoleFrozenMsg.getFrozenEndTime();
                        // 在结束时间之前继续投递
                        if (now.before(endTime)) {
                            long interval = (endTime.getTime() - now.getTime()) / 1000;
                            // 间隔超过1小时时 , 按1小时投递延迟消息
                            if (interval > FrozenCmqProducerService.MAX_DELAY_SECOND) {
                                baseResponse resp = frozenCmqProducerService.sendMessage(JSON.toJSONString(userRoleFrozenMsg));
                                log.info("Deliver messages more than 1 hour interval");
                            } else {
                                // 1小时内 , 按照时间差投递
                                baseResponse baseResponse = frozenCmqProducerService.sendMessage(JSON.toJSONString(userRoleFrozenMsg), (int) interval);
                                log.info("Deliver duplicate delivery messages within 1 hour");
                            }
                        } else if (now.after(endTime)) {
                            // 在结束时间之后, 修改用户实名认证状态
                            userRoleFrozenService.autoUnFrozenUserRole(user.getUserId(), UserRoleEnum.SFC_USER);
                        }
                    }
                    // 删除此次消费的消息
                    baseResponse delMsgPerReq = consumerService.delMsgPerReq(queueName, response.get().receiptHandle);
                    log.info("sr_auth equals 1 message delete userRoleFrozenMsg from cmq msg: {} response: {}", msgBody, delMsgPerReq);
                } catch (IOException e) {
                    log.error("userRoleFrozenMsg message:{} body parsing error:{}", msgBody, e);
                }
            }
        }
    }


}


冻结生产service

package com.tencent.iov.userservice.cmq;

import com.ruqimobility.common.dtos.baseResponse;
import com.tencent.iov.userservice.config.CmqConfig;
import com.tencent.iov.userservice.util.ResponseUtils;
import org.springframework.stereotype.Service;

import java.util.List;


@Service
public class FrozenCmqProducerService extends CmqProducerService {

    
    public static Integer MAX_DELAY_SECOND = 3600;


    
    public baseResponse sendMessage(String msgBody) {
        CmqConfig.FrozenDelayQueueSetting setting = super.getFrozenDelayQueueSetting();
        Long seconds = setting.getDelayTime().getSeconds();
        return super.sendToQueue(setting.getQueueName(), msgBody, seconds.intValue());
    }

    
    public baseResponse batchSendMessage(List msgBodyList) {
        CmqConfig.FrozenDelayQueueSetting setting = super.getFrozenDelayQueueSetting();
        Long seconds = setting.getDelayTime().getSeconds();
        return super.batchSendToQueuePerReq(setting.getQueueName(), msgBodyList, seconds.intValue());
    }

    
    public baseResponse sendMessage(String msgBody, int delay) {
        CmqConfig.FrozenDelayQueueSetting setting = super.getFrozenDelayQueueSetting();
        if (delay > MAX_DELAY_SECOND) {
            return ResponseUtils.success(Boolean.FALSE);
        }
        return super.sendToQueue(setting.getQueueName(), msgBody, delay);
    }

    
    public baseResponse batchSendMessage(List msgBodyList, int delay) {
        CmqConfig.FrozenDelayQueueSetting setting = super.getFrozenDelayQueueSetting();
        if (delay > MAX_DELAY_SECOND) {
            return ResponseUtils.success(Boolean.FALSE);
        }
        return super.batchSendToQueuePerReq(setting.getQueueName(), msgBodyList, delay);
    }
}

踩坑记录
会有重复消费的问题, cmq的消息体消费之后不会被删除,需要使用者自己删除, 所以每次拉取的消费信息必须进行删除

// 删除此次消费的消息
                    baseResponse delMsgPerReq = consumerService.delMsgPerReq(queueName, response.get().receiptHandle);
                    log.info("sr_auth equals 1 message delete userRoleFrozenMsg from cmq msg: {} response: {}", msgBody, delMsgPerReq);

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/305770.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号