客户端及服务端已经完成启动并注册成功
Feign远程调用由于使用了feign, 所以向Spring容器中注入了FeignClientFactoryBean, 通过getObject方法, 获取到feign对应的代理对象, 对应的InvocationHandler为HystrixInvocationHandler. 当调用feign接口中方法时, 会执行HystrixInvocationHandler中的invoke方法.
public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable {
if (!"equals".equals(method.getName())) {
if ("hashCode".equals(method.getName())) {
return this.hashCode();
} else if ("toString".equals(method.getName())) {
return this.toString();
} else {
HystrixCommand
SynchronousMethodHandler#invoke
public Object invoke(Object[] argv) throws Throwable {
// 构造请求
RequestTemplate template = buildTemplateFromArgs.create(argv);
Request.Options options = findOptions(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
// 执行请求
return executeAndDecode(template, options);
} catch (RetryableException e) {
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
Object executeAndDecode(RequestTemplate template, Request.Options options) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
// 执行 重点
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 12
response = response.toBuilder()
.request(request)
.requestTemplate(template)
.build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (decoder != null)
return decoder.decode(response, metadata.returnType());
CompletableFuture resultFuture = new CompletableFuture<>();
// 处理结果
asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
metadata.returnType(),
elapsedTime);
try {
if (!resultFuture.isDone())
throw new IllegalStateException("Response handling not done");
return resultFuture.join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause != null)
throw cause;
throw e;
}
}
LoadBalancerFeignClient#execute
public Response execute(Request request, Request.Options options) throws IOException {
try {
// 字符串转对象
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
// ***** 此处重点 首次调用会更新服务列表 *****
IClientConfig requestConfig = getClientConfig(options, clientName);
// lbClient方法利用缓存创建并获取FeignLoadBalancer
return lbClient(clientName)
// 通过lb执行
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
AbstractLoadBalancerAwareClient#executeWithLoadBalancer
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
// 对象封装请求
LoadBalancerCommand command = this.buildLoadBalancerCommand(request, requestConfig);
try {
// submit中用来选取一个server然后调用call方法执行请求
return (IResponse)command.submit(new ServerOperation() {
public Observable call(Server server) {
URI finalUri = AbstractLoadBalancerAwareClient.this.reconstructURIWithServer(server, request.getUri());
ClientRequest requestForServer = request.replaceUri(finalUri);
try {
// execute调用FeignLoadBalancer#execute方法 然后执行feign.Client.Default#execute 执行Http请求并构建相应
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} catch (Exception var5) {
return Observable.error(var5);
}
}
}).toBlocking().single();
} catch (Exception var6) {
Throwable t = var6.getCause();
if (t instanceof ClientException) {
throw (ClientException)t;
} else {
throw new ClientException(var6);
}
}
}
服务发现
LoadBalancerFeignClient#getClientConfig
会调用NamedContextFactory#getContext根据Feign的name从Map中获取并缓存对应的context
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
// 只在首次调用会触发createContext
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}
NamedContextFactory#createContext
为当前各项配置封装成Spring容器, 并refresh容器, 每次创建一个新的context, 但是parentContext始终是同一个
protected AnnotationConfigApplicationContext createContext(String name) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
if (this.configurations.containsKey(name)) {
for (Class> configuration : this.configurations.get(name)
.getConfiguration()) {
context.register(configuration);
}
}
for (Map.Entry entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
// defaultConfigType为org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration为配置类
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
this.propertySourceName,
Collections.singletonMap(this.propertyName, name)));
// 设置父parent, 保证其他非feign的对象全局唯一
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
// jdk11 issue
// https://github.com/spring-cloud/spring-cloud-netflix/issues/3101
context.setClassLoader(this.parent.getClassLoader());
}
context.setDisplayName(generateDisplayName(name));
context.refresh();
return context;
}
Spring加载配置类RibbonClientConfiguration后会初始化内部对象ribbonLoadBalancer, 调用父类DynamicServerListLoadBalancer的构造方法
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerListserverList, ServerListFilter filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.isSecure = false; this.useTunnel = false; this.serverListUpdateInProgress = new AtomicBoolean(false); this.updateAction = new NamelessClass_1(); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter)filter).setLoadBalancerStats(this.getLoadBalancerStats()); } // 初始化 this.restOfInit(clientConfig); } void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); this.setEnablePrimingConnections(false); this.enableAndInitLearnNewServersFeature(); // 更新服务list this.updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections().primeConnections(this.getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } public void updateListOfServers() { List servers = new ArrayList(); if (this.serverListImpl != null) { // 从远端获取对应的服务列表 会调用HostReactor#getServiceInfo方法 servers = this.serverListImpl.getUpdatedListOfServers(); if (this.filter != null) { servers = this.filter.getFilteredListOfServers((List)servers); } } // 将server通过zone分开存放成map this.updateAllServerList((List)servers); } public void setServersList(List lsrv) { super.setServersList(lsrv); List serverList = (List ) lsrv; Map > serversInZones = new HashMap >(); for (Server server : serverList) { // make sure ServerStats is created to avoid creating them on hot // path getLoadBalancerStats().getSingleServerStat(server); String zone = server.getZone(); // 默认UNKNOWN // 分类存放 if (zone != null) { zone = zone.toLowerCase(); List servers = serversInZones.get(zone); if (servers == null) { servers = new ArrayList (); serversInZones.put(zone, servers); } servers.add(server); } } setServerListForZones(serversInZones); }
HostReactor#getServiceInfo
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 从serviceInfoMap缓存中获取 key: name + "@@" + clusters, value: ServiceInfo包含服务列表
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
// 直接获取服务列表
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) { // 表示正在获取服务列表
if (UPDATE_HOLD_INTERVAL > 0) { // 休息5s
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
// 与心跳类似, 间隔10s更新服务列表 调用addTask添加定时
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 发起远程调用
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
// 结果转换, 并存放到map缓存中
processServiceJSON(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
最终调用NamingProxy#queryList
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map params = new HashMap(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqAPI(UtilAndComs.NACOS_URL_base + "/instance/list", params, HttpMethod.GET);
}
UpdateTask实现了Runnable接口, 执行run方法
public void run() {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
!futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task:
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
// 再次注册定时任务 循环调用
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
}
服务列表结果示例
{
"name": "DEFAULT_GROUP@@order-server",
"groupName": "DEFAULT_GROUP",
"clusters": "",
"cacheMillis": 10000,
"hosts": [
{
"instanceId": "10.247.36.102#8081#DEFAULT#DEFAULT_GROUP@@order-server",
"ip": "10.247.36.102",
"port": 8081,
"weight": 1,
"healthy": true,
"enabled": true,
"ephemeral": true,
"clusterName": "DEFAULT",
"serviceName": "DEFAULT_GROUP@@order-server",
"metadata": {
"preserved.register.source": "SPRING_CLOUD"
},
"instanceHeartBeatTimeOut": 15000,
"ipDeleteTimeout": 30000,
"instanceHeartBeatInterval": 5000
}
],
"lastRefTime": 1636438205772,
"checksum": "",
"allIPs": false,
"reachProtectionThreshold": false,
"valid": true
}



