栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java > SpringBoot

源码分析最新版本Spring Cloud Ribbon 负载均衡之迷(一)

SpringBoot 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

源码分析最新版本Spring Cloud Ribbon 负载均衡之迷(一)

Spring Cloud Ribbon是一个基于HTTP和TCP的客户端负载均衡工具,它基于Netflix Ribbon实现,它可以通过客户端配置的ribbonServerList服务端列表去轮训访问达到负载均衡的作用。当Ribbon跟Eureka联合使用时,ribbon的服务列表ribbonServerList会被DiscoveryEnabledNIWSServerList重写,扩展成从Eureka注册中心获取服务端列表。同时他也会用NIWSDiscoveryPing来取代IPing,它将职责委托给eureka来确定服务端是否启动。

Spring Cloud的封装,可以让我们轻松地将面向服务的REST模板请求自动转换成客户端负载均衡的服务调用。 Spring Cloud Ribbon虽然只是一个工具类框架,它不像服务注册中心、配置中心、API网关那样需要独立部署,但是它几乎存在于每一个Spring Cloud构建的微服务和基础设施中。因为微服务间的调用,API网关的请求转发等内容,实际上都是通过 Ribbon来实现的,包括 Feign,它也是基于 Ribbon 实现的工具。所以,对 Spring Cloud Ribbon的理解和使用,对于我们使用 Spring Cloud来构建微服务非常重要。

首先引入依赖,引入最新版本,方便看源码

    
 
 org.springframework.cloud
 spring-cloud-starter-ribbon
 
    

    
 
     
  org.springframework.cloud
  spring-cloud-dependencies
  Edgware.SR4
  pom
  import
     
 
    

所谓的负载均衡就是分摊到多个操作单元上进行执行,例如Web服务器、FTP服务器、企业关键应用服务器和其它关键任务服务器等,从而共同完成工作任务。例如搬砖,假如一个人一小时可以搬运一车砖(小车,这就不要钻牛角尖了),如果同时来了十车砖,一百车砖,那我们如果想要尽快的把这些砖送到使用地,就需要多找几个人同时来搬运,这时候就需要根据个人的体能,高矮胖瘦,搬运速度来确定分配这些砖头,我们的负载均衡设备同样根据一些算法,来分配任务,比如线性轮询、按权重负载、按流量负载等。

上面介绍虽然复杂,但是通过Spring Cloud Ribbon的封装,我们可以很简单的使用负载均衡:

  • 服务提供者启动多个实例并注册到一个注册中心或者是多个相关联的服务注册中心。

  • 消费者直接通过调用被@LoadBalanced注册修饰过的RestTemplate来实现面向服务的接口调用。

这样的话就可以实现服务高可用以及负载均衡。
例如: 创建一个RestTemplate的spring bean实例,并通过@LoadBalanced注解开启客服端负载均衡。

    @Bean
    @LoadBalanced //开启负载均衡
    RestTemplate initRestTemplate(){
 return new RestTemplate();
    }

通过上面创建的RestTemplate调用服务端服务,需要注意的是我们传入的url并不是具体的地址,而是服务名,这在服务治理框架中是一个很重要的特性。

//调用服务端
    @Autowired
    private RestTemplate restTemplate;

    public void bizProcess(){
 //注意 地址是服务名  而不是具体的ip地址,
 String result = restTemplate.getForEntity("http://test-serive/hello",String.class).getBody();
    }

如此,一个简单的客户端负载均衡就开启了,下面我们一起看下一些类的方法及实现。

首先我们介绍一下RestTemplate

该对象可以让使用ribbon的自动化配置,同时通过配置@LoadBalanced还能够开启负载均衡,下面介绍一下RestTemplate针对不同请求类型和参数类型调用实现。

get请求

首先介绍下 第一种getForEntity,他有三个重载函数,该函数返回ResponseEntity,他是spring对http相应封装的一个对象,继承HttpEntity类,该实体包含了请求状态(400、404、500等)和一些常用的函数,他的父类HttpEntity储存一些http请求头信息HttpHeaders,泛型请求体body等

1.ResponseEntity getForEntity(String url, Class responseType, Object... uriVariables)

源码如下:

    
    @Override
    public  ResponseEntity getForEntity(String url, Class responseType, Object... uriVariables)
     throws RestClientException {

 RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
 ResponseExtractor> responseExtractor = responseEntityExtractor(responseType);

 return nonNull(execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables));

    }

