栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Rabbitmq消息中间件安装以及springboot整合使用

Rabbitmq消息中间件安装以及springboot整合使用

1.Linux下通过docker安装RabbitMQ 1.1准备docker环境

1.查看是否已经安装docker
yum list installed | grep docker
我的已经安装 未安装的请往下走2.安装docker
yum install docker -y
3.启动docker
systemctl start docker
此时docker环境准备完毕 请确保服务器有网的情况下 使用该方式安装

1.2使用docker安装RabbitmQ

1.下载RabbitMQ镜像
docker pull rabbitmq 该方式默认安装最新版本 rabbitmq:7.12.1指定版本

2.查看rabbitmq镜像
docker images

3.启动rabbitmq
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

rabbitmq是下载软件名称
4.查看rabbitmq是否启动
docker ps

5.进入rabbitmq启动插件否则web端不能访问
docker exec -it rabbit /bin/bash

-it后面参数为上方标红名称 即启动服务时通过- -name指定的名称
6.启动rabbitmq管理插件
rabbitmq-plugins enable rabbitmq_management

出现该界面启动成功 注意别把rabbitmq_management中的下划线写成- 否则会找不到插件
7.退出容器
exit
8.查看是否成功
服务器IP:15672

9.通过默认账号guest登录

账号密码都是guest

注意启动完毕后请确认防火墙已经关闭或者已经放行了5672 和 15672端口 阿里云服务器需要去防火墙出添加新的规则 也就是放行端口才可以
关闭防火墙指令systemctl stop firewalld

10.创建自定账号
进入rabbitmq镜像 docker exec -it rabbit /bin/bash
rabbitmqctl add_user 用户名 密码
rabbitmqctl add_user test test

设置用户的角色 rabbitmqctl set_user_tags 账号 角色
rabbitmqctl set_user_tags test administrator 此处设置为管理员

设置用户对vhost的权限 安装成功后有一个默认的vhost / 可以自己选择是否创建新的vhost
rabbitmqctl set_permissions -p / test ‘.’ '.’ ‘.*’

/是vhost名称 test是用户名 后面需要跟上一个空格 每个’.星’都需要使用空格分隔 否则会报错
为添加vhost
rabbitmqctl add_vhost /demohost

1.3额外补充延迟队列插件
*该插件用于实现延迟队列时使用

1.使用wget下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.23/rabbitmq_delayed_message_exchange-3.8.23ez

2.文件下载后就在当前用户的文件夹下 通过ls查看 该目录下文件 3.9.7可以根据自己版本自己修改版本号

3.将下载的插件复制到rabbitmq容器中
docker cp 文件路径 docker容器id:容器中的路径
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 982b190bf852:/plugins
*容器id可以通过docker ps 查看 找到ConTAINER ID这个的值即是容器id

4.进入rabbitmq容器启动该插件
进入容器 docker exec -it rabbit /bin/bash 
启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

*wget 不可以使用时  使用 yum install -y wget 下载安装即可  请确保联网
2.springboot整个RabbitMQ 2.1maven依赖
	
       org.springframework.boot
       spring-boot-starter-amqp
    
2.2配置文件
spring:
  rabbitmq:
    host: 120.79.15.65#mq安装服务器地址
    port: 5672 #端口 别写成15672 15672是web访问端口 此处连接需要的是5672
    username: test#账号
    password: 123456 #密码
    listener:
      simple:
        acknowledge-mode: manual #手动确认消息
        concurrency: 10#消费者数量
        max-concurrency: 20#最大消费者数量
        prefetch: 1 #消费者每次接收的消息数量
        #以下两个参数 在生产者发送消息到交换机需要确认时使用
    publisher-/confirm/i-type: correlated #
    publisher-returns: true
2.3RabbitMQ配置类
package com.eyu.rpcrabbitdemo.config;

