栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringCloud、Eureka、Zuul、Rabbitmq

SpringCloud、Eureka、Zuul、Rabbitmq

1.eureka注册中心

注册中心服务端主要对外提供了三个功能:
服务注册:
服务提供者启动时,会通过 Eureka Client 向 Eureka Server 注册信息,Eureka Server 会存储该服务的信息,Eureka Server 内部有二层缓存机制来维护整个注册表。
提供注册表:
服务消费者在调用服务时,如果 Eureka Client 没有缓存注册表的话,会从 Eureka Server 获取最新的注册表。
同步状态:
Eureka Client 通过注册、心跳机制和 Eureka Server 同步当前客户端的状态。

2.创建项目 2.1添加eureka依赖

2.2修改pom


    
        springcloud1
        cn.tedu
        0.0.1-SNAPSHOT
    
    4.0.0

    sp05-eureka
    0.0.1-SNAPSHOT
    sp05-eureka
    Demo project for Spring Boot

    
        1.8
        UTF-8
        UTF-8
    

    
        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-server
        
    

2.3修改application.yml文件
# 应用名称

spring:
  pplication:
  name: eureka-server
#2001,3001,4001,5001,6001
server:
  port: 2001
eureka:
  server:
    enable-self-preservation: false   #禁用自我保护机制
  instance:
    hostname: eureka1 #主机名
  client:
    register-with-eureka: false   #不向自己注册
    fetch-registry: false   #不从自己拉取
Eureka四条运行机制:

1.注册:客户端会一次一次的反复注册,直到注册成功为止。
2.拉取:客户端每隔30秒,重复的拉取、刷新本地缓存的注册表。
3.心跳:客户端每隔30秒向服务器发送心跳,如果服务器连续3次收不到一个服务的心跳,就会删除该服务的注册信息。
4.自我保护模式:
网络不稳定,或网络中断时,15分钟内85%服务器都出现心跳异常,会进入自动保护模式;
这种特殊情况下,会保护所有注册信息不删除;
等待网络回复正常后,自动退出保护模式;
开发调试期间应该禁用保护模式,避免影响测试。

eureka 集群服务器之间,通过 hostname 来区分。eureka.server.enable-self-preservation:eureka 的自我保护状态:心跳失败的比例,在15分钟内是否超过85%,如果出现了超过的情况,Eureka
Server会将当前的实例注册信息保护起来,同时提示一个警告,一旦进入保护模式,Eureka
Server将会尝试保护其服务注册表中的信息,不再删除服务注册表中的数据。也就是不会注销任何微服务。eureka.client.register-with-eureka=false:不向自身注册。eureka.client.fetch-registry=false:不从自身拉取注册信息。eureka.instance.lease-expiration-duration-in-seconds:最后一次心跳后,间隔多久认定微服务不可用,默认90。

2.4启动类添加@EnableEurekaServer注解 2.5为02、03、04工程添加依赖

    org.springframework.cloud
    spring-cloud-starter-netflix-eureka-client

2.6 02工程配置文件添加
spring:
  application:
    name: user-service


server:
  port: 8101
#用户的demo数据
#[{id:7,username:xx,password:xx},{8..},{9..}]
sp:
  user-service:
    users: "[{"id":7, "username":"abc","password":"123"},
            {"id":8, "username":"def","password":"456"},
            {"id":9, "username":"ghi","password":"789"}]"
#/eureka 子路径是客户端调用的 REST API 路径,浏览器不能访问
        eureka:
          client:
            service-url:
              defaultZone: http://eureka1:2001/eureka

defaultZone:默认地点,也可以从云服务商购买不同地点的eurea服务 2.7修改 hosts 文件,添加 eureka 域名映射

C:WindowsSystem32driversetchosts 管理员打开
添加内容:

127.0.0.1       eureka1
127.0.0.1       eureka2
2.8启动,并访问测试

http://eureka1:2001

3.Eureka 和 “服务提供者”的高可用

3.1item-service 高可用

启动参数 --server.port 可以覆盖yml中的端口配置;
通过启动参数设置启动的端口:

java -jar item.jar --server.port=8001
java -jar item.jar --server.port=8002


启动测试:

3.2Erueka高可用

添加两个服务器的 profile 配置文件:
application-eureka1.yml:

#2001,3001,4001,5001,6001
server:
  port: 2001
eureka:
  instance:
    hostname: eureka1 #主机名
  client:
    register-with-eureka: true   #false不向自己注册,profile的配置会覆盖公用配置
    fetch-registry: true   #false不从自己拉取,profile的配置会覆盖公用配置
    service-url:
      defaultZone: http://eureka2:2002/eureka  #eureka1启动时向eureka2注册

application-eureka2.yml:

#2001,3001,4001,5001,6001
server:
  port: 2002
eureka:
  instance:
    hostname: eureka2 #主机名
  client:
    register-with-eureka: true   #false不向自己注册,profile的配置会覆盖公用配置
    fetch-registry: true   #false不从自己拉取,profile的配置会覆盖公用配置
    service-url:
      defaultZone: http://eureka1:2001/eureka  #eureka1启动时向eureka2注册


访问 eureka 服务器,查看注册信息:
http://eureka1:2001/

http://eureka2:2002/

3.3eureka客户端注册时,向两个服务器注册

修改以下微服务:
sp02-itemservice
sp03-userservice
sp04-orderservice

  #/eureka 子路径是客户端调用的 REST API 路径,浏览器不能访问
eureka:
  client:
    service-url:
      defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka
4. 02orderservice远程调用02和03 4.1添加feign依赖

    org.springframework.cloud
    spring-cloud-starter-openfeign

4.2启动类添加@EbableFeignClients注解 4.3添加两个远程调用接口
package cn.tedu.sp04.feign;

import cn.tedu.sp01.pojo.Item;
import cn.tedu.sp01.web.util.JsonResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

import java.util.List;



@FeignClient(name="item-service",contextId = "itemClient")
public interface ItemClient {
    //获取订单的商品列表
    @GetMapping("/{orderId}")
    JsonResult> getItems(@PathVariable("orderId") String orderId);

    //减少商品库存
    @PostMapping("/decreaseNumber")
    JsonResult decreaseNumber(@RequestBody List items);
}
package cn.tedu.sp04.feign;

import cn.tedu.sp01.pojo.User;
import cn.tedu.sp01.web.util.JsonResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(name = "user-service",contextId = "userClient")
public interface UserClient {
    //获取用户
    @GetMapping("/{userId}")
    JsonResult getUser(@PathVariable("userId") Integer userId);
    //增加用户积分
    @GetMapping("/{userId}/score") //?score=1000
    JsonResult addScore(@PathVariable("userId") Integer userId,
                           @RequestParam Integer score);
}
4.4在订单实现类远程调用两个接口

5.Feign集成Ribbon

Ribbon提供负载均衡和重试的功能
Ribbon重试:调用后台服务失败(异常、超时、宕机),可以自动发起重试调用。
随机向一台服务器请求,请求失败(重试次数默认为0)则会更换服务器(更换服务器次数默认为1)。
单台服务器的重试次数MaxAutoRetries(默认0)(x),更换服务器的次数 MaxAutoRetriesNextServer(默认1)(y),最大请求次数(x+1)*(y+1)
在itemController模拟阻塞运算:

//模拟阻塞运算
if (Math.random()<0.9){//90%概率执行阻塞代码
    //阻塞时长随机0-5秒
    int t = new Random().nextInt(5000);
    log.info("阻塞:"+t);
    Thread.sleep(t);
}

ribbon.MaxAutoRetries:重试次数,默认为0ribbon.MaxAutoRetriesNextServer:更换服务器次数,默认为1ribbon.ReadTimeout:超时时间,默认1000ribbon.ConnectTimeout:与后台服务器建立连接的超时时间,默认1000ribbon.OkToRetryOnAllOperations:是否对所有类型请求都重试,默认只对GET请求重试 6.Zuul API网关

统一的入口

统一的权限校验

集成Ribbon

集成Hystrix

统一的入口: 1.新建spring模块:sp06-zuul

选择eureka client依赖

2.添加依赖:zuul、eureka client、sp01


    
        springcloud1
        cn.tedu
        0.0.1-SNAPSHOT
    
    4.0.0

    sp06-zuul
    0.0.1-SNAPSHOT
    sp06-zuul
    Demo project for Spring Boot

    
        1.8
        UTF-8
        UTF-8
        2.3.7.RELEASE
        Hoxton.SR9
    
    
        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-client
        
        
            org.springframework.cloud
            spring-cloud-starter-netflix-zuul
        
    

3.配置yml:
# 应用名称
spring:
  application:
    name: zuul
# 2001,3001,4001,5001,6001

server:
  port: 3001
eureka:
  client:
    service-url:
      defaultZone: http://eureks1:2001/eureka, http://eureks2:2002/eureka,
#路由转发规则
#最好手动配置
zuul:
  routes:
    item-service: /item-service/**  # **包含深层子路径,*只含当前一层路径
    user-service: /user-service/**
    order-service: /order-service/**
4.启动类加@EnableZuulProxy注解

启用zuul网关
先启动两个eureka服务,启动02、03、04,最后启动zuul

7.统一权限校验 模拟登录检查:

http://localhost:3001/order-service/order123456 没有登录,不允许访问
http://localhost:3001/order-service/order123456?token=fdshnkj 已经登录可以访问

1.新建过滤器,继承ZuulFilter
2.按照规则实现ZuulFilter
3.添加注解@Component

zuul的自动配置类会自动配置过滤器 添加过滤类:

@Component
public class AccessFilter extends ZuulFilter {
    //过滤器类型,pre前置,routing 运行 post后置,error错误
    @Override
    public String filterType() {
        //return "pre";
        return FilterConstants.PRE_TYPE;//前置过滤器
    }

    //过滤器顺序号
    @Override
    public int filterOrder() {
        return 6;//放在第6位
    }

    //针对当前请求,是否执行过滤代码
    @Override
    public boolean shouldFilter() {
        //调用item-service需要判断权限

        //否则,不判断权限,直接跳过过滤代码

        //获取请求上下文对象
        RequestContext ctx = RequestContext.getCurrentContext();
        //从上下文对象获得调用的服务id
        String serviceId = (String) ctx.get(FilterConstants.SERVICE_ID_KEY);//("serviceId)
        //如果服务id是 item-service,返回true,否则返回false
        return "item-service".equalsIgnoreCase(serviceId);//IgnoreCase忽略大小写

    }

    //过滤代码
    @Override
    public Object run() throws ZuulException {
        // http://localhost:3001/order-service/order123456?token=fdshnkj
        //获取请求上下文对象
        RequestContext ctx = RequestContext.getCurrentContext();
        //从上下文获得request对象
        HttpServletRequest request = ctx.getRequest();
        //从request取出token参数
        String token = request.getParameter("token");
        //如果没有token,null,""
        if (StringUtils.isBlank(token)) {
            //阻止继续调用
            ctx.setSendZuulResponse(false);
            //直接向客户端返回响应
            String json = JsonResult.build().code(400).msg("未登录").toString();
            ctx.addZuulResponseHeader("Content-Type", "application/json;charset=UTF-8");
            ctx.setResponseBody(json);
        }
        return null;
    }
}
Zuul集成Ribbon

默认启用了负载均衡默认不启用重试,一般不在网关添加重试功能,否则可能造成后台服务器压力过大,出现大面积故障,重试功能应该尽量靠后添加 Zuul启用重试:

1.添加依赖:


    org.springframework.retry
    spring-retry

2.修改配置文件:

Zuul集成Hystrix

向后台服务转发调用,使用Hystrix进行容错和限流
Zuul默认已经启用了Hystrix。不做任何配置

8.Hystrix

容错(降级)和限流(熔断)工具

Zuul集成Hystrix添加降级

1.新建降级类,实现FallBackProvider接口
2.按接口规则实现
3.添加注解:@Component
zuul的自动配置类可以完成自动配置

Hystrix熔断:

流量过大,出现故障,可以熔断,断开链路,减轻后台服务的压力

熔断的触发条件: 10秒20次请求 ,50%请求出错断路器打开后一段时间,会进入“半开状态”:半开状态下会尝试发送一次客户端调用,调用成功,关闭断路器恢复正常;调用失败,继续保持打开状态

使用Actuator暴露Hystrix监控日志

Hystrix利用Actuator来暴露自己的监控日志
添加Actuator:
Actuator时Springboot提供的一个项目指标工具

健康状态spring容器 中所有对象springmvc映射的所有路径java虚拟机堆内存镜像

Actuator依赖:

暴露监控日志:

Hystrix数据监控-Hystrix Dashboard

1.新建spring模块:sp07-hystrix-dashboard
2.添加依赖



    
        springcloud1
        cn.tedu
        0.0.1-SNAPSHOT
    
    4.0.0
    sp07-hystrix-bashboard
    0.0.1-SNAPSHOT
    sp07-hystrix-bashboard
    Demo project for Spring Boot
    
        1.8
        UTF-8
        UTF-8
        2.3.7.RELEASE
    
    
        
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.cloud
            spring-cloud-starter-netflix-hystrix-dashboard
        
    

3.配置文件

server:
  port: 4001
hystrix:
  dashboard:
    proxy-stream-allow-list: localhost

4.启动类加注解@EnableHystrixDashboard
5.访问: http://localhost:4001/hystrix
然后在里边输入http://localhost:3001/actuator/hystrix.stream测试

不断刷新此网址http://localhost:3002/item-service/156?token=asfdassfad,观察检测信息

06网关高可用

9.创建Turbine

从多台服务器抓取日志,进行聚合。
Hystrix Dashboard可以从Turbine抓取聚合后的日志数据。
1.新建sp08-turbine

2.添加依赖:turbine、eureka client



    
        springcloud1
        cn.tedu
        0.0.1-SNAPSHOT
    
    4.0.0
    sp08-turbine
    0.0.1-SNAPSHOT
    sp08-turbine
    Demo project for Spring Boot

    
        1.8
    
    
        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-client
        
        
            org.springframework.cloud
            spring-cloud-starter-netflix-turbine
        
    

3.配置yml文件
抓取的访问列表: zuul,a,b,c
为聚合后的数据命名: new String(“default”)

	# 应用名称
spring:
  application:
    name: turbine
# eureka2001,zuul3001,dashboard4001,turbine5001
server:
  port: 5001
eureka:
  client:
    service-url:
      defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka
turbine:
  app-config: zuul
  cluster-name-expression: new String("default")

4.启动类注解:@EnableTurbine
5.访问聚合日志:http://localhist:5001/turbine.stream
在此处进入命令窗口,输入zb -n 2000 -c 50 http://localhost:3002/item-service/156?token=asfdassfad

Spring cloud config 配置中心

集中的管理,
在springcloud1下创建config文件夹,并创建配置文件:

item-service-dev.ymluser-service-dev.ymlorder-service-dev.yml 搭建配置中心:

1.新建sp09-config

2.添加依赖



    
        springcloud1
        cn.tedu
        0.0.1-SNAPSHOT
    
    4.0.0
    sp09-config
    0.0.1-SNAPSHOT
    sp09-config
    Demo project for Spring Boot
    
        1.8
    
    
        
            org.springframework.cloud
            spring-cloud-config-server
        
        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-client
        
    

3.配置文件

# 应用名称
spring:
  application:
    name: config-service
  cloud:
    config:
      server:
        git:
          uri: https://gitee.com/jiahaobz/springcloud.git
server:
  port: 6001
eureka:
  client:
    service-url:
      defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka

4.启动类加注解:
@EnableConfigServer
5.访问测试:
http://localhost:6001/item-service/dev
http://localhost:6001/user-service/dev
http://localhost:6001/order-service/dev

配置中心的客户端:
1.把2,3,4的application.yml注释掉

2.添加依赖:
03添加依赖:


    org.springframework.cloud
    spring-cloud-starter-config

3.添加新的配置文件:bootstrap.yml
bootstrap.yml,引导配置,应用启动之前先执行
application.yml应用期待着之后执行

eureka:
  client:
    service-url:
      defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka
spring:
  cloud:
    config:
      discovery:
        enabled: true
        service-id: config-service
      #下载配置文件
      name: user-service
      profile: dev

检查确认:
1.按顺序启动项目:
05-eureka,等待完全启动
09-config,等待完全启动
03、04、06…
2.检查配置中心
注册表中有config-server
http://localhost:6001/user-service/dev
http://localhost:6001/item-service/dev
http://localhost:6001/order-service/dev
3.03-user的控制台日志,有没有连接6001服务器

10.RabbitMQ

消息队列、消息服务、消息中间件Broker
常见服务器:Rabbitmq、Activemq、Rocketmq、Kafka、Tubemq
搭建Rabbitmq服务器:
设置vm网段:

ls
./ip-dhcp
ifconfig

ip设置失败:
nmcli n on
systemcli restart NetworkManager

1.克隆一份虚拟机,docker-base,传文件

1.1从docker-base克隆一份虚拟机:rabbirmq
2.设置固定ip

./ip-static
ip:192.168.64.140
ifconfig

3.上传文件到/root/

4.导入镜像

systemctl restart docker
docker load -i rabbit-image.gz
docker images

5.安装Rabbitmq:
Docker 启动Rabbitmq
关闭防火墙:

systemctl stop firewalld
systemctl disable firewalld

# 重启 docker 系统服务
systemctl restart docker

配置管理员用户名和密码:

mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf

# 添加两行配置:
default_user = admin
default_pass = admin

启动Rabbitmq:

docker run -d --name rabbit 
-p 5672:5672 
-p 15672:15672 
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf 
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf 
--restart=always 
rabbitmq:management

访问管理控制台 http://192.168.64.140:15672
用户名密码是 admin

11.Rabbitmq创建

创建一个空的project,然后船舰一个maven工程:
添加依赖:



    4.0.0

    cn.tedu
    rabbitmq-api
    1.0-SNAPSHOT
    
        
            com.rabbitmq
            amqp-client
            5.4.3
        
    
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.8.0
                
                    1.8
                    1.8
                
            
        
    

创建服务者(发送消息):

package m1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接服务器
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();
        Channel c = con.createChannel(); //通信通道

        //在服务器上创建一个队列:helloworld
        //如果队列在服务器上存在,不会重复创建
        //第二个参数:是否是持久队列;
        //第三个参数:是否是排他队列、独占队列
        //第四个参数:是否自动删除
        //第五个参数:队列的其他属性
        c.queueDeclare("helloworld",false,false,false,null);

        //向helloworld队列发送消息
        c.basicPublish("", "helloworld", null, "Hello World".getBytes());

        //不执行消息,关闭连接,关闭通信通道
        c.close();
        con.close();
    }
}

创建消费者(接收消息):

package m1;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();
        Channel c = con.createChannel(); //通信通道
        //创建队列
        c.queueDeclare("helloworld",false,false,false,null);
        System.out.println("等待接收数据");
        //创建回调对象
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            byte[] a = message.getBody();
            String s = new String(a);
            System.out.println("收到:"+s);
        };
        CancelCallback cancelCallback = consumerTag ->{};
        //开始接收消息,把消息传递给一个回调对象进行处理
        //第二个参数:是否自动确认(true),autoAck
        c.basicConsume("helloworld", true,deliverCallback,cancelCallback);

    }
}

合理分发:
1.让服务器可以知道消费者有没有处理完消息,手动确认
2. QPS=1,与抓取的消息数量,每次只接收一条消息,处理完之前不收下一条,手动确认模式才有效
3. 我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。
创建服务者:

package m2;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接服务器
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();
        Channel c = con.createChannel(); //通信通道

        //在服务器上创建一个队列:helloworld
        //如果队列在服务器上存在,不会重复创建
        //第二个参数:是否是持久队列;
        //第三个参数:是否是排他队列、独占队列
        //第四个参数:是否自动删除
        //第五个参数:队列的其他属性
        c.queueDeclare("helloworld", false, false, false, null);

        //向helloworld队列发送消息
        while (true) {
            System.out.println("输入消息:");
            String s = new Scanner(System.in).nextLine();
            c.basicPublish("", "helloworld", null, s.getBytes());
        }

        //不执行消息,关闭连接,关闭通信通道
    }
}

创建消费者:

package m2;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();
        Channel c = con.createChannel(); //通信通道
        //创建队列
        c.queueDeclare("helloworld",false,false,false,null);
        System.out.println("等待接收数据");
        //创建回调对象
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String s = new String(message.getBody());
            System.out.println("收到:"+s);
            //遍历访问每一个字符,每遇到一个‘.’,暂停一秒
            for (int i = 0; i < s.length(); i++) {
                if (s.charAt(i)=='.'){
                    try {
                        Thread.sleep(1000);

                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
            //发送回执
            //1:回执,2:是否同时确认之前收到过的多条消息
            c.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println("...........消息处理完成");
        };
        CancelCallback cancelCallback = consumerTag ->{};
        //QPS=1,每次受一条,处理完之前不收下一条,手动ack模式才有效
        c.basicQos(1);
        //开始接收消息,把消息传递给一个回调对象进行处理
        //第二个参数:是否自动确认(true),autoAck
        c.basicConsume("helloworld", false,deliverCallback,cancelCallback);
    }
}

消息持久化:
1.队列持久化


添加交换机:
创建生产者:

package m3;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接服务器
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();
        Channel c = con.createChannel(); //通信通道
        //创建fanout交换机:logs
        //c.exchangeDeclare("logs", "fanout");
        c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);//默认是非持久,FANOUT表示持久
        //向交换机发送消息
        while (true){
            System.out.println("输入消息:");
            String s = new Scanner(System.in).nextLine();
            //对交换机,第二个参数无效
            c.basicPublish("logs", "", null, s.getBytes());
        }
    }
}

创建消费者:

package m3;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.140");
        f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
        f.setUsername("admin");
        f.setPassword("admin");
        Connection con = f.newConnection();
        Channel c = con.createChannel(); //通信通道
        //1.创建队列
        String queue = UUID.randomUUID().toString();
        c.queueDeclare(queue,false,true,true,null);
        //2.创建交换机
        c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
        //3.绑定
        c.queueBind(queue,"logs", "");
    }
}
12.BUS配置刷新,Rabbitmq整合到微服务中

在2、3、4、9添加BUS、Rabbitmq
1.在2、3、4、9添加依赖:rabbitmq、bus、binder-rabbit


    org.springframework.boot
    spring-boot-starter-amqp


    org.springframework.cloud
    spring-cloud-bus


    org.springframework.cloud
    spring-cloud-stream-binder-rabbit

09再单独加一个依赖:


     org.springframework.boot
     spring-boot-starter-actuator

2.修改09的application.yml,添加Rabbitmq连接配置

spring:
  rabbitmq:
    host: 192.168.64.140
    port: 5672
    username: admin
    password: admin
management:
  endpoints:
    web:
      exposure:
        include: bus-refresh    

3.修改config目录中的三个配置文件,添加Rabbitmq连接配置
在spring下一层添加

  rabbitmq:
    host: 192.168.64.140
    port: 5672
    username: admin
    password: admin

4.提交推送到gitee仓库
5.启动:
先等待05启动完成,等待09启动完成,
然后访问测试:http://localhost:6001/order-service/dev,http://localhost:6001/item-service/dev,http://localhost:6001/user-service/dev
02 03 04 06启动后,查看控制台,要有连接6001服务器,
再访问:http://localhost:6001/actuator ,查看有没有bus-refresh,
有就用post请求:http://localhost:6001/actuator/bus-refresh

消息服务案例

1.bus配置刷新
向rabbitmq发送刷新指令,其他模块接受指令并执行
2.sleuth+zipkin链路跟踪
产生的链路跟踪日志发送到rabbitmq,zipkin从rabbitmq接收日志,
简单模式
3.

13. sleuth+zipkin链路跟踪 sleuth

用来产生链路监控日志,
在2、3、4、6添加sleuth依赖,sleuth是0配置


    org.springframework.cloud
    spring-cloud-starter-sleuth

zipkin

通过消息服务转发,解耦,流浪削峰,
2、3、4、6添加zipkin客户端,向rabbitmq发送日志:
1.在2、3、4、6添加zipkin客户端依赖


    org.springframework.cloud
    spring-cloud-starter-zipkin

2.在6中添加rabbitmq依赖


    org.springframework.boot
    spring-boot-starter-amqp

3.在6中添加rabbitmq连接配置

  rabbitmq:
    host: 192.168.64.140
    port: 5672
    username: admin
    password: admin

4.在2、3、4、6添加发送方式配置:rabbitmq

  zipkin:
    sender:
      type: rabbit

下载zipkin服务器,开启服务器,连接rabbitmq:
java -jar zipkin-server-2.23.16-exec.jar --zipkin.collector.rabbitmq.uri=amqp://admin:admin@192.168.64.140:5672

测试:
http://localhost:9411/zipkin

14.eureka客户端选择正确网卡,注册ip地址

选择正确网卡,eureka客户端会自动选择网卡,可能会选择错误网卡进行注册,
手动指定注册网卡:
修改bootstrap.yml文件

spring:
  cloud:
    inetutils:
      ignored-interfaces: # 忽略的网卡
        - VM.*			  #.任意字符 *0到多个
      preferred-networks: # 要是用的网卡的网段
        - 192.168.0..+   # +是一到多个
15.拼多商城项目

2.springboot版本改成2.3.2RELEASE
3.如果数据库文件导入失败,执行下面sql语句,增大mysql缓存大小:

set global max_allowed_packet=100000000;
set global net_buffer_length=100000;
set global interactive_timeout=28800000;
set global wait_timeout=28800000;

4.删除测试数据

delete from pd_user
delete from pd_order
delete from pd_order_item

5.选择SDK1.8
6.启动项目
修改RunPdApp的启动配置,设置working directory工作目录:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/712773.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号