使用该函数时,第三个参数可以替换掉url里面的占位符,需要注意的是uriVariables 是一个数组,它的顺序对应着url占位符的顺序,如下:

 RestTemplate restTemplate = new RestTemplate();

 ResponseEntity responseEntity = restTemplate.getForEntity("http://user-serivce/user?name={1}"
  ,String.class,"zhangsan");

 String body = responseEntity.getBody();

 //------------------------ 使用自定义类型 -------------------

 RestTemplate restTemplate = new RestTemplate();

 ResponseEntity responseEntity2 = restTemplate.getForEntity("http://user-serivce/user?name={1}"
  ,User.class,"zhangsan");

 User user = responseEntity2.getBody();

2.public ResponseEntity getForEntity(String url, Class responseType, Map uriVariables)
源码如下:

    @Override

    public  ResponseEntity getForEntity(String url, Class responseType, Map uriVariables)

     throws RestClientException {

 RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);

 ResponseExtractor> responseExtractor = responseEntityExtractor(responseType);

 return nonNull(execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables));

    }

该函数与上个相比只是把入参变成了map,唯一变化的是占位符变成了map中的key,列如:

 RestTemplate restTemplate = new RestTemplate();

 Map param = new HashMap();

 param.put("name","zhangsan");

 // 后面的参数会替换
 ResponseEntity responseEntity = restTemplate.getForEntity("http://user-serivce/user?name={name}"
  ,String.class,param);

 String body = responseEntity.getBody();

3.public ResponseEntity getForEntity(URI url, Class responseType)
源码如下

    @Override

    public  ResponseEntity getForEntity(URI url, Class responseType) throws RestClientException {

 RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);

 ResponseExtractor> responseExtractor = responseEntityExtractor(responseType);

 return nonNull(execute(url, HttpMethod.GET, requestCallback, responseExtractor));

    }

该函数与其他的函数相比,使用URI实体对象,代替了之前的url地址和入参,URI是jdk java.net下面的类,使用如下例:

 RestTemplate restTemplate = new RestTemplate();

 UriComponents uri = UriComponentsBuilder.fromUriString("http://user-serivce/user?name={1}")
  .build()
  .expand("zhangsan")
  .encode();

 ResponseEntity responseEntity2 = restTemplate.getForEntity(uri.toUri(),User.class);
 User user = responseEntity2.getBody();

第二种getForObject,算是对getForEntity一种包装,当不需要关注除了body以外的内容时就比较好用,可以用自己的对象接收返回值,他也有三个重载函数,和getForEntity一一对应。看下面的源码

    @Override

    @Nullable

    public  T getForObject(String url, Class responseType, Object... uriVariables) throws RestClientException {

 RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);

 HttpMessageConverterExtractor responseExtractor =

 new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);

 return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);

    }

    @Override

    @Nullable

    public  T getForObject(String url, Class responseType, Map uriVariables) throws   
     RestClientException {

 RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);

 HttpMessageConverterExtractor responseExtractor =

 new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);

 return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);

    }

    @Override

    @Nullable

    public  T getForObject(URI url, Class responseType) throws RestClientException {

 RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);

 HttpMessageConverterExtractor responseExtractor =

 new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);

 return execute(url, HttpMethod.GET, requestCallback, responseExtractor);

    }

例如:

 RestTemplate restTemplate = new RestTemplate();

 UriComponents uri = UriComponentsBuilder.fromUriString("http://user-serivce/user?name={1}")
  .build()
  .expand("zhangsan")
  .encode();

 User user = restTemplate.getForObject(uri.toUri(),User.class);

post、put、delete请求和get请求使用方法几乎差不多,就不在做过多赘述,下面把源码贴出来