import com.eyu.rpcrabbitdemo.Constant.RabbitMqConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqConfig {

    // 通过插件自定义交换机实现自定义延迟队列
    @Bean("pluginsExchange")
    public CustomExchange pluginsExchange(){
        // 设置交换机类型
        Map arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");

        return new CustomExchange(RabbitMqConstant.PLUGINS_EX_NAME,"x-delayed-message",false,false,arguments);
    }

    // 将自定义交换机与队列绑定
    @Bean("pluginsQueue")
    public Queue pluginsQueue(){
        return QueueBuilder.durable(RabbitMqConstant.PLUGINS_QUEUE_NAME).build();
    }

    @Bean
    public Binding pluginsQueueBindingPluginsEx(@Qualifier("pluginsQueue") Queue pluginsQueue,
                                                @Qualifier("pluginsExchange") CustomExchange pluginsEx){
        return BindingBuilder.bind(pluginsQueue).to(pluginsEx).with(RabbitMqConstant.PLUGINS_ROUTING_KEY).noargs();
    }


    // 注入所有的队列
    // 普通队列同时设置
    @Bean("ordQueue")
    public Queue ordQueue(){
        return QueueBuilder.durable(RabbitMqConstant.ORD_QUEUE_NAME).build();
    }

    // 延迟队列 同时设置与死信对列的绑定
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        Map arguments = new HashMap<>();
        // 设置死信交换机
        arguments.put("x-letter-dead-exchange",RabbitMqConstant.DEAD_EX_NAME);
        // 设置发送的key
        arguments.put("x-letter-dead-routing_key", RabbitMqConstant.DEAD_ROUTING_KEY);
        // 设置队列中消息的有效时间 20秒
        arguments.put("x-message-ttl", 20000);

        return QueueBuilder.durable(RabbitMqConstant.TTL_QUEUE_NAMW).withArguments(arguments).build();
    }

    @Bean("deadQueue")
    public Queue deadQueue(){
        return QueueBuilder.durable(RabbitMqConstant.DEAD_QUEUE_NAME).build();
    }


    // 注入所有的交换机
    @Bean("ordExchange")
    public DirectExchange ordExchange(){
        return ExchangeBuilder.directExchange(RabbitMqConstant.ORD_EX_NAME).build();
    }

    @Bean("ttlExchange")
    public DirectExchange ttlExchange(){
        return ExchangeBuilder.directExchange(RabbitMqConstant.TTL_EXCHANGE_NAMW).build();
    }

    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return ExchangeBuilder.directExchange(RabbitMqConstant.DEAD_EX_NAME).build();
    }


    // 绑定交换机和队列
    // 普通交换机
    @Bean
    public Binding ordQueueBindingOrdExchange(@Qualifier("ordQueue") Queue ordQueue,
                                              @Qualifier("ordExchange") DirectExchange ordExchange){
        return BindingBuilder.bind(ordQueue).to(ordExchange).with(RabbitMqConstant.ORD_ROUTING_KEY);
    }

    // 延迟交换机
    @Bean
    public Binding ttlQueueBindingTtlExchange(@Qualifier("ttlQueue") Queue ordQueue,
                                              @Qualifier("ttlExchange") DirectExchange ordExchange){
        return BindingBuilder.bind(ordQueue).to(ordExchange).with(RabbitMqConstant.TTL_ROUTING_KEY_NAMW);
    }

    // 死信交换机
    @Bean
    public Binding deadQueueBindingDeadExchange(@Qualifier("deadQueue") Queue ordQueue,
                                              @Qualifier("deadExchange") DirectExchange ordExchange){
        return BindingBuilder.bind(ordQueue).to(ordExchange).with(RabbitMqConstant.DEAD_ROUTING_KEY);
    }


    // 正常消费的交换机以及队列
    @Bean("consumerQueue")
    public Queue consumerQueue(){
        return QueueBuilder.durable(RabbitMqConstant.CONSUMER_QUEUE_NAME).build();
    }

    @Bean("consumerExchange")
    public DirectExchange consumerExchange(){
        return ExchangeBuilder.directExchange(RabbitMqConstant.CONSUMER_EX_NAME).build();
    }

    @Bean
    public Binding consumerQueueBindingConsumerEx(@Qualifier("consumerQueue") Queue consumerQueue,
                                                  @Qualifier("consumerExchange") DirectExchange consumerExchange){
        return BindingBuilder.bind(consumerQueue).to(consumerExchange).with(RabbitMqConstant.CONSUMER_ROUTING_KEY);
    }

}

// 常量类
package com.eyu.rpcrabbitdemo.Constant;

import org.springframework.stereotype.Component;


@Component
public class RabbitMqConstant {

    // 普通系列
    public static final String ORD_EX_NAME = "ord_exchange";
    public static final String ORD_QUEUE_NAME = "ord_queue";
    public static final String ORD_ROUTING_KEY = "ord_key";

    // 延迟队列
    public static final String TTL_EXCHANGE_NAMW = "ttl_exchange";
    public static final String TTL_QUEUE_NAMW = "ttl_queue";
    public static final String TTL_ROUTING_KEY_NAMW = "ttl_key";

    // 死信队列
    public static final String DEAD_EX_NAME = "dead_ex";
    public static final String DEAD_QUEUE_NAME ="dead_queue";
    public static final String DEAD_ROUTING_KEY = "dead_key";

    //自定义时间延迟队列
    public static final String PLUGINS_EX_NAME ="plugins_ex";
    public static final String PLUGINS_QUEUE_NAME = "plugins_queue";
    public static final String PLUGINS_ROUTING_KEY = "plugins_key";

    // 正常消费系列
    public static final String CONSUMER_EX_NAME = "consumer_ex";
    public static final String CONSUMER_QUEUE_NAME = "consumer_queue";
    public static final String CONSUMER_ROUTING_KEY = "consumer_key";
    
}

// 监听队列 消费消息
@Component
@Slf4j
public class DemoConsumer {

    private static final int time = 10;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 监听正常队列 正常消费
    @RabbitListener(queues = RabbitMqConstant.CONSUMER_QUEUE_NAME)
    public void recevieConsumerQueueMessage(Message message, Channel channel){

        String mesStr = new String(message.getBody());

        log.info("当前时间:{}消费了消息:{}",new Date().toString(),mesStr);

    }

    // 监听死信队列 queues 的值为队列名称
    @RabbitListener(queues = RabbitMqConstant.DEAD_QUEUE_NAME)
    public void recevieDeadQueueMessage(Message message, Channel channel){

        String messStr = new String(message.getBody());

        log.info("消费死信队列中的消息:{}",message);
    }
}
// 发送消息到mq
package com.eyu.rpcrabbitdemo.controller;

import com.eyu.rpcrabbitdemo.Constant.RabbitMqConstant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(value = "/demo")
public class DemoController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/message")
    public String sendMessage(@PathVariable String message){

        CorrelationData correlationData = new CorrelationData();

        rabbitTemplate.convertAndSend(
                RabbitMqConstant.CONSUMER_EX_NAME,
                RabbitMqConstant.CONSUMER_ROUTING_KEY,
                message,
                correlationData
        );


        return "ok";
    }


    // 发送消息到延迟交换机
    @GetMapping(value = "/ttlMessage/{message}/{ttlTime}")
    public String sengTtlMessage(@PathVariable String message,@PathVariable String ttlTime){

        CorrelationData correlationData = new CorrelationData();

// 交换机名称  路由key  消息 消息确认参数 额外参数
        rabbitTemplate.convertAndSend(
                RabbitMqConstant.TTL_EXCHANGE_NAMW,
                RabbitMqConstant.TTL_ROUTING_KEY_NAMW,
                message,
                mes -> {
                    // 发现消息时 设置消息的有效时间
                    mes.getMessageProperties().setExpiration(ttlTime);
                    return mes;
                }
        );


        return "ok";
    }
}

学习笔记不喜勿喷

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/283085.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号