docker 安装 rabbitmq
- 下载镜像
docker pull rabbitmq:3.7.7-management
- 启动镜像(用户名和密码设置为 guest guest)
docker run -dit --name rabbitmq3.8.0 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 rabbitmq:3.8.0-management
-
访问 rabbitmq 管理界面
http://127.0.0.1:15672
guest/guest
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KvmoLAsy-1652499874763)(…up-9de29c6b93228b96111a601d377264d053e.webp)]
docker 安装 rabbitMQ 延时队列插件(delayed_message_exchange)-
- 查找 Docker 容器中的 RabbitMQ 镜像
docker ps -a
[root@linux ~]# docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 8efd6f3add3c chenchuxin/dubbo-admin "catalina.sh run" 6 weeks ago Up 5 weeks 0.0.0.0:9090->8080/tcp dubbo-admin 6939b83d0942 zookeeper "/docker-entrypoint.…" 6 weeks ago Up 5 weeks zookeeper01 2aec2548a9f8 525bd2016729 "docker-entrypoint.s…" 6 weeks ago Up 5 weeks 0.0.0.0:27017->27017/tcp docker_mongodb
-
- 上传 /opt/rabbitmq_delayed_message_exchange-3.8.0.ez 插件到 Linux 文件夹中
-
- 拷贝插件文件到 rabbitMQ 的 Docker 容器中
[root@linux ~]# docker cp opt/rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq3.8.0:/plugins
-
- 进入 rabbitMQ 的 Docker 容器中 docker exec -it rabbitmq3.8.0 bash
[root@linux ~]# docker exec -it rabbitmq3.8.0 bash root@myRabbit:/#
-
- 查看插件列表
rabbitmq-plugins list
-
- 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-
- 创建交换器
RabbitMQ:我们通常谈到消息队列,就会联想到这其中的三者:生产者、消费者和消息队列,生产者将消息发送到消息队列,消费者从消息队列中获取消息进行处理。对于RabbitMQ,它在此基础上做了一层抽象,引入了交换器exchange的概念,交换器是作用于生产者和消息队列之间的中间桥梁,它起了一种消息路由的作用,也就是说生产者并不和消息队列直接关联,而是先发送给交换器,再由交换器路由到对应的队列,至于它是根据何种规则路由到消息队列的,就是我们下面需要介绍的内容了。这里的生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange(交换器)的Channel(信道),将消息发送给Exchange,Exchange根据路由规则,将消息转发给指定的消息队列。消息队列储存消息,等待消费者取出消息,消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。
AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
rabbitmq代码实现 rabbitmq客户端1、交换器构造器的创建
public class DelayExchangeBuilder {
public final static String DEFAULT_DELAY_EXCHANGE = "secp.delayed.exchange";
public final static String DELAY_EXCHANGE = "secp.direct.exchange";
public static CustomExchange buildExchange() {
Map args = new HashMap();
args.put("x-delayed-type", "direct");
return new CustomExchange(DEFAULT_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
public static CustomExchange buildDefaultExchange() {
return new CustomExchange(DELAY_EXCHANGE, ExchangeTypes.DIRECT, true, false);
}
}
2、发送默认消息并且创建队列绑定队列
@Slf4j
@Configuration
public class RabbitMqClient {
private static final Logger logger = LoggerFactory.getLogger(RabbitMqClient.class);
private final RabbitAdmin rabbitAdmin;
private final RabbitTemplate rabbitTemplate;
@Resource
private SimpleMessageListenerContainer messageListenerContainer;
@Resource
BusProperties busProperties;
@Resource
private ApplicationEventPublisher publisher;
@Resource
private ApplicationContext applicationContext;
@Bean
public void initQueue() {
Map beansWithRqbbitComponentMap = this.applicationContext.getBeansWithAnnotation(RabbitComponent.class);
Class extends Object> clazz = null;
for (Map.Entry entry : beansWithRqbbitComponentMap.entrySet()) {
log.info("初始化队列............");
//获取到实例对象的class信息
clazz = entry.getValue().getClass();
Method[] methods = clazz.getMethods();
RabbitListener rabbitListener = clazz.getAnnotation(RabbitListener.class);
if (ObjectUtil.isNotEmpty(rabbitListener)) {
createQueue(rabbitListener);
}
for (Method method : methods) {
RabbitListener methodRabbitListener = method.getAnnotation(RabbitListener.class);
if (ObjectUtil.isNotEmpty(methodRabbitListener)) {
createQueue(methodRabbitListener);
}
}
}
private Map sentObj = new HashMap<>();
@Autowired
public RabbitMqClient(RabbitAdmin rabbitAdmin, RabbitTemplate rabbitTemplate) {
this.rabbitAdmin = rabbitAdmin;
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String queueName, Object params) {
log.info("发送消息到mq");
try {
Queue queue = new Queue(queueName);
addQueue(queue);
CustomExchange customExchange = DelayExchangeBuilder.buildDefaultExchange();
rabbitAdmin.declareExchange(customExchange);
Binding binding = BindingBuilder.bind(queue).to(customExchange).with(queueName).noargs();
rabbitAdmin.declareBinding(binding);
messageListenerContainer.setQueueNames(queueName);
rabbitTemplate.convertAndSend(DelayExchangeBuilder.DELAY_EXCHANGE, queueName, params, message -> {
return message;
});
} catch (Exception e) {
e.printStackTrace();
}
}
public RabbitMqClient put(String key, Object value) {
this.sentObj.put(key, value);
return this;
}
public void sendMessage(String queueName, Object params, Integer expiration) {
this.send(queueName, params, expiration);
}
private void send(String queueName, Object params, Integer expiration) {
Queue queue = new Queue(queueName);
addQueue(queue);
CustomExchange customExchange = DelayExchangeBuilder.buildExchange();
rabbitAdmin.declareExchange(customExchange);
Binding binding = BindingBuilder.bind(queue).to(customExchange).with(queueName).noargs();
rabbitAdmin.declareBinding(binding);
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.debug("发送时间:" + sf.format(new Date()));
messageListenerContainer.setQueueNames(queueName);
rabbitTemplate.convertAndSend(DelayExchangeBuilder.DEFAULT_DELAY_EXCHANGE, queueName, params, message -> {
if (expiration != null && expiration > 0) {
message.getMessageProperties().setHeader("x-delay", expiration);
}
return message;
});
}
}
3、消费消息
@Slf4j @RabbitListener(queues = SecpDataConstant.SECP_DATA_SYNC) @RabbitComponent(value = "testReceiver2") public class TestReceiver2 extends BaseRabbiMqHandler{ @RabbitHandler public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { super.onMessage(baseMap, deliveryTag, channel, new MqListener () { @Override public void handler(BaseMap map, Channel channel) { //业务处理 String orderId = map.get(SecpDataConstant.SECP_DATA_USER_INFO).toString(); log.info("MQ Receiver2,orderId : " + orderId); } }); } }



