栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ使用

RabbitMQ使用

目录
    • 三、使用:
    • 生产者:
    • 消费者:
    • 工作队列:
    • 消息应答:
    • 消息自动重新入队:
    • 消息手动应答:
    • RabbitMQ持久化:
      • 持久化概念:
      • 队列持久化:
      • 消息持久化:
      • 不公平分发:
      • 预取值:
    • 发布确认:
      • 单个确认发布:
      • 批量确认发布:
      • 异步确认发布:
      • 三种发布确认的速度比较:
    • 交换机:
      • fanout交换机:
      • direct交换机:
      • topics交换机:
    • 死信队列:
      • 消息过期:
      • 队列达到最大长度:
      • 消息被拒:

三、使用:

创建一个空工程来容纳所有的子模块:

子maven模块名字:rabbitmq-hello

需要的依赖:


        
        
            com.rabbitmq
            amqp-client
            5.8.0
        
        
        
            commons-io
            commons-io
            2.6
        
    
    
        
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    8
                    8
                
            
        
    

创建类:com.fan.rabbitmq.one.Producer


生产者:
package com.fan.rabbitmq.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    //队列名称
    public static final String QUEUE_NAME= "hello";
    //发消息
    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //工厂ip连接rabbitmq的队列
        connectionFactory.setHost("192.168.211.200");
        //用户名
        connectionFactory.setUsername("admin");
        //密码
        connectionFactory.setPassword("123");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //获取消息通道
        Channel channel = connection.createChannel();
        //生产一个队列:
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息准备
        String msg = "hello world";
        //开始发送消息
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("消息发送完毕");
    }
}

消费者:
package com.fan.rabbitmq.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Consumer {
    //队列名称
    public static final String QUEUE_NAME ="hello";
    //接收消息
    public static void main(String[] args) throws Exception {
        //创建mq中转连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //给连接工厂设置地址,用户名,密码等
        connectionFactory.setHost("192.168.211.200");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");
        //中转连接工厂创建连接(和快递发送方和送达方建立联系)
        Connection connection = connectionFactory.newConnection();
        //连接创建消息通道(创建快递运送通道)
        Channel channel = connection.createChannel();
        channel.basicConsume(QUEUE_NAME,
                true,
                (consumerTag,msg)->{ System.out.println("接收到的消息:"+new String(msg.getBody()) ); }
                ,(consumerTag)->{  System.out.println("消息的消费被中断"); }
          );

    }
}

测试:先开启发送消息,然后运行消费消息:

工作队列:

实例:

抽下工具类RabbitMqUtils(在utils包下):

package com.fan.rabbitmq.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
    //工具类的目的:得到一个消息通道channel
    public static Channel getChannel() throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.211.200");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

工作队列消费者Work01Consumer:

package com.fan.rabbitmq.two;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.sql.SQLOutput;

public class Work01Consumer {
    //队列的名称
    public static final String QUEUE_NAME="hello";

    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.basicConsume(QUEUE_NAME,true,
                (consumerTag,msg)->{
                    System.out.println("接收到的消息:"+new String(msg.getBody()));
                },
                (consumerTag)->{
                    System.out.println(consumerTag+"消费者取消接口回调逻辑");
                }
                );

        System.out.println("c2启动,等待接收消息-----");
    }


}

配置启动多个线程:

工作队列生产者Worker01Product:

package com.fan.rabbitmq.two;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class Worker01Product {
    //队列名称
    public static final String QUEUE_NAME ="hello";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台当中接收消息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String msg = scanner.next();
            
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("发送消息完成:"+msg);
        }
    }
}

测试:

轮询消费消息:

消息应答:



批量应答图解:

消息自动重新入队:

为了保证消息的不丢失:

消息手动应答:

消息生产者:

package com.fan.rabbitmq.three;
import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;

public class Task2Product {
    //队列名称
    public static final String TASK_QUEUE_ANME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        channel.queueDeclare(TASK_QUEUE_ANME,false,false,false,null);
        //从控制台输入信息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String msg = scanner.next();
            channel.basicPublish("",TASK_QUEUE_ANME,null,msg.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+msg);
        }
    }
}

