栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spring Cloud 请求重试机制核心代码分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spring Cloud 请求重试机制核心代码分析

场景

发布微服务的操作一般都是打完新代码的包,kill掉在跑的应用,替换新的包,启动。

spring cloud 中使用eureka为注册中心,它是允许服务列表数据的延迟性的,就是说即使应用已经不在服务列表了,客户端在一段时间内依然会请求这个地址。那么就会出现请求正在发布的地址,而导致失败。

我们会优化服务列表的刷新时间,以提高服务列表信息的时效性。但是无论怎样,都无法避免有那么一段时间是数据不一致的。

所以我们想到一个办法就是重试机制,当a机子在重启时,同个集群的b是可以正常提供服务的,如果有重试机制就可以在上面这个场景里进行重试到b而不影响正确响应。

操作

需要进行如下的操作:

ribbon:
 ReadTimeout: 10000
 ConnectTimeout: 10000
 MaxAutoRetries: 0
 MaxAutoRetriesNextServer: 1
 OkToRetryOnAllOperations: false

引入spring-retry包


  org.springframework.retry
  spring-retry
 

以zuul为例子还需要配置开启重试:

zuul.retryable=true

遇到了问题

然而万事总没那么一帆风顺,通过测试重试机制生效了,但是并没有我想象的去请求另一台健康的机子,于是被迫去吧开源码看一看,最终发现是源码的bug,不过已经修复,升级版本即可。

代码分析

使用的版本是

spring-cloud-netflix-core:1.3.6.RELEASE

spring-retry:1.2.1.RELEASE

spring cloud 依赖版本:


    
      
 org.springframework.cloud
 spring-cloud-dependencies
 ${spring-cloud.version}
 pom
 import
      
    
  

因为启用了重试,所以请求应用时会执行RetryableRibbonLoadBalancingHttpClient.execute方法:

public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
    final RequestConfig.Builder builder = RequestConfig.custom();
    IClientConfig config = configOverride != null ? configOverride : this.config;
    builder.setConnectTimeout(config.get(
 CommonClientConfigKey.ConnectTimeout, this.connectTimeout));
    builder.setSocketTimeout(config.get(
 CommonClientConfigKey.ReadTimeout, this.readTimeout));
    builder.setRedirectsEnabled(config.get(
 CommonClientConfigKey.FollowRedirects, this.followRedirects));

    final RequestConfig requestConfig = builder.build();
    final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);
    RetryCallback retryCallback = new RetryCallback() {
      @Override
      public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception {
 //on retries the policy will choose the server and set it in the context
 //extract the server and update the request being made
 RibbonApacheHttpRequest newRequest = request;
 if(context instanceof LoadBalancedRetryContext) {
   ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance();
   if(service != null) {
     //Reconstruct the request URI using the host and port set in the retry context
     newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(),
  newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(),
  newRequest.getURI().getPath(), newRequest.getURI().getQuery(),
  newRequest.getURI().getFragment()));
   }
 }
 newRequest = getSecureRequest(request, configOverride);
 HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig);
 final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);
 if(retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) {
   if(CloseableHttpResponse.class.isInstance(httpResponse)) {
     ((CloseableHttpResponse)httpResponse).close();
   }
   throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName,
httpResponse.getStatusLine().getStatusCode());
 }
 return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
      }
    };
    return this.executeWithRetry(request, retryPolicy, retryCallback);
  }

我们发现先new 一个RetryCallback,然后执行this.executeWithRetry(request, retryPolicy, retryCallback);

而这个RetryCallback.doWithRetry的代码我们清楚看到是实际请求的代码,也就是说this.executeWithRetry方法最终还是会调用RetryCallback.doWithRetry

