栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMq的基本使用

RabbitMq的基本使用

什么是RabbitMq?

RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.

通俗的说RabbitMq就是用来接收和转发消息的,本身不对消息进行任何处理,只是接收和转发。

为什么要用RabbitMq?

1、解耦

这里有两个模块,用户注册完后短信通知用户注册成功,这里用户注册就依赖于短信通知,存在耦合,当短信通知出现异常时,用户注册也会影响使用,下面引入消息队列。

引入消息队列后用户注册于短信通知已经解耦了,此后用户注册完后发送消息队列后就可以不管了,剩下的就交给消息队列了。

2、异步
  还是上面的案例,本来用户注册完后需要等待短信通知调用完后才能给用户回馈,引入消息队列后用户注册和短信通知就可以异步执行,大大提高了响应速度。

3、流量削峰
  举个例子,某个特殊的日子,有个商家搞整点优惠活动,力度还挺大,听到这个消息,大家都跃跃欲试。到了时间点,大家打开活动网站,准备下单,发现网站崩了,原来是到了整点,大量的请求发给了服务器,服务器一时间难以接受,就挂了。怎么解决,MQ的流量削峰。

引入MQ

RabbitMq的组成

  • Producer 发送消息的一方
  • Exchange(交换机)Producer 将消息发送给交换机,交换机来实现消息的分发
  • Queue(队列) 存储消息的地方
  • Consumer 信息的消费者
RabbitMq分发策略 简单队列

生产者的实现:

public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message="Hello World";
             
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

运行后:

消费者的实现:

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        try{
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            
            channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    String info=new String(message.getBody(),"utf-8");
                    System.out.println(" [x] Received '" + info + "'");
                }
            }, consumerTag -> {
            });
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

运行后:

工作队列


消息生产者:

public class WorkSend {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection=null;
        Channel channel=null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //限制每个消费者一次只能消费一条信息
            channel.basicQos(1);
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
            String info="work task info--------";
            //生产十条待消费信息
            for(int i=0;i<10;i++){
                System.out.println(info.getBytes());
                channel.basicPublish("",TASK_QUEUE_NAME,null,(info+i).getBytes());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息消费者(这里创建了两个消费者,代码都一样):

public class WorkRecv2 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection=null;
        try {
            connection=connectionFactory.newConnection();
            final Channel channel=connection.createChannel();
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
            //这里关闭了自动应答,采用手动应答方式
            channel.basicConsume(TASK_QUEUE_NAME, false, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                  String info = new String(message.getBody(), "utf-8");
                    System.out.println(info);
                    //手动应答,第一个参数为消息序列号,第二个参数为是否批量应答
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                }
            }, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {

                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

运行后(两个消费者每人每次消费一条信息,依次消费):

分发队列

生产者产生消息,只要订阅了该队列的都能收到消息通知

消息生产者:

public class FanoutExchange {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); 
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String message =  "info: Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

运行后:

消息消费者:

public class FanoutRecive {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //默认实现的queue,具有排他性和自动删除
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

运行后:

Routing

通过路由key绑定队列与交换机,交换机可以定点投放信息

消息生产者:

public class RouteSend {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String message="发给blue的";
            channel.basicPublish(EXCHANGE_NAME, "blue", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent "+ message + "'");
        }
    }

}

运行后:

消息消费者(这里创建两个消费者,只需将绑定的路由key改下就行):

public class BlueRecv {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        
        channel.queueBind(queueName, EXCHANGE_NAME, "blue");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

运行后:

Topics

  • * 能匹配一个
  • # 能匹配零个或多个

例如 lazy.# 能匹配 lazy.orange.elephantlazy.brown.fox ,而 lazy.*.fox 只能匹配lazy.brown.fox

消息生产者:

public class TopicSend {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            String message="info to lazy.#-----";
            channel.basicPublish(EXCHANGE_NAME, "lazy.red.bird", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent  " + message + "'");
        }
    }
}

消费者代码(创建了两个消费者,改一下routingKey就行,改为 lazy.*.bird):

public class TopicRecv {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        //这里的routingKey和之前的不同
            channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

        System.out.println(" [*] Waiting for messages.");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

运行后(两个都收到了消息):

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/312678.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号