栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ

RabbitMQ

一、引言

消息队列作用:解耦、异步、削峰

https://www.cnblogs.com/terry-love/p/11492397.html

二、RabbitMQ介绍

市面上比较火爆的几款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。

  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。

  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。

  • 学习成本:RabbitMQ非常简单。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。

RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。

三、RabbitMQ安装

1.创建 docker-compose.yaml 文件

[root@master java2109]# mkdir docker-compose-rabbitmq
[root@master java2109]# cd docker-compose-rabbitmq/
[root@master docker-compose-rabbitmq]# ls
[root@master docker-compose-rabbitmq]# vim docker-compose.yaml

version: "3.1"
services:
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - ./data:/var/lib/rabbitmq

2.启动rabbitmq

 [root@master docker-compose-rabbitmq]# docker-compose up

 

登陆 (注意Google浏览器有兼容问题,使用IE)

http://虚拟机IP:15672/ 用户名guest 密码guest

 

 

四、RabbitMQ架构【重点】
4.1 官方的简单架构图
  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange

  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息

  • Exchange - 交换机:和生产者建立连接并接收生产者的消息

  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互

  • Routes - 路由:交换机以什么样的策略将消息发布到Queue

简单架构图

4.2 RabbitMQ的完整架构图

完整架构图

完整架构图

五、RabbitMQ的使用【重点】
5.1 RabbitMQ的通讯方式

 

 

5.2 Java连接RabbitMQ 5.2.1 创建maven项目

…………

5.2.2 导入依赖

    
        com.rabbitmq
        amqp-client
        5.6.0
    
​
    
        junit
        junit
        4.12
    
5.2.3 创建工具类连接RabbitMQ
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class RabbitMqUtils {


    
    public  static Connection getConnection() throws IOException, TimeoutException {


        ConnectionFactory connectionFactory = new ConnectionFactory();


        connectionFactory.setHost("192.168.12.145");

        // 客户端连接的端口
        connectionFactory.setPort(5672);

        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // VirtualHost 相当于文件夹,消息存在那个位置
        connectionFactory.setVirtualHost("/");


       return connectionFactory.newConnection();

    }


}

5.3 Hello-World模式

一个生产者,一个默认的交换机,一个队列,一个消费者

创建测试类进行测试

