栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot整合使用RabbitMQ四种模式

SpringBoot整合使用RabbitMQ四种模式

SpringBoot整合使用RabbitMQ四种模式

目录

SpringBoot整合使用RabbitMQ四种模式

一、前期准备

1.所需依赖2.配置文件 二、四种模式演示

1.fanout(广播)模式

1.1 介绍1.2 使用方法 2.direct模式

2.1 介绍2.2 使用方法 3.topic模式

3.1 介绍3.2 使用方法 4.headers模式

4.1 介绍4.2 使用方法

一、前期准备 1.所需依赖

    org.springframework.boot
    spring-boot-starter-amqp

2.配置文件

​ 由于不同的模式,使用的配置文件有所不同,下面会根据不同的模式来编写不同的配文件

二、四种模式演示 1.fanout(广播)模式 1.1 介绍

​ fanout模式是一种不需要路由的模式,顾名思义,广播是一方发送,所有人都可以听到的。

​ 发送者将消息发送给交换机之后,交换机将消息发送给所有的消息队列,然后消费者可以从所有的消息队列中取到这个消息。

1.2 使用方法

配置文件

//注意,下面所有需要导入的包均是org.springframework.amqp下的



@Configuration
public class RabbitMQConfigFanout {

    //两个消息队列名称
    private static final String QUEUE01="queue_fanout01";
    private static final String QUEUE02="queue_fanout02";
    //交换机名称,也可不定义交换机,会使用默认的交换机
    private static final String EXCHANGE="exchange_fanout";

    //定义消息队列01
    @Bean
    public Queue queue01(){
        return new Queue(QUEUE01);
    }
	//定义消息队列01
    @Bean
    public Queue queue02(){
        return new Queue(QUEUE02);
    }

    //定义fanout交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE);
    }
	
    //绑定交换机与消息队列queue01
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue01()).to(fanoutExchange());
    }
	
    //绑定交换机与消息队列queue01
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue02()).to(fanoutExchange());
    }

}

发送者

@Slf4j
public class MQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object msg){
        log.info("发送消息"+msg);
        //参数依次是:交换机,路由,消息
        rabbitTemplate.convertAndSend("fanoutExchange","",msg);
    }
}

接收者

@Slf4j
public class MQReceiver {

    @RabbitListener(queues = "queue")
    public void receive(Object msg){
        log.info("接收消息:"+msg);
    }
}
2.direct模式 2.1 介绍

​ direct模式是直接模式,这个是需要固定路由的。

​ 发送者将消息发送给交换机,交换机根据绑定的消息队列和路由发送给对应的队列,然后消费者在消息队列中获取消息。

2.2 使用方法

配置文件

//注意,下面所有需要导入的包均是org.springframework.amqp下的



@Configuration
public class RabbitMQConfigDirect {

    //两个消息队列
    private static final String queue_direct01="queue_direct01";
    private static final String queue_direct02="queue_direct02";
    //交换机
    private static final String exchange_direct="exchange_direct";
    //设置两个队列的路由
    private static final String ROUTINGKEY01="queue.red";
    private static final String ROUTINGKEY02="queue.green";


    @Bean
    public Queue direct_queue01(){
        return new Queue(queue_direct01);
    }

    @Bean
    public Queue direct_queue02(){
        return new Queue(queue_direct02);
    }

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(exchange_direct);
    }

    //注意:需要绑定路由
    @Bean
    public Binding direct_binding01(){
        return BindingBuilder.bind(direct_queue01()).to(directExchange()).with(ROUTINGKEY01);
    }

    @Bean
    public Binding direct_binding02(){
        return BindingBuilder.bind(direct_queue02()).to(directExchange()).with(ROUTINGKEY02);
    }

}

发送者

@Slf4j
public class MQReceiver {

   public void sendRed(Object msg){
        log.info("发送红色的消息:"+msg);
       //参数依次是:交换机,路由,消息
        rabbitTemplate.convertAndSend("exchange_direct","queue.red",msg);
    }

    public void sendGreen(Object msg){
        log.info("发送绿色消息:"+msg);
        rabbitTemplate.convertAndSend("exchange_direct","queue.green",msg);
    }
}

接收者

@Slf4j
public class MQReceiver {

   @RabbitListener(queues = "queue_direct01")
    public void receive03(Object msg){
        log.info("queue01接收消息"+msg);
    }

    @RabbitListener(queues = "queue_direct02")
    public void receive04(Object msg){
        log.info("queue02接收消息"+msg);
    }
}
3.topic模式 3.1 介绍

​ topic模式和direct模式类型,但topic模式的路由可以使用通配符,相遇于sql中的模糊查询,以下是两种通配符的介绍:

#:代表多个字符,或者代表没有字符*:只代表一个字符

​ 比如:red.green.queue.msg就可以被#.queue.#路由的消息队列接收;red.queue可以被*.queue接收

