栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

RabbitMQ笔记

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ笔记

RabbitMQ 概述

原始操作系统的TCP/IP协议满足不了项目需求

添加中间件 添加头 -> 满足要求

中间件
  • 特点
    • 高可用
    • 可靠性
  • 分布式消息中间件
    • ActiveMQ:老派
    • RabbitMQ:spring同源 支持度高
    • Kafka:开源 性能最高 最接近底层
    • RocketMQ:慎选
    • 场景
      • 消息中间件监控数据
      • 异步数据传输场景
      • 削峰填谷场景
      • 任务调度场景
      • 海量数据同步场景
      • 分布式事务场景
      • 日记管理场景
      • 大数据分析场景
    • 考量
      • AMQP
      • MQTT
      • 持久化设计
      • Kafka协议
      • 消息分发设计
      • 高可用
      • 可靠性
      • 容错
  • 负载均衡中间件
    • Nginx
    • LVS负载均衡软件
    • KeepAlive -> 高可用
    • CDN -> 加速
  • 缓存中间件
    • MemCache
    • Redis
  • 数据库中间件
    • Mycat
    • Shardingjdbc
  • 案例分析
    • 异步数据保存
    • 订单数据的消息分发
    • 分布式事务
    • 消息的容错
    • 分布式锁
    • 分布式会话
    • 分库分表
架构
  • 单体架构
    • 耦合度太高
    • 运维成本过高
    • 不易维护
    • 服务器的成本太高
    • 升级架构的复杂度变高
  • 分布式架构:一个请求由多个系统来处理
    • 学习成本高 技术栈过多
    • 运维成本 服务器成本高
    • 人员成本
    • 项目的复杂度上升
    • 错误和容错率
    • 占用的端口和通讯的选择成本
    • 安全性的考虑被迫得选择RMI/MQ服务端通讯

MQ消息队列:负责消息的接受、存储和传递,它的性能要高于普通的服务和技术

消息队列协议
  • AMQP:Erlang(底层C) 开发

    • 1.分布式事务支持
    • 2.消息的持久化支持
    • 高性能和高可靠性的消息处理优势
  • MQTT:

    • 特点:
      • 轻量
      • 结构简单
      • 传输块、不支持事务
      • 没有持久化设计
    • 应用场景:适用于计算能力有限、低带宽、网络不稳定的场景
  • OpenMessage:

    • 结构简单
    • 解析速度快
    • 支持事务和持久化设计
  • KafKa:

    • 结构简单
    • 解析速度快
    • 无事务支持
    • 有持久化设计
消息队列的持久化

数据不存在内存中 -> 写入磁盘中 持久化保存

消息的分发策略

角色:

  • 1.生产者
  • 2.存储消息
  • 3.消费者
RabbitMQ的角色分类

1:none

  • 不能访问management plugin

2:management:查看自己相关节点信息

  • 列出自己可以通过AMQP登入的虚拟机
  • 查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
  • 查看和关闭自己的channels和connections
  • 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts 中的活动信息。

3:Policymaker

  • 包含management所有权限
  • 查看和创建和删除自己的virtual hosts所属的policies和parameters信息。

4:Monitoring

  • 包含所有management所有权限
  • 罗列出所有的virtual hosts 包括不能登录的virtual hosts
  • 查看其他用户的connections和channel信息
  • 查看节点级别的数据如clustering和memory使用情况
  • 查看所有的virtual hosts的全局统计信息

5:Administrator

  • 最高权限
  • 可以创建和删除virtual hosts
  • 可以查看,创建和删除users
  • 查看创建permissions
  • 关闭所有用户的connections
模式
  • Simple:
    • 1.创建连接工程
    • 2.创建连接Connection
    • 3.通过连接获取通道Channel
    • 4.通过通道创建交换机、声明队列、绑定关系、路由key、发送消息和接受消息
    • 5.准备消息内容
    • 6.发送消息给队列queue
    • 7.关闭连接
    • 8.关闭通道
