栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Eureka客户端源码解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Eureka客户端源码解析

Eureka客户端源码

1.使用1.重要的几个API

1.InstanceInfo2.Application3.Applications4.jersey框架5.依赖关系 2.源码解析

3.分析EurekaClientAutoConfiguration4.分析EurekaClient的创建过程

4.1 fetchRegistry

4.1.1 getAndStoreFullRegistry(): 4.1.2 getAndUpdateDelta(applications):4.2 register4.3 initScheduledTasks

4.3.1 CacheRefreshThread4.3.2 HeartbeatThread4.3.3 instanceInfoReplicator4.3.3.1 refreshInstanceInfo() 5.客户端原理图
版本:spring-cloud-netflix-eureka-client-2.2.2.RELEASE.jar

1.使用
 
     org.springframework.cloud
     spring-cloud-starter-netflix-eureka-client
 

配置:

eureka:
  client:
    registry-fetch-interval-seconds: 5 # 5秒抓取一次 
    service-url:
      defaultZone: http://localhost:8081/eureka/  # 服务端地址
    register-with-eureka: true # 是否注册到注册中心里
    fetch-registry: true # 是否从注册中心拉取实例信息
    region: us-east-1 # 这是个默认值,分区域的概念,region下面有zone
    prefer-same-zone-eureka: true # 优先选择zone相同的服务,默认就是true
  instance:
    instance-id: web-provider  #修改主机名称  prefer-ip-address: true #访问路径显示ip
    prefer-ip-address: true #访问路径显示ip
    metadata-map:
      zone: z1 #区的概念

一个Region下面可以对应多个Zone,同区中的服务优先调用,减少网络带宽。

1.重要的几个API 1.InstanceInfo

该类用于保存一个微服务的信息,是Eureka注册的单元。

public class InstanceInfo {
    // 当前Client在Server端的状态
    private volatile InstanceStatus status = InstanceStatus.UP;
    private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
    
    // 当前InstanceInfo在Server端被修改的时间戳
    private volatile Long lastUpdatedTimestamp;
    // 当前InstanceInfo在Client端被修改的时间戳
    private volatile Long lastDirtyTimestamp;
    
    // 重写了equals,id相同就相同
    @Override
    public boolean equals(Object obj) {
        InstanceInfo other = (InstanceInfo) obj;
        String id = getId();
        if (id == null) {
            if (other.getId() != null) {
                return false;
            }
        } else if (!id.equals(other.getId())) {
            return false;
        }
        return true;
    }
}
2.Application

一个Application实例中保存一个特定服务的所有实例提供者。

public class Application {
    // 服务名称
    private String name;
    
    //当前name所指定的实例集合
    private final Set instances;

    private final AtomicReference> shuffledInstances;
    // key为instanceId,value为InstanceInfo 
    private final Map instancesMap;
}    
3.Applications

封装了来自于EurekaServer的所有注册信息,客户端注册表,仅仅是客户端的,服务端是一个Map结构。

public class Applications {
  //key为服务名,value为Application 
  private final Map appNameApplicationMap;
}
4.jersey框架

Jersey框架是一个开源的RESTful框架,实现了JAX-RS规范。
该框架的作用与SpringMVC是相同的,其
也是用户提交URI后,在处理器中进行路由匹配,路由到指定的后台业务。

5.依赖关系

Applications —> Application—> InstanceInfo
客户端注册表:

2.源码解析

分析入口:

 
     org.springframework.cloud
     spring-cloud-starter-netflix-eureka-client
 

根据自动配置原理可知,会存在一个自动配置类:
EurekaClientAutoConfiguration
这个自动配置类是整个源码分析的入口类。

3.分析EurekaClientAutoConfiguration

EurekaClientAutoConfiguration存在2个重要的内部类:

EurekaClientConfiguration

	@Configuration(proxyBeanMethods = false)
	@ConditionalOnMissingRefreshScope
	protected static class EurekaClientConfiguration {
		public EurekaClient eurekaClient(ApplicationInfoManager manager,
					EurekaClientConfig config) {
				return new CloudEurekaClient(manager, config, this.optionalArgs,
						this.context);
			}
	}

这个内部类不会被加载,不会被加载的原因是:@ConditionalOnMissingRefreshScope注解的作用。

RefreshableEurekaClientConfiguration(重点)

  @Configuration(proxyBeanMethods = false)
	@ConditionalOnRefreshScope
	protected static class RefreshableEurekaClientConfiguration {
	
		// 重点是 EurekaClient 类的装载过程
		@Bean(destroyMethod = "shutdown")
		// 从当前上下文中查找是否存在EurekaClient类的信息,不存在才加载
		@ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)
		// 装配了@RefreshScope,表示这个类在运行过程中是可以被动态刷新的
		@RefreshScope
		@Lazy
		public EurekaClient eurekaClient(ApplicationInfoManager manager,
				EurekaClientConfig config, EurekaInstanceConfig instance,
				@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
			...
			CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
					config, this.optionalArgs, this.context);
			cloudEurekaClient.registerHealthCheck(healthCheckHandler);
			return cloudEurekaClient;
		}
	}