import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class HelloWorldTest {

    private Connection connection;


    @Before // 在@Test 之前调用初始化数据
    public void init() throws IOException, TimeoutException {

        connection = RabbitMqUtils.getConnection();
    }


    
    @Test // 进行单元测试
    public void consumerTest() throws IOException {

        // channel 管道 连接 消费者和队列
        Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("hello-queue",true,true,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息
                System.out.println("接受到消息:"+new String(body,"utf-8") );

            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
        // 参数3  消费者
        channel.basicConsume("hello-queue", true,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    
    @Test
    public void publishTest() throws IOException, TimeoutException {

        Channel channel = connection.createChannel();



        //参数1: 交换机名称 没有就是默认 ""
        // 参数2 :队列名称  或者是 消息类型信息 真的会路由规则进行匹配
        // 参数3: 指定传递的消息所携带的properties,使用null。
        // 参数4:消息

        // 向队列 发送消息  " hello-queue"
        channel.basicPublish("", "hello-queue",null,"hello-world".getBytes());


        channel.close();

    }



    @After// 在@Test 之后进行 ,是数据销毁
    public void destroy() throws IOException {

        connection.close();
    }

}

注意:一定要先启动消费者,在启动生产者

 5.4 手动ack 机制

手动ack 机制:保证消息对应的业务 已经真正的处理了,而不是仅仅接收到该消息

 

package com.qfedu.test;

import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class HelloWorldTestAck {

    private Connection connection;


    @Before // 在@Test 之前调用初始化数据
    public void init() throws IOException, TimeoutException {

        connection = RabbitMqUtils.getConnection();
    }


    
    @Test // 进行单元测试
    public void consumerTest() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("hello-queue",true,true,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("接受到消息:"+new String(body,"utf-8") );


//                int a = 1/0;

                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("hello-queue", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    
    @Test
    public void publishTest() throws IOException, TimeoutException {

        Channel channel = connection.createChannel();



        //参数1: 交换机名称 没有就是默认 ""
        // 参数2 :队列名称  或者是 消息类型信息 真的会路由规则进行匹配
        // 参数3: 指定传递的消息所携带的properties,使用null。
        // 参数4:消息

        // 向队列 发送消息  " hello-queue"
        channel.basicPublish("", "hello-queue",null,"hello-world".getBytes());


        channel.close();

    }



    @After// 在@Test 之后进行 ,是数据销毁
    public void destroy() throws IOException {

        connection.close();
    }

}
5.5 worker-queue 模式
package com.qfedu.test;

import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class WorkerQueueTest {

    private Connection connection;


    @Before // 在@Test 之前调用初始化数据
    public void init() throws IOException, TimeoutException {

        connection = RabbitMqUtils.getConnection();
    }


    
    @Test // 进行单元测试
    public void consumer1Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("worker-queue",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者1  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("worker-queue", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    @Test // 进行单元测试
    public void consumer2Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("worker-queue",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者2  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("worker-queue", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    
    @Test
    public void publishTest() throws IOException, TimeoutException {

        Channel channel = connection.createChannel();



        //参数1: 交换机名称 没有就是默认 ""
        // 参数2 :队列名称  或者是 消息类型信息 真的会路由规则进行匹配
        // 参数3: 指定传递的消息所携带的properties,使用null。
        // 参数4:消息

        // 向队列 发送消息  " worker-queue"

        // 发送10条数据 每个消费者得到5条数据
        for (int i = 0; i < 10; i++) {

            channel.basicPublish("", "worker-queue",null,("worker-queue--i:" +i).getBytes());

        }


        channel.close();

    }



    @After// 在@Test 之后进行 ,是数据销毁
    public void destroy() throws IOException {

        connection.close();
    }

}

 

5.6 发布订阅模式

 

 

package com.qfedu.test;

import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class PublishSubTest {

    private Connection connection;


    @Before // 在@Test 之前调用初始化数据
    public void init() throws IOException, TimeoutException {

        connection = RabbitMqUtils.getConnection();
    }


    
    @Test // 进行单元测试
    public void consumer1Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("pubsub-queue1",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者1  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("pubsub-queue1", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    @Test // 进行单元测试
    public void consumer2Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("pubsub-queue2",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者2  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("pubsub-queue2", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    
    @Test
    public void publishTest() throws IOException, TimeoutException {

        Channel channel = connection.createChannel();


        //将 chanel 和 自定义的交换机 绑定 "pubsub-exchange"
        //参数1: exchange的名称
        //参数2: 指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
        //FANOUT - pubsub 交换机 会将消息发送到 所有的队列中
        channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);

        //参数1 队列名
        //参数2 交换机名
        //参数3  路由规则
        channel.queueBind("pubsub-queue1", "pubsub-exchange", "");
        channel.queueBind("pubsub-queue2", "pubsub-exchange", "");

        //参数1: 交换机名称 没有就是默认 ""
        // 参数2 :队列名称  或者是 消息类型信息 真的会路由规则进行匹配
        // 参数3: 指定传递的消息所携带的properties,使用null。
        // 参数4:消息

        // 向队列 发送消息

        // 发送10条数据 每个消费者得到5条数据
        for (int i = 0; i < 10; i++) {

            // 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange
            // "pubsub-exchange" 交换机名称
            // ""  路由规则
            channel.basicPublish("pubsub-exchange", "",null,("pubsub--i:" +i).getBytes());

        }


        channel.close();

    }



    @After// 在@Test 之后进行 ,是数据销毁
    public void destroy() throws IOException {

        connection.close();
    }

}

 

5.7 routing 路由模式

import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class RoutingTest {

    private Connection connection;


    @Before // 在@Test 之前调用初始化数据
    public void init() throws IOException, TimeoutException {

        connection = RabbitMqUtils.getConnection();
    }


    
    @Test // 进行单元测试
    public void consumer1Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("routing-info-queue",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者1  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("routing-info-queue", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    @Test // 进行单元测试
    public void consumer2Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("routing-error-queue",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者2  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("routing-error-queue", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    
    @Test
    public void publishTest() throws IOException, TimeoutException {

        Channel channel = connection.createChannel();


        //将 chanel 和 自定义的交换机 绑定 "pubsub-exchange"
        //参数1: exchange的名称
        //参数2: 指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
        // DIRECT - Routing 交换机 会将消息发送到 所有的队列中
        channel.exchangeDeclare("Routing-exchange", BuiltinExchangeType.DIRECT);

        //参数1 队列名
        //参数2 交换机名
        //参数3  路由规则
        // 所有消息为 info 的消息都会 由Routing-exchange发送到routing-info-queue 队列中
        channel.queueBind("routing-info-queue", "Routing-exchange", "info");
        channel.queueBind("routing-error-queue", "Routing-exchange", "error");

        //参数1: 交换机名称 没有就是默认 ""
        // 参数2 :队列名称  或者是 消息类型信息 真的会路由规则进行匹配
        // 参数3: 指定传递的消息所携带的properties,使用null。
        // 参数4:消息

        // 向队列 发送消息
        // 发送10条数据 每个消费者得到5条数据
        for (int i = 0; i < 10; i++) {

            // 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange
            //参数1: "pubsub-exchange" 交换机名称
            //参数2: ""  路由规则

            if (i%2==1){// 奇数 info
                channel.basicPublish("Routing-exchange", "info",null,("routing--i:" +i).getBytes());

            }else{ // 偶数 error
                channel.basicPublish("Routing-exchange", "error",null,("routing--i:" +i).getBytes());

            }

        }


        channel.close();

    }



    @After// 在@Test 之后进行 ,是数据销毁
    public void destroy() throws IOException {

        connection.close();
    }

}

 

5.8 Topic 主题模式

package com.qfedu.test;

import com.qfedu.utils.RabbitMqUtils;
import com.rabbitmq.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class TopicTest {

    private Connection connection;


    @Before // 在@Test 之前调用初始化数据
    public void init() throws IOException, TimeoutException {

        connection = RabbitMqUtils.getConnection();
    }


    
    @Test // 进行单元测试
    public void consumer1Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("topic-queue-1",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者1  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("topic-queue-1", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    @Test // 进行单元测试
    public void consumer2Test() throws IOException {

        // channel 管道 连接 消费者和队列
        final Channel channel = connection.createChannel();

        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        // chanel 和 队列绑定
        channel.queueDeclare("topic-queue-2",true,false,false,null);

        // 抱着每次消费者 消费一条数据
        channel.basicQos(1);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 从队列中获取消息  并处理器
                System.out.println("消费者2  接受到消息:"+new String(body,"utf-8") );



                // 所有的业务都完成之后 可以手动的ack
                // envelope.getDeliveryTag() // 消息标记 0  1  2
                // false ack 之后 不删除
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // chanel 和 消费者绑定
        // 参数1  队列名称
        // 参数2  是否 自动ack 就是消费者给队列发送确认消息,否则队列会再次发送
//                  false 需要手动ack
        // 参数3  消费者
        channel.basicConsume("topic-queue-2", false,consumer);


        // 让程序一致卡在这里 消费者 可以一致消费消息
        System.in.read();// 等待客户端命令行 的输入

    }

    
    @Test
    public void publishTest() throws IOException, TimeoutException {

        Channel channel = connection.createChannel();


        //将 chanel 和 自定义的交换机 绑定 "pubsub-exchange"
        //参数1: exchange的名称
        //参数2: 指定exchange的类型  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
        // DIRECT - Routing 交换机 会将消息发送到 所有的队列中
        channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);

        //参数1 队列名
        //参数2 交换机名
        //参数3  路由规则
        // 所有消息为 info 的消息都会 由Routing-exchange发送到topic-queue-1 队列中
        channel.queueBind("topic-queue-1", "topic-exchange", "*.orange.*");
        channel.queueBind("topic-queue-2", "topic-exchange", "big.*.*");

        //参数1: 交换机名称 没有就是默认 ""
        // 参数2 :队列名称  或者是 消息类型信息 真的会路由规则进行匹配
        // 参数3: 指定传递的消息所携带的properties,使用null。
        // 参数4:消息

        // 向队列 发送消息
        // 发送10条数据 每个消费者得到5条数据
        for (int i = 0; i < 10; i++) {

            // 消息没有发送到默认的交换机 ,而是发送到 自定义交换机pubsub-exchange
            //参数1: "pubsub-exchange" 交换机名称
            //参数2: ""  路由规则

            if (i%2==1){// 奇数 orange
                channel.basicPublish("topic-exchange", "xxxasdasd.orange.xfsdf",null,("topic--i:" +i).getBytes());

            }else{ // 偶数 error
                channel.basicPublish("topic-exchange", "big.xxxx.uii",null,("routing--i:" +i).getBytes());

            }

        }


        channel.close();

    }



    @After// 在@Test 之后进行 ,是数据销毁
    public void destroy() throws IOException {

        connection.close();
    }

}

六、RabbitMQ整合SpringBoot【重点】

1、创建springboot 工程并导入依赖

    

    
        org.springframework.boot
        spring-boot-starter-parent
        2.2.6.RELEASE
         
    

    
        
            org.springframework.boot
            spring-boot-starter-amqp
        

        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
            
                
                    org.junit.vintage
                    junit-vintage-engine
                
            
        

        
            junit
            junit
            4.12
            
        


    

2、 编写配置文件

spring.rabbitmq.host=192.168.12.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

3.启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitApplication {


    public static void main(String[] args) {

        SpringApplication.run(RabbitApplication.class,args);
    }



}

4.装配 exchange、queue

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfig {


    @Bean
    public TopicExchange topicExchange(){

        //String name,  交换机名字
        // boolean durable,  是否持久化
        // boolean autoDelete 是否自动删除
        TopicExchange topicExchange = new TopicExchange("springboot-topic-exchange", true,false);

        return  topicExchange;
    }

    @Bean
    public Queue queue(){
        //String name, 队列 名字
        // boolean durable,   是否持久化
        // boolean exclusive,  是否排外
        // boolean autoDelete,  是否自动删除
        // @Nullable Map arguments 其他属性
        Queue queue = new Queue("springboot-queue", true,false,false,null);

        return queue;

    }

    
    @Bean
    public Binding binding(TopicExchange topicExchange,Queue queue){

        // 将交换接和 队列绑定 并匹配 *.java2109.*
        Binding binding = BindingBuilder.bind(queue).to(topicExchange).with("*.java2109.*");


        return binding;
    }



}

5、配置消费者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer {



    // 该方法作为消费者 接收 中的消息
    @RabbitListener(queues = "springboot-queue")
    public void consumer1(String msg, Channel channel, Message message){

        System.out.println("消费者 得到   msg = " + msg);
        System.out.println("msg = " + message);

    }


}

6、单元测试

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;


@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqTest {

    // rabbitTemplate 就是连接 rabbit 的客户端
    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Test
    public void publishTest(){


//        String var1, 交换机
//        String var2, 路由规则
//        Object var3 发送的数据
        rabbitTemplate.convertAndSend("springboot-topic-exchange", "xxxx.java2109.sdfsd","下课么?");
        System.out.println("发送消息 ");
    }

}
 

 

七、 消息的高可靠 7.1 手动ack

手动ack 保证消费者正确消费消息,执行相关的业务

1.在配置文件开启手动ack

spring.rabbitmq.listener.simple.acknowledge-mode=manual

2.消费者手动ack

    // 该方法作为消费者 接收 中的消息
    @RabbitListener(queues = "springboot-queue")
    public void consumer1(String msg, Channel channel, Message message) throws IOException {

        System.out.println("消费者 得到   msg = " + msg);
        System.out.println("msg = " + message);

        // 此时发生异常 不可以运行到手动 ack,  此时消息 会发送多次
        int i = 1/0;

        // 手动ack
        // 将该消息的序号 ack message.getMessageProperties().getDeliveryTag()
        // 多条消息是否一起ack   false
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }

 

7.2 confirm 机制 & return 机制

保证生产者 发送的消息,确保可以到达queue 磁盘存储

confirm 机制 :发生在 生产者 和交换机之间

return 机制:发生在交换机 和 队列之间

1.配置开启confirm return 机制

spring.rabbitmq.publisher-/confirm/i-type=simple
spring.rabbitmq.publisher-returns=true

2.配置confirm return 接口

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class ConfirmReturnCallBack implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnCallback {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 在容器中 加入该bean 会调用 该方法( init 方法 , @PostConstruct 标记方法)
    @PostConstruct // 相当 bean.xml 中 
    public void init(){
        // 配置 rabbitTemplate 客户端的回调信息
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    // confirm 机制
    @Override
    public void confirm(CorrelationData correlationData, boolean /confirm/i, String s) {

        System.out.println("correlationData = " + correlationData);
        System.out.println("s = " + s);


        if (/confirm/i){
            System.out.println( "该消息到达 交换机");
        }


    }

    // return 机制  ,一般情况下 不会回调,只有在交换机的消息不能写入队列才会调用
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {

        // 如果失败 得到message 消息,否则 不调用
        System.out.println("message = " + message);

    }
}

3.发送消息 并测试

八、消息不重复

消息不重复:就是在某些情况下,消费者发生异常(没有手动ack成功),造成消息的重复发送,此时消费者 会再次处理该消息,就会造成 重复消费

重复消费场景:重复扣款,重复订单 都成为不满足幂等

幂等:请求只处理一次

 redis 中 setnx如果没有key 则设置成功,并返回true

如果有key,则不做任何操作,返回false

我们处理消息不重复的思路:

1.我们为每一个消息标记一个唯一id 在生产发送消息时

2.在消费者 处理消息时,首先判断该消息id 是否已经被 标记,

如果标记说明已经被执行过,就不需要再次执行

如果没有标记说明没有被执行过,执行业务

1.引入redis


    org.springframework.boot
    spring-boot-starter-data-redis

2.配置redis

#配置redis 
spring.redis.host=47.94.137.100
spring.redis.port=6379

3.在发送消息时配置唯一 的 id

 @Test
    public void publishTest(){

        // 创建一个唯一id
        CorrelationData correlationData =  new CorrelationData(UUID.randomUUID().toString())
;
//        String var1, 交换机
//        String var2, 路由规则
//        Object var3 发送的数据
        rabbitTemplate.convertAndSend("springboot-topic-exchange", "xxxx.java2109.sdfsd","下课么?"
                // 发送该消息 并配置唯一id
                + System.currentTimeMillis(),correlationData);
        System.out.println("发送消息 ");
    }

4.消费者处理消息

package com.qfedu.config;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


@Component
public class Consumer {

    @Autowired
    private StringRedisTemplate redisTemplate;

    // 该方法作为消费者 接收 中的消息
    @RabbitListener(queues = "springboot-queue")
    public void consumer1(String msg, Channel channel, Message message) throws IOException {

        System.out.println("消费者 得到   msg = " + msg);
        System.out.println("msg = " + message);

        // 得到处理消息的唯一id
        String id = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");

        System.out.println("id = " + id);


        // 如果能设置成功说明 该消息没有被处理过
        // 该 id 值为0 代表 正在处理       1代表处理完成
        if (redisTemplate.opsForValue().setIfAbsent(id,"0", 10, TimeUnit.SECONDS)){

            System.out.println("消费者 处理该消息业务   msg = " + msg);

            //处理完之后: 1.redis 修改处理完成状态  2.手动ack
//            1.redis 修改处理完成状态
            redisTemplate.opsForValue().set(id,"1", 10, TimeUnit.SECONDS);


            // 手动ack
            // 将该消息的序号 ack message.getMessageProperties().getDeliveryTag()
            // 多条消息是否一起ack   false
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);


        }else {
            // 如果不能设置key   说明 已经有消费者在处理

            if (redisTemplate.opsForValue().get(id).equals("1")){// 如果状态是 1说明业务已经完成,只是没有ack 而已

                // 手动ack
                // 将该消息的序号 ack message.getMessageProperties().getDeliveryTag()
                // 多条消息是否一起ack   false
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }

        }





    }


}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/601226.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号