栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ带你入坑,不能再简单了

RabbitMQ带你入坑,不能再简单了

文章目录
  • 1.RabbitMQ入门玩耍
    • 1.安装
    • 2.RabbitMQ的几大模式
    • 3.动手兄弟们,冲
      • 准备工作:
      • 1.Hello World模式
        • 生产者代码:
        • 消费者代码:
      • 2.Work Queues模式
        • 工具类代码如下:
        • 两个消费者代码
        • 生产者代码
        • 能者多劳的消费者:
      • 3.Publish/Subscrible(发布订阅)模式
        • 生产者:
        • 消费者:
      • 4.Routing模式
        • 生产者:
        • 消费者:
      • 5.Topics模式
        • 生产者:
        • 消费者:
  • 2.Spring 如何使用RabbitMQ
      • 1.创建项目
      • 2.依赖(完全可以创建的时候勾选上)
      • 3.配置
      • 4.代码
        • HelloWorld模型:
        • WorkQueues模式:
        • Publish/Subscrible模式
        • Routing模式:
        • Topics模式:
  • 3.RabbitMQ集群

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang(面向并发、函数式编程)语言编写的

1.RabbitMQ入门玩耍

官网下载:https://rabbitmq.com/download.html

官网7种模式介绍:https://www.rabbitmq.com/getstarted.html

1.安装

直接Docker,(后面会写linux服务器安装RabbitMQ,时间嘛,就说不准了 )洛

官网也写了docker安装运行的命令,复制,cmd命令行运行即可(前提安装了docker)

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

图中可以看到RabbitMQ的http端口是15672,所以浏览器访问一下:

账号和密码都是:guest,它拥有管理员的权限

暂时就知道这些

2.RabbitMQ的几大模式

咱们下面一个一个来

3.动手兄弟们,冲 准备工作:

新建一个项目,不多说

引入依赖:


    com.rabbitmq
    amqp-client
    5.13.1

在开始代码之前,我们需要去创建一个虚拟主机:

(虚拟主机相当于数据库,不同的项目应有自己的数据库,虚拟主机就可以区分同一台RabbitMQ主机的不同项目)

如果你要创建用户(了解):

如果创建的虚拟主机权限赋给该用户(了解):

下面开始:

1.Hello World模式

可以看到:这种模式就是一个生产者生产消息放到队列,消费者直接从队列里消费消息,并没有涉及到交换机(其实使用了默认交换机)。其缺点就是容易产生消息堆积

生产者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
		
        //设置RabbitMQ的主机,ip地址(cmd中ipconfig命令最后一块ipv4就是)
        connectionFactory.setHost("192.168.1.3");
        //设置端口,5672不改
        connectionFactory.setPort(5672);
        //设置连接哪个虚拟主机
        connectionFactory.setVirtualHost("/study");
        //用户名
        connectionFactory.setUsername("guest");
        //密码
        connectionFactory.setPassword("guest");


        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //创建连接通道
        Channel channel = connection.createChannel();
        //通道绑定消息队列
        channel.queueDeclare("hello",false,false,false,null);

        //发布消息
        channel.basicPublish("","hello",null,"HelloWorld".getBytes());
    }
}

参数说明:

如果队列要持久化:durable设置为true,不然rabbitmq重启队列就消失

如果队列的消息也要持久化:发布消息时需要将props写为如下内容,就是持久化消息

运行:查看web管理界面的队列:

可以看到运行了两次,页面中确实存在了hello队列,并且有两条未被消费的消息

消费者代码:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Constumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        //设置RabbitMQ的主机
        connectionFactory.setHost("192.168.1.3");
        //设置端口,5672
        connectionFactory.setPort(5672);
        //设置连接哪个虚拟主机
        connectionFactory.setVirtualHost("/study");
        //用户名
        connectionFactory.setUsername("guest");
        //密码
        connectionFactory.setPassword("guest");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //创建连接通道
        Channel channel = connection.createChannel();
        //通道绑定消息队列
        channel.queueDeclare("hello",false,false,false,null);

        //消费消息
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println(new String(body));
            }
        });
    }
}

现在我的hello队列里有一条消息:

启动Consumer:

消息被消费,控制台也打印出了消息内容

2.Work Queues模式

通过上面这张图片我们可以看出:它和helloworld模式区别不大,只是消费者不再是一个。

为了方便,我创建了工具类,因为代码大部分都是重复的。

