栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Rabbit mq 消息服务器(分布式中非常重要的服务器)

Rabbit mq 消息服务器(分布式中非常重要的服务器)

目录

Rabbitmq概述

RabbitMQ使用场景

服务解耦

流量削峰

异步调用

使用步骤

VM版本:16+(Rabbitmq要在容器里使用)

安装Docker(先克隆出来一个里面只有docker 的虚拟机进入执行)

搭建Rabbitmq服务

六种工作模式

简单模式(对应只有一个消费者)

工作模式(对应对个消费者)

如何使消息持久化(防止服务器端的消息丢失)

群发模式(发消息就需要交换机)

路由模式

主题模式

如何实现SCC的配置更新


Rabbitmq概述

Rabbitmq就像是一个邮局,投递人(生产者)将消息发送给Rabbitmq,Rabbitmq邮递员(线程)根据设定好的道路送到指定的收信人(消费者),收信人拿到信之后,阅读理解(做处理),最后写个回信(响应)给投递人,还得通过Rabbitmq,如此循环反复

RabbitMQ使用场景

服务解耦

服务解耦这个情况在单体项目的时候,可以不考虑这个问题,服务的解耦是基于两个服务之间进行调用的时候,出现的问题,A产生数据而服务BCD都需要,那么我可以直接在A中调用BCD,从而实现数据的传递,但当在微服务的时候,服务不会像这个例子这么少,成千上百的服务,这样服务间的耦合性太高,维护成本过高,引入Rabbitmq中间商后,A把数据交给Rabbitmq,然后BCD等服务,需要就去rabbitmq拿就可以了

流量削峰

举个栗子,当我们的服务器只有一台的时候,瞬间qps达到了3000那么这个服务器的压力就会剧增,也就是瞬时压力爆棚,那么通过Rabbitmq的消息队列,拉长战线(处理时长),通过rabbitmq的请求慢慢发给服务器进行处理,例如qps从每秒3000通过rabbitmq后变成qps300而其他的在rabbitmq队列中排队等待处理,这样虽说把时间拉长,但可以减轻瞬时压力

异步调用

链路 A(接收请求的)--- rabbitmq ----- B(处理请求的),就是A的线程只负责接收并发给rabbitmq,之后A的线程就会被释放,继续做接收请求,rabbitmq负责生成一个消息队列,B服务只负责进行业务处理,B服务没处理完的可以在消息队列中等待,之后按顺序进行处理。

消息服务

消息队列

消息中间件

常见的服务器:

1.Activemq  2.Rockermq(阿里)  3.Kafka(大数据) 4.tubemq(腾讯万亿级别) 5.Rabbitmq(spring集成)

使用步骤

VM版本:16+(Rabbitmq要在容器里使用)

VM网段:192.168.64.0

知识点:咋改网段 编辑 虚拟网络编辑器选择VMnet8   左下角子网IP 修改岂可

虚拟机:centos-8-2105 centos-7-1908随便选一个

课前资料设置好的东西

2.安装了三个工具:python pip ansible

3.两个脚本文件,用来设置ip地址

           ip-dhcp:自动获取

           ip-static:手动获取

4.用vm打开对应的.vmx文件,加载虚拟机镜像

5.启动 按提示“已复制虚拟机”

6.默认用户密码root

7.设置ip

./ip-dhcp   #执行脚本

ifconfig  看ip
ifconfig ens33

没有网卡咋办(执行下面的两行代码)

nmcli n on 
systemctl restart NetworkManager

如果上面的两条不好使就重置虚拟网络设置,之后先还原,在设置网段为64

安装Docker(先克隆出来一个里面只有docker 的虚拟机进入执行)

1.可以从网上下载Docker离线包

https://download.docker.com/linux/static/stable/x86_64/docker-20.10.6.tgz

2.离线安装工具(简化安装,要下载)

https://github.com/Jrohy/docker-install/

3.安装,通过MBX软件,连接,并把下载好的以下文件一块放到/root/里1.阿里的yum安装源

    - docker-20.10.6.tgz
	- install.sh
	- docker.bash

4.执行安装

# 进入 docker-install 文件夹
cd docker-install

# 为 docker-install 添加执行权限
chmod +x install.sh

# 安装
./install.sh -f docker-20.10.6.tgz

5.由于国内网络的问题,需要配置加速器来加速

cat下面命令直接生成文件daemon.json

cat < /etc/docker/daemon.json
{
  "registry-mirrors": [
    "https://docker.mirrors.ustc.edu.cn",
    "http://hub-mirror.c.163.com"
  ],
  "max-concurrent-downloads": 10,
  "log-driver": "json-file",
  "log-level": "warn",
  "log-opts": {
    "max-size": "10m",
    "max-file": "3"
    },
  "data-root": "/var/lib/docker"
}
EOF

6.重新加载docker配置 重启docker

7.测试 docker info

搭建Rabbitmq服务

从docker-base再克隆一个虚拟机: rabbitmq设置ip:   运行            ./ip-static将镜像文件复制到/root 下(若本地没有则要去下载镜像)

docker pull rabbitmq:management关闭防火墙

            systemctl stop firewalld        systemctl disable firewall        # 重启 docker 系统服务   systemctl restart docker

导入镜像: docker load -i rabbit-image.gz

        

配置管理员用户名与密码

mkdir /etc/rabbitmq vim /etc/rabbitmq/rabbitmq.conf

# 在文件中添加两行配置: default_user = admin default_pass = admin

   通过docker启动镜像  
docker run -d --name rabbit 
-p 5672:5672 
-p 15672:15672 
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf 
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf 
--restart=always 
rabbitmq:management

        消息服务器不会长久的储存,只要消费者处理完,消息服务器中的消息会被删除

六种工作模式

简单模式(对应只有一个消费者)

第一步:新建一个maven 什么依赖都不用添加

第二步:添加依赖

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    4.0.0
    com.tedu
    rabbitmq
    0.0.1-SNAPSHOT
    


          
            com.rabbitmq
            amqp-client
            5.4.3
        


        
            org.slf4j
            slf4j-api
            1.8.0-alpha2
        

      

     
            org.slf4j
            slf4j-log4j12
            1.8.0-alpha2
        

    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.8.0
                
                    1.8
                    1.8
                

            

        

    

3.创建生产者类发送消息

第一步:连接

第二步:创建通信的通道(channel对象)

第三步:在服务器上创建队列

第四步:发送消息

第五步:关闭资源

package m1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接
        ConnectionFactory f= new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);  
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();      
        Channel channel = con.createChannel();  

        //在服务器上创建队列  hello world
        
        channel.queueDeclare("helloworld", false, false, false, null);
            
        //发送消息
        channel.basicPublish("", "helloworld", null, "HelloWorld824".getBytes());

        
        //断开连接
        channel.close();
        con.close();



    }
}

4.创建消费者接收处理数据

package m1;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();
        Channel channel = con.createChannel();
        //创建队列
        channel.queueDeclare("helloworld", false, false, false, null);
        //创建回调对象
        // DeliverCallback deliverCallback1 = (consumerTag,message)->{};
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String s, Delivery delivery) throws IOException {
                byte[] body = delivery.getBody();
                String s1 = new String(body);
                System.out.println("收到"+s1);
            }
        };

        CancelCallback cancelCallback = consumerTag -> {};  //取消接收消息时执行
        //从hello world队列接收消息,收到的消息会传递到回调对象进行处理
        channel.basicConsume("helloworld", true,deliverCallback,cancelCallback);//helloworld 回传到 message
                                            






    }
}

工作模式(对应对个消费者)

多个消费者从同一个队列进行获取消息,可以并行的处理多条消息,处理速度翻倍

合理分发:

1.设置autoAck = false  

2.如何设置qos:       qos -pre fetch -预抓取   c.basicQos(1);必须在手动确认模式下才会生效)

如何使消息持久化(防止服务器端的消息丢失)

不一定所有的数据都要持久化,例如日志是可以丢失的,而订单一定不能丢失

1.队列持久化

当队列被创建后是无法被更改的,要么删除,要么创建一个名字不相同的队列

2.消息持久化

package m2;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接
        ConnectionFactory f= new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);  
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();      
        Channel channel = con.createChannel();  

        //创建队列
        channel.queueDeclare("task_queue", true, false, false, null);
        //循环在控制台发送消息
        while (true){
            System.out.println("输入消息:");
            String s = new Scanner(System.in).nextLine();
            channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_BASIC, s.getBytes());
                                                                        
        }
    }
}
package m2;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();
        Channel channel = con.createChannel();
        //创建队列
        channel.queueDeclare("task_queue", true, false, false, null);


        //创建回调对象
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String s, Delivery delivery) throws IOException {
                String str = new String(delivery.getBody());
                System.out.println("收到"+str);
                for (int i = 0; i < str.length(); i++) {
                    if (str.charAt(i) == '.'){

                        //模拟耗时消息,遍历字符串,每遇到一个‘.’字符暂停1秒
                        try {
                            Thread.sleep(1000);

                        }catch (Exception e){

                        }
                    }
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println("消息处理完成=================================================================消息处理完成");

            }
        };
        CancelCallback cancelCallback = consumerTag -> {};

        //预抓取,只收一条,处理完之前不收下一条
        //只在手动确认模式才有效
        channel.basicQos(1);
        //接收消息
        channel.basicConsume("task_queue", false, deliverCallback, cancelCallback); 

    }
}

群发模式(发消息就需要交换机)

所有消费者得到同一个消息,每个消费者都要有一个自己的队列,队列要求与交换机进行绑定,交换机只会发给与自己绑定的队列

使用的Fanout交换机(扇形交换机)

路由模式

每个消费者都有自己的队列,这个队列是随机命名,direct交换机,通过关键词来进行匹配队列发送

主题模式

使用的Taotal交换机,其次使用的关键词变为xxxx.xxxx.xxx()

如何实现SCC的配置更新

Spring cloud config + Bus组件(写好的代码)

要在对应的服务中添加该组件

1.添加bus依赖

        1.rabbitmq

        2.bus

        3.bus去操作rabbitmq时用到binder-rabbit

        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.cloud
            spring-cloud-bus
        
        
            org.springframework.cloud
            spring-cloud-stream-binder-rabbit
        

  amqp是消息服务的协议(数据格式)集成了rabbitmq的依赖

2.配置中心还有去添加一个actuator依赖

2.添加rabbit-mq连接配置

        09修改yml

        2.3.4修改config目录的三个文件,再上传到远程仓库

4.配置中心暴露bus-refresh刷新路径   m.e.w.e.i = bus-refresh 

     

1.BUS配置刷新

2.Bus发送刷新指令,其他模块接收指令执行配置刷新操作,Rabbitmq主题模式

链路跟踪

sleuth +zipkin

sleuth 用来产生链路跟踪日志 会产生日志数据

执行添加依赖,0配置就可以产生依赖

zipkin 对链路跟踪日志进行分析处理 最后用图形进行展示

a-b-c-d  默认日志只有10%会发给zipkin

a,asuidhiasd,asuidhiasd,true

b,asuidhiasd,,true

c,asuidhiasd,asuidhiasd,true

d,asuidhiasd,asuidhiasd,true

 

sleuth 通过 RABBITMQ发送到zipkin

修改2,3,4,6

1.添加zipkin client 客户端 依赖

2.在06添加rabbitmq依赖

3.修改06的application

4.修改config目录的是哪个文件添加zipkin发送方式

zipkin需要自己下载,之后用cmd

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746352.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号