栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【RabbitMQ】消息传递模型

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【RabbitMQ】消息传递模型

引言

生产者是发送消息的用户程序。队列是存储消息的缓冲区。使用者是接收消息的用户应用程序。RabbitMQ消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,很多时候生产者甚至根本不知道消息是否被传递到任何队列。
相反,生产者只能向交换器发送消息。交换是一件很简单的事情。它一边接收来自生产者的消息,另一边将消息推送到队列。交换器必须确切的知道如何处理它接收到的消息,比如是附加到一个特定的队列还是多个队列或者被丢弃。它的规则由交换类型定义。
交换类型:direct 直连,topic 主题, headers 头部信息,fanout 广播。

一、fanout 广播模式

广播模式类似发布订阅。它的消息可以被多个队列同时接收。
广播模式的特点:

  • 不会处理路由键。只是简单的将队列绑定到交换机上,所以它的处理速度是最快的。
  • 发送到交换机上的消息会被发送到多个队列上。
1.1 广播模式测试

准备几个队列和交换机,然后将队列绑定到交换机上。然后把消息发送到交换机上。消息接收者也要进行修改,监听两个队列。

@Configuration
public class RabbitMQConfig {

    private static final String QUEUE01 = "queue_fanout01";
    private static final String QUEUE02 = "queue_fanout02";
    private static final String EXCHANGE = "fanoutExchange";

    @Bean
    public Queue queue(){
        return new Queue("queue", true);
    }

    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01);
    }

    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02);
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE);
    }

    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue01()).to(fanoutExchange());
    }

    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue02()).to(fanoutExchange());
    }
}


@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object msg){
        log.info("发送消息:" + msg);
        rabbitTemplate.convertAndSend("fanoutExchange",  "", msg);

    }
}


@Service
@Slf4j
public class MQReceiver {

    @RabbitListener(queues = "queue")
    public void receive(Object msg){
        log.info("接收消息:" + msg);
    }

    @RabbitListener(queues = "queue_fanout01")
    public void receive01(Object msg){
        log.info("QUEUE01接收消息:" + msg);
    }

    @RabbitListener(queues = "queue_fanout02")
    public void receive02(Object msg){
        log.info("QUEUE02接收消息:" + msg);
    }
}
1.2 测试结果

登录RabbitMQ远程界面,看到已经出现Exchange和两个queue。


发送消息后,两个队列均收到了数据。

二、direct 直连模式

所有发送到交换机上的消息,都会被转发到路由键中指定的队列。这个模式可以使用RabbitMQ自带的交换机。

2.1 直连模式测试

修改rabbitMQ配置类

//direct
private static final String QUEUE01 = "queue_direct01";
private static final String QUEUE02 = "queue_direct02";
private static final String EXCHANGE = "directExchange";
private static final String ROUTINGKEY01 = "queue.red";
private static final String ROUTINGKEY02 = "queue.green";

@Bean
public Queue queue01(){
    return new Queue(QUEUE01);
}

@Bean
public Queue queue02(){
    return new Queue(QUEUE02);
}
@Bean
public DirectExchange directExchange(){
    return new DirectExchange(EXCHANGE);
}

@Bean
public Binding binding01(){
    //return BindingBuilder.bind(queue01()).to(fanoutExchange());
    return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
}

@Bean
public Binding binding02(){
    //return BindingBuilder.bind(queue02()).to(fanoutExchange());
    return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
}

修改消息发送者和接收者

public void send01(Object msg){
    log.info("发送red消息:" + msg);
    rabbitTemplate.convertAndSend("directExchange", "queue.red", msg);

}

public void send02(Object msg){
    log.info("发送green消息:" + msg);
    rabbitTemplate.convertAndSend("directExchange", "queue.green", msg);

}
@RabbitListener(queues = "queue_direct01")
public void receive03(Object msg){
    log.info("QUEUE01接收消息:" + msg);
}

@RabbitListener(queues = "queue_direct02")
public void receive04(Object msg){
    log.info("QUEUE02接收消息:" + msg);
}

控制层发送逻辑

@RequestMapping("/mq/direct01")
@ResponseBody
public void mq02(){
    mqSender.send01("hello,red");
}


@RequestMapping("/mq/direct02")
@ResponseBody
public void mq03(){
    mqSender.send02("hello,green");
}
2.2 测试结果

管控台已经多了交换机以及绑定的队列和路由键


发送消息给red队列。



发送给green队列也一样。

三、topic 主题模式

topic模式的路由键有通配符,较为方便。

* 匹配的精确的一个值.#匹配的是0个或多个。
路由键一个都匹配不上的话,会丢弃这个消息。

3.1 主题模式测试

修改RabbitMQ配置类