(1)yum 包更新到最新
> yum update
(2)安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
> yum install -y yum-utils device-mapper-persistent-data lvm2
(3)设置yum源为阿里云
> yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
(4)安装docker
> yum install docker-ce -y
(5)安装后查看docker版本
> docker -v
 (6) 安装加速镜像
 sudo mkdir -p /etc/docker
 sudo tee /etc/docker/daemon.json <<-'EOF'
 {
  "registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"]
 }
 EOF
 sudo systemctl daemon-reload
 sudo systemctl restart docker
 (7) 获取rabbit镜像:
> docker pull rabbitmq:management
 (8)创建并运行容器
> docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e           RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
 (9)查看日志
> docker logs -f myrabbit
 (10)查看服务
> docker ps -a
 (11)关闭容器
> docker 08d03ae27334 stop
为什么RabbitMq是基于通道而不是基于连接?
  • Connection表示到消息代理的真实TCP连接,而Channel是其中的虚拟连接(AMQP连接)。这样,您可以在应用程序内部使用任意数量的(虚拟)连接,而不会使TCP连接使代理过载
  • 您可以为所有内容使用一个Channel。但是,如果您有多个线程,建议为每个线程使用不同的Channel
可以存在没有交换机的队列么?
  • 不可以,没有为队列指定交换机时,同一指向默认交换机
RabbitMq核心组件

  • 每一个virtual Host里还有routinkey -> 条件 分发给不同的消费者 (对应非广播场景) -> 过滤
  • RabbitMQ 消息传递模型的核心思想是生产者从不直接向队列发送任何消息 实际上,生产者经常甚至根本不知道消息是否会被传送到任何队列
  • 生产者只能将消息发送到交换。交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方面将它们推送到队列中。交易所必须确切地知道如何处理它收到的消息。它应该附加到特定队列吗?它应该附加到许多队列中吗?或者它应该被丢弃。其规则由交换类型定义 。
工作模式 简单模式
  • simple
    • 应用场景:手机短信、邮件单发

package com.whlll.rabbitmq.simple;

import com.rabbitmq.client.*;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class SimpleMode {

    private Channel channel;

    @Before
    public void channelInit() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.115.207.246");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
    }

    //声明组件、交换机和队列、简单模板案例、交换机使用默认交换机,队列需要声明
    @Test
    public void myQueueDeclare() throws IOException {
        channel.queueDeclare("simple",
                false,//队列是否持久化
                false,//队列是否专属
                false,//队列是否自动删除,从第一个消费端监听队列开始
                //计算,到最后一个消费端断开连接,队列就会自动删除
                null);//map类型 key值固定一批属性
        System.out.println("队列创建成功");
    }

    //发送消息到队列 生产端 永远不会把消息直接发给队列,发给交换机
    @Test
    public void send() throws IOException {
        String msg = "whlll你好啊";
        byte[] msgByte = msg.getBytes(StandardCharsets.UTF_8);
        //将消息发给(AMQP DEFAULT)交换机 名字""
        channel.basicPublish(
                "",//发送给的交换机的名字,默认为空
                "simple",//设置路由key
                null,//发送消息时携带的参数
                msgByte//消息体
        );
    }

    @Test
    public void consume() throws IOException {
        channel.basicConsume("simple", false,
                new DeliverCallback() {
                    
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        //从消息对象中拿到信息
                        byte[] body = delivery.getBody();
                        System.out.println(new String(body));
                        //如果autoAck false说明消费玩消息,需要手动确认
                        channel.basicAck(
                                delivery.getEnvelope().getDeliveryTag(),
                                false);
                    }
                }, new CancelCallback() {
                        
                    @Override
                    public void handle(String s) throws IOException {

                    }
                });
        //使用while true 将线程卡死,否则看不到消息消费逻辑
        while (true);
    }
}
Work模式
  • work queues
    • 工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务而不得不等待它完成。相反,我们安排任务稍后完成。我们将一个任务封装 成一条消息并发送到队列中。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工人时,任务将在他们之间共享
    • 应用场景:抢红包、资源分配