消息的消费者1:Task2Consumer:

package com.fan.rabbitmq.three;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

//手动应答,保证消息的不丢失
public class Task2Consumer {
    //消息队列
    public static final String TASK_QUEUE_NAME = "ack_queue";
    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c1等待接收消息处理时间较短");

        DeliverCallback deliverCallback = (consumerTag,msg)->{
            //沉睡一秒
            try {
                Thread.sleep(1*1000);
                System.out.println("接收到的消息:"+new String(msg.getBody(),"UTF-8"));
                //手动应答
                
                channel.basicAck(msg.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        //消费消息,并采用手动应答
        Boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(ConsumerTag)->{
                    System.out.println(ConsumerTag+"消费者取消消息消费接口回调逻辑");
                });
    }
}

消费者2:Task2Consumer2:

package com.fan.rabbitmq.three;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

//手动应答,保证消息的不丢失
public class Task2Consumer2 {
    //消息队列
    public static final String TASK_QUEUE_NAME = "ack_queue";
    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
      System.out.println("c2等待接收消息处理时间较长。。。。。。。。。。。。。。。");

        DeliverCallback deliverCallback = (consumerTag,msg)->{
            //沉睡一秒
            try {
                Thread.sleep(30*1000);
                System.out.println("接收到的消息:"+new String(msg.getBody(),"UTF-8"));
                //手动应答
                
                channel.basicAck(msg.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        //消费消息,并采用手动应答
        Boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(ConsumerTag)->{
            System.out.println(ConsumerTag+"消费者取消消息消费接口回调逻辑");
        });
    }
}

测试:
生产者发送消息:

消费者1接收消息;

消费者2接收消息:

然后消息bb没有丢失,而是给消费者1去处理了:

RabbitMQ持久化: 持久化概念:

队列持久化:

删除已存在的非持久化队列:

消息持久化:

持久化代码:
channel.basicPublish("",TASK_QUEUE_ANME, MessageProperties.PERSISTENT_TEXT_PLAIN
,msg.getBytes(“UTF-8”));

不公平分发:

设置成1为不公平分发,0为默认的轮询分发。

修改消费者代码实现分发策略:因为消费方决定消息的消费方式

每个消费者都要设置:

测试:
发送多条消息:


消费者:

预取值:

package com.fan.rabbitmq.three;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

//手动应答,保证消息的不丢失
public class Task2Consumer2 {
    //消息队列
    public static final String TASK_QUEUE_NAME = "ack_queue";
    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c2等待接收消息处理时间较长。。。。。。。。。。。。。。。");
        DeliverCallback deliverCallback = (consumerTag,msg)->{
            //沉睡一秒
            try {
                Thread.sleep(30*1000);
                System.out.println("接收到的消息:"+new String(msg.getBody(),"UTF-8"));
                //手动应答策略
                
                channel.basicAck(msg.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        //消费消息,并采用手动应答
        Boolean autoAck = false;
        channel.basicQos(5);//1为设置不公平分发.basicQos参数为预取值
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(ConsumerTag)->{
            System.out.println(ConsumerTag+"消费者取消消息消费接口回调逻辑");
        });
    }
}

发布确认:

单个确认发布:

代码演示:

package com.fan.rabbitmq.four;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.UUID;


public class ConfimMsg {
    //消息总量MSG_COUNT
    public static final int MSG_COUNT = 1000;
    public static void main(String[] args) throws Exception {
        //1.单个确认
        publishMsgIndividually();//发布1000条单独确认消息,耗时694ms
        //2.批量确认

    }

