1-导入依赖
org.springframework.boot spring-boot-starter-amqpcom.ruoyi ruoyi-system
2-配置RabbitMQ
spring:
# rabbitmq配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest //默认
password: guest //默认
listener:
type: direct
direct:
prefetch: 10 # 每次拉取的数量
acknowledge-mode: manual # 手动ack模式
3-windows本地启动RabbitMQ安装方式
安装步骤:在这里
安装包:
链接:https://pan.baidu.com/s/17U0kxqXGbl5tvLl6QBuH-Q 提取码:kzl0
注意:
- RabbitMQ和erLang的版本关系要对应好 详情
- RabbitMQ不要安装在空格多的路径下 我安装在D:Program FilesRabbitMQ Server
4-RabbitMQ工具类编写
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String RABBITMQ_TOPIC="rabbitmq.topic";
@Bean
public Queue HkQueue() {
return new Queue(RABBITMQ_TOPIC , true);
}
}
5-RabbitMQService编写
@Service
public class RabbitMQServiceImpl implements IRabbitMQService {
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public String sendMsg(String msg, object obj) throws Exception {
try {
//调用了自己的写的工具类,
String msgId = IdUtils.fastUUID().replace("-", "").substring(0, 32);
String sendTime = DateUtils.getTime();
Map map = new HashMap();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
map.put("obj",obj);
System.out.println("map"+map);
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_TOPIC, map);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
}
6-Controller编写
@PostMapping("/insert")
public AjaxResult insert( ){
try {
//调用自己的方法
byte[] userId=Md5Utils.md5(IpUtils.getHostIp());
String id="";
for(byte b:userId){
id+=b;
}
String result = rabbitMQService.sendMsg("埋点录入",object);//object 传入的对象
if(result.equals("ok")){
return AjaxResult.success(result);
}else{
return AjaxResult.error("埋点录入rabbitMQ处理异常!");
}
} catch (Exception e) {
e.printStackTrace();
return AjaxResult.error("rabbitMQ处理异常!");
}
}
7-消费者类编写
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_HK_TOPIC))
public void process(Map msg, Message message, Channel channel) {
long tag = message.getMessageProperties().getDeliveryTag();
Action action = Action.SUCCESS;
try {
//Object 要转换的对象
int result = service.insert((Object) msg.get("obj"));
if(result>0){
System.out.println("消费者RabbitMqConsumer从RabbitMQ服务端消费消息:" + msg);
}
} catch (IllegalArgumentException e1) {
e1.printStackTrace();
//根据异常的类型判断,设置action是可重试的,还是无需重试的
action = Action.RETRY;
} catch (Exception e2) {
//打印异常
e2.printStackTrace();
//根据异常的类型判断,设置action是可重试的,还是无需重试的
action = Action.REJECT;
} finally {
try {
if (action == Action.SUCCESS) {
//multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
channel.basicAck(tag, false);
} else if (action == Action.RETRY) {
//Nack,拒绝策略,消息重回队列
channel.basicNack(tag, false, true);
} else {
//Nack,拒绝策略,并且从队列中删除
channel.basicNack(tag, false, false);
}
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}



