现在是2021年8月的一个星期四,
窗外是刚下过大雨的夜色,伴着几点稀零的灯光。
我戴着耳机,
在客厅阳台上的书桌前吹着安静的风,
耳机中正好播放“夜风凛凛 独回望旧事前尘”张国荣的歌。
不知道不觉间,我来公司也已经二年了。
二年间匆匆过去,同样也预示着我也从大学毕业正好四年了。
在公司这二年间,我见证了公司的微服务从docker->docker swarm->kubernetes迁移的一路历程。
同时变化的也还有青涩的自己,从中级JAVA->项目技术负责人->team leader,一切感觉都是那么的真切。
一次次新模块产品的上线、一次次的重构后性能的极速提升、每次经历上万QPS并发时激动的心情,那些个激情澎湃的每一天。
直到一个多月前的一天,我接受了同学的建议,让我出去面试看看,试试外面更大规模公司的机会,几轮面试一路通过。
拿到offer后犹豫了很久,在同学的劝说下,我最终还是向公司提出了离职。
最后交接的两周,看着曾经写下的代码,重构后的部分模块,我竟后悔就不该一时冲动提出离职,也曾好几夜的胡思乱想与失眠。
尽管后面找了机会约了公司领导单独聊了聊,直到领导祝我在新的公司好好干后,我才明白职场中的忠诚度是什么。离职需谨慎!
既然没有了后悔的机会,也没有别的退路,那就勇往直前地向前冲吧(至少钱多)。
去新的公司好好干。
一夜过去,周五了。
我也早已想好,离职最后一天的我只需要在工位上静静地坐着,等着时间滴答滴答地走。一到点我起身,给周围同事一个笑容,之后默默地背起装着陪伴了我2年的办公用品的书包,给领导打声照顾说声再见。
再之后便是转身,只需走向办公室门外的电梯口,不再回头,按下电梯。
我看着电梯的门缓缓地合闭并关上。那一刻,也就是我和这可爱的公司、可爱的同事、可爱的领导短暂离别的那一刻。
再见了,约苗!
根据了解,新的公司项目中没有使用spring cloud相关的技术栈,去了新公司后也要和这个高大上的框架做一个短暂的告别,甚是不舍。
趁着离职的最后一天也没什么工作安排,在公司里最后再记录一下关于spring cloud kubernetes框架中关于loadbalancer部分源码的阅读笔记。
前言
结合公司产品的部署环境都是部署在k8s下,为了让微服务更加配合k8s的环境,在我的建议下,领导最终同意了采用spring cloud kubernetes框架作为spring cloud服务发现和治理的组件。
修改为spring cloud kubernetes之后的项目,在经过了1年多的线上运行,经历了上千万笔订单和在上万QPS的验证下,目前一切运行良好。
其中项目中服务调用服务是通过feign调用的,在feign调用的过程中会使用到loadbalancer,本文将通过阅读源码,来看一下在spring cloud kubernetes中feign调用时的核心代码链,以及spring cloud kubernetes中loadbacner两种负载模式获取最终url的过程与区别是什么。
spring cloud feign调用主要过程feign.ReflectiveFeign.FeignInvocationHandler#invoke
feign.SynchronousMethodHandler#invoke
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
response = client.execute(request, options);
org.springframework.cloud.openfeign.loadbalancer.RetryableFeignBlockingLoadBalancerClient#execute
@Override
public Response execute(Request request, Request.Options options) throws IOException {
final URI originalUri = URI.create(request.url());
String serviceId = originalUri.getHost();
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(serviceId,
loadBalancerClient);
RetryTemplate retryTemplate = buildRetryTemplate(serviceId, request, retryPolicy);
return retryTemplate.execute(context -> {
Request feignRequest = null;
ServiceInstance retrievedServiceInstance = null;
Set supportedLifecycleProcessors = LoadBalancerLifecyclevalidator
.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RetryableRequestContext.class, ResponseData.class, ServiceInstance.class);
String hint = getHint(serviceId);
DefaultRequest lbRequest = new DefaultRequest<>(
new RetryableRequestContext(null, buildRequestData(request), hint));
// On retries the policy will choose the server and set it in the context
// and extract the server and update the request being made
if (context instanceof LoadBalancedRetryContext) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
ServiceInstance serviceInstance = lbContext.getServiceInstance();
if (serviceInstance == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Service instance retrieved from LoadBalancedRetryContext: was null. "
+ "Reattempting service instance selection");
}
ServiceInstance previousServiceInstance = lbContext.getPreviousServiceInstance();
lbRequest.getContext().setPreviousServiceInstance(previousServiceInstance);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
//通过负载均衡器获取一个instance
retrievedServiceInstance = loadBalancerClient.choose(serviceId, lbRequest);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Selected service instance: %s", retrievedServiceInstance));
}
lbContext.setServiceInstance(retrievedServiceInstance);
}
if (retrievedServiceInstance == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("Service instance was not resolved, executing the original request");
}
org.springframework.cloud.client.loadbalancer.Response lbResponse = new DefaultResponse(
retrievedServiceInstance);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext(
CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
feignRequest = request;
}
else {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Using service instance from LoadBalancedRetryContext: %s",
retrievedServiceInstance));
}
String reconstructedUrl = loadBalancerClient.reconstructURI(retrievedServiceInstance, originalUri)
.toString();
feignRequest = buildRequest(request, reconstructedUrl);
}
}
org.springframework.cloud.client.loadbalancer.Response lbResponse = new DefaultResponse(
retrievedServiceInstance);
Response response = LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(delegate, options,
feignRequest, lbRequest, lbResponse, supportedLifecycleProcessors,
retrievedServiceInstance != null);
int responseStatus = response.status();
if (retryPolicy != null && retryPolicy.retryableStatusCode(responseStatus)) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Retrying on status code: %d", responseStatus));
}
response.close();
throw new RetryableStatusCodeException(serviceId, responseStatus, response, URI.create(request.url()));
}
return response;
}, new LoadBalancedRecoveryCallback() {
@Override
protected Response createResponse(Response response, URI uri) {
return response;
}
});
}
org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient#choose(java.lang.String, org.springframework.cloud.client.loadbalancer.Request)
@Override publicServiceInstance choose(String serviceId, Request request) { ReactiveLoadBalancer loadBalancer = loadBalancerClientFactory.getInstance(serviceId); if (loadBalancer == null) { return null; } Response loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block(); if (loadBalancerResponse == null) { return null; } return loadBalancerResponse.getServer(); }
org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer#choose
@SuppressWarnings("rawtypes")
@Override
// see original
// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
public Mono> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
org.springframework.cloud.context.named.ClientFactoryObjectProvider#getIfAvailable(java.util.function.Supplier)
public T getIfAvailable(SupplierdefaultSupplier) throws BeansException { return this.delegate().getIfAvailable(defaultSupplier); }
org.springframework.cloud.context.named.ClientFactoryObjectProvider#getIfAvailable(java.util.function.Supplier)
@Override public T getIfAvailable(SupplierdefaultSupplier) throws BeansException { return delegate().getIfAvailable(defaultSupplier); }
org.springframework.beans.factory.support.DefaultListableBeanFactory#getBeanProvider(org.springframework.core.ResolvableType, boolean)
@Override publicObjectProvider getBeanProvider(ResolvableType requiredType, boolean allowEagerInit) { //org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier return new BeanObjectProvider () { @Override public T getObject() throws BeansException { T resolved = resolveBean(requiredType, null, false); if (resolved == null) { throw new NoSuchBeanDefinitionException(requiredType); } return resolved; } @Override public T getObject(Object... args) throws BeansException { T resolved = resolveBean(requiredType, args, false); if (resolved == null) { throw new NoSuchBeanDefinitionException(requiredType); } return resolved; } @Override @Nullable public T getIfAvailable() throws BeansException { try { return resolveBean(requiredType, null, false); } catch (ScopeNotActiveException ex) { // Ignore resolved bean in non-active scope return null; } } @Override public void ifAvailable(Consumer dependencyConsumer) throws BeansException { T dependency = getIfAvailable(); if (dependency != null) { try { dependencyConsumer.accept(dependency); } catch (ScopeNotActiveException ex) { // Ignore resolved bean in non-active scope, even on scoped proxy invocation } } } @Override @Nullable public T getIfUnique() throws BeansException { try { return resolveBean(requiredType, null, true); } catch (ScopeNotActiveException ex) { // Ignore resolved bean in non-active scope return null; } } @Override public void ifUnique(Consumer dependencyConsumer) throws BeansException { T dependency = getIfUnique(); if (dependency != null) { try { dependencyConsumer.accept(dependency); } catch (ScopeNotActiveException ex) { // Ignore resolved bean in non-active scope, even on scoped proxy invocation } } } @SuppressWarnings("unchecked") @Override public Stream stream() { return Arrays.stream(getBeanNamesForTypedStream(requiredType, allowEagerInit)) .map(name -> (T) getBean(name)) .filter(bean -> !(bean instanceof NullBean)); } @SuppressWarnings("unchecked") @Override public Stream orderedStream() { String[] beanNames = getBeanNamesForTypedStream(requiredType, allowEagerInit); if (beanNames.length == 0) { return Stream.empty(); } Map matchingBeans = CollectionUtils.newlinkedHashMap(beanNames.length); for (String beanName : beanNames) { Object beanInstance = getBean(beanName); if (!(beanInstance instanceof NullBean)) { matchingBeans.put(beanName, (T) beanInstance); } } Stream stream = matchingBeans.values().stream(); return stream.sorted(adaptOrderComparator(matchingBeans)); } }; }
org.springframework.beans.factory.support.DefaultListableBeanFactory.BeanObjectProvider#getIfAvailable会调用resolveBean,
org.springframework.beans.factory.support.DefaultListableBeanFactory#resolveBean
@Nullable privateT resolveBean(ResolvableType requiredType, @Nullable Object[] args, boolean nonUniqueAsNull) { NamedBeanHolder namedBean = resolveNamedBean(requiredType, args, nonUniqueAsNull); if (namedBean != null) { return namedBean.getBeanInstance(); } BeanFactory parent = getParentBeanFactory(); if (parent instanceof DefaultListableBeanFactory) { return ((DefaultListableBeanFactory) parent).resolveBean(requiredType, args, nonUniqueAsNull); } else if (parent != null) { ObjectProvider parentProvider = parent.getBeanProvider(requiredType); if (args != null) { return parentProvider.getObject(args); } else { return (nonUniqueAsNull ? parentProvider.getIfUnique() : parentProvider.getIfAvailable()); } } return null; }
上面的resolveNamedBean方法:
org.springframework.beans.factory.support.DefaultListableBeanFactory#resolveNamedBean(org.springframework.core.ResolvableType, java.lang.Object[], boolean)
@Nullable privateNamedBeanHolder resolveNamedBean( ResolvableType requiredType, @Nullable Object[] args, boolean nonUniqueAsNull) throws BeansException { //org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier Assert.notNull(requiredType, "Required type must not be null"); String[] candidateNames = getBeanNamesForType(requiredType); if (candidateNames.length > 1) { List autowireCandidates = new ArrayList<>(candidateNames.length); for (String beanName : candidateNames) { if (!containsBeanDefinition(beanName) || getBeanDefinition(beanName).isAutowireCandidate()) { autowireCandidates.add(beanName); } } if (!autowireCandidates.isEmpty()) { candidateNames = StringUtils.toStringArray(autowireCandidates); } } if (candidateNames.length == 1) { return resolveNamedBean(candidateNames[0], requiredType, args); } else if (candidateNames.length > 1) { Map candidates = CollectionUtils.newlinkedHashMap(candidateNames.length); for (String beanName : candidateNames) { if (containsSingleton(beanName) && args == null) { Object beanInstance = getBean(beanName); candidates.put(beanName, (beanInstance instanceof NullBean ? null : beanInstance)); } else { candidates.put(beanName, getType(beanName)); } } String candidateName = determinePrimaryCandidate(candidates, requiredType.toClass()); if (candidateName == null) { candidateName = determineHighestPriorityCandidate(candidates, requiredType.toClass()); } if (candidateName != null) { Object beanInstance = candidates.get(candidateName); if (beanInstance == null) { return null; } if (beanInstance instanceof Class) { return resolveNamedBean(candidateName, requiredType, args); } return new NamedBeanHolder<>(candidateName, (T) beanInstance); } if (!nonUniqueAsNull) { throw new NoUniqueBeanDefinitionException(requiredType, candidates.keySet()); } } return null; }
org.springframework.beans.factory.support.DefaultListableBeanFactory#getBeanNamesForType(org.springframework.core.ResolvableType)
@Override
public String[] getBeanNamesForType(ResolvableType type) {
//org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier
return getBeanNamesForType(type, true, true);
}
@Override
public String[] getBeanNamesForType(ResolvableType type, boolean includeNonSingletons, boolean allowEagerInit) {
//org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier
Class> resolved = type.resolve();
if (resolved != null && !type.hasGenerics()) {
return getBeanNamesForType(resolved, includeNonSingletons, allowEagerInit);
}
else {
return doGetBeanNamesForType(type, includeNonSingletons, allowEagerInit);
}
}
最终获得:
service:
org.springframework.cloud.kubernetes.client.loadbalancer.KubernetesClientServicesListSupplier
pod:
org.springframework.cloud.loadbalancer.core.CachingServiceInstanceListSupplier
待更新,此模式下默认不会走缓存
pod模式下获取远程访问地址过程待更新,此模式下默认会走缓存
configorg.springframework.cloud.loadbalancer.config.LoadBalancerCacheAutoConfiguration
待更新,具体初始化过程