3.2 使用方法

配置文件

//注意,下面所有需要导入的包均是org.springframework.amqp下的



@Configuration
public class RabbitMQConfigTopic {

    private static final String queue_topic01="queue_topic01";
    private static final String queue_topic02="queue_topic02";
    private static final String queue_exchange="exchange_topic";
    //注意路由通配符的使用
    private static final String ROUTINGKEY01="#.queue.#";
    private static final String ROUTINGKEY02="*.queue.#";

    @Bean
    public Queue queue_topic01(){
        return new Queue(queue_topic01);
    }

    @Bean
    public Queue queue_topic02(){
        return new Queue(queue_topic02);
    }

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(queue_exchange);
    }

    @Bean
    public Binding binding_topic01(){
        return BindingBuilder.bind(queue_topic01()).to(topicExchange()).with(ROUTINGKEY01);
    }

    @Bean
    public Binding binding_topic02(){
        return BindingBuilder.bind(queue_topic02()).to(topicExchange()).with(ROUTINGKEY02);
    }

}

发送者

@Slf4j
public class MQReceiver {

   public void send01(Object msg){
        log.info("发送消息(queue01接收):"+msg);
        rabbitTemplate.convertAndSend("exchange_topic","queue.red.message",msg);
    }

    public void send02(Object msg){
        log.info("发送消息(被两个queue接收):"+msg);
        rabbitTemplate.convertAndSend("exchange_topic","message.queue.green",msg);
    }
}

接收者

@Slf4j
public class MQReceiver {

   @RabbitListener(queues = "queue_topic01")
    public void receive05(Object msg){
        log.info("queue01接收消息:"+msg);
    }

    @RabbitListener(queues = "queue_topic02")
    public void receive06(Object msg){
        log.info("queue02接收消息:"+msg);
    }
}
4.headers模式 4.1 介绍

​ 当相对信息进行两种路由限制的时候,发现前面的模式不好去做,那么就有了headers模式,headers模式没有路由的概念,而是有messageProperties,可以将一些限制放入properties键值对中。

4.2 使用方法

配置文件

//注意,下面所有需要导入的包均是org.springframework.amqp下的


@Configuration
public class RabbitMQConfigHeader {

    //注意:这块已经没有路由了
    private static final String queue_header01="queue_headers01";
    private static final String queue_header02="queue_headers02";
    private static final String exchange_header="exchange_headers";
    @Bean
    public Queue queue_header01(){
        return new Queue(queue_header01);
    }

    @Bean
    public Queue queue_header02(){
        return new Queue(queue_header02);
    }

    @Bean
    public HeadersExchange headersExchange(){
        return new HeadersExchange(exchange_header);
    }

    //注意:在绑定的时候,需要使用where...来进行限制绑定,
    @Bean
    public Binding binding01(){
        HashMap map = new HashMap<>();
        map.put("color","red");
        map.put("speed","slow");
        return 
            //whereAny是map中的任一个条件成立即可
            BindingBuilder.bind(queue_header01()).to(headersExchange()).whereAny(map).match();
    }

    @Bean
    public Binding binding02(){
        HashMap map = new HashMap<>();
        map.put("color","red");
        map.put("speed","fast");
        return 
            //whereAll是map中的所有条件都成立才可以
            BindingBuilder.bind(queue_header02()).to(headersExchange()).whereAll(map).match();
    }

}

发送者

@Slf4j
public class MQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

     public void send03(String msg){
        log.info("发送消息(被两个queue接收):"+msg);
        //headers模式需要使用MessageProperties添加属性
        MessageProperties properties = new MessageProperties();
        properties.setHeader("color","red");
        properties.setHeader("speed","fast");
        //发送message消息
        Message message = new Message(msg.getBytes(), properties);
        //将message加入,路由为空
        rabbitTemplate.convertAndSend("exchange_headers","",message);
    }

    public void send04(String msg){
        log.info("发送消息(被qeueue1接收):"+msg);
        //配置headers属性
        MessageProperties properties = new MessageProperties();
        properties.setHeader("color","red");
        properties.setHeader("speed","normal");
        //还要发送message消息
        Message message = new Message(msg.getBytes(), properties);
        //将message加入,路由为空
        rabbitTemplate.convertAndSend("exchange_headers","",message);
    }
}

接收者

@Slf4j
public class MQReceiver {

  @RabbitListener(queues = "queue_headers01")
    //注意,在这块接收就不是接收Object了,而是Message
    public void receive07(Message msg){
        log.info("queue01接收Message对象:"+msg);
        log.info("queue01接收消息:"+new String(msg.getBody()));
    }

    @RabbitListener(queues = "queue_headers02")
    public void receive08(Message msg){
        log.info("queue02接收Meaasge对象:"+msg);
        log.info("queue02接收的信息:"+new String(msg.getBody()));
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/722786.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号