    //1.单个确认
    public static void publishMsgIndividually()throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //批量发消息
        for (int i = 0; i < MSG_COUNT; i++) {
            String msg = i + "";
            channel.basicPublish("",queueName,null,msg.getBytes());
            //单个消息就马上进行发布确认
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("消息发送成功");
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MSG_COUNT+"条单独确认消息,耗时"+(end-begin)+"ms");
    }
}
批量确认发布:

package com.fan.rabbitmq.four;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.UUID;


public class ConfimMsg {
    //消息总量MSG_COUNT
    public static final int MSG_COUNT = 1000;
    public static void main(String[] args) throws Exception {
        //1.单个确认
        //publishMsgIndividually();//发布1000条单独确认消息,耗时694ms
        //2.批量确认
        BatchpublishMsg();//发布1000条,批量确认消息,耗时194ms

    }

    //2.批量确认
    public static void BatchpublishMsg()throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //每次批量确认消息的容量大小
        int batchSize = 100;
        //批量发消息
        for (int i = 0; i < MSG_COUNT; i++) {
            String msg = i + "";
            channel.basicPublish("",queueName,null,msg.getBytes());
            //这里进行批量确认操作:判断达到100个消息的时候,批量确认一次
            if(i % batchSize == 0){
                //批量发布确认
                channel.waitForConfirms();
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MSG_COUNT+"条,批量确认消息,耗时"+(end-begin)+"ms");
    }
}
异步确认发布:


代码演示:

package com.fan.rabbitmq.four;
import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iCallback;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;


public class ConfimMsg {
    //消息总量MSG_COUNT
    public static final int MSG_COUNT = 1000;
    public static void main(String[] args) throws Exception {
        //1.单个确认
        //publishMsgIndividually();//发布1000条单独确认消息,耗时694ms
        //2.批量确认
        //BatchpublishMsg();//发布1000条,批量确认消息,耗时194ms
        //3.异步确认
        publishMsgAsync();//发布1000条,批量确认消息,耗时66ms,第一次比较耗时

    }

    //异步发布确认
    public static void publishMsgAsync()throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        
        ConcurrentSkipListMap outstandingConfirm = new ConcurrentSkipListMap<>();
        
        //消息确认成功,回调函数
        ConfirmCallback ackCallback = (deliverTag, multiple) -> {
            if(multiple){//批量确认
                //②.确定成功后,删除已经确认的消息,剩下的就是未确认的消息
                ConcurrentNavigableMap confirmed =
                        outstanding/confirm/i.headMap(deliverTag);
                /confirm/ied.clear();
            }else{//单个确认
                outstanding/confirm/i.remove(deliverTag);
            }
            System.out.println("确认的消息:"+deliverTag);
        };
        
        ConfirmCallback nackCallback = (deliverTag, multiple) -> {
            //③.打印一下未确认消息的都有哪些
            String msg = outstanding/confirm/i.get(deliverTag);
            System.out.println("未确认的消息是:"+msg+"未确认的消息tag:"+deliverTag);
        };
        //准备消息的监听器,监听哪些消息成功了,哪些消息失败了
        channel.addConfirmListener(ackCallback,nackCallback);//异步通知

        //开始时间
        long begin = System.currentTimeMillis();
        //批量发消息
        for (int i = 0; i < MSG_COUNT; i++) {
            String msg = "消息"+i;
            channel.basicPublish("",queueName,null,msg.getBytes());
            //①记录下所有要发送的消息到map,消息的总和
            outstanding/confirm/i.put(channel.getNextPublishSeqNo(),msg);
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MSG_COUNT+"条,批量确认消息,耗时"+(end-begin)+"ms");
    }
}
三种发布确认的速度比较:

交换机:

绑定:

交换机绑定队列:

fanout交换机:

fanout交换机即发布订阅模式:


一个发送两个接收:
ReceiveLogs01将接收到的消息打印在控制台:

package com.fan.rabbitmq.five;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs01 {
    public static final String EXCHANGE_NAME = "logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //声明一个临时队列,当消费者断开与队列的连接的时候,队列就自动删除
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("等待接收消息,把剩余的消息打印到屏幕上。。。。。。");
        DeliverCallback deliverCallback = (consumerTag, msg) -> {
            System.out.println("ReceiveLogs01控制台答应接收到的消息:" + new String(msg.getBody(), "utf-8"));
        };
        //消息的消费,包含取消消息是的回调接口
        channel.basicConsume(queueName,true,deliverCallback,(consumerTag)->{});
    }
}