protected  T doExecute(RetryCallback retryCallback,
      RecoveryCallback recoveryCallback, RetryState state)
      throws E, ExhaustedRetryException {

    RetryPolicy retryPolicy = this.retryPolicy;
    BackOffPolicy backOffPolicy = this.backOffPolicy;

    // Allow the retry policy to initialise itself...
    RetryContext context = open(retryPolicy, state);
    if (this.logger.isTraceEnabled()) {
      this.logger.trace("RetryContext retrieved: " + context);
    }

    // Make sure the context is available globally for clients who need
    // it...
    RetrySynchronizationManager.register(context);

    Throwable lastException = null;

    boolean exhausted = false;
    try {

      // Give clients a chance to enhance the context...
      boolean running = doOpenInterceptors(retryCallback, context);

      if (!running) {
 throw new TerminatedRetryException(
     "Retry terminated abnormally by interceptor before first attempt");
      }

      // Get or Start the backoff context...
      BackOffContext backOffContext = null;
      Object resource = context.getAttribute("backOffContext");

      if (resource instanceof BackOffContext) {
 backOffContext = (BackOffContext) resource;
      }

      if (backOffContext == null) {
 backOffContext = backOffPolicy.start(context);
 if (backOffContext != null) {
   context.setAttribute("backOffContext", backOffContext);
 }
      }

      
      while (canRetry(retryPolicy, context) && !context.isExhaustedonly()) {

 try {
   if (this.logger.isDebugEnabled()) {
     this.logger.debug("Retry: count=" + context.getRetryCount());
   }
   // Reset the last exception, so if we are successful
   // the close interceptors will not think we failed...
   lastException = null;
   return retryCallback.doWithRetry(context);
 }
 catch (Throwable e) {

   lastException = e;

   try {
     registerThrowable(retryPolicy, state, context, e);
   }
   catch (Exception ex) {
     throw new TerminatedRetryException("Could not register throwable",
  ex);
   }
   finally {
     doonErrorInterceptors(retryCallback, context, e);
   }

   if (canRetry(retryPolicy, context) && !context.isExhaustedonly()) {
     try {
backOffPolicy.backOff(backOffContext);
     }
     catch (BackOffInterruptedException ex) {
lastException = e;
// back off was prevented by another thread - fail the retry
if (this.logger.isDebugEnabled()) {
  this.logger
      .debug("Abort retry because interrupted: count="
   + context.getRetryCount());
}
throw ex;
     }
   }

   if (this.logger.isDebugEnabled()) {
     this.logger.debug(
  "Checking for rethrow: count=" + context.getRetryCount());
   }

   if (shouldRethrow(retryPolicy, context, state)) {
     if (this.logger.isDebugEnabled()) {
this.logger.debug("Rethrow in retry for policy: count="
    + context.getRetryCount());
     }
     throw RetryTemplate.wrapIfNecessary(e);
   }

 }

 
 if (state != null && context.hasAttribute(GLOBAL_STATE)) {
   break;
 }
      }

      if (state == null && this.logger.isDebugEnabled()) {
 this.logger.debug(
     "Retry failed last attempt: count=" + context.getRetryCount());
      }

      exhausted = true;
      return handleRetryExhausted(recoveryCallback, context, state);

    }
    catch (Throwable e) {
      throw RetryTemplate.wrapIfNecessary(e);
    }
    finally {
      close(retryPolicy, context, state, lastException == null || exhausted);
      doCloseInterceptors(retryCallback, context, lastException);
      RetrySynchronizationManager.clear();
    }
  }

在一个while循环里实现重试机制,当执行retryCallback.doWithRetry(context)出现异常的时候,就会catch异常,然后用 retryPolicy判断是否进行重试,特别注意registerThrowable(retryPolicy, state, context, e);方法,不但判断了是否重试,在重试情况下会新选出一个机子放入context,然后再去执行retryCallback.doWithRetry(context)时带入,如此就实现了换机子重试了。

但是我的配置怎么会没有换机子呢?调试代码发现registerThrowable(retryPolicy, state, context, e);选出来的机子没问题,就是新的健康的机子,但是在执行retryCallback.doWithRetry(context)代码的时候依然请求的是那台挂掉的机子。

所以我们再仔细看一下retryCallback.doWithRetry(context)的代码:

我们发现了这行代码:

newRequest = getSecureRequest(request, configOverride);
protected RibbonApacheHttpRequest getSecureRequest(RibbonApacheHttpRequest request, IClientConfig configOverride) {
    if (isSecure(configOverride)) {
      final URI secureUri = UriComponentsBuilder.fromUri(request.getUri())
   .scheme("https").build(true).toUri();
      return request.withNewUri(secureUri);
    }
    return request;
  }

newRequest在前面已经使用context构建完毕,request是上一次请求的数据,只要执行这个代码就会发现newRequest永远都会被request覆盖。看到这里我们才发现原来是一个源码bug。

issue地址:https://github.com/spring-cloud/spring-cloud-netflix/issues/2667

总结

这是一次很普通的查问题过程,在这个过程中当我发现配置没有达到我的预期时,我先查看了配置的含义,尝试多次无果,于是进行断点调试发现异常中断点后,因为场景需要一台机子健康一台机子下线,我模拟了数百次,最终才定位到了这行代码。开源项目即使是优秀的项目必然也会有bug存在,不迷信,不盲目。另一方面,阅读源码能力也是一个解决问题的重要能力,像我在找源码入口,定位代码时耗费了很多的时间。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/140734.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号