栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ入门学习

RabbitMQ入门学习

RabbitMQ入门学习
      • 一.RabbitMQ安装
      • 二.SpringAMQP
        • 1.Basic Queue 简单队列模型
        • 2.Work Queue 工作队列模型
        • 3.发布订阅模型
          • 3.1 FanoutExchange(广播交换机)
          • 3.2 DirectExchange(路由交换机)
          • 3.3 TopicExchange(话题交换机)

一.RabbitMQ安装

①拉取RabbitMQ镜像 docker pull rabbitmq

②运行安装命令
15672是图形化界面的端口号
docker run
-e RABBITMQ_DEFAULT_USER=itcast
-e RABBITMQ_DEFAULT_PASS=123321
–name mq
–hostname mq1
-p 15672:15672
-p 5672:5672
-d
rabbitmq
③安装图形化界面的插件

  • docker exec -it mq容器的id /bin/bash
    例如:docker exec -it ddfbc46c13e7 /bin/bash
  • rabbitmq-plugins enable rabbitmq_management

④登录可视化界面 IP:15672

输入账户名itcast 密码123321

二.SpringAMQP 1.Basic Queue 简单队列模型

①创建一个普通的maven项目,作为父工程,父项目引入相关依赖。



    4.0.0

    org.example
    RabbitMQ
    pom
    1.0-SNAPSHOT
    
        publisher
        consumer
    

    
        org.springframework.boot
        spring-boot-starter-parent
        2.4.12
        
    

    
        8
        8
    

    
        
            org.springframework.boot
            spring-boot-starter-amqp
            2.4.12
        
        
        
            org.springframework.boot
            spring-boot-starter-test
        
        
            com.fasterxml.jackson.core
            jackson-databind
        
    


②创建微服务消息发布者publisher

  • 编写启动类,在application.yml进行配置
spring:
  rabbitmq:
    host: 192.168.10.128
    port: 5672
    virtual-host: /
    username: itcast
    password: 123321
  • 编写测试类SpringAmqpTest
package com.gzhu;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

在rabbitmq的可视化界面手动创建一个消息队列

  • 启动测试类,可以看到消息已进入队列
    ③消费者consumer
  • 编写启动类,在application.yml配置
spring:
  rabbitmq:
    host: 192.168.10.128
    port: 5672
    virtual-host: /
    username: itcast
    password: 123321
  • 编写消费者
package com.gzhu.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息 :【" + msg + "】"); }
}
  • 启动,可以发现控制台接收到了消息,而且simple.queue队列中的消息也没了

2.Work Queue 工作队列模型

  • 测试类,模拟消息发布者在1秒内发送了50条消息
@Test
public void testWorkQueue() throws InterruptedException {
    String queueName = "work.queue";
    String message = "hello, I am _";
    for (int i = 0; i < 50; i++) {
        rabbitTemplate.convertAndSend(queueName, message+ i);
        Thread.sleep(20);
    }
}
  • 设置两个消费者,两者处理能力不同
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage(String msg) throws InterruptedException {
    System.out.println("spring 消费者1接收到消息 :【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
    System.err.println("spring 消费者2接收到消息 :【" + msg + "】"+ LocalTime.now());
    Thread.sleep(200);
}

在application.yml中配置如下信息,因为两者的处理能力不同,所以设置消费者预取消息数量为1,只有处理完当前的消息,才能获取下一条消息,否则AMQP默认情况两个消费者会平均读取消息才处理

spring:
  rabbitmq:
    host: 192.168.10.128
    port: 5672
    virtual-host: /
    username: itcast
    password: 123321
    listener:
      simple:
        prefetch: 1 # 只能预先取得1条消息,处理完成才可以接下一条
  • 先启动消费者,再启动测试类,消息在1秒左右的时间被处理完成了
3.发布订阅模型

交换机的作用

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失

声明队列、交换机、绑定关系的Bean

  • Queue
  • FanoutExchange
  • Binding

3.1 FanoutExchange(广播交换机)

FanoutExchange的会将消息路由到每个绑定的队列

1.在consumer服务中,编写一个配置类,利用代码声明队列、交换机,并将两者绑定

package com.gzhu.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("gzhu.fanout");
    }
    //队列1
    @Bean
    public Queue fanouQueue1(){
        return new Queue("fanout.queue1");
    }
    //绑定关系1
        @Bean
        public Binding fanoutBinding1(Queue fanouQueue1, FanoutExchange fanoutExchange){
            return BindingBuilder
                    .bind(fanouQueue1)
                    .to(fanoutExchange);
    }
    //队列2
    @Bean
    public Queue fanouQueue2(){
        return new Queue("fanout.queue2");
    }
    //绑定关系2
    @Bean
    public Binding fanoutBinding2(Queue fanouQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanouQueue2)
                .to(fanoutExchange);
    }
}

-去控制台查看交换机信息,但在这遇到了500的错误码,解决方法:
链接: https://blog.csdn.net/shentian885/article/details/120905570?spm=1001.2101.3001.6661.1&utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link

解决问题后,可以看到交换机和队列绑定在了一起

2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@RabbitListener(queues = "fanout.queue1")
public void listenWorkQueueMessage(String msg) throws InterruptedException {
    System.out.println("fanout.queue1接收到消息 :【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
    System.err.println("fanout.queue2接收到消息 :【" + msg + "】");
}

3.在publisher中编写测试方法,向gzhu.fanout交换机发送消息

@Test
public void testSendFanoutExchange(){
    //交换机名称写死
    String exchangeName = "gzhu.fanout";
    //消息
    String message = "hello, baby!!";
    //发送消息
    rabbitTemplate.convertAndSend(exchangeName,"",message);
}

4.启动消费者,启动测试类,可以看到每一个消费者都接收到了消息

3.2 DirectExchange(路由交换机)

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)

1.使用注解的方式在consumer消费者服务中,将消费者与队列、路由机绑定

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "gzhu.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}))
        public void listenDirectQueue1(String msg){
        System.out.println("消费者1接收到Direct消息:【"+msg+"】");}
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "gzhu.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}))public void listenDirectQueue2(String msg){
        System.out.println("消费者2接收到Direct消息:【"+msg+"】 ");
        }

2.消息发布者测试类发布消息

@Test
    public void testDirectExchange() {
        // 队列名称
        String exchangeName = "gzhu.direct";
        // 消息
        String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
        // 发送消息,参数依次为:交换机名称,RoutingKey,消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }

3.启动消费者,再启动发布者测试类

3.3 TopicExchange(话题交换机)

Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词

1.编写消费者

@RabbitListener(bindings = @QueueBinding(
       value = @Queue(name = "topic.queue1"),
       exchange = @Exchange(name = "gzhu.topic", type = ExchangeTypes.TOPIC),
       key = "china.#"
))
public void listenTopicQueue1(String msg){
   System.out.println("消费者1接收到Topic消息:【"+msg+"】");}

@RabbitListener(bindings = @QueueBinding(
       value = @Queue(name = "topic.queue2"),
       exchange = @Exchange(name = "gzhu.topic", type = ExchangeTypes.TOPIC),
       key = "japan.#"
))
public void listenTopicQueue2(String msg){
   System.out.println("消费者2接收到Topic消息:【"+msg+"】");}

2.发布者测试类

@Test
    public void testTopicExchange() {
        // 队列名称
        String exchangeName = "gzhu.topic";
        // 消息
        String message = "祖国统一,收复台湾 !";
        // 发送消息,参数依次为:交换机名称,RoutingKey,消息
        rabbitTemplate.convertAndSend(exchangeName, "china.song", message);
    }

3.启动消费者,再运行测试类

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/443636.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号