消息的生产者:

package com.fan.rabbitmq.five;
import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;

public class EmitLog {
    //交换机名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String msg = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+msg);
        }
    }
}

测试:开启mq的linux服务,
然后启动三个main程序:

两个消费者都能接收到一份消息:

direct交换机:

direct交换机即漏由模式:

实战:


消费者ReceiveLogsDirect01:

package com.fan.rabbitmq.six;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsDirect01 {
    //声明交换机名字
    public static final String EXCHANGE_NAME="direct_logs";

    public static void main(String[] args)throws Exception {
        //1.获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //2.交换机的声明创建(参数一时交换机的名字,参数二是交换机的类型)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //3.队列的声明创建
        channel.queueDeclare("console",false,false,false,null);
        //4.队列的绑定(队列名,交换机名,漏由key),可以绑定多个
        channel.queueBind("console",EXCHANGE_NAME,"info");
        channel.queueBind("console",EXCHANGE_NAME,"warning");
        //接收消息回调函数
         DeliverCallback  deliverCallback = (consumerTag, msg) -> {
            System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+
                    new String(msg.getBody(),"UTF-8"));
        };
         //消息消费(消费者取消消息时回调接口)
        channel.basicConsume("console",true,deliverCallback,(consumerTag)->{});

    }
}

消费者ReceiveLogsDirect02 :

package com.fan.rabbitmq.six;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsDirect02 {
    //声明交换机名字
    public static final String EXCHANGE_NAME="direct_logs";

    public static void main(String[] args)throws Exception {
        //1.获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //2.交换机的声明创建(参数一时交换机的名字,参数二是交换机的类型)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //3.队列的声明创建
        channel.queueDeclare("disk",false,false,false,null);
        //4.队列的绑定(队列名,交换机名,漏由key),可以绑定多个
        channel.queueBind("disk",EXCHANGE_NAME,"error");
        //接收消息回调函数
         DeliverCallback  deliverCallback = (consumerTag, msg) -> {
            System.out.println("ReceiveLogsDirect02控制台打印接收到的消息:"+
                    new String(msg.getBody(),"UTF-8"));
        };
         //消息消费(消费者取消消息时回调接口)
        channel.basicConsume("disk",true,deliverCallback,(consumerTag)->{});

    }
}

生产者:

package com.fan.rabbitmq.six;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class DirectLogs {
    //声明交换机名字
    public static final String EXCHANGE_NAME="direct_logs";

    public static void main(String[] args) throws Exception{
        //1.获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //2.循环发送消息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String msg = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"error",
                    null,msg.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+msg);
        }
    }
}

测试:发送给带有漏由口令error的消息,则只能被队列2接收到(即ReceiveLogsDirect02类中绑定了队列disk,口令是error)。

topics交换机:

注意:

消费者:ReceiveLogsTopic01 :

package com.fan.rabbitmq.seven;
import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic01 {
    //交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";
    //接收消息
    public static void main(String[] args)throws Exception {
        //1.获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //2.交换机的声明创建
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //3.队列的声明创建
        String queueName = "Q1";
        channel.queueDeclare(queueName,false,false,false,null);
        //4.队列的绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
        System.out.println("等待接收消息。。。。。");

        DeliverCallback deliverCallback = (consumerTag, msg) -> {
            System.out.println(new String(msg.getBody(),"UTF-8") );
            System.out.println("接收队列:"+queueName+",绑定键:"+
                    msg.getEnvelope().getRoutingKey());
        };
        //接收消息
        channel.basicConsume(queueName,true,deliverCallback,(consumerTag)->{});
    }
}

消费者:ReceiveLogsTopic02 :

