栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ实现埋点录入

RabbitMQ实现埋点录入

1-导入依赖
 

    org.springframework.boot
    spring-boot-starter-amqp


    com.ruoyi
    ruoyi-system
2-配置RabbitMQ
spring:
     # rabbitmq配置
  rabbitmq:
    host: 127.0.0.1 
    port: 5672
    username: guest //默认
    password: guest //默认
    listener:
      type: direct
      direct:
        prefetch: 10 # 每次拉取的数量
        acknowledge-mode: manual # 手动ack模式
3-windows本地启动RabbitMQ安装方式


安装步骤:在这里

安装包:

 链接:https://pan.baidu.com/s/17U0kxqXGbl5tvLl6QBuH-Q 提取码:kzl0

注意:

  • RabbitMQ和erLang的版本关系要对应好  详情
  • RabbitMQ不要安装在空格多的路径下 我安装在D:Program FilesRabbitMQ Server
4-RabbitMQ工具类编写
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    
    public static final String RABBITMQ_TOPIC="rabbitmq.topic";


    @Bean
    public Queue HkQueue() {
        return new Queue(RABBITMQ_TOPIC , true);
    }
}
5-RabbitMQService编写
@Service
public class RabbitMQServiceImpl implements IRabbitMQService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    


    @Override
    public String sendMsg(String msg, object obj) throws Exception {
        try {
              //调用了自己的写的工具类,
            String msgId = IdUtils.fastUUID().replace("-", "").substring(0, 32);
            String sendTime = DateUtils.getTime();
            Map map = new HashMap();
            map.put("msgId", msgId);
            map.put("sendTime", sendTime);
            map.put("msg", msg);
            map.put("obj",obj);
            System.out.println("map"+map);
            
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_TOPIC, map);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}
6-Controller编写
@PostMapping("/insert")
public AjaxResult insert( ){
    try {
        //调用自己的方法
        byte[] userId=Md5Utils.md5(IpUtils.getHostIp());
        String id="";
        for(byte b:userId){
            id+=b;
        }
       
        String result = rabbitMQService.sendMsg("埋点录入",object);//object  传入的对象
        if(result.equals("ok")){
            return AjaxResult.success(result);
        }else{
            return AjaxResult.error("埋点录入rabbitMQ处理异常!");
        }
    } catch (Exception e) {
        e.printStackTrace();
        return AjaxResult.error("rabbitMQ处理异常!");
    }
}
7-消费者类编写
    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_HK_TOPIC))
    public void process(Map msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;

        try {
            //Object 要转换的对象
            int result = service.insert((Object) msg.get("obj"));
            if(result>0){
                System.out.println("消费者RabbitMqConsumer从RabbitMQ服务端消费消息:" + msg);
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印异常
            e2.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack,拒绝策略,消息重回队列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack,拒绝策略,并且从队列中删除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/282936.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号