RabbitMQ的5种运行模式注意:同一个交换机或队列的声明位置可以在消费者,也可以在生产者,也可以重复声明,如果使用了SpringBoot整合后,就可统一配置到配置类中
- 简单队列模式
- 使用默认交换机1个生产者+1个消费者
- 使用默认交换机1个生产者,多个消费者单个消息只能被1个消费者消费多个消息默认是类似轮询的方式发给多个消费者
- 生产者数量>=1,消费者数量>1每一个消费者都有自己的一个队列生产者发送的消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的
- 同订阅模式,区别是:不再广播,而是根据routingKey路由到指定的消费者多个队列的routingKey可以是一样的,这样就变成了fanout
- 同路由模式,区别是:routingKey添加了通配符概念*可以代替一个单词,#可以替代零个或多个单词
- 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)第一个单词是 lazy 的多个单词 (lazy.#)
参考:RabbitMQ的五种工作模式
简单队列模式worker模式也可以用此代码测试生产者核心代码:
//获取信道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
消费者核心代码:
Channel channel = connection.createChannel(); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);发布-订阅模式(fanout)
//生产者核心代码 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); channel.basicPublish(EXCHANGE_NAME, "", null, message); //消费者核心代码 channel.queueBind(queueName, EXCHANGE_NAME, "");//fanout模式,和routingKey无关 channel.basicConsume(queueName, true, deliverCallback, cancelCallback);路由模式(direct)
//生产者核心代码 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message); //消费者核心代码 channel.queueBind(queueName, EXCHANGE_NAME, "error");//指定routingKey=error channel.basicConsume(queueName, true, deliverCallback, cancelCallback);主题模式(topic)
//生产者核心代码 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message); //消费者核心代码 channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); channel.basicConsume(queueName, true, deliverCallback, cancelCallback);路由模式——SpringBoot方式整合
配置类
@Bean
public DirectExchange directExchange() {
// 支持持久化
return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
}
@Bean
public Queue test1Queue() {
// 支持持久化
return new Queue(QUEUE_TEST1, true);
}
@Bean
public Binding test1Binding(DirectExchange directExchange, Queue test1Queue) {
//这样写spring会自动按名称注入
return BindingBuilder.bind(test1Queue).to(directExchange).with(ROUTING_KEY_TEST1);//指定routingKey
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPublisher/confirm/iType(CachingConnectionFactory./confirm/iType.SIMPLE);//设置发布确认类型
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
生产者rabbitTemplate还有很多send方法,如:
sendAndReceive();convertAndSend();convertSendAndReceive();convertSendAndReceiveAsType();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化
Message message = new Message("你好,这是发给test1队列的消息".getBytes(), messageProperties);
rabbitTemplate.send(DIRECT_EXCHANGE_NAME, ROUTING_KEY_TEST1, message);
消费者
@RabbitListener(queues = QUEUE_TEST1)
public void consumeMessage1(Message message) {
System.out.println("test1队列收到:======" + new String(message.getBody(), StandardCharsets.UTF_8));
}
完整代码
完整代码:GitHub



