栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

初学RabbitMQ(二)

初学RabbitMQ(二)

RabbitMQ

发布订阅模式之路由模式

​ 路由会根据类型定向发送给某个队列。

将交换机的类型改为direct(定向) 交换机会根据路由key定向发送到队列中

创建生产者

public class Sender {
    public static void main(String[] args) throws IOException, TimeoutException {

        String msg=  "你好,世界";
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建路由(路由名字,路由类型)
        //路由模式为direct(定向发送消息)
        channel.exchangeDeclare("tsyun","direct");
        //发送信息 
        //给这条信息绑定路由key为insert ,只有路由key为insert的队列才会收到消息
        channel.basicPublish("tsyun","insert",null,msg.getBytes());
        channel.close();
        connection.close();
    }
}

创建消费者1

public class Accept2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare("fensi1",false,false,false,null);
        //绑定交换机,设置queue的路由key为insert
        channel.queueBind("fensi1","tsyun","insert");
        //创建接收消息后的回调类
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //创建消费者
        channel.basicConsume("fensi1",false,consumer);
    }
}

创建消费者2

public class Accept {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare("fensi2",false,false,false,null);
        //绑定交换机,设置队列的路由key为update
        channel.queueBind("fensi2","tsyun","update");
        //创建接收消息后的回调类
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //创建消费者
        channel.basicConsume("fensi2",false,consumer);
    }
}

运行顺序:先运行生产者,创建出交换机,然后运行起两个消费者,再次运行生产者,我们发现只有相对应路由Key的队列才收到了消息,不同路由key的没有收到消息。

发布订阅模式之主题模式(通配符模式)

和路由模式90%相同

不同点在于通配符模式可以支持模糊查询

匹配符号

#:匹配0个或者多个单词。*:匹配一个单词(不能多也不能少)

官方案例

​ Q1绑定了路由键 *.orange. *,Q2绑定了路由键 *.*.rabbit 和 lazy.#

下面是生产者发送的的消息绑定的路由key 他们会被发送给哪个队列?

quick.orange.rabbit # Q1 Q2 
lazy.orange.elephant # Q1 Q2 
quick.orange.fox # Q1 
lazy.brown.fox # Q2 
lazy.pink.rabbit # Q2 
quick.brown.fox # 无
orange # 无 
quick.orange.male.rabbit # 无

创建生产者

public class Sender {
    public static void main(String[] args) throws IOException, TimeoutException {

        String msg=  "你好,世界";
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建路由(路由名字,路由类型)
        //路由类型为topic
        channel.exchangeDeclare("wangbb","topic");
        //发送信息
        channel.basicPublish("wangbb","a.update",null,msg.getBytes());
        channel.close();
        connection.close();
    }
}

创建消费者1

public class Accept {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare("fensi2",false,false,false,null);
        //绑定交换机
        //路由key为*.update
        channel.queueBind("fensi2","wangbb","*.update");
        //创建接收消息后的回调类
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //创建消费者
        channel.basicConsume("fensi2",false,consumer);
    }
}

创建消费者2

public class Accept2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare("fensi1",false,false,false,null);
        //绑定交换机
        //路由key为insert.#
        channel.queueBind("fensi1","wangbb","insert.#");
        //创建接收消息后的回调类
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //创建消费者
        channel.basicConsume("fensi1",false,consumer);
    }
}

运行顺序:先运行生产者,创建出交换机,然后运行起两个消费者,再次运行生产者,我们发现只有相对应模糊匹配路由Key成功的队列才收到了消息,不成功的路由key的没有收到消息。

消息的持久化

​ 消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失?

​ 消费者的ACK确认机制,可以防止消费者丢失消息万一在消费者消费之前,RabbitMQ服务器宕机了,那消息也会丢失

想要将消息持久化,那么 路由和队列都要持久化 才可以

没开启持久化,重启RabbitMQ后创建的exchange和queue都会消失。

重启前

重启后未持久化的队列和交换机都消失了

如何持久化?

代码:

创建生产者

public class Sender {
    public static void main(String[] args) throws IOException, TimeoutException {

        String msg=  "你好,世界";
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建路由(路由名字,路由类型,持久化)
        //开启持久化
        channel.exchangeDeclare("wangbb","topic",true);
        //发送信息
        channel.basicPublish("wangbb","a.update",null,msg.getBytes());
        channel.close();
        connection.close();
    }
}

创建消费者

public class Accept {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //创建队列(队列名,持久化,独占队列,自动删除,属性)
        channel.queueDeclare("fensi2",true,false,false,null);
        //绑定交换机
        channel.queueBind("fensi2","wangbb","*.update");
        //创建接收消息后的回调类
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //创建消费者
        channel.basicConsume("fensi2",false,consumer);
    }
Spring与RabbitMQ整合