package com.fan.rabbitmq.seven;
import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic02 {
    //交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";
    //接收消息
    public static void main(String[] args)throws Exception {
        //1.获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //2.交换机的声明创建
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //3.队列的声明创建
        String queueName = "Q2";
        channel.queueDeclare(queueName,false,false,false,null);
        //4.队列的绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
        System.out.println("等待接收消息。。。。。");

        DeliverCallback deliverCallback = (consumerTag, msg) -> {
            System.out.println(new String(msg.getBody(),"UTF-8") );
            System.out.println("接收队列:"+queueName+",绑定键:"+
                    msg.getEnvelope().getRoutingKey());
        };
        //接收消息
        channel.basicConsume(queueName,true,deliverCallback,(consumerTag)->{});
    }
}

生产者:EmitLogTopic :

package com.fan.rabbitmq.seven;

import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;

public class EmitLogTopic {
    //交换机名称
    public static final String EXCHANGE_NAME="topic_logs";

    public static void main(String[] args)throws Exception {
        //1.获取信道
        Channel channel = RabbitMqUtils.getChannel();
        HashMap map = new HashMap<>();
        map.put("lazy.orange.elephant","被队列Q1Q2接收到");
        map.put("quick.orange.fox","被队列Q1接收到");
        map.put("lazy.brown.fox","被队列Q2接收到");
        map.put("lazy.pink.ribbit","满足两个绑定,但是只能被队列Q2接收到");
        map.put("quick.brown.fox","不满足任何绑定,不会被任何队列接收到,会被丢弃");
        map.put("quick.orange.male.rabbit","四个单词不匹配任何的绑定队列,会被丢弃");
        map.put("lazy.orange.male.rabbit","是四个单词,但匹配Q2");
        for (Map.Entry bindingKeyEntry : map.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String msg = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,
                    msg.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+msg);
        }
    }
}

然后启动测试即可;

死信队列:

消息过期:

消费者Consumer1:

package com.fan.rabbitmq.eight;
import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;

public class Consumer1 {
    //普通交换机的名字
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名字
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws  Exception{
        //1.获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //2.交换机的声明创建
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //3.队列的声明创建
        //#########################声明普通队列########################################
        HashMap arguments = new HashMap<>();
        //设置ttl过期时间,可以在这里设置,但是不建议,因为不灵活10s=10 000ms
        //arguments.put("x-message-ttl",10000);
        //正常队列设置死信交换机,发生异常的时候连接死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信的routing-key
        arguments.put("x-dead-letter-routing-key","lisi");
        //正常队列有异常消息时进入死信队列
        channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);
        //#########################声明死信队列########################################
        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);
        //绑定普通的交换机与普通的队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收消息--------");
        //消费消息
        DeliverCallback deliverCallback = (consumerTag, msg) -> {
            System.out.println("Consumer1接收的消息是:"+
                    new String(msg.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,(consumerTag)->{});
    }
}

消费者2:Consumer2:

package com.fan.rabbitmq.eight;
import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumer2 {
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws  Exception{
        //1.获取信道
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("等待接收消息-------");
        //消息发送
        DeliverCallback deliverCallback = (consumerTag, msg) -> {
            System.out.println("Consumer2接收的消息是:"+
                    new String(msg.getBody(),"UTF-8"));
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag)->{});
    }
}

生产者Producer:

package com.fan.rabbitmq.eight;
import com.fan.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

public class Producer {
    //声明普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args)throws Exception {
        //1.获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息,设置ttl(time to live:存活时间)时间,单位是ms
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();
        for (int i = 1; i <= 10 ; i++) {
            String msg = "info" + i;
            //发消息
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,
                    msg.getBytes("UTF-8"));
        }
    }
}

测试:

先启动Consumer1消费者,然后立马停掉(模拟死掉),然后启动生产者来发送10条消息。看MQ网页控制台:

然后启动消费者Consumer2,看控制台10条消息被C2消费掉:

队列达到最大长度:


修改代码:

测试:

同样,启动消费者01后直接关闭,然后启动生产者,看效果:

消息被拒:

修改代码:

启动消费者01,然后启动生产者,看控制台:

然后启动消费者02看控制台:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612402.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号