- 一、服端接收请求方法
- 1、getContainers处理全量更新方法
- 2、getContainerDifferential处理新量更新方法
- 3、readonlyCacheMap 、readWriteCacheMap在哪里更新的
- 3.1、定时任务更新readOnlyCacheMap
- 3.2、readWriteCacheMap
- 3.3、如果全量直接遍历registry返回实例列表
- 3.4、如果增量直接遍历recentlyChangedQueue返回实例列表
从客户端发送请求的路径可以找到,服务端接收全量和增量请求的方法如下:
主要代码
//构造缓存key,不管全量还是增量 获取实例列表的信息 都是从缓存中获取的,只是构造的缓存key不同
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
//从responseCache中获取实例列表 压缩后返回
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
2、getContainerDifferential处理新量更新方法
//构造增量缓存key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
return Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
return Response.ok(responseCache.get(cacheKey))
.build();
}
从上面2个方法可以看出,不管是获取全量服务列、还是获取增量服务列表都是从responseCache.getGZIP这个方法
public byte[] getGZIP(Key key) {
Value payload = getValue(key, shouldUseReadOnlyResponseCache);
if (payload == null) {
return null;
}
return payload.getGzipped();
}
getValue:
//先从readOnlyCacheMap获取,如果不为空,那么再从readWriteCacheMap获取,然后再更新readOnlyCacheMap。eureka服务端也用了缓存。即使调用接口从服务端获取最新服务,也不一定是最新的。
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key :" + key, t);
}
return payload;
}
readonlyCacheMap 、readWriteCacheMap实际上就是map结构
private final ConcurrentMap3、readonlyCacheMap 、readWriteCacheMap在哪里更新的 3.1、定时任务更新readonlyCacheMapreadonlyCacheMap = new ConcurrentHashMap (); private final LoadingCache readWriteCacheMap;
//遍历所有key,从readWriteCacheMap中更新到readOnlyCacheMap中
for (Key key : readOnlyCacheMap.keySet()) {
try {
CurrentRequestVersion.set(key.getVersion());
Value cachevalue = readWriteCacheMap.get(key);
Value currentCachevalue = readOnlyCacheMap.get(key);
if (cachevalue != currentCachevalue) {
readOnlyCacheMap.put(key, cachevalue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache", th);
}
}
3.2、readWriteCacheMap
//初始化时构造缓存: Value value = generatePayload(key);
关键代码:
//如果是全量,调用getApplicationsFromMultipleRegions构造缓存
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
//如果增量调用getApplicationDeltasFromMultipleRegions 构造缓存
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
}
3.3、如果全量直接遍历registry返回实例列表
for (Entry3.4、如果增量直接遍历recentlyChangedQueue返回实例列表>> entry : registry.entrySet()) { Application app = null; if (entry.getValue() != null) { for (Entry > stringLeaseEntry : entry.getValue().entrySet()) { Lease lease = stringLeaseEntry.getValue(); if (app == null) { app = new Application(lease.getHolder().getAppName()); } app.addInstance(decorateInstanceInfo(lease)); } } if (app != null) { apps.addApplication(app); } }
Iteratoriter = this.recentlyChangedQueue.iterator(); while (iter.hasNext()) { Lease lease = iter.next().getLeaseInfo(); InstanceInfo instanceInfo = lease.getHolder(); Object[] args = {instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()}; logger.debug("The instance id %s is found with status %s and actiontype %s", args); Application app = applicationInstancesMap.get(instanceInfo.getAppName()); if (app == null) { app = new Application(instanceInfo.getAppName()); applicationInstancesMap.put(instanceInfo.getAppName(), app); apps.addApplication(app); } app.addInstance(decorateInstanceInfo(lease)); }
recentlyChangedQueue:它是一个ConcurrentlinkedQueue,通过定时任务 默认180过期数据
private ConcurrentlinkedQueuerecentlyChangedQueue = new ConcurrentlinkedQueue (); private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iterator it = recentlyChangedQueue.iterator(); while (it.hasNext()) { if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { //数据过期,那么移除getRetentionTimeInMSInDeltaQueue:默认180s it.remove(); } else { break; } } } }; }