工具类代码如下:
public class ConnUtil {
    public static Connection getConn() throws IOException, TimeoutException {
        //创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        //设置RabbitMQ的主机
        connectionFactory.setHost("192.168.1.3");
        //设置端口,5672
        connectionFactory.setPort(5672);
        //设置连接哪个虚拟主机
        connectionFactory.setVirtualHost("/study");
        //用户名
        connectionFactory.setUsername("guest");
        //密码
        connectionFactory.setPassword("guest");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        return connection;
    }
    public static void close(Channel channel, Connection connection) throws IOException, TimeoutException {
        channel.close();
        connection.close();
    }
}

两个消费者代码

消费者1:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取通道
        Channel channel = ConnUtil.getChannel();
        //绑定队列
        channel.queueDeclare("workqueue",false,false,true,null);
        //消费消息
        channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println(new String(body));
            }
        });

    }
}

消费者2:代码一样

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取通道
        Channel channel = ConnUtil.getChannel();
        //绑定队列
        channel.queueDeclare("workqueue",false,false,true,null);
        //消费消息
        channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println(new String(body));
            }
        });

    }
}

先将两个消费者跑起来

生产者代码
public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection conn = ConnUtil.getConn();
        //获取通道
        Channel channel = conn.createChannel();
        //绑定队列
        channel.queueDeclare("workqueue",false,false,true,null);
        //发送50条消息
        for (int i = 1; i <=20; i++) {
            channel.basicPublish("","workqueue",null,("这是消息"+i).getBytes());
        }
        
        //释放资源
        ConnUtil.close(channel,conn);

    }
}

生产者跑起来:

可以看到,消费者一和消费者二是平均分配消息的,一人一半。

这种平均分配是会出现问题的,当其中一个消费者处理速度很慢,那么会造成总的处理速度下降,我另一个消费者早就消费完了,而你才消费几个,为什么处理快的和处理慢的要平均分呢?。所以能者多劳才是最好的分配方式。

在说如何实现能者多劳之前,我们需要知道的是RabbitMQ的消息确认机制,

就是上面这个参数,这里我们是开启了自动确认,所谓自动确认是指消费者拿到消息后就会向队列确认,不管消息有没有被消费掉。而且上面的代码中我们并没有设置消费者一次从队列中拿几条消息,没有设置RabbitMQ就会一人一半分好,然后一次性全部拿给消费者。可以想象,一次性拿给消费者,自动确认的情况下,如果消费者执行了一半挂了,后面的消息都没有被消费,而你在拿到所有消息的时候就自动的向队列确认了-----“所有的消息都执行了”,这就出大问题!

所以要实现能者多劳,我们需要对上面代码做如下修改:

  1. 消费者一次从队列拿一条消息
  2. 关闭自动确认
  3. 在消费者的回调中,手动消息确认
能者多劳的消费者:

消费者1(消费快):

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取通道
        Channel channel = ConnUtil.getChannel();
        //绑定队列
        channel.queueDeclare("workqueue",false,false,true,null);
        //一次拿一条消息
        channel.basicQos(1);
        //消费消息
        channel.basicConsume("workqueue",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println(new String(body));
                //消费完成手动确认消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });

    }
}

消费者2(消费慢):

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取通道
        Channel channel = ConnUtil.getChannel();
        //绑定队列
        channel.queueDeclare("workqueue",false,false,true,null);
        //一次拿一条消息
        channel.basicQos(1);
        //消费消息
        channel.basicConsume("workqueue",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                //模拟该消费者处理慢,睡1s
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(new String(body));
				//消费完成手动确认消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });

    }
}

这里为了模拟消费速度慢,让它每次睡一秒

重新运行:先跑两个消费者,再跑生产者

可以看到因为消费者2要睡一秒,所以消费者1会处理更多的消息,这就是能者多劳模式

3.Publish/Subscrible(发布订阅)模式

x代表交换机,该模式需要明确指定使用哪个交换机,我们之前的交换机都是写的"",这其实是使用了默认交换机,该模式的大致流程:

  1. 生成者将消息发送到交换机
  2. 交换机可绑定多个临时队列
  3. 每个队列一个消费者

该模式正如其名,一个消息可以发布给很多消费者,只要消费者绑定的队列与该交换机绑定。

生产者:
public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection conn = ConnUtil.getConn();
        //创建通道
        Channel channel = conn.createChannel();
        //通道绑定交换机,没有会自己创建
        channel.exchangeDeclare("publish","fanout");
        //发送消息
        channel.basicPublish("publish","",null,"这是发布订阅模式".getBytes());
    }
}

