- 导入maven依赖
org.springframework.amqp spring-rabbit
- 在application.yml中添加上rabbitmq的配置
spring:
rabbitmq:
virtual-host: /
host: localhost
username: guest
password: guest
port: 5672
# 消息确认配置项
# 确认消息发送到队列
publisher-returns: true
# 确认消息发送到交换机
publisher-/confirm/i-type: correlated
#调整监听使用手动
listener:
direct:
acknowledge-mode: manual
二,配置消息队列和交换机
根据场景选择下面配置模式
1.配置直连(一对一消费)
@Configuration
public class DirectRabbitConfig {
//返回一个队列
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
@Bean
public Queue TestDirectRabbit(){
return new Queue("operationLog",true,false,false,null);
}
//交换机起名
@Bean
DirectExchange TestDirectExchange(){
return new DirectExchange("TestDirectExchange",true,false);
}
//交换机和队列绑定,提供路由
@Bean
Binding BindingDirect(){
return BindingBuilder.bind(TestDirectRabbit()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
这种方式是,发送一个消息到消息队列,只会一个交换机消费一次
- TOPIC型
Topic型,当多个队列绑定到同一个交换机上时,根据routingkey来确定,与Direct不同的是,rountingkey有统配规则,例如topic#,表示所有前面是topic后面无论是什么,都会被捕获到,然后消费,达到了一个可以选择性直连或者广播的效果
@Configuration
public class TopicRabbitConfig {
final static String man="topic.man";
final static String woman="topic.woman";
//队列1
@Bean
public Queue getQueue1(){
return new Queue("TopicQueue1",true,false,false,null);
}
//队列2
@Bean
public Queue getQueue2(){
return new Queue("TopicQueue2",true,false,false,null);
}
//交换机
@Bean
public TopicExchange getTopicExchange(){
return new TopicExchange("TopicExchange");
}
//绑定
@Bean
public Binding getBinding1(){
return BindingBuilder.bind(getQueue1()).to(getTopicExchange()).with(man);
}
@Bean
public Binding getBinding2(){
return BindingBuilder.bind(getQueue2()).to(getTopicExchange()).with("topic.#");
}
}
- 广播模式
一个消息,可以被大家都消费一遍
@Configuration
public class FanoutRabbitConfig {
//队列一
@Bean
public Queue getFanoutQueue1(){
return new Queue("FanoutQueue1",true,false,false);
}
//队列二
@Bean
public Queue getFanoutQueue2(){
return new Queue("FanoutQueue2",true,false,false);
}
//队列三
@Bean
public Queue getFanoutQueue3(){
return new Queue("FanoutQueue3",true,false,false);
}
//交换机
@Bean
public FanoutExchange getFanoutExchange(){
return new FanoutExchange("FanoutExchange");
}
//绑定交换机与队列
@Bean
public Binding getFanoutBinding1(){
return BindingBuilder.bind(getFanoutQueue1()).to(getFanoutExchange());
}
//绑定交换机与队列
@Bean
public Binding getFanoutBinding2(){
return BindingBuilder.bind(getFanoutQueue2()).to(getFanoutExchange());
}
//绑定交换机与队列
@Bean
public Binding getFanoutBinding3(){
return BindingBuilder.bind(getFanoutQueue3()).to(getFanoutExchange());
}
}
三,监听消费
- 编写一个手动监听的功能类
@Component
public class MyListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag=" + deliveryTag);
//一次接收一条
channel.basicAck(1, false);
byte[] body = message.getBody();
String s = new String(body);
System.out.println("消费信息:" + s);
}
}
- 创建一个监听容器,把上面对象存入
@Configuration
public class SimpleMessageListenerConfig {
@Autowired
CachingConnectionFactory connectionFactory;
@Autowired
MyListener myListener;
@Bean
public SimpleMessageListenerContainer createSimpleMessageListenerContainer(){
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(connectionFactory);
//监听为手动处理消费
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置需要监听的队列名
container.setQueueNames("operationLog");
//将监听处理类加入容器
container.setMessageListener(myListener);
return container;
}
}
四,编写controller,发送消息给rabbitmq
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
//注入rabbitTemplate来发送消息
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/log2")
public void TopicRabbit1(){
Map map = new HashMap<>();
map.put("lmh", "测试666");
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TopicExchange", "topic.man",map);
System.out.println("上传日志===");
}
测试一下就好。