由@ConditionalOnRefreshScope注解可知,最终spring容器会加载这个类: RefreshableEurekaClientConfiguration 。
所以下面重点分析:EurekaClient这个类。

4.分析EurekaClient的创建过程

CloudEurekaClient 的继承体系:

这里EurekaClient的创建是直接创建的CloudEurekaClient,所以我们进行源码分析时,直接根据构造器调用过程进行分析即可:

这个地方初始化了父类的构造器,我们直接跟进,在这里我们分析一些主要的代码:
经过一系列的跟进我们最终会来到父类DiscoveryClient的这个构造器里面,:

public DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
         ...省略n行代码
        // 需要分析的第一处
        // clientConfig.shouldFetchRegistry() 对应配置的 fetch-registry: true
        // fetchRegistry(false) 从注册中心中抓取实例信息
       if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
           fetchRegistryFromBackup();
       }
       ...省略n行代码
       // register-with-eureka: true && should-enforce-registration-at-init: false
       if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                // 需要分析的第二处
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }
        // 需要分析的第三处
        initScheduledTasks();
        ...省略n行代码
}
4.1 fetchRegistry

实例抓取流程 ,核心代码:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        try {
            Applications applications = getApplications();
            if (clientConfig.shouldDisableDelta())
                    || forceFullRegistryFetch
                    || (applications == null))) 
            {
                // 全量抓取更新
                getAndStoreFullRegistry();
            } else {
            		// 增量抓取更新
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
        }
        return true;
    }
4.1.1 getAndStoreFullRegistry():
 private void getAndStoreFullRegistry() throws Throwable {
      long currentUpdateGeneration = fetchRegistryGeneration.get();
      Applications apps = null;
      // 这个地方是直接发送的http请求到server端的
      // 返回的是 Applications 对象
      EurekaHttpResponse httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
              ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
              : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
      if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
          apps = httpResponse.getEntity();
      }
      if (apps == null) {
      } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
      		  // 设置本地region的applications
          localRegionApps.set(this.filterAndShuffle(apps));
      }
  }  
4.1.2 getAndUpdateDelta(applications):
    private void getAndUpdateDelta(Applications applications) throws Throwable {
        Applications delta = null;
        // 发送http请求获取增量信息
        EurekaHttpResponse httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }
        // 没获取到直接全量一次
        if (delta == null) {
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    // 增量更新,,对应本地applications缓存的增删改查
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            }
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);
            }
        } 
    }

以上就是初始化时实例抓取的大致代码流程。

4.2 register

真正的注册时机,并不是在此处,但是调用的是这个方法,所以我们分析此方法即可:

    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse httpResponse;
        try {
        		 // 直接发送http请求到server,把InstanceInfo信息发送过去
        		 // InstanceInfo 里面存有IP,PORT等等信息
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

注册实例的代码很简单,就是把实例信息封装成一个对象,给Server端处理。

4.3 initScheduledTasks

初始化任务流程:
这个任务初始化会存在三个线程:

CacheRefreshThread: 刷新本地客户端注册表的HeartbeatThread:心跳线程InstanceInfoReplicator: 实例状态改变时主动上报到server

private void initScheduledTasks() {
  if (clientConfig.shouldFetchRegistry()) 
          int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
          int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
          cacheRefreshTask = new TimedSupervisorTask(
                  "cacheRefresh",
                  scheduler,
                  cacheRefreshExecutor,
                  registryFetchIntervalSeconds,
                  TimeUnit.SECONDS,
                  expBackOffBound,
                  new CacheRefreshThread()
          );
          // 第一个线程
          scheduler.schedule(
                  cacheRefreshTask,
                  registryFetchIntervalSeconds, TimeUnit.SECONDS);
      }

      if (clientConfig.shouldRegisterWithEureka()) {
          int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
          int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();

          heartbeatTask = new TimedSupervisorTask(
                  "heartbeat",
                  scheduler,
                  heartbeatExecutor,
                  renewalIntervalInSecs,
                  TimeUnit.SECONDS,
                  expBackOffBound,
                  new HeartbeatThread()
          );
          // 第二个线程
          scheduler.schedule(
                  heartbeatTask,
                  renewalIntervalInSecs, TimeUnit.SECONDS);

          // 这里是第三个线程
          instanceInfoReplicator = new InstanceInfoReplicator(
                  this,
                  instanceInfo,
                  clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                  2); // burstSize

				 // 这个地方是注册一个监听器
          statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
              @Override
              public String getId() {
                  return "statusChangeListener";
              }
              @Override
              public void notify(StatusChangeEvent statusChangeEvent) {
                  if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                          InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                      logger.warn("Saw local status change event {}", statusChangeEvent);
                  } else {
                      logger.info("Saw local status change event {}", statusChangeEvent);
                  }
                  // 客户端实例信息发送变化时,主动更新上报到server
                  instanceInfoReplicator.onDemandUpdate();
              }
          };
          if (clientConfig.shouldOnDemandUpdateStatusChange()) {
              applicationInfoManager.registerStatusChangeListener(statusChangeListener);
          }
          // 第三个线程的启动
          instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
      }
  }
