栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq的几种模式(springboot整合rabbitmq)

rabbitmq的几种模式(springboot整合rabbitmq)

注: 代码demo 一、简单模式(单生产者单消费者):将消息直接发送到队列中,供消费者消费

1.创建队列

2.代码实现

1)添加依赖


   org.springframework.boot
   spring-boot-starter-amqp

2)消息生产者

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final String queue = "testqueue";
    // 发送字符串
    public void sendMessage() {
        rabbitTemplate.convertAndSend(queue, "this is a test message");
    }
    // 发送对象(将user转为json串的形式发送)
    @Override
    public void sendMessage(UserTest user) {
        rabbitTemplate.convertAndSend(queue, JSONObject.toJSONString(user));
    }

3)消息消费者

@Component
public class TestQueueListener {

    // 消费字符串
    @RabbitListener(queues = "testqueue")
    public void listen(String message) {
        System.err.println("this is provider1"+ message);
    }

    // 消费对象
    @RabbitListener(queues = "testqueue")
    public void listen(Message msg) {
        String s = new String(msg.getBody());
        UserTest userTest = JSONObject.parseObject(s, UserTest.class);
        System.err.println("this is listener2" + userTest);
    }
}

二、工作模式(单个生产者,多个消费者):多个消费者争抢资源,谁先抢到谁处理消息。

1.在上一个“简单模式”的基础上,再添加一个监听同一个队列的方法

@Component
public class Provider2Listener {
    
    @RabbitListener(queues = "testqueue")
    private void listener2(Message msg) {
        String s = new String(msg.getBody());
        UserTest userTest = JSONObject.parseObject(s, UserTest.class);
        System.err.println("this is listener 2 接收到的消息为:" + userTest);
    }
}

结果

1)消息发送者

2)消息消费者1

3)消息消费者2

三、发布、订阅模式(Publish/Subscribe)(消息发送到交换机,再由交换机发送到消息队列):生产者端发送消息,多个消费者同时接收所有的消息。

1.创建交换机

2.创建队列
1)创建testqueuqe1、testqueuqe2两个队列
2)将交换机与队列进行绑定(下图是创建testqueuqe1绑定过程、testqueuqe2绑定也是如此,不再演示)

2.代码实现

2.1)消息发送者(将消息发送到交换机)

    private final String exchange = "testexchange";
    
    public void sendExchangeMessage(UserTest user) {
        rabbitTemplate.convertAndSend(exchange, "", JSONObject.toJSONString(user));
    }

2.2)消息消费者

@RabbitListener(queues = "testqueuqe2")
public void testqueue2Listen(Message msg) {
    String s = new String(msg.getBody());
    UserTest userTest = JSONObject.parseObject(s, UserTest.class);
    System.err.println("this is listener 1 接收到的消息为" + userTest);
}

@RabbitListener(queues = "testqueuqe1")
private void testqueue1Listener2(Message msg) {
    String s = new String(msg.getBody());
    UserTest userTest = JSONObject.parseObject(s, UserTest.class);
    System.err.println("this is listener2" + userTest);
}


四、路由模式(消息发送到交换机,再由交换机发送到消息队列):生产者根据key发送消息,不同的消费者按不同的key接收消息

1.创建交换机

1).创建队列testqueuqe3、testqueuqe4
2).将队列和交换机进行绑定

2.代码实现

// 消息生产者
    private static final String exchange1 = "testexchange1";

    public void sendExchangeMessageByKey(UserTest user) {                 
        // exchange1代表交换机,a代表key
        rabbitTemplate.convertAndSend(exchange1, "a", JSONObject.toJSONString(user));
        }

交换机将消息发送到了testqueuqe3中


    // 消息消费者1
    @RabbitListener(queues = "testqueuqe3")
    public void testqueue2ListenByKey(Message msg) {
        String s = new String(msg.getBody());
        UserTest userTest = JSONObject.parseObject(s, UserTest.class);
        System.err.println("this is listener1" + userTest);
    }
    
    // 消息消费者2
    @RabbitListener(queues = "testqueuqe4")
    private void testqueue1Listener2ByKey(Message msg) {
        String s = new String(msg.getBody());
        UserTest userTest = JSONObject.parseObject(s, UserTest.class);
        System.err.println("this is listener2" + userTest);
    }

key为a,所以监听到testqueuqe3消息队列的消费者接收到消息,testqueuqe4未消费消息


五、通配符模式(消息发送到交换机,再由交换机发送到消息队列):生产者根据字符串匹配发送消息,不同的消费者按字符串匹配接收消息

  1. 创建队列

1).创建交换机

2).创建队列testqueuqe5、testqueuqe6

3).将交换机和队列绑定

    // 定义 * 号路由,仅能匹配一个单词
    private String logStart = "log.register";
    // 定义 # 号路由,能匹配0个或多个单词
    private String logHash = "log.register.error";

2.代码实现

    // 消息发送者
    // 定义 * 号路由,仅能匹配一个单词
    private String logStart = "log.register";

    // 定义 # 号路由,能匹配0个或多个单词
    private String logHash = "log.register.error";

    
    @Override
    public void sendExchangeMessageByTopic(UserTest user) {

        try {
            rabbitTemplate.convertAndSend(exchange2, logHash, JSONObject.toJSONString(user));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 消费者5
    @RabbitListener(queues = "testqueuqe5")
    public void testqueue2ListenByTopic(Message msg) {
        String s = new String(msg.getBody());
        UserTest userTest = JSONObject.parseObject(s, UserTest.class);
        System.err.println("this is listener1" + userTest);
    }

    // 消费者6
    @RabbitListener(queues = "testqueuqe6")
    private void testqueue1Listener2ByTopic(Message msg) {
        String s = new String(msg.getBody());
        UserTest userTest = JSONObject.parseObject(s, UserTest.class);
        System.err.println("this is listener2" + userTest);
    }


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487081.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号