在运行之前,我们先打开管理界面看看RabbitMQ默认有哪些交换机,一共有7个交换机,分别对应7中模式,而且可以看出,每个虚拟主机都有自己的7个交换机。但是这些自带的好像不能用,需要自己创建

这里的fanout交换机类型就是发布订阅需要使用的交换机类型

运行一下:

消费者:
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接对象
        Connection conn = ConnUtil.getConn();
        //创建信道
        Channel channel = conn.createChannel();
        //信道绑定交换机
        channel.exchangeDeclare("publish","fanout");
        //创建临时队列,这种方式能创建临时的队列
        String queue = channel.queueDeclare().getQueue();
        //临时队列与交换机绑定
        channel.queueBind(queue,"publish","");
		//一次拿一条消息
        channel.basicQos(1);
        //消费消息
        channel.basicConsume(queue,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

将此消费者复制两个即可,代码一样

为了更直观的看到结果,我将生产者停止运行,管理界面删掉了我创建的publish的交换机,

先启动两个消费者,在启动生产者:

可以看到,两个消费者都拿到了同一条消息

看看队列管理界面

所谓临时队列,就是消费者断开连接后,队列就自动销毁,你可以试试现在关闭一个消费者,看看临时队列的变化。

4.Routing模式

Routing模式可以理解为细化或者加强版的发布订阅模式,我们知道发布订阅模式是生产者发送一个消息,所有绑定的消费者都能接收到该消息,而Routing模式,加上了路由筛选,临时队列与交换机绑定时,需要指定哪些路由能发送到该队列,生产者发送消息时,也会指定该消息要发送的路由。只有绑定了这个路由的队列才会接收到消息

使用Routing模式,我们需要将交换机的类型设置为direct

生产者:

指定接收路由为vip

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection conn = ConnUtil.getConn();
        //获取信道
        Channel channel = conn.createChannel();
        //信道绑定交换机
        channel.exchangeDeclare("routing","direct");
        //发送消息,,routingkey必须写
        channel.basicPublish("routing","vip",null,"这条消息vip才能看到".getBytes());
    }
}
消费者:

消费者1:路由设置为vip

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection conn = ConnUtil.getConn();
        //获取信道
        Channel channel = conn.createChannel();
        //信道绑定交换机
        channel.exchangeDeclare("routing","direct");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //对列与交换机绑定
        channel.queueBind(queue,"routing","vip");
        //一次拿一条消息
        channel.basicQos(1);
        //消费
        channel.basicConsume(queue,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消费者2:路由设置为notvip

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection conn = ConnUtil.getConn();
        //获取信道
        Channel channel = conn.createChannel();
        //信道绑定交换机
        channel.exchangeDeclare("routing","direct");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //队列与交换机绑定
        channel.queueBind(queue,"routing","notvip");
        //一次拿一条消息
        channel.basicQos(1);
        //消费
        channel.basicConsume(queue,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

先运行消费者,再运行生产者:

的确只有设置了路由为vip的消费者1拿到了消息。

现在我们改一下消费者2,给它也加上vip的路由:

如下即可:

重跑消费者2和生产者,效果:

其实就是在发布订阅的模式上,生产者发送消息必须指定路由,消费者所在队列绑定交换机时必须指明接收的路由。很简单

5.Topics模式

Topics这个模型更简单,它是基于Routing模式,相当于动态路由,交换机类型必须是topic,它可以使用#和*进行统配:

#:普配多个

*:匹配一个

Topics的路由要求我们使用点点点的写法:test.a.b,这样。

下面我的生产者的发送路由为:test.c.b

消费者队列的路由一个设置为test.*.b ,另一个为test.#

生产者:
public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection conn = ConnUtil.getConn();
        //获取信道
        Channel channel = conn.createChannel();
        //信道绑定交换机
        channel.exchangeDeclare("topic","topic");
        //发送消息,,routingkey必须写
        channel.basicPublish("topic","test.c.b",null,"Topics模式".getBytes());
    }
}
消费者:

消费者1:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection conn = ConnUtil.getConn();
        //获取信道
        Channel channel = conn.createChannel();
        //信道绑定交换机
        channel.exchangeDeclare("topic","topic");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //信道绑定队列
        channel.queueBind(queue,"topic","test.*.b");
        //一次拿一条消息
        channel.basicQos(1);
        //消费
        channel.basicConsume(queue,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消费者2:

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection conn = ConnUtil.getConn();
        //获取信道
        Channel channel = conn.createChannel();
        //信道绑定交换机
        channel.exchangeDeclare("topic","topic");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //信道绑定队列
        channel.queueBind(queue,"topic","test.#");
        //一次拿一条消息
        channel.basicQos(1);
        //消费
        channel.basicConsume(queue,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println(new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

结果,

这样发送,两个消费者都能匹配到该消息:

下面,我们改一下,生产者的路由为:test.c

思考一下,哪个消费者能收到消息?

消费者2.

的确如此。

以上就是常用的模式了。

2.Spring 如何使用RabbitMQ 1.创建项目 2.依赖(完全可以创建的时候勾选上)

    org.springframework.boot
    spring-boot-starter-amqp


    org.springframework.boot
    spring-boot-starter-web

3.配置
spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    virtual-host: /study
    port: 5672
4.代码

Spring为我们提供了一个RabbitMQ的模板类,同redis那些一样,再配上一些注解就可以了

HelloWorld模型:

生产者测试方法:(因为必须启动Spring,所以只能写在测试类中)

@SpringBootTest
public class SpringRabbitMQTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void helloWorld(){
        //发送消息就这么简单
        rabbitTemplate.convertAndSend("spring","Spring+RabbitMQ");
    }
}

消费者:(创建一个类即可)

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "spring",durable = "false",exclusive = "false"))
public class Consumer {

    @RabbitHandler
    public void handler(String message){
        System.out.println(message);
    }
}

运行生产者测试类:

可以看到,消费者确实拿到了消息

队列也创建了

WorkQueues模式:

生产者测试方法:

@Test
public void workQueues(){
//WorkQueues模式
for (int i = 0; i < 10; i++) {
     rabbitTemplate.convertAndSend("work","这是Spring,WorkQueues模式");
     }
 }

消费者:@RabbitListener也能作用于方法,同样表明这是一个消费者,此时@RabbitHandler不写了

@Component
public class Consumer {
    //消费者一,没错@RabbitListener也能作用于方法,表明这是一个消费者
    @RabbitListener(queuesToDeclare = @Queue(value = "work"))
    public void consumer1(String message){
        System.out.println("consumer1:"+message);
    }

    //消费者二
    @RabbitListener(queuesToDeclare = @Queue(value = "work"))
    public void consumer2(String message){
        System.out.println("consumer2:"+message);
    }
}

运行生产者测试方法:

能者多劳的WorkQueues需要一些额外的配置

Publish/Subscrible模式

生产者测试方法:

//发布订阅模式
@Test
public void Publish(){
rabbitTemplate.convertAndSend("SpringPublish","publish","这是SpringRabbitMQ的发布订阅模式");
}

消费者类:

@Component
public class PublishConsumer {
    
    //消费者1
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringPublish",type = "fanout"))
    })
    public void consumer1(String message){
        System.out.println("消费者1:"+message);
    }

    
    //消费者2
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringPublish",type = "fanout"))
    })
    public void consumer2(String message){
        System.out.println("消费者2:"+message);
    }
}

运行生产者测试方法:

Routing模式:

生产者测试方法:

//路由模式
@Test
public void routing(){
rabbitTemplate.convertAndSend("SpringRouting","svip9","这是SpringRabbitMQ的Routing模式,svip9可见");
}

消费者类:

@Component
public class RoutingConsumer {
    //消费者1
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringRouting",type = "direct"),
                    key = {"svip1","svip2","svip3"})
    })
    public void consumer1(String message){
        System.out.println("消费者1:"+message);
    }


    //消费者2
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringRouting",type = "direct"),
                    key = {"svip7","svip8","svip9"})
    })
    public void consumer2(String message){
        System.out.println("消费者2:"+message);
    }
}

运行:

Topics模式:

生产者测试类:

//Topics模式
@Test
public void topics(){
rabbitTemplate.convertAndSend("SpringTopic","hao.fan","这是SpringRabbitMQ的Topics模式");
}

消费者类:

@Component
public class TopicsConsumer {

    //消费者1
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringTopic",type = "topic"),
                    key = {"woc.*","hao.#"})
    })
    public void consumer1(String message){
        System.out.println("消费者1:"+message);
    }


    //消费者2
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,exchange = @Exchange(value = "SpringTopic",type = "topic"),
                    key = {"A.B.#","C.*.A"})
    })
    public void consumer2(String message){
        System.out.println("消费者2:"+message);
    }
}

运行一下测试类:

3.RabbitMQ集群

待更新。。。。。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354908.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号