4.3.1 CacheRefreshThread
void refreshRegistry() {
     try {
         boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
         boolean remoteRegionsModified = false;
         // 这个地方是从远程Region里面抓取实例信息,通常并不需要配置,这个是针对异地多活场景
         // 对应配置属性: fetch-remote-regions-registry: us-east-1
         String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
         if (null != latestRemoteRegions) {
             String currentRemoteRegions = remoteRegionsToFetch.get();
             if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                 synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                     if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                         String[] remoteRegions = latestRemoteRegions.split(",");
                         remoteRegionsRef.set(remoteRegions);
                         instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                         remoteRegionsModified = true;
                     }
                 }
             } else {
                 instanceRegionChecker.getAzToRegionMapper().refreshMapping();
             }
         }
         // 重新抓取实例信息,更新本地缓存
         boolean success = fetchRegistry(remoteRegionsModified);
         if (success) {
             registrySize = localRegionApps.get().size();
             lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
         }
     } catch (Throwable e) {
         logger.error("Cannot fetch registry from server", e);
     }
 }
4.3.2 HeartbeatThread
boolean renew() {
    EurekaHttpResponse httpResponse;
	// 直接发送心跳包到server
    httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
     // 状态要是 NOT_FOUND
    if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
        long timestamp = instanceInfo.setIsDirtyWithTime();
        // 重新注册
        boolean success = register();
        if (success) {
            instanceInfo.unsetIsDirty(timestamp);
        }
        return success;
    }
    return httpResponse.getStatusCode() == Status.OK.getStatusCode();
 }
4.3.3 instanceInfoReplicator
 public void run() {
      try {
      		 // 刷新实例信息
          discoveryClient.refreshInstanceInfo();
          // 获取 lastDirtyTimestamp,不为null就代表实例状态已经发生变化了,需要通知服务器
          Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
          if (dirtyTimestamp != null) {
              discoveryClient.register();
              instanceInfo.unsetIsDirty(dirtyTimestamp);
          }
      } finally {
          Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
          scheduledPeriodicRef.set(next);
      }
  }
4.3.3.1 refreshInstanceInfo()
 void refreshInstanceInfo() {
     applicationInfoManager.refreshDataCenterInfoIfRequired();
     // 刷新续约信息,LeaseInfo是InstanceInfo里面的属性
     applicationInfoManager.refreshLeaseInfoIfRequired();
     InstanceStatus status;
     try {
         status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
     } catch (Exception e) {
         status = InstanceStatus.DOWN;
     }
     if (null != status) {
         applicationInfoManager.setInstanceStatus(status);
     }
 }

 public void refreshLeaseInfoIfRequired() {
     LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
     if (leaseInfo == null) {
         return;
     }
     // lease-expiration-duration-in-seconds: 90 # 续约持续时间,超过就代表当前服务挂了
    //  lease-renewal-interval-in-seconds: 30 # 续约间隔时间
     int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
     int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
     if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) {
     	// LeaseInfo 续约信息类
         LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
                 .setRenewalIntervalInSecs(currentLeaseRenewal)
                 .setDurationInSecs(currentLeaseDuration)
                 .build();
         // 迭代变换
         instanceInfo.setLeaseInfo(newLeaseInfo);
         // 设置 isInstanceInfoDirty = true,标记为实例信息已经发生过改变
         // lastDirtyTimestamp = System.currentTimeMillis();
         instanceInfo.setIsDirty();
     }
 }

总结:
Client提交 register()请求的情况有三种 :

在应用启动时就可以直接进行register(),需要提前在配置文件中提前配置。在renew时,如果server端返回的是NOT_FOUND,则提交register()。当Client的配置信息发生了变更,则Client提交register()。 5.客户端原理图

总结:

实例注册本地注册表定时更新心跳,本地信息变更上报

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/727844.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号