Spring Cloud Ribbon 可以实现客户端负载均衡,本文仅仅以Ribbon单独使用时对源码进行分析和理解,没有集成Eureka。
Ribbon会针对我们在配置文件中配置的服务地址进行负载均衡的计算,得到目标地址后,进行服务的调用。
接下来会针对两方面进行分析:
1、为什么我们使用@LoadBalanced注解作用于RestTemplate上就可以实现负载均衡了呢?
2、如何对服务地址进行解析的呢?
3 源码解析 3.1 @LoadBalancer注解的作用org.springframework.cloud spring-cloud-starter-netflix-ribbon 2.2.9.RELEASE
在使用RestTemplate的时候,我们加了一个@LoadBalance注解,就能让这个RestTemplate在请求时,就拥有客户端负载均衡的能力。
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
然后,我们打开@LoadBalanced这个注解,可以发现该注解仅仅是声明了一个@Qualifier 注解。
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
@Qualifier注解相信大家在spring中都接触过,起作用就是当您创建多个相同类型的 bean 并希望仅使用属性装配其中一个 bean 时,您可以使用@Qualifier 注解和 @Autowired 通过指定应该装配哪个确切的 bean来消除歧义。
下面我们进行一个案例演示:
现在我们有一个TestClass类,想把这个类的实例对象注入到Spring容器中
public class TestClass {
private String name;
}
@Configuration
public class TestConfig {
@Bean("testClass1")
TestClass testClass(){
return new TestClass("testClass1");
}
@Bean("testClass2")
TestClass testClass2(){
return new TestClass("testClass2");
}
}
@RestController
public class TestController {
@Autowired(required = false)
List testClasses= Collections.emptyList();
@GetMapping("/test")
public Object test(){
return testClasses;
}
}
通过浏览器访问http://localhost:8080/test
[
{
name: "testClass1"
},
{
name: "testClass2"
}
]
此时我们修改修改TestConfig类
@Configuration
public class TestConfig {
@Bean("testClass1")
@Qualifier
TestClass testClass(){
return new TestClass("testClass1");
}
@Bean("testClass2")
TestClass testClass2(){
return new TestClass("testClass2");
}
}
再次进行访问可以发现:
[
{
name: "testClass1"
}
]
通过这个案例我们是不是可以发现@LoadBalance注解的作用了,肯定会有个地方对加上了@LoadBanlance注解的RestTemplate进行筛选。答案肯定是没错的,确实有筛选的地方,下面我们就分析是哪里进行保存的呢?
我们找到spring-cloud-commons-2.2.9.RELEASE.jar这个包
找到META-INF目录。相信大家并不陌生,这里肯定使用的是spi的机制把一些类加载到IOC容器中的,我们打开spring.factories文件:
# AutoConfiguration org.springframework.boot.autoconfigure.EnableAutoConfiguration= org.springframework.cloud.client.CommonsClientAutoConfiguration, org.springframework.cloud.client.ReactiveCommonsClientAutoConfiguration, org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClientAutoConfiguration, org.springframework.cloud.client.discovery.composite.reactive.ReactiveCompositeDiscoveryClientAutoConfiguration, org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration, org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration, org.springframework.cloud.client.discovery.simple.reactive.SimpleReactiveDiscoveryClientAutoConfiguration, org.springframework.cloud.client.hypermedia.CloudHypermediaAutoConfiguration, org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerAutoConfiguration, org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration, org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerBeanPostProcessorAutoConfiguration, org.springframework.cloud.client.loadbalancer.reactive.ReactorLoadBalancerClientAutoConfiguration, org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancerAutoConfiguration, org.springframework.cloud.client.serviceregistry.ServiceRegistryAutoConfiguration, org.springframework.cloud.commons.httpclient.HttpClientConfiguration, org.springframework.cloud.commons.util.UtilAutoConfiguration, org.springframework.cloud.configuration.CompatibilityVerifierAutoConfiguration, org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration # Environment Post Processors org.springframework.boot.env.EnvironmentPostProcessor= org.springframework.cloud.client.HostInfoEnvironmentPostProcessor # Failure Analyzers org.springframework.boot.diagnostics.FailureAnalyzer= org.springframework.cloud.configuration.CompatibilityNotMetFailureAnalyzer
我们发现有一个.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration类。
该类中会做两件事情:
- 把配置了@LoadBalanced 注解的RestTemplate 注入到restTemplates 集合中
- 拿到了RestTemplate 之后,在LoadBalancerInterceptorConfig配置类中,会针对这些RestTemplate 进行拦截,当我们调用目标方法的时候会通过拦截器进行拦截
具体的源码如下:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
//收集配置了@LoadBalanced 注解的RestTemplate
@LoadBalanced
@Autowired(required = false)
private List restTemplates = Collections.emptyList();
...省略
//拿到了RestTemplate 之后,在LoadBalancerInterceptorConfig配置类中,会针对这些RestTemplate 进行拦截,当我们调用目标方法的时候会通过拦截器进行拦截
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
//装载一个LoadBalancerInterceptor的实例到IOC容器。
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
//遍历所有加了@LoadBalanced注解的RestTemplate,在原有的拦截器之上,再增加了一个LoadBalancerInterceptor
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
...
3.2 RestTemplate调用过程
接下来我们通过RestTemplate调用过程分析整个流程,在调用的过程中肯定会进入到我们上面注入的拦截器,进行拦截。
我们在程序中,使用下面的代码发起远程请求时restTemplate.getForObject(url,String.class);
整个调用过程如下:(我们重点分析拦截器部分,restTemplate源码不做重点)
RestTemplate.getForObject -----> AbstractClientHttpRequest.execute() -----> AbstractBufferingClientHttpRequest.executeInternal() -----> InterceptingClientHttpRequest.executeInternal() -----> InterceptingClientHttpRequest.execute()3.3 LoadBalancerInterceptor拦截器
LoadBalancerInterceptor是一个拦截器,当一个被@Loadbalanced 注解修饰的RestTemplate 对象发起HTTP请求时,会被LoadBalancerInterceptor 的intercept 方法拦截,在这个方法中直接通过getHost 方法就可以获取到服务名(因为我们在使用RestTemplate调用服务的时候,使用的是服务名而不是域名,所以这里可以通过getHost直接拿到服务名然后去调用execute方法发起请求)
- InterceptingClientHttpRequest.execute()
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
//遍历所有的拦截器,通过拦截器进行逐个处理。
if(this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
//调用intercept方法进行拦截 重点分析
return nextInterceptor.intercept(request, body, this);
} else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> {
delegate.getHeaders().addAll(key, value);
});
if(body.length > 0) {
if(delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
streamingOutputMessage.setBody((outputStream) -> {
StreamUtils.copy(body, outputStream);
});
} else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
- InterceptingClientHttpRequest.intercept()
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
//获取请求的uri
final URI originalUri = request.getURI();
//获取服务名称,因为我们配置文件中配置的就是服务名
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
//private LoadBalancerClient loadBalancer; LoadBalancerClient其实是一个接口,我们看一下它的类图,它有一个唯一的实现类:RibbonLoadBalancerClient 。
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
- RibbonLoadBalancerClient.execute()
publicT execute(String serviceId, LoadBalancerRequest request, Object hint) throws IOException { //根据serviceId获得一个ILoadBalancer,实例为:ZoneAwareLoadBalancer ILoadBalancer loadBalancer = getLoadBalancer(serviceId); //调用getServer方法去获取一个服务实例 Server server = getServer(loadBalancer, hint); //判断Server的值是否为空。这里的Server实际上就是传统的一个服务节点,这个对象存储了服务节点的一些元数据,比如host、port、schema等 if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } //封装成RibbonServer对象 RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); }
。。。 未完待续