post请求
    @Override
    @Nullable
    public URI postForLocation(String url, @Nullable Object request, Object... uriVariables)
     throws RestClientException {

 RequestCallback requestCallback = httpEntityCallback(request);
 HttpHeaders headers = execute(url, HttpMethod.POST, requestCallback, headersExtractor(), uriVariables);
 return (headers != null ? headers.getLocation() : null);
    }

    @Override
    @Nullable
    public URI postForLocation(String url, @Nullable Object request, Map uriVariables)
     throws RestClientException {

 RequestCallback requestCallback = httpEntityCallback(request);
 HttpHeaders headers = execute(url, HttpMethod.POST, requestCallback, headersExtractor(), uriVariables);
 return (headers != null ? headers.getLocation() : null);
    }

    @Override
    @Nullable
    public URI postForLocation(URI url, @Nullable Object request) throws RestClientException {
 RequestCallback requestCallback = httpEntityCallback(request);
 HttpHeaders headers = execute(url, HttpMethod.POST, requestCallback, headersExtractor());
 return (headers != null ? headers.getLocation() : null);
    }

    @Override
    @Nullable
    public  T postForObject(String url, @Nullable Object request, Class responseType,
     Object... uriVariables) throws RestClientException {

 RequestCallback requestCallback = httpEntityCallback(request, responseType);
 HttpMessageConverterExtractor responseExtractor =
  new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
 return execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables);
    }

    @Override
    @Nullable
    public  T postForObject(String url, @Nullable Object request, Class responseType,
     Map uriVariables) throws RestClientException {

 RequestCallback requestCallback = httpEntityCallback(request, responseType);
 HttpMessageConverterExtractor responseExtractor =
  new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
 return execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables);
    }

    @Override
    @Nullable
    public  T postForObject(URI url, @Nullable Object request, Class responseType)
     throws RestClientException {

 RequestCallback requestCallback = httpEntityCallback(request, responseType);
 HttpMessageConverterExtractor responseExtractor =
  new HttpMessageConverterExtractor<>(responseType, getMessageConverters());
 return execute(url, HttpMethod.POST, requestCallback, responseExtractor);
    }

    @Override
    public  ResponseEntity postForEntity(String url, @Nullable Object request,
     Class responseType, Object... uriVariables) throws RestClientException {

 RequestCallback requestCallback = httpEntityCallback(request, responseType);
 ResponseExtractor> responseExtractor = responseEntityExtractor(responseType);
 return nonNull(execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables));
    }

    @Override
    public  ResponseEntity postForEntity(String url, @Nullable Object request,
     Class responseType, Map uriVariables) throws RestClientException {

 RequestCallback requestCallback = httpEntityCallback(request, responseType);
 ResponseExtractor> responseExtractor = responseEntityExtractor(responseType);
 return nonNull(execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables));
    }

    @Override
    public  ResponseEntity postForEntity(URI url, @Nullable Object request, Class responseType)
     throws RestClientException {

 RequestCallback requestCallback = httpEntityCallback(request, responseType);
 ResponseExtractor> responseExtractor = responseEntityExtractor(responseType);
 return nonNull(execute(url, HttpMethod.POST, requestCallback, responseExtractor));
    }
put请求
    @Override
    public void put(String url, @Nullable Object request, Object... uriVariables)
     throws RestClientException {

 RequestCallback requestCallback = httpEntityCallback(request);
 execute(url, HttpMethod.PUT, requestCallback, null, uriVariables);
    }

    @Override
    public void put(String url, @Nullable Object request, Map uriVariables)
     throws RestClientException {

 RequestCallback requestCallback = httpEntityCallback(request);
 execute(url, HttpMethod.PUT, requestCallback, null, uriVariables);
    }

    @Override
    public void put(URI url, @Nullable Object request) throws RestClientException {
 RequestCallback requestCallback = httpEntityCallback(request);
 execute(url, HttpMethod.PUT, requestCallback, null);
    }
delete请求
    @Override
    public void delete(String url, Object... uriVariables) throws RestClientException {
 execute(url, HttpMethod.DELETE, null, null, uriVariables);
    }

    @Override
    public void delete(String url, Map uriVariables) throws RestClientException {
 execute(url, HttpMethod.DELETE, null, null, uriVariables);
    }

    @Override
    public void delete(URI url) throws RestClientException {
 execute(url, HttpMethod.DELETE, null, null);
    }
现在让我们看下@LoadBalanced

首先进入@LoadBalanced类里面,注释写着一句话:Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient,大概意思是这个注解是用来让LoadBalancerClient配置RestTemplate 的标记
LoadBalancerClient是什么呢?我们搜索这个进去看下

