首先要先部署好RabbitMQ
在spring中使用RabbitMQ使用的时AMQP统一规范,需要引入依赖
org.springframework.boot
spring-boot-starter-amqp
一、基本使用
推送端yml配置,virtual-host是配置rabbitmq的虚拟主机,类似于不同的服务器,这样当rabbitmq部署多个通道队列的时候,可以有效的进行隔离
spring:
rabbitmq:
host: 121.***.***.***
port: 5672
virtual-host: /
username: test
password: 123456
消费端yml配置,注意这里配置listener.simple.prefetch=1,意味着消费端每次只预取一个消息,消费过之后才能再次拉取消息
spring:
rabbitmq:
host: 121.***.***.***
port: 5672
virtual-host: /
username: test
password: 123456
listener:
simple:
prefetch: 1
(一)、消息队列方式一
1.推送端
推送端主要使用的是RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMsg")
public String sendMsg(){
String queueName = "simple.queue";
String message = "hello world";
rabbitTemplate.convertAndSend(queueName,message);
return "send success";
}
2.消费端
@Component
public class MyRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listeneQueue1(String msg) throws InterruptedException {
System.out.println("收到msg:"+msg);
}
}
*(二)、消息队列方式二
1.推送端
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMsg")
public String sendMsg(){
String queueName = "simple.queue";
String message = "hello world";
rabbitTemplate.convertAndSend(queueName,message);
return "send success";
}
2.消费端
@Component
public class MyRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listeneQueue1(String msg) throws InterruptedException {
System.out.println("Listener111收到msg:"+msg);
Thread.sleep(1000);
}
@RabbitListener(queues = "simple.queue")
public void listeneQueue21(String msg) throws InterruptedException {
System.out.println("Listener222收到msg:"+msg);
Thread.sleep(20);
}
}
*(三)、消息队列方式三
1.推送端
首先需要配置交换机并绑定队列
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
@Bean
public Binding fanoutBinding(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//这里是绑定第二个队列,和上一个重复
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
之后再编写消息推送代码
@RequestMapping("/sendMsg2")
public String sendMsg2(){
String fanoutExchange = "itcast.fanout";
String message = "fanout test";
rabbitTemplate.convertAndSend(fanoutExchange,"aaa",message);
return "send success";
}
2消费端
@Component
public class MyRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listeneQueue3(String msg) throws InterruptedException {
System.out.println("Listener333收到msg:"+msg);
}
@RabbitListener(queues = "fanout.queue2")
public void listeneQueue4(String msg) throws InterruptedException {
System.out.println("Listener444收到msg:"+msg);
}
}
*(四)、消息队列方式四
*(五)、消息队列方式五
*(六)、消息队列方式六