//topic
private static final String QUEUE01 = "queue_topic01";
private static final String QUEUE02 = "queue_topic02";
private static final String EXCHANGE = "topicExchange";
private static final String ROUTINGKEY01 = "#.queue.#";
private static final String ROUTINGKEY02 = "*.queue.#";

//topic
@Bean
public TopicExchange topicExchange(){
    return new TopicExchange(EXCHANGE);
}

@Bean
public Binding binding01(){
    //return BindingBuilder.bind(queue01()).to(fanoutExchange());
    //return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
    return BindingBuilder.bind(queue01()).to(topicExchange()).with(ROUTINGKEY01);
}

@Bean
public Binding binding02(){
    //return BindingBuilder.bind(queue02()).to(fanoutExchange());
    //return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
    return BindingBuilder.bind(queue02()).to(topicExchange()).with(ROUTINGKEY02);
    }

修改发送者、接收者

public void send03(Object msg){
    log.info("发送消息(QUEUE1):" + msg);
    rabbitTemplate.convertAndSend("topicExchange", "queue.red.message", msg);
}

public void send04(Object msg){
    log.info("发送消息(QUEUE1,QUEUE2):" + msg);
    rabbitTemplate.convertAndSend("topicExchange", "msg.queue.green", msg);
}
@RabbitListener(queues = "queue_topic01")
public void receive05(Object msg){
    log.info("QUEUE01接收消息:" + msg);
}

@RabbitListener(queues = "queue_topic02")
public void receive06(Object msg){
    log.info("QUEUE01,QUEUE02接收消息:" + msg);
}

修改控制层逻辑


@RequestMapping("/mq/topic01")
@ResponseBody
public void mq04(){
    mqSender.send03("hello,topic01");
}

@RequestMapping("/mq/topic02")
@ResponseBody
public void mq05(){
    mqSender.send04("hello,topic02");
}
3.2 测试结果

控制台信息:


发送消息:


四、Headers 头部信息

这种模式写起来代码比较麻烦,且效率不是很高,现在不怎么用。
项目里一般都用topic模式。

4.1 头部信息测试

修改RabbitMQ配置类

//headers
private static final String QUEUE01 = "queue_header01";
private static final String QUEUE02 = "queue_header02";
private static final String EXCHANGE = "headersExchange";

@Bean
public HeadersExchange headersExchange(){
    return new HeadersExchange(EXCHANGE);
}

@Bean
public Binding binding01(){
    //return BindingBuilder.bind(queue01()).to(fanoutExchange());
    //return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
    //return BindingBuilder.bind(queue01()).to(topicExchange()).with(ROUTINGKEY01);
    Map map = new HashMap<>();
    map.put("color", "red");
    map.put("speed", "slow");
    return BindingBuilder.bind(queue01()).to(headersExchange()).whereAny(map).match();
}

@Bean
public Binding binding02(){
    //return BindingBuilder.bind(queue02()).to(fanoutExchange());
    //return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
    //return BindingBuilder.bind(queue02()).to(topicExchange()).with(ROUTINGKEY02);
    Map map = new HashMap<>();
    map.put("color", "red");
    map.put("speed", "fast");
    return BindingBuilder.bind(queue02()).to(headersExchange()).whereAll(map).match();
}

修改发送者、接收者

public void send05(String msg){
    log.info("发送消息(QUEUE1,QUEUE2):" + msg);
    MessageProperties properties = new MessageProperties();
    properties.setHeader("color", "red");
    properties.setHeader("speed", "fast");
    Message message = new Message(msg.getBytes(), properties);
    rabbitTemplate.convertAndSend("headersExchange", "", message);
}

public void send06(String msg){
    log.info("发送消息(QUEUE1):" + msg);
    MessageProperties properties = new MessageProperties();
    properties.setHeader("color", "red");
    properties.setHeader("speed", "normal");
    Message message = new Message(msg.getBytes(), properties);
    rabbitTemplate.convertAndSend("headersExchange", "", message);
}
@RabbitListener(queues = "queue_header01")
public void receive07(Message msg){
    log.info("QUEUE01,QUEUE02接收Message对象:" + msg);
    log.info("QUEUE01,QUEUE02接收消息:" + new String(msg.getBody()));
}

@RabbitListener(queues = "queue_header02")
public void receive08(Message msg){
    log.info("QUEUE01接收Message对象:" + msg);
    log.info("QUEUE01接收消息:" + new String(msg.getBody()));
}

修改控制层逻辑

@RequestMapping("/mq/header0102")
@ResponseBody
public void mq06(){
    mqSender.send05("hello,header01,header02");
}


@RequestMapping("/mq/header01")
@ResponseBody
public void mq07(){
    mqSender.send06("hello,header01");
}
4.2 测试结果

RabbitMQ控制台信息

发送消息:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/881441.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号