public interface LoadBalancerClient extends ServiceInstanceChooser {

    
     T execute(String serviceId, LoadBalancerRequest request) throws IOException;

    
     T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) throws IOException;

    
    URI reconstructURI(ServiceInstance instance, URI original);
}
public interface ServiceInstanceChooser {

    
    ServiceInstance choose(String serviceId);
}

LoadBalancerClient和它的父类共有四个方法:

ServiceInstance choose(String serviceId):根据传入的服务名serviceid挑选出一个对象的服务实例

T execute(String serviceId, LoadBalancerRequest request) :使用负载均衡器里面的默认服务实例执行请求,如果serviceid为空则取一个默认的实例

T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) :使用传入的服务实例请求。

URI reconstructURI(ServiceInstance instance, URI original):微系统构建一个合适的host:prot形式的url,ServiceInstance 对象是带有host、port的具体服务实例,第二个参数是使用逻辑服务名定义为host的URI,返回的uri是根据具体服务实例拼接出来的host:port形式的请求地址
通过观察当前类(LoadBalancerClient)所在的jar发现,有一个LoadBalancerAutoConfiguration,顾名思义,负载均衡自动化配置。

LoadBalancerAutoConfiguration源码的介绍

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

    @LoadBalanced
    @Autowired(required = false)
    private List restTemplates = Collections.emptyList();

    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
     final List customizers) {
 return new SmartInitializingSingleton() {
     @Override
     public void afterSingletonsInstantiated() {
  for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
      for (RestTemplateCustomizer customizer : customizers) {
   customizer.customize(restTemplate);
      }
  }
     }
 };
    }

    @Autowired(required = false)
    private List transformers = Collections.emptyList();

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(
     LoadBalancerClient loadBalancerClient) {
 return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
    }

    @Configuration
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {
 @Bean
 public LoadBalancerInterceptor ribbonInterceptor(
  LoadBalancerClient loadBalancerClient,
  LoadBalancerRequestFactory requestFactory) {
     return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
 }

 @Bean
 @ConditionalOnMissingBean
 public RestTemplateCustomizer restTemplateCustomizer(
  final LoadBalancerInterceptor loadBalancerInterceptor) {
     return new RestTemplateCustomizer() {
  @Override
  public void customize(RestTemplate restTemplate) {
      List list = new ArrayList<>(
restTemplate.getInterceptors());
      list.add(loadBalancerInterceptor);
      restTemplate.setInterceptors(list);
  }
     };
 }
    }
。。。省略一堆重试机制的配置。。。
}

根据里面的注释发现我们猜的没错,根据该类的注解头@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
发现,自动配置需要当前工程必须有RestTemplate这个类和springBeanFactory里面必须有LoadBalancerClient的实现类的bean,我们还看到最上面有几个动作:

  1. 维护了LoadBalancerRequestTransformer实例列表并初始化,
  2. 根据loadBalancerClient和LoadBalancerRequestTransformer列表创建了LoadBalancerRequestFactory的bean,
    3.维护了一个被LoadBalanced修饰的restTemplate列表,通过自动注入初始化,并让所有的restTemplate被RestTemplateCustomizer定制
    4.创建了静态内部类LoadBalancerInterceptorConfig,看见上面注解@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate"),如果RetryTemplate不存在就加载该拦截器配置,静态内部类LoadBalancerInterceptorConfig创建了一个LoadBalancerInterceptor拦截器和RestTemplateCustomizer的bean,使用了传参构造方法把LoadBalancerClient传入进去,也就是说被@LoadBalanced所标记的RestTemplate调用http请求时都会经过该拦截器,同时传入的还有requestFactory 。

接下来看看LoadBalancerInterceptor拦截器如何实现负载均衡的,看下面源码:

package org.springframework.cloud.client.loadbalancer;

import java.io.IOException;
import java.net.URI;

import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.util.Assert;


public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;
    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
 this.loadBalancer = loadBalancer;
 this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
 // for backwards compatibility
 this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
     final ClientHttpRequestExecution execution) throws IOException {
 final URI originalUri = request.getURI();
 String serviceName = originalUri.getHost();
 Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
 return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
    }
}

我们看intercept函数,当RestTemplate请求被拦截之后,从HttpRequest 拿到URI,并拿到服务名,并通过自己的构造工厂来创建具体的请求实例(请记住这个地方),通过调用execute根据服务名拿到具体的实例发起请求。

下面我们看下LoadBalancerClient的具体实现,通过idea很容易找到他的实现类RibbonLoadBalancerClient,找到拦截器调用的方法:

    @Override
    public  T execute(String serviceId, LoadBalancerRequest request) throws IOException {

 //通过serviceId在SpringClientFactory里面拿到服务实例
 ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
 Server server = getServer(loadBalancer);
 if (server == null) {
     throw new IllegalStateException("No instances available for " + serviceId);
 }
 RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
  serviceId), serverIntrospector(serviceId).getmetadata(server));

 return execute(serviceId, ribbonServer, request);
    }

    protected ILoadBalancer getLoadBalancer(String serviceId) {
 return this.clientFactory.getLoadBalancer(serviceId);
    }

    protected Server getServer(ILoadBalancer loadBalancer) {
 if (loadBalancer == null) {
     return null;
 }
 return loadBalancer.chooseServer("default"); // TODO: better handling of key
    }

看代码发现spring通过serviceId在SpringClientFactory里面拿到服务列表,根据服务列表获取一个默认的具体服务。
通过getServer函数看到,它没有使用自己的选择器,而是使用了Netflix RibbonI的LoadBalancer接口。

package com.netflix.loadbalancer;

import java.util.List;


public interface ILoadBalancer {

    
    public void addServers(List newServers);

    
    public Server chooseServer(Object key);

    
    public void markServerDown(Server server);

    
    @Deprecated
    public List getServerList(boolean availableOnly);

    
    public List getReachableServers();

    
    public List getAllServers();
}

里面有六个函数,看名字就容易理解:
1.addServers对负载均衡器维护的服务实例列表增加实例
2.chooseServer 通过某种策略筛选服务
3.markServerDown通知负载均衡器某个服务停止运行
4.getServerList 获取服务列表 已过期
5.getReachableServers 获取正常运行的服务实例列表
6.getAllServers 获取所有服务实例

我们查看下该接口的实现类

baseLoadBalancer实现了基础的负载均衡,DynamicServerListLoadBalancer和ZoneAwareLoadBalancer做了一些功能扩展,那我们到底是用了哪个实现呢?我们回过头看下上面的ILoadBalancer loadBalancer = getLoadBalancer(serviceId)函数,点进去往下走,发现进入SpringClientFactory类的getInstance函数:

    @Override
    public  C getInstance(String name, Class type) {
 C instance = super.getInstance(name, type);
 if (instance != null) {
     return instance;
 }
 IClientConfig config = getInstance(name, IClientConfig.class);
 return instantiateWithConfig(getContext(name), type, config);
    }

我们发现使用了IClientConfig接口,他有两个实现类其中一个是spring的FeignOptionsClientConfig类,通过上文我们知道我们用的负载均衡器是ribbon再带的,所有看自带的实现类DefaultClientConfigImpl:

public static final String DEFAULT_NFLOADBALANCER_CLASSNAME = "com.netflix.loadbalancer.ZoneAwareLoadBalancer";

当看到这个的时候大家可能就恍然大悟,噢,原来是它,大家可以通过工具查看调用链就明白怎么调用的了,因为太长就不在展示。我们暂且记着具体的负载均衡器还是由Ribbon实现的,我们这章暂时不对负载均衡器实现方法多做叙述,放到下章单独讲解。

下面接着LoadBalancerClient的实现RibbonLoadBalancerClient的execute函数,当拿到service时包装成RibbonServer,还使用该对象回调了拦截器中的LoadBalancerRequest的apply方法,向一个具体的服务发起请求,实现前面介绍的以服务名为host的URI请求到host:port形式的实际请求地址的转换。

 if(serviceInstance instanceof RibbonServer) {
     server = ((RibbonServer)serviceInstance).getServer();
 }
 if (server == null) {
     throw new IllegalStateException("No instances available for " + serviceId);
 }

 RibbonLoadBalancerContext context = this.clientFactory
  .getLoadBalancerContext(serviceId);
 RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

 try {
     T returnVal = request.apply(serviceInstance);
     statsRecorder.recordStats(returnVal);
     return returnVal;
 }

T apply(ServiceInstance instance)函数中,ServiceInstance 对象是对实例服务的抽象定义,在接口中暴露一些服务治理中每个实例服务都需要提供的基本信息:

public interface ServiceInstance {

    
    String getServiceId();

    
    String getHost();

    
    int getPort();

    
    boolean isSecure();

    
    URI getUri();

    
    Map getmetadata();
}

上面包装server的RibbonServer类就是对ServiceInstance 的一种实现,它除了包含server对象之外,还包含了是否使用https标识和一个map类型的元数据集合。我们看下RibbonLoadBalancerClient的静态内部类RibbonServer

   
    public static class RibbonServer implements ServiceInstance {
 private final String serviceId;
 private final Server server;
 private final boolean secure;
 private Map metadata;

 public RibbonServer(String serviceId, Server server) {
     this(serviceId, server, false, Collections. emptyMap());
 }

 public RibbonServer(String serviceId, Server server, boolean secure,
  Map metadata) {
     this.serviceId = serviceId;
     this.server = server;
     this.secure = secure;
     this.metadata = metadata;
 }
。。。省略一堆get、set方法。。。
    }

接下来看下当apply函数传入具体的实现的时候是怎么处理的,我们找到apply的实现方法,是在LoadBalancerRequestFactory类的createRequest函数

public LoadBalancerRequest createRequest(final HttpRequest request,
     final byte[] body, final ClientHttpRequestExecution execution) {
 return new LoadBalancerRequest() {

     @Override
     public ClientHttpResponse apply(final ServiceInstance instance)
      throws Exception {
  HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
  if (transformers != null) {
      for (LoadBalancerRequestTransformer transformer : transformers) {
   serviceRequest = transformer.transformRequest(serviceRequest, instance);
      }
  }
  return execution.execute(serviceRequest, body);
     }

 };
    }

等等,好像有点眼熟,这不是正式被拦截器拦截的时候调用的函数么(忘记的同学可以看上面的源码)?是的,当@LoadBalanced修饰的RestTemplate的http请求被拦截时,就把apply实现封装进去了,而且它还生成了一个ServiceRequestWrapper对象并使用该对象,该对象继承了HttpRequestWrapper并重写了getURI函数,重写之后的getURI调用自己内部的LoadBalancerClient的reconstructURI函数创建URI。

public class ServiceRequestWrapper extends HttpRequestWrapper {
    private final ServiceInstance instance;
    private final LoadBalancerClient loadBalancer;

    public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance,
     LoadBalancerClient loadBalancer) {
 super(request);
 this.instance = instance;
 this.loadBalancer = loadBalancer;
    }

    @Override
    public URI getURI() {
 URI uri = this.loadBalancer.reconstructURI(
  this.instance, getRequest().getURI());
 return uri;
    }
}

接着看上面的apply方法,调用了execution.execute(serviceRequest, body)函数,ClientHttpRequestExecution 是一个接口,查看他的实现类,是在InterceptingClientHttpRequest里面的一个内部类InterceptingRequestExecution

private class InterceptingRequestExecution implements ClientHttpRequestExecution {

 private final Iterator iterator;

 public InterceptingRequestExecution() {
     this.iterator = interceptors.iterator();
 }

 @Override
 public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
     if (this.iterator.hasNext()) {
  ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
  return nextInterceptor.intercept(request, body, this);
     }
     else {
  HttpMethod method = request.getMethod();
  Assert.state(method != null, "No standard HTTP method");
  ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
  request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
  if (body.length > 0) {
      if (delegate instanceof StreamingHttpOutputMessage) {
   StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
   streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
      }
      else {
   StreamUtils.copy(body, delegate.getBody());
      }
  }
  return delegate.execute();
     }
 }
    }