引入依赖


    org.springframework.amqp
    spring-rabbit
    2.0.1.RELEASE



        org.slf4j
        slf4j-log4j12
        1.7.25



        org.apache.commons
        commons-lang3
        3.9

生产端配置文件





    
    
    
    
    
    
    
        
            
            
            
        
    
    
    
    
    


生产端的类:

public class Sender {
    public static void main(String[] args) throws IOException, TimeoutException {
			//获取上下文
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application.xml");
        //获取RabbitMQ模板类
        RabbitTemplate bean = context.getBean(RabbitTemplate.class);
        HashMap map = new HashMap<>();
        map.put("name","123");
        map.put("email","123@qq.com");
        //发送信息,被转换为Json格式
        bean.convertAndSend("msg.user",map);
        context.close();
    }
}

消费端配置文件:





    
    
    
    
    

    

    
        
         
    



消费端类:

//将类注册进容器中
@Component
//实现MessageListener接口才能监听信息
public class ConsumerListener implements MessageListener {
    public static final ObjectMapper obj = new ObjectMapper();
	//加载上下文配置文件
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    }
    //重写MessageListener的onMessage方法在中间做想做的事
    @Override
    public void onMessage(Message message) {
        try {
            JsonNode jsonNode = obj.readTree(message.getBody());
            String name = jsonNode.get("name").asText();
            String email = jsonNode.get("email").asText();
            System.out.println(name+" "+email);
        } catch (IOException e) {
        e.printStackTrace();
        }
    }
}
消息成功确认机制

​ 在实际场景种,有的生产者的消息必须发送到消费者端,如何保证成功投递?

事务机制发布确认 事务机制

AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式 并利用信道 的三个方法来实现以事务方式 发送消息,若发送失败,通过异常处理回滚事务,确保 消息成功投递

channel.txSelect(): 开启事务channel.txCommit() :提交事务channel.txRollback() :回滚事务 Spring已经对上面三个方法进行了封装,所以我们只能使用原始的代码演示

//只需要在发送方开启事务就行了。
public class Sender {
    public static void main(String[] args) throws IOException, TimeoutException {


        String msg = "你好,世界";
        //获取连接
        Connection connection = ConnectionUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //开启事务
        channel.txSelect();
        //创建路由(路由名字,路由类型)
        try {
            channel.exchangeDeclare("wangbb", "topic", true);
            //发送信息
            channel.basicPublish("wangbb", "a.update", null, msg.getBytes());
            channel.basicPublish("wangbb", "a.update", null, msg.getBytes());
            //提交事务
            channel.txCommit();
        } catch (Exception e) {
            //出现异常回滚事务
            channel.txRollback();
        } finally {
            //无论出现异常与否,关闭连接。
            channel.close();
            connection.close();
        }
    }
}
/confirm/i发布确认

​ 使用事务机制是非常损耗性能的,官方表述使用事务机制会使性能缺失250倍左右,那我们如何保证消息的成功投递的同时又不损失性能呢?

我们可以使用Confirn发布确认机制。

如果说1000万条消息中最后一条出现了错误,事务机制会直接回滚1000W条数据,太浪费资源了。而/confirm/i发布确认只需要补发最后一条消息,来完成消息的送达。

/confirm/i发布确认只需要在生产端开启

生产端配置文件



     
   
    
    
        
    
        
            
        
    
    
    

    
    
    
    
    

回调类

//实现/confirm/iCallback接口
public class Callback implements RabbitTemplate./confirm/iCallback {
    
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean b, String s) {
    if(b){
        System.out.println("发送成功");
    }else{
        //发送失败后可以采用定时任务补发信息 或者 采用 递归的方式
        //定时任务有:redis+定时任务 、 Quartz定时任务框架
        System.out.println("xxx消息发送失败");
    }
    }
}

当你生产者发送消息时这个Callback回调类会自动调用实现你/confirm/i方法中的业务逻辑。 消费端限流

假如 RabbitMQ 服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,就会 出现这样的情况: 巨量的消息瞬间全部喷涌推送过来,但是单个客户端无法同时处理这么多数据, 就会被压垮导致服务崩溃。

我们限定用户少量使用,或者在某个时间段不使用,是不可能的。这是用户行为,我们无法约束。所以在生产端限流是不科学的。

因此我们可以在消费端进行限流,限制每次进入消费端的消息条数。

RabbitMQ 提供了一种 Qos (Quality of Service,服务质量)服务质量保证功能

即在非自动确认消息的前提下,如果一定数目的消息未被确认前,不再进行消费新的消息

生产端模拟大量消息投递

public class Sender {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application.xml");
        RabbitTemplate bean = context.getBean(RabbitTemplate.class);
        for (int i = 0; i < 10; i++) {
            bean.convertAndSend("yangbi","msg","你好,世界");
        }
    }

在消费端配置文件中限制每次进入的消息条数




     

消费端类:

//原本时间的MessageListener接口功能太低级了,只能接受消息,不能处理信道
//改为继承AbstractAdaptableMessageListener可以处理信道
@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {
    //处理序列化消息,进行反序列化。
    public static final ObjectMapper obj = new ObjectMapper();

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    }
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            JsonNode jsonNode = obj.readTree(message.getBody());
            System.out.println(jsonNode.asText());
            Thread.sleep(3000);
            //(参数1,参数2)1:每条消息的唯一标识ID,此ID为单调递增 。 2:是否确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
过期时间TTL

TTL: Time TO Live 存活时间、还能活多久、单位毫秒。

在这个生命周期内,消息可以正常被消费端消费,超过这个时间,就会被扔进死信队列。

RabbitMQ可以对队列和消息使用TTL

​ 队列设置:队列内每条消息都被加了相同存活时间,存活时间一到就会都被扔进死信队列中。​ 消息设置 :对每条消息单独设置TTL,每条消息有不同的TTL。

TTL队列

  

        
			
            
        
    

生产类不做任何更改。

发送5s后整个队列的未被接收的消息会被扔进死信队列。

TTL消息设置

配置文件


​ 消费端类

public class Sender {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application.xml");
        RabbitTemplate bean = context.getBean(RabbitTemplate.class);
        //信息配置类
        MessageProperties messageProperties = new MessageProperties();
        //设置消息过期时间为5s
        messageProperties.setExpiration("5000");
        //设置发送的信息,将配置类加入
        Message message = new Message("过期消息".getBytes(),messageProperties);
        //发送信息
        bean.convertAndSend("ttl",message);
    }
}
死信队列

DLX(Dead Letter Exchange)死信交换机(死信邮箱),当消息在队列中由于某些原因没有被及时 消费而变成死信(dead message)后,这些消息就会被分发到DLX交换机中,DLX中的队列叫做死信队列

消息没有被及时消费的原因

消息被拒绝

消息超时

消息队列长队超过最大值

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MD5Rbb3S-1643379813183)(C:Users19637AppDataRoamingTyporatypora-user-imagesimage-20220128213610882.png)]

将上图进行分析。

对于消息过期,消息长度超标,消息不被接收,都是生产端发生的,所以在生产段配置就行了。

配置文件



    
   
    

    

    
        

            
            
        
    
    
    
        
             
            
            

            
        
    
    
    
        
            
    
            
        
    
    
    
        
            
            
        
    
    

生产端类:

public class Sender {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application.xml");
        RabbitTemplate bean = context.getBean(RabbitTemplate.class);
        //TTL队列发送一条消息
        bean.convertAndSend("dlx_ttl","过期队列");
        //长队为2的队列发送三条消息
        bean.convertAndSend("dlx_max","长队队列1");
        bean.convertAndSend("dlx_max","长队队列2");
        bean.convertAndSend("dlx_max","长队队列3");
    }
}

结果:

课外小知识:超过消息长度被扔进死信队列的那个消息是先进入的消息。

延迟队列

延迟队列就是TTL+死信队列。(发送消息到TTL队列,等TTL队列过期以后,消息会发送到死信队列,消费端监听死信队列,实现延迟队列。)死信队列中的消息只是被人抛弃的,但不是不可以消费,死信队列中的消息也是可以被消费的。在电商开发部分中,都会涉及到延时关闭订单,此时延迟队列正好可以解决这个问题又或者在某个平台注册30分钟后会有短信发送案例。

配置文件



    
   
    
    

    
        

            

            
        
    

    

    
        

            
        
    

    
        

            
        
    
    

生产端类:

public class Sender {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application.xml");
        RabbitTemplate bean = context.getBean(RabbitTemplate.class);
//        往TTL队列发送消息
        bean.convertAndSend("dlx_ttl","过期队列");
    }
}

消费端配置文件





    

    
    
    
    
    

    

         
    



消费端类:

@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {
    //处理序列化消息,进行反序列化。
    public static final ObjectMapper obj = new ObjectMapper();

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    }
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            JsonNode jsonNode = obj.readTree(message.getBody());
            System.out.println(jsonNode.asText());
            System.out.println("监听到了死信队列");//这里可以写延迟队列业务逻辑。
             //(参数1,参数2)1:每条消息的唯一标识ID,此ID为单调递增 。 2:是否确认消息
              channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
      id="connectionFactory" />








     

```

消费端类:

@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {
    //处理序列化消息,进行反序列化。
    public static final ObjectMapper obj = new ObjectMapper();

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    }
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            JsonNode jsonNode = obj.readTree(message.getBody());
            System.out.println(jsonNode.asText());
            System.out.println("监听到了死信队列");//这里可以写延迟队列业务逻辑。
             //(参数1,参数2)1:每条消息的唯一标识ID,此ID为单调递增 。 2:是否确认消息
              channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/719784.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号