轮询分发
  • 轮询分发(均匀分给每一个消费者):默认为轮询 可以不设置为手动应答
package com.whlll.rabbitmq.work;

import com.rabbitmq.client.*;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class WorkMode {
    private Channel channel;

    @Before
    public void channelInit() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.115.207.246");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
    }

    @Test
    public void myQueueDeclare() throws IOException {
        channel.queueDeclare(
                "work",
                false,
                false,
                false,
                null);
        System.out.println("队列申明成功");
    }

    @Test
    public void send() throws IOException {
        String msg = "hahah whlll";
        byte[] msgByte = msg.getBytes(StandardCharsets.UTF_8);
        channel.basicPublish(
                "",
                "work",
                null,
                msgByte
        );
    }
    //消费端
    @Test
    public void consume01() throws IOException {
        channel.basicConsume("work", false,
                new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        byte[] body = delivery.getBody();
                        System.out.println("消费者01:" + new String(body));
                        //如果autoAck false说明消费完消息,需要手动确认
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
                                false);
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String s) throws IOException {
                    }
                });
                while (true);
    }

    @Test
    public void consume02() throws IOException {
        channel.basicConsume("work", false,
                new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        byte[] body = delivery.getBody();
                        channel.basicAck(
                                delivery.getEnvelope().getDeliveryTag(),
                                false);
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String s) throws IOException {
                    }
                });
        while (true);
    }
}

公平分发
  • 公平分发(谁新能好优先分给谁处理):配置Qos并谁设置为手动应答
  • Qos:每次从队列读取的消息条数(根据具体的设备资源情况而定)
package com.whlll.rabbitmq.work;

import com.rabbitmq.client.*;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class WorkMode {
    private Channel channel;

    @Before
    public void channelInit() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.115.207.246");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
    }

    @Test
    public void myQueueDeclare() throws IOException {
        channel.queueDeclare(
                "work",
                false,
                false,
                false,
                null);
        System.out.println("队列申明成功");
    }

    @Test
    public void send() throws IOException, InterruptedException {
        for (int i =0;i<20;i++){
        String msg = "hahah whlll"+i;
        byte[] msgByte = msg.getBytes(StandardCharsets.UTF_8);
        channel.basicPublish(
                "",
                "work",
                null,
                msgByte
        );
        Thread.sleep(1000);
        }
    }
    //消费端
    @Test
    public void consume01() throws IOException{
        channel.basicQos(1);//每次从队列读取的消息条数
        channel.basicConsume("work", false,
                new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        byte[] body = delivery.getBody();
                        System.out.println("消费者01:" + new String(body));
                        //如果autoAck false说明消费完消息,需要手动确认
                        //公平分发
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
                                false);
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String s) throws IOException {
                    }
                });
                while (true);
    }

    @Test
    public void consume02() throws IOException {
        channel.basicQos(1);//每次从队列读取的消息条数
        channel.basicConsume("work", false,
                new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        byte[] body = delivery.getBody();
                        System.out.println("消费者02:" + new String(body));
                        //公平分发
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        channel.basicAck(
                                delivery.getEnvelope().getDeliveryTag(),
                                false);
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String s) throws IOException {
                    }
                });
        while (true);
    }
}

发布/订阅模式
  • publish/sbscribe(Fanout)
    • 工作队列背后的假设是每个任务都被交付给一个工人。在这一部分,我们将做一些完全不同的事情——我们将向多个消费者传递一条消息。这种模式被称为“发布/订阅”
    • 应用场景:邮件的群发、广告的群发

package com.whlll.rabbitmq.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class FanoutMode {
    //初试化连接
    private Channel channel;

    @Before
    public void channelInit() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.115.207.246");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
    }
    //准备交换机,队列的名称属性
    private static final String TYPE="fanout";
    private static final String EX_NAME=TYPE+"_ex";//fanout_ex
    private static final String QUEUE01=TYPE+"_Q1";
    private static final String QUEUE02=TYPE+"_Q2";
    @Test
    public void declare() throws IOException {
        //声明队列
        channel.queueDeclare(QUEUE01,false,false,false,null);
        channel.queueDeclare(QUEUE02,false,false,false,null);
        //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系
        //声明交换机
        channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机
        //绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意
        channel.queueBind(QUEUE01,EX_NAME,"");
        channel.queueBind(QUEUE02,EX_NAME,"");
    }
    @Test
    public void send() throws IOException {
        String msg="你好,发布订阅模式";
        byte[] bytes = msg.getBytes();
        channel.basicPublish(EX_NAME,"北京",null,bytes);
    }
}
路由模式
  • routing(direct)
    • 应用场景:处理一些特殊的消息逻辑,可以经过路由的筛选

package com.whlll.rabbitmq.direct;

import com.rabbitmq.client.*;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class DirectMode {
    private Channel channel;

    @Before
    public void channelInit() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.115.207.246");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
    }

    //准备交换机,队列名称属性
    private static final String TYPE = "direct";
    private static final String EX_NAME=TYPE+"_ex";//fanout_ex
    private static final String QUEUE01 = TYPE + "_Q1";
    private static final String QUEUE02 = TYPE + "_Q2";

    @Test
    public void declare() throws IOException {
        //声明队列
        channel.queueDeclare(QUEUE01, false, false, false, null);
        channel.queueDeclare(QUEUE02, false, false, false, null);
        //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系
        //声明交换机
        channel.exchangeDeclare(EX_NAME, TYPE);//声明一个名为fanout_ex类型为fanout的交换机
        //绑定交换机和队列的关系。由于发布订阅,绑定时需要提供自定义的路由key,随意
        channel.queueBind(QUEUE01, EX_NAME, "北京");
        channel.queueBind(QUEUE01, EX_NAME, "广州");
        channel.queueBind(QUEUE02, EX_NAME, "上海");
    }

    @Test
    public void send() throws IOException {
        String msg = "你好,路由模式交换机";
        byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
        channel.basicPublish(EX_NAME,"上海",null,bytes);
    }

}
主题模式
  • topics
    • 虽然使用直接交换改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。
    • *(星号)可以正好代替一个词。
    • # (hash) 可以代替零个或多个单词。

package com.whlll.rabbitmq.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class TopicMode {
    private Channel channel;

    @Before
    public void ChannelInit() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.115.207.246");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
    }
    //准备交换机,队列的名称属性
    private static final String TYPE="topic";
    private static final String EX_NAME=TYPE+"_ex";//topic_ex
    private static final String QUEUE01=TYPE+"_Q1";
    private static final String QUEUE02=TYPE+"_Q2";

    @Test
    public void declare() throws IOException {
        channel.queueDeclare(QUEUE01, false, false, false, null);
        channel.queueDeclare(QUEUE02, false, false, false, null);

        channel.exchangeDeclare(EX_NAME, TYPE);

        channel.queueBind(QUEUE01, EX_NAME, "中国.北京.#");
        channel.queueBind(QUEUE01, EX_NAME, "中国.*.*.*.*");
        channel.queueBind(QUEUE02, EX_NAME, "*.上海.#");
    }
    @Test
    public void send() throws IOException {
        String msg="你好,路由模式交换机";
        byte[] bytes = msg.getBytes();
        channel.basicPublish(EX_NAME,"中国.上海.大兴.亦庄.你爹",null,bytes);
    }
}
Header模式
  • RPC(header)
    • 如果我们需要在远程计算机上运行一个函数并等待结果呢?嗯,这是一个不同的故事。这种模式通常称为远程过程调用RPC

  • 第三个参数 -> 附加参数
channel.basicPublish("",queueName,null,message.getBytes());
整合SpringBoot-Fanout模式 生产者 application.yml
# 服务端口
server:
  port: 8081

spring:
  rabbitmq:
    username: admin
    password: admin
    host: 47.115.207.246
    port: 5672
service
package com.whlll.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import java.util.UUID;


@Configuration
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void makeOrder(String userid, String productid, int num) {
        String orderid = UUID.randomUUID().toString();

        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderid);
    }

}
config
package com.whlll.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMqConfiguration {

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }

    @Bean
    public Queue duanxinQueue() {
        return new Queue("duanxin.fanout.queue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("email.fanout.queue", true);
    }

    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding duanxinBinding() {
        return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

}
Test
package com.whlll;

import com.whlll.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringbootOrderRabbitmqApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {
        orderService.makeOrder("1","1",12);
    }

}
消费者 service.fanout
  • FanoutDuanxinComsumer
package com.whlll.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;


@RabbitListener(queues = {"duanxin.fanout.queue"})
@Service
public class FanoutDuanxinComsumer {

    @RabbitHandler
    public void recieveMessage(String message) {
        System.out.println("Duanxin Fanout 接收到的订单信息:->"+message);
    }

}
  • FanoutEmailComsumer
package com.whlll.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;


@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class FanoutEmailComsumer {

    @RabbitHandler
    public void recieveMessage(String message) {
        System.out.println("Email Fanout 接收到的订单信息:->"+message);
    }

}

  • FanoutSMSComsumer
package com.whlll.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;


@RabbitListener(queues = {"sms.fanout.queue"})
@Service
public class FanoutSMSComsumer {

    @RabbitHandler
    public void recieveMessage(String message) {
        System.out.println("SMS Fanout 接收到的订单信息:->"+message);
    }

}

整合SpringBoot-Direct模式 生产者 service
package com.whlll.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import java.util.UUID;


@Configuration
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void makeOrderFanout(String userid, String productid, int num) {
        String orderid = UUID.randomUUID().toString();
        System.out.println("订单产生成功:->"+orderid);

        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderid);
    }

    public void makeOrderDirect(String userid, String productid, int num) {
        String orderid = UUID.randomUUID().toString();
        System.out.println("订单产生成功:->"+orderid);

        String exchangeName = "direct_order_exchange";
        rabbitTemplate.convertAndSend(exchangeName,"email",orderid);
        rabbitTemplate.convertAndSend(exchangeName,"duanxin",orderid);
    }


}
config
package com.whlll.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMqConfiguration {

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct_order_exchange", true, false);
    }

    @Bean
    public Queue smsQueue() {
        return new Queue("sms.direct.queue", true);
    }

    @Bean
    public Queue duanxinQueue() {
        return new Queue("duanxin.direct.queue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("email.direct.queue", true);
    }

    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding duanxinBinding() {
        return BindingBuilder.bind(duanxinQueue()).to(directExchange()).with("duanxin");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
    }

}

Test
package com.whlll;

import com.whlll.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringbootOrderRabbitmqApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads_Fanout() {
        orderService.makeOrderFanout("1","1",12);
    }

    @Test
    void contextLoads_Direct() {
        orderService.makeOrderDirect("1","1",12);
    }

}
消费者 service.direct
  • DirectDuanxinComsumer
package com.whlll.service.Direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;


@RabbitListener(queues = {"duanxin.direct.queue"})
@Service
public class DirectDuanxinComsumer {

    @RabbitHandler
    public void recieveMessage(String message) {
        System.out.println("Duanxin Direct 接收到的订单信息:->"+message);
    }

}

  • DirectEmailComsumer
package com.whlll.service.Direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;


@RabbitListener(queues = {"email.direct.queue"})
@Service
public class DirectEmailComsumer {

    @RabbitHandler
    public void recieveMessage(String message) {
        System.out.println("Email Direct 接收到的订单信息:->"+message);
    }

}
  • DirectSMSComsumer
package com.whlll.service.Direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;


@RabbitListener(queues = {"sms.direct.queue"})
@Service
public class DirectSMSComsumer {

    @RabbitHandler
    public void recieveMessage(String message) {
        System.out.println("SMS Direct 接收到的订单信息:->"+message);
    }

}
懒加载

配置类应该先在消费者配置启动,等待消息即可

整合SpringBoot-Topic模式(注解方式) 生产者 service
package com.whlll.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import java.util.UUID;


@Configuration
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void makeOrderFanout(String userid, String productid, int num) {
        String orderid = UUID.randomUUID().toString();
        System.out.println("订单产生成功:->"+orderid);

        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderid);
    }

    public void makeOrderDirect(String userid, String productid, int num) {
        String orderid = UUID.randomUUID().toString();
        System.out.println("订单产生成功:->"+orderid);

        String exchangeName = "direct_order_exchange";
        rabbitTemplate.convertAndSend(exchangeName,"email",orderid);
        rabbitTemplate.convertAndSend(exchangeName,"duanxin",orderid);
    }

    public void makeOrderTopic(String userid, String productid, int num) {
        String orderid = UUID.randomUUID().toString();
        System.out.println("订单产生成功:->"+orderid);

        String exchangeName = "topic_order_exchange";
        String routingKey = "com.duanxin.";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderid);
    }

}

Test
    @Test
    void contextLoads_Topic() {
        orderService.makeOrderTopic("1","1",12);
    }
消费者 service.Topic
  • TopicDuanxinComsumer
package com.whlll.service.Topic;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;


@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "duanxin.topic.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "#.duanxin.#"
))
public class TopicDuanxinComsumer {

    @RabbitHandler
    public void recieveMessage(String message) {
        System.out.println("Duanxin Topic 接收到的订单信息:->"+message);
    }

}
  • TopicEmailComsumer
package com.whlll.service.Topic;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;


@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "#.email.#"
))
public class TopicEmailComsumer {

    @RabbitHandler
    public void recieveMessage(String message) {
        System.out.println("Email Topic 接收到的订单信息:->"+message);
    }

}
  • TopicSMSComsumer
package com.whlll.service.Topic;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;


@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "#.sms.#"
))
public class TopicSMSComsumer {

    @RabbitHandler
    public void recieveMessage(String message) {
        System.out.println("SMS Topic 接收到的订单信息:->"+message);
    }

}

TTL过期时间

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
  • 第二种方法是对消息进行单独设置,每条消息TTL可以不同。

如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

实现设置队列过期时间

  • 配置类
@Configuration
public class ttlRabbitmqConfig {
    @Bean
    public Queue queue1(){
        Map args = new HashMap<>();
        args.put("x-message-ttl",5000);
        return new Queue("ttl.queue",true,false,false,args);
    }
    @Bean
    public DirectExchange ttlExchange() {
        return new DirectExchange("ttl_order_exchange", true, false);
    }
    @Bean
    public Binding bindingExchange() {
        return BindingBuilder.bind(queue1()).to(ttlExchange()).with("ttl");
    }
}
  • 业务层
public void makeOrderTtl(String userId, String productId, int num) {
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("user:"+orderNumer);
        String routeKey = "ttl";
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(ttlExchangeName, routeKey, orderNumer);
    }
  • 测试类
@Testvoid contextLoads2Ttl() throws InterruptedException {    orderService.makeOrderTtl("1","1",12);}
  • 消费者监视类
@RabbitListener(queues = "ttl.queue")
@Component
public class ttlController {
    @RabbitHandler
    public void  ttlRevice(String msg){
        System.out.println("ttl -->>>>>>>邮件发送消息:"+msg);
    }
}

实现设置消息过期机制

  • producer代码
    @Bean
    public Queue messageQueue() {
        return new Queue("message.queue",true);
    }
    @Bean
    public DirectExchange messageOrderExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("message_order_exchange", true, false);
    }
     @Bean
    public Binding bindingMessage() {
        return BindingBuilder.bind(messageQueue()).to(messageOrderExchange()).with("message");
    }
  • producer发送消息代码
public void ttlOrder(String userId, String productId, int num) {
        String exchangeName = "message_order_exchange";
        String routeKey = "message";
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("utf-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName, routeKey,userId, messagePostProcessor);
    }
  • consumer消费者监听方法
@RabbitListener(queues = "message.queue")
@Component
public class messageController {
    @RabbitHandler
    public void messageRevice(String msg) {
        System.out.println("message->>>>消费消息");
    }
}
  • 生产者测试类发送消息
@Test
    void contextLoads1() {
        orderService.ttlOrder("1"," 1",12);
    }
死信队列
  • 概念

  • 当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信。

    • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
    • 消息TTL过期
    • 队列达到最大长度
  • 当消息在一个队列中变成一个死信之后,如果配置了死信队列,它将被重新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列

  • 生产者配置类

@Configuration
public class DeadRabbitMqConfiguration {

    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange("dead_direct_exchange",true,false);
    }

    @Bean
    public Queue deadqueue() {
        return new Queue("dead.direct.queue", true);
    }

    @Bean
    public Binding deadbinds() {
        return BindingBuilder.bind(deadqueue()).to(deadExchange()).with("dead");
    }

}
  • 修改原TTL队列属性
  @Bean
    public Queue queue1(){
        Map args = new HashMap<>();
        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange", "dead_direct_exchange");
        args.put("x-dead-letter-routing-key", "dead");//fanout不需要配置
        return new Queue("ttl.queue",true,false,false,args);
    }

error:队列已存在 不会覆盖 (线上环境 -> 新建一个属性一样的队列绑定死信队列来替换 危险 -> 直接删除线上队列)

RabbitMq内存磁盘的监控

  • 内存
vm_memory_high_watermark.relative = (0.4~0.7)
vm_memory_high_watermark.absolute = 2GB

eg.

rabbitmqctl set_vm_memory_high_watermark.absolute = 50MB
  • 磁盘
disk_free_limit.relative = 3.0disk_free_limit.absolute = 2GB

eg.

rabbitmqctl set_disk_free_limit.absolute = 100GB
Rabbitmq分布式事务

在不同的系统之间(JVM)如何保持数据的一致性 -> 分布式事务

  • 本地消息表(异步确保)

执行步骤如下:

  1. MQ发送方发送远程事务消息到MQ Server;
  2. MQ Server给予响应, 表明事务消息已成功到达MQ Server.
  3. MQ发送方Commit本地事务.
  4. 若本地事务Commit成功, 则通知MQ Server允许对应事务消息被消费; 若本地事务失败, 则通知MQ Server对应事务消息应被丢弃.
  5. 若MQ发送方超时未对MQ Server作出本地事务执行状态的反馈, 那么需要MQ Servfer向MQ发送方主动回查事务状态, 以决定事务消息是否能被消费.
  6. 当得知本地事务执行成功时, MQ Server允许MQ订阅方消费本条事务消息

需要额外说明的一点, 就是事务消息投递到MQ订阅方后, 并不一定能够成功执行. 需要MQ订阅方主动给予消费反馈(ack)

  • 如果MQ订阅方执行远程事务成功, 则给予消费成功的ack, 那么MQ Server可以安全将事务消息移除;
  • 如果执行失败, MQ Server需要对消息重新投递, 直至消费成功.
注意事项
  • 消息中间件在系统中扮演一个重要的角色, 所有的事务消息都需要通过它来传达, 所以消息中间件也需要支持 HAC 来确保事务消息不丢失.
  • 根据业务逻辑的具体实现不同,还可能需要对消息中间件增加消息不重复, 不乱序等其它要求.
适用场景
  • 执行周期较长
  • 实时性要求不高

例如:

  • 跨行转账/汇款业务(两个服务分别在不同的银行中)
  • 退货/退款业务
  • 财务, 账单统计业务(先发送到消息中间件, 然后进行批量记账)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/320533.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号