看上面代码看到,当创建请求的时候,requestFactory.createRequest(request.getURI(), method),(request.getURI()就会调用被ServiceRequestWrapper 重写之后的getURI,然后他就调用ServiceRequestWrapper 自身对象LoadBalancerClient的实现类的reconstructURI函数,组装具体的请求地址,该函数的实现位于之前介绍的RibbonLoadBalancerClient类里面

    @Override
    public URI reconstructURI(ServiceInstance instance, URI original) {
 Assert.notNull(instance, "instance can not be null");
 String serviceId = instance.getServiceId();
 RibbonLoadBalancerContext context = this.clientFactory
  .getLoadBalancerContext(serviceId);
 Server server = new Server(instance.getHost(), instance.getPort());
 IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
 ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
 URI uri = RibbonUtils.updateToHttpsIfNeeded(original, clientConfig,
  serverIntrospector, server);
 return context.reconstructURIWithServer(server, uri);
    }

通过该函数可以看到,拿到serviceId,然后从客服端工厂里面拿到对象的负载均衡器上下文RibbonLoadBalancerContext ,根据服务实例ServiceInstance信息创建服务server,拿到客户端的配置信息,获取服务的源信息及安全性校验,然后通过参数及服务信息和服务配置信息,安全校验信息组装URI,看下面源码及注释

    
    public static URI updateToHttpsIfNeeded(URI uri, IClientConfig config, ServerIntrospector serverIntrospector,
     Server server) {
 String scheme = uri.getScheme();
 if (!"".equals(uri.toString()) && !"https".equals(scheme) && isSecure(config, serverIntrospector, server)) {
     UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromUri(uri).scheme("https");
     if (uri.getRawQuery() != null) {
  // When building the URI, UriComponentsBuilder verify the allowed characters and does not 
  // support the '+' so we replace it for its equivalent '%20'.
  // See issue https://jira.spring.io/browse/SPR-10172
  uriComponentsBuilder.replaceQuery(uri.getRawQuery().replace("+", "%20"));
     }
     return uriComponentsBuilder.build(true).toUri();
 }
 return uri;
    }

上面多次用到了SpringClientFactory,他是究竟是什么呢?SpringClientFactory其实是一个用来创建客户端负载均衡的工厂类,该工厂类会为每个不同名的Ribbon客户端生成不同的上下文

reconstructURI函数结尾调用了RibbonLoadBalancerContextreconstructURIWithServer函数,RibbonLoadBalancerContext继承于LoadBalancerContext,RibbonLoadBalancerContext类用于储存一些被负载均衡器使用的上下文内容和api操作(它的父类LoadBalancerContext里面的reconstructURIWithServer函数就是其中一个),让我们看下reconstructURIWithServer函数源码

public URI reconstructURIWithServer(Server server, URI original) {
 String host = server.getHost();
 int port = server.getPort();
 String scheme = server.getScheme();

 if (host.equals(original.getHost()) 
  && port == original.getPort()
  && scheme == original.getScheme()) {
     return original;
 }
 if (scheme == null) {
     scheme = original.getScheme();
 }
 if (scheme == null) {
     scheme = deriveSchemeAndPortFromPartialUri(original).first();
 }

 try {
     StringBuilder sb = new StringBuilder();
     sb.append(scheme).append("://");
     if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
  sb.append(original.getRawUserInfo()).append("@");
     }
     sb.append(host);
     if (port >= 0) {
  sb.append(":").append(port);
     }
     sb.append(original.getRawPath());
     if (!Strings.isNullOrEmpty(original.getRawQuery())) {
  sb.append("?").append(original.getRawQuery());
     }
     if (!Strings.isNullOrEmpty(original.getRawFragment())) {
  sb.append("#").append(original.getRawFragment());
     }
     URI newURI = new URI(sb.toString());
     return newURI;     
 } catch (URISyntaxException e) {
     throw new RuntimeException(e);
 }
    }

我们看到reconstructURIWithServer函数源码其实跟上面的reconstructURI函数实现类似,只是reconstructURI使用的参数实例是spring cloud的ServiceInstance,而reconstructURIWithServer函数用的是Netflix Ribbon用的是自己的Server,所以RibbonLoadBalancerClient实现reconstructURI方法时做了一个转换,所以使用ServiceInstance的信息创建了server,并使用RibbonLoadBalancerContext对象的reconstructURIWithServer函数构建服务实例的URI,忘记的同学去返回上面看下源码。

分析到这已经可以大致理清spring cloud ribbon实现客服端负载均衡的脉络了,如何去使用LoadBalancerInterceptor拦截RestTemplate请求进行拦截,并且利用spring cloud的负载均衡器LoadBalancerClient将逻辑服务名的url转换为具体的请求地址,同时分析了LoadBalancerClient的ribbon实现类RibbonLoadBalancerClient,还可以知道使用ribbon实现负载均衡的时候,还是使用了ribbon本身自带的接口ILoadBalancer,并默认使用他的ZoneAwareLoadBalancer类来实现负载均衡。

喜欢的话点点关注,点点关注不迷路,您的推荐是我创作的动力。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/235328.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号