1.2使用docker安装RabbitmQ1.查看是否已经安装docker
yum list installed | grep docker
我的已经安装 未安装的请往下走2.安装docker
yum install docker -y
3.启动docker
systemctl start docker
此时docker环境准备完毕 请确保服务器有网的情况下 使用该方式安装
1.下载RabbitMQ镜像
docker pull rabbitmq 该方式默认安装最新版本 rabbitmq:7.12.1指定版本
2.查看rabbitmq镜像
docker images
3.启动rabbitmq
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
rabbitmq是下载软件名称
4.查看rabbitmq是否启动
docker ps
5.进入rabbitmq启动插件否则web端不能访问
docker exec -it rabbit /bin/bash
-it后面参数为上方标红名称 即启动服务时通过- -name指定的名称
6.启动rabbitmq管理插件
rabbitmq-plugins enable rabbitmq_management
出现该界面启动成功 注意别把rabbitmq_management中的下划线写成- 否则会找不到插件
7.退出容器
exit
8.查看是否成功
服务器IP:15672
9.通过默认账号guest登录
账号密码都是guest
注意启动完毕后请确认防火墙已经关闭或者已经放行了5672 和 15672端口 阿里云服务器需要去防火墙出添加新的规则 也就是放行端口才可以
关闭防火墙指令systemctl stop firewalld
10.创建自定账号
进入rabbitmq镜像 docker exec -it rabbit /bin/bash
rabbitmqctl add_user 用户名 密码
rabbitmqctl add_user test test
设置用户的角色 rabbitmqctl set_user_tags 账号 角色
rabbitmqctl set_user_tags test administrator 此处设置为管理员
设置用户对vhost的权限 安装成功后有一个默认的vhost / 可以自己选择是否创建新的vhost
rabbitmqctl set_permissions -p / test ‘.’ '.’ ‘.*’
/是vhost名称 test是用户名 后面需要跟上一个空格 每个’.星’都需要使用空格分隔 否则会报错
为添加vhost
rabbitmqctl add_vhost /demohost
*该插件用于实现延迟队列时使用 1.使用wget下载插件 wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.23/rabbitmq_delayed_message_exchange-3.8.23ez 2.文件下载后就在当前用户的文件夹下 通过ls查看 该目录下文件 3.9.7可以根据自己版本自己修改版本号 3.将下载的插件复制到rabbitmq容器中 docker cp 文件路径 docker容器id:容器中的路径 docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 982b190bf852:/plugins *容器id可以通过docker ps 查看 找到ConTAINER ID这个的值即是容器id 4.进入rabbitmq容器启动该插件 进入容器 docker exec -it rabbit /bin/bash 启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange *wget 不可以使用时 使用 yum install -y wget 下载安装即可 请确保联网2.springboot整个RabbitMQ 2.1maven依赖
2.2配置文件org.springframework.boot spring-boot-starter-amqp
spring:
rabbitmq:
host: 120.79.15.65#mq安装服务器地址
port: 5672 #端口 别写成15672 15672是web访问端口 此处连接需要的是5672
username: test#账号
password: 123456 #密码
listener:
simple:
acknowledge-mode: manual #手动确认消息
concurrency: 10#消费者数量
max-concurrency: 20#最大消费者数量
prefetch: 1 #消费者每次接收的消息数量
#以下两个参数 在生产者发送消息到交换机需要确认时使用
publisher-/confirm/i-type: correlated #
publisher-returns: true
2.3RabbitMQ配置类
package com.eyu.rpcrabbitdemo.config;
import com.eyu.rpcrabbitdemo.Constant.RabbitMqConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqConfig {
// 通过插件自定义交换机实现自定义延迟队列
@Bean("pluginsExchange")
public CustomExchange pluginsExchange(){
// 设置交换机类型
Map arguments = new HashMap<>();
arguments.put("x-delayed-type","direct");
return new CustomExchange(RabbitMqConstant.PLUGINS_EX_NAME,"x-delayed-message",false,false,arguments);
}
// 将自定义交换机与队列绑定
@Bean("pluginsQueue")
public Queue pluginsQueue(){
return QueueBuilder.durable(RabbitMqConstant.PLUGINS_QUEUE_NAME).build();
}
@Bean
public Binding pluginsQueueBindingPluginsEx(@Qualifier("pluginsQueue") Queue pluginsQueue,
@Qualifier("pluginsExchange") CustomExchange pluginsEx){
return BindingBuilder.bind(pluginsQueue).to(pluginsEx).with(RabbitMqConstant.PLUGINS_ROUTING_KEY).noargs();
}
// 注入所有的队列
// 普通队列同时设置
@Bean("ordQueue")
public Queue ordQueue(){
return QueueBuilder.durable(RabbitMqConstant.ORD_QUEUE_NAME).build();
}
// 延迟队列 同时设置与死信对列的绑定
@Bean("ttlQueue")
public Queue ttlQueue(){
Map arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-letter-dead-exchange",RabbitMqConstant.DEAD_EX_NAME);
// 设置发送的key
arguments.put("x-letter-dead-routing_key", RabbitMqConstant.DEAD_ROUTING_KEY);
// 设置队列中消息的有效时间 20秒
arguments.put("x-message-ttl", 20000);
return QueueBuilder.durable(RabbitMqConstant.TTL_QUEUE_NAMW).withArguments(arguments).build();
}
@Bean("deadQueue")
public Queue deadQueue(){
return QueueBuilder.durable(RabbitMqConstant.DEAD_QUEUE_NAME).build();
}
// 注入所有的交换机
@Bean("ordExchange")
public DirectExchange ordExchange(){
return ExchangeBuilder.directExchange(RabbitMqConstant.ORD_EX_NAME).build();
}
@Bean("ttlExchange")
public DirectExchange ttlExchange(){
return ExchangeBuilder.directExchange(RabbitMqConstant.TTL_EXCHANGE_NAMW).build();
}
@Bean("deadExchange")
public DirectExchange deadExchange(){
return ExchangeBuilder.directExchange(RabbitMqConstant.DEAD_EX_NAME).build();
}
// 绑定交换机和队列
// 普通交换机
@Bean
public Binding ordQueueBindingOrdExchange(@Qualifier("ordQueue") Queue ordQueue,
@Qualifier("ordExchange") DirectExchange ordExchange){
return BindingBuilder.bind(ordQueue).to(ordExchange).with(RabbitMqConstant.ORD_ROUTING_KEY);
}
// 延迟交换机
@Bean
public Binding ttlQueueBindingTtlExchange(@Qualifier("ttlQueue") Queue ordQueue,
@Qualifier("ttlExchange") DirectExchange ordExchange){
return BindingBuilder.bind(ordQueue).to(ordExchange).with(RabbitMqConstant.TTL_ROUTING_KEY_NAMW);
}
// 死信交换机
@Bean
public Binding deadQueueBindingDeadExchange(@Qualifier("deadQueue") Queue ordQueue,
@Qualifier("deadExchange") DirectExchange ordExchange){
return BindingBuilder.bind(ordQueue).to(ordExchange).with(RabbitMqConstant.DEAD_ROUTING_KEY);
}
// 正常消费的交换机以及队列
@Bean("consumerQueue")
public Queue consumerQueue(){
return QueueBuilder.durable(RabbitMqConstant.CONSUMER_QUEUE_NAME).build();
}
@Bean("consumerExchange")
public DirectExchange consumerExchange(){
return ExchangeBuilder.directExchange(RabbitMqConstant.CONSUMER_EX_NAME).build();
}
@Bean
public Binding consumerQueueBindingConsumerEx(@Qualifier("consumerQueue") Queue consumerQueue,
@Qualifier("consumerExchange") DirectExchange consumerExchange){
return BindingBuilder.bind(consumerQueue).to(consumerExchange).with(RabbitMqConstant.CONSUMER_ROUTING_KEY);
}
}
// 常量类
package com.eyu.rpcrabbitdemo.Constant;
import org.springframework.stereotype.Component;
@Component
public class RabbitMqConstant {
// 普通系列
public static final String ORD_EX_NAME = "ord_exchange";
public static final String ORD_QUEUE_NAME = "ord_queue";
public static final String ORD_ROUTING_KEY = "ord_key";
// 延迟队列
public static final String TTL_EXCHANGE_NAMW = "ttl_exchange";
public static final String TTL_QUEUE_NAMW = "ttl_queue";
public static final String TTL_ROUTING_KEY_NAMW = "ttl_key";
// 死信队列
public static final String DEAD_EX_NAME = "dead_ex";
public static final String DEAD_QUEUE_NAME ="dead_queue";
public static final String DEAD_ROUTING_KEY = "dead_key";
//自定义时间延迟队列
public static final String PLUGINS_EX_NAME ="plugins_ex";
public static final String PLUGINS_QUEUE_NAME = "plugins_queue";
public static final String PLUGINS_ROUTING_KEY = "plugins_key";
// 正常消费系列
public static final String CONSUMER_EX_NAME = "consumer_ex";
public static final String CONSUMER_QUEUE_NAME = "consumer_queue";
public static final String CONSUMER_ROUTING_KEY = "consumer_key";
}
// 监听队列 消费消息
@Component
@Slf4j
public class DemoConsumer {
private static final int time = 10;
@Autowired
private RabbitTemplate rabbitTemplate;
// 监听正常队列 正常消费
@RabbitListener(queues = RabbitMqConstant.CONSUMER_QUEUE_NAME)
public void recevieConsumerQueueMessage(Message message, Channel channel){
String mesStr = new String(message.getBody());
log.info("当前时间:{}消费了消息:{}",new Date().toString(),mesStr);
}
// 监听死信队列 queues 的值为队列名称
@RabbitListener(queues = RabbitMqConstant.DEAD_QUEUE_NAME)
public void recevieDeadQueueMessage(Message message, Channel channel){
String messStr = new String(message.getBody());
log.info("消费死信队列中的消息:{}",message);
}
}
// 发送消息到mq
package com.eyu.rpcrabbitdemo.controller;
import com.eyu.rpcrabbitdemo.Constant.RabbitMqConstant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping(value = "/demo")
public class DemoController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "/message")
public String sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData();
rabbitTemplate.convertAndSend(
RabbitMqConstant.CONSUMER_EX_NAME,
RabbitMqConstant.CONSUMER_ROUTING_KEY,
message,
correlationData
);
return "ok";
}
// 发送消息到延迟交换机
@GetMapping(value = "/ttlMessage/{message}/{ttlTime}")
public String sengTtlMessage(@PathVariable String message,@PathVariable String ttlTime){
CorrelationData correlationData = new CorrelationData();
// 交换机名称 路由key 消息 消息确认参数 额外参数
rabbitTemplate.convertAndSend(
RabbitMqConstant.TTL_EXCHANGE_NAMW,
RabbitMqConstant.TTL_ROUTING_KEY_NAMW,
message,
mes -> {
// 发现消息时 设置消息的有效时间
mes.getMessageProperties().setExpiration(ttlTime);
return mes;
}
);
return "ok";
}
}
学习笔记不喜勿喷



