回到org.apache.dubbo.config.deploy.DefaultModuleDeployer#start 方法,在该方法中有两个重要的方法:1、exportServices(); — privoder的注册 2、referServices(); —consumer的订阅。
在之前的章节 有说过 exportServices的一个执行流程,现在咱们在说下 referServices方法的执行流程。
//org.apache.dubbo.config.deploy.DefaultModuleDeployer#referServices
private void referServices() {
//configManager.getReferences()主要是拿到了我们定义的 信息
//对references 进行循环
configManager.getReferences().forEach(rc -> {
try {
//转为ReferenceConfig,(ReferenceConfig类似于provider的ServiceConfig)
ReferenceConfig> referenceConfig = (ReferenceConfig>) rc;
if (!referenceConfig.isRefreshed()) {
referenceConfig.refresh();
}
//判断是否需要进行初始化(启动的时候是需要进行初始化的)
if (rc.shouldInit()) {
//是否开启了异步初始化
if (referAsync || rc.shouldReferAsync()) {
ExecutorService executor = executorRepository.getServiceReferExecutor();
CompletableFuture future = CompletableFuture.runAsync(() -> {
try {
referenceCache.get(rc);
} catch (Throwable t) {
logger.error(getIdentifier() + " refer async catch error : " + t.getMessage(), t);
}
}, executor);
asyncReferringFutures.add(future);
} else {
//执行到这里
referenceCache.get(rc);
}
}
} catch (Throwable t) {
logger.error(getIdentifier() + " refer catch error", t);
referenceCache.destroy(rc);
}
});
}
//org.apache.dubbo.config.utils.SimpleReferenceCache#get(org.apache.dubbo.config.ReferenceConfigbase) @Override @SuppressWarnings("unchecked") public T get(ReferenceConfigbase rc) { String key = generator.generateKey(rc); Class> type = rc.getInterfaceClass(); //执行到这里 Object proxy = rc.get(); references.computeIfAbsent(rc, _rc -> { List > referencesOfType = referenceTypeMap.computeIfAbsent(type, _t -> Collections.synchronizedList(new ArrayList<>())); referencesOfType.add(rc); List > referenceConfigList = referenceKeyMap.computeIfAbsent(key, _k -> Collections.synchronizedList(new ArrayList<>())); referenceConfigList.add(rc); return proxy; }); return (T) proxy; }
@Override
//org.apache.dubbo.config.ReferenceConfig#get
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
//执行到这里
init();
}
return ref;
}
protected synchronized void init() {
//省略其他初始化和配置代码
//执行到这里 (此处referenceParameters是一个map,里面存放了很多基础信息)
ref = createProxy(referenceParameters);
}
private T createProxy(Map referenceParameters) {
if (shouldJvmRefer(referenceParameters)) {
//创建一个本地引用,创建一个本地调用程序
createInvokerForLocal(referenceParameters);
} else {
urls.clear();
if (url != null && url.length() > 0) {
// user specified URL, could be peer-to-peer address, or register center's address.
parseUrl(referenceParameters);
} else {
// if protocols not in jvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
//创建远程引用,创建远程引用调用程序
//执行到这里
aggregateUrlFromRegistry(referenceParameters);
}
}
createInvokerForRemote();
}
if (logger.isInfoEnabled()) {
logger.info("Referred dubbo service " + interfaceClass.getName());
}
URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
referenceParameters.get(INTERFACE_KEY), referenceParameters);
consumerUrl = consumerUrl.setScopeModel(getScopeModel());
consumerUrl = consumerUrl.setServiceModel(consumerModel);
metadataUtils.publishServiceDefinition(consumerUrl);
// create service proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
private void createInvokerForRemote() {
if (urls.size() == 1) {
URL curUrl = urls.get(0);
//执行到此方法 (会执行RegistryProtocol的refer方法)
invoker = protocolSPI.refer(interfaceClass,curUrl);
if (!UrlUtils.isRegistry(curUrl)){
List> invokers = new ArrayList<>();
invokers.add(invoker);
invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
}
} else {
List> invokers = new ArrayList<>();
URL registryUrl = null;
for (URL url : urls) {
// For multi-registry scenarios, it is not checked whether each referInvoker is available.
// Because this invoker may become available later.
invokers.add(protocolSPI.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
// use last registry url
registryUrl = url;
}
}
if (registryUrl != null) {
// registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker
// (RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);
} else {
// not a registry url, must be direct invoke.
if (CollectionUtils.isEmpty(invokers)) {
throw new IllegalArgumentException("invokers == null");
}
URL curUrl = invokers.get(0).getUrl();
String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);
invoker = Cluster.getCluster(scopeModel, cluster).join(new StaticDirectory(curUrl, invokers), true);
}
}
}
//org.apache.dubbo.registry.integration.RegistryProtocol#refer
public Invoker refer(Class type, URL url) throws RpcException {
//因为有组的概念 所以这块是进行组的配置
url = getRegistryUrl(url);
Registry registry = getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map qs = (Map) url.getAttribute(REFER_KEY);
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
}
}
Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
//执行到这里
return doRefer(cluster, registry, type, url, qs);
}
protected Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url, Map parameters) {
//组织URL的参数
Map consumerAttribute = new HashMap<>(url.getAttributes());
consumerAttribute.remove(REFER_KEY);
URL consumerUrl = new ServiceConfigURL(parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
null,
null,
parameters.get(REGISTER_IP_KEY),
0, getPath(parameters, type),
parameters,
consumerAttribute);
url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
ClusterInvoker migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
//执行到这里
return interceptInvoker(migrationInvoker, url, consumerUrl, url);
}
protected Invoker interceptInvoker(ClusterInvoker invoker, URL url, URL consumerUrl, URL registryURL) {
//拿到所有的监听器
List listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
//依次调用监听器的onRefer方法
for (RegistryProtocolListener listener : listeners) {
//执行到这里
listener.onRefer(this, invoker, consumerUrl, registryURL);
}
return invoker;
}
//org.apache.dubbo.registry.client.migration.MigrationRuleListener#onRefer
@Override
public void onRefer(RegistryProtocol registryProtocol, ClusterInvoker> invoker, URL consumerUrl, URL registryURL) {
//获取到MigrationRuleHandler
MigrationRuleHandler> migrationRuleHandler = handlers.computeIfAbsent((MigrationInvoker>) invoker, _key -> {
((MigrationInvoker>) invoker).setMigrationRuleListener(this);
return new MigrationRuleHandler<>((MigrationInvoker>) invoker, consumerUrl);
});
//执行到这里
migrationRuleHandler.doMigrate(rule);
}
接下来的方法中我们看到 上一章配置的 dubbo.application.service-discovery.migration 的内容
//org.apache.dubbo.registry.client.migration.MigrationRuleHandler#doMigrate
public synchronized void doMigrate(MigrationRule rule) {
if (migrationInvoker instanceof ServiceDiscoveryMigrationInvoker) {
refreshInvoker(MigrationStep.FORCE_APPLICATION, 1.0f, rule);
return;
}
// 默认是APPLICATION_FIRST
MigrationStep step = MigrationStep.APPLICATION_FIRST;
float threshold = -1f;
try {
//通过consumerURL再次获取
step = rule.getStep(consumerURL);
threshold = rule.getThreshold(consumerURL);
} catch (Exception e) {
logger.error("Failed to get step and threshold info from rule: " + rule, e);
}
//执行到refreshInvoker方法
if (refreshInvoker(step, threshold, rule)) {
// refresh success, update rule
setMigrationRule(rule);
}
}
private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
if (step == null || threshold == null) {
throw new IllegalStateException("Step or threshold of migration rule cannot be null");
}
MigrationStep originStep = currentStep;
if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
boolean success = true;
//此处为三种订阅模式的核心处理
switch (step) {
case APPLICATION_FIRST:
//APPLICATION_FIRST模式下 其实默认实现了 FORCE_APPLICATION 与 FORCE_INTERFACE 的相关逻辑
//所以我们直接看该方法
migrationInvoker.migrateToApplicationFirstInvoker(newRule);
break;
case FORCE_APPLICATION:
success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
break;
case FORCE_INTERFACE:
default:
success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
}
if (success) {
setCurrentStepAndThreshold(step, threshold);
logger.info("Succeed Migrated to " + step + " mode. Service Name: " + consumerURL.getDisplayServiceKey());
report(step, originStep, "true");
} else {
// migrate failed, do not save new step and rule
logger.warn("Migrate to " + step + " mode failed. Probably not satisfy the threshold you set "
+ threshold + ". Please try re-publish configuration if you still after check.");
report(step, originStep, "false");
}
return success;
}
// ignore if step is same with previous, will continue override rule for MigrationInvoker
return true;
}
//org.apache.dubbo.registry.client.migration.MigrationInvoker#migrateToApplicationFirstInvoker
@Override
public void migrateToApplicationFirstInvoker(MigrationRule newRule) {
CountDownLatch latch = new CountDownLatch(0);
//接口级订阅 ---- 先看这个
refreshInterfaceInvoker(latch);
//应用级订阅 ---- 再看这个
refreshServiceDiscoveryInvoker(latch);
// directly calculate preferred invoker, will not wait until address notify
// calculation will re-occurred when address notify later
//进行计算
calcPreferredInvoker(newRule);
}
refreshInterfaceInvoker – 先看接口级订阅
//org.apache.dubbo.registry.client.migration.MigrationInvoker#refreshInterfaceInvoker
protected void refreshInterfaceInvoker(CountDownLatch latch) {
//先将自己的监听移除掉
clearListener(invoker);
if (needRefresh(invoker)) {
if (logger.isDebugEnabled()) {
logger.debug("Re-subscribing interface addresses for interface " + type.getName());
}
if (invoker != null) {
invoker.destroy();
}
//执行到这里
invoker = registryProtocol.getInvoker(cluster, registry, type, url);
}
setListener(invoker, () -> {
latch.countDown();
//TODO frameworkStatusReporter
// frameworkStatusReporter.reportConsumptionStatus(
// createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "interface")
// );
if (step == APPLICATION_FIRST) {
calcPreferredInvoker(rule);
}
});
}
//org.apache.dubbo.registry.integration.RegistryProtocol#getInvoker
public ClusterInvoker getInvoker(Cluster cluster, Registry registry, Class type, URL url) {
// FIXME, this method is currently not used, create the right registry before enable.
DynamicDirectory directory = new RegistryDirectory<>(type, url);
//调用doCreateInvoker方法
return doCreateInvoker(directory, cluster, registry, type);
}
protected ClusterInvoker doCreateInvoker(DynamicDirectory directory, Cluster cluster, Registry registry, Class type) {
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
URL urlToRegistry = new ServiceConfigURL(
parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
parameters.remove(REGISTER_IP_KEY), 0, getPath(parameters, type), parameters);
urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(urlToRegistry);
//此处为真正进入订阅流程的地方
directory.subscribe(toSubscribeUrl(urlToRegistry));
return (ClusterInvoker) cluster.join(directory, true);
}
//org.apache.dubbo.registry.integration.RegistryDirectory#subscribe
@Override
public void subscribe(URL url) {
setSubscribeUrl(url);
consumerConfigurationListener.addNotifyListener(this);
referenceConfigurationListener = new ReferenceConfigurationListener(url.getOrDefaultModuleModel(), this, url);
registry.subscribe(url, this);
}
//org.apache.dubbo.registry.support.FailbackRegistry#subscribe
@Override
public void subscribe(URL url, NotifyListener listener) {
//org.apache.dubbo.registry.support.AbstractRegistry#subscribe
super.subscribe(url, listener);
//移除失败的订阅(对于失败的订阅dubbo会启动一个TimerTask来重试执行,此处先移除,后面会添加)
removeFailedSubscribed(url, listener);
try {
// 发送订阅服务请求,此处有具体的注册中心实现来实现,例如zk
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
//org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
checkDestroyed();
//ANY_VALUE:* , --- 订阅所有接口
if (ANY_VALUE.equals(url.getServiceInterface())) {
// root: /dubbo --- 得到根节点
String root = toRootPath();
boolean check = url.getParameter(CHECK_KEY, false);
ConcurrentMap listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
//此处是一个回调
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(check)), k);
}
}
});
zkClient.create(root, false);
List services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
//此处是一个回调
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(check)), listener);
}
}
} else {
CountDownLatch latch = new CountDownLatch(1);
try {
List urls = new ArrayList<>();
//toCategoriesPath(url)获取节点应该要创建的子节点
for (String path : toCategoriesPath(url)) {
ConcurrentMap listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
//创建子节点
zkClient.create(path, false);
//创建节点监听
List children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//继续看这里(下面会进行配置信息的文件保存)
notify(url, listener, urls);
} finally {
// tells the listener to run only after the sync notification of main thread finishes.
latch.countDown();
}
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
//org.apache.dubbo.registry.support.FailbackRegistry#notify
@Override
protected void notify(URL url, NotifyListener listener, List urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
//执行到这里
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list
logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
}
}
protected void doNotify(URL url, NotifyListener listener, List urls) {
super.notify(url, listener, urls);
}
//org.apache.dubbo.registry.support.AbstractRegistry#notify(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.util.List) protected void notify(URL url, NotifyListener listener, List urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size()); } // keep every provider's category. Map > result = new HashMap<>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getCategory(DEFAULT_CATEGORY); List categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); categoryList.add(u); } } if (result.size() == 0) { return; } Map > categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry > entry : result.entrySet()) { String category = entry.getKey(); List categoryList = entry.getValue(); categoryNotified.put(category, categoryList); listener.notify(categoryList); // 上面都是些参数校验 无需关注,主要看这里 if (localCacheEnabled) { //配置保存 saveProperties(url); } } } private void saveProperties(URL url) { if (file == null) { return; } try { StringBuilder buf = new StringBuilder(); Map > categoryNotified = notified.get(url); if (categoryNotified != null) { for (List us : categoryNotified.values()) { for (URL u : us) { if (buf.length() > 0) { buf.append(URL_SEPARATOR); } buf.append(u.toFullString()); } } } properties.setProperty(url.getServiceKey(), buf.toString()); long version = lastCacheChanged.incrementAndGet(); if (syncSaveFile) { //然后执行到这里 doSaveProperties(version); } else { registryCacheExecutor.execute(new SaveProperties(version)); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
//org.apache.dubbo.registry.support.AbstractRegistry#doSaveProperties
public void doSaveProperties(long version) {
if (version < lastCacheChanged.get()) {
return;
}
if (file == null) {
return;
}
// Save
try {
//file: /Users.dubbo/dubbo-registry-consumer-127.0.0.1-2181.cache 缓存文件
//lockfile: /Users.dubbo/dubbo-registry-consumer-127.0.0.1-2181.cache.lock 文件锁
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock();
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// Save
try {
if (!file.exists()) {
file.createNewFile();
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
properties.store(outputFile, "Dubbo Registry Cache");
}
} finally {
lock.release();
}
}
} catch (Throwable e) {
savePropertiesRetryTimes.incrementAndGet();
if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) {
logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e);
savePropertiesRetryTimes.set(0);
return;
}
if (version < lastCacheChanged.get()) {
savePropertiesRetryTimes.set(0);
return;
} else {
registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
}
logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e);
}
}
refreshInterfaceInvoker – 先看应用级订阅zs@zs .dubbo % cat /Users/zs/.dubbo/dubbo-registry-consumer-127.0.0.1-2181.cache
#Dubbo Registry Cache
#Fri Dec 31 14:51:41 CST 2021
com.jiangzheng.course.dubbo.api.service.ServiceDemo=empty://30.96.216.168/com.jiangzheng.course.dubbo.api.service.ServiceDemo?application=consumer&background=false&category=routers&dubbo=2.0.2&interface=com.jiangzheng.course.dubbo.api.service.ServiceDemo&metadata-type=remote&methods=getName,getSelf&pid=18635&qos.accept.foreign.ip=false&qos.enable=true&qos.port=33333&release=3.0.4&side=consumer&sticky=false×tamp=1640933317398 empty://30.96.216.168/com.jiangzheng.course.dubbo.api.service.ServiceDemo?application=consumer&background=false&category=configurators&dubbo=2.0.2&interface=com.jiangzheng.course.dubbo.api.service.ServiceDemo&metadata-type=remote&methods=getName,getSelf&pid=18635&qos.accept.foreign.ip=false&qos.enable=true&qos.port=33333&release=3.0.4&side=consumer&sticky=false×tamp=1640933317398 empty://30.96.216.168/com.jiangzheng.course.dubbo.api.service.ServiceDemo?application=consumer&background=false&category=providers&dubbo=2.0.2&interface=com.jiangzheng.course.dubbo.api.service.ServiceDemo&metadata-type=remote&methods=getName,getSelf&pid=18635&qos.accept.foreign.ip=false&qos.enable=true&qos.port=33333&release=3.0.4&side=consumer&sticky=false×tamp=1640933317398
zs@zs .dubbo %
//org.apache.dubbo.registry.client.migration.MigrationInvoker#refreshServiceDiscoveryInvoker
protected void refreshServiceDiscoveryInvoker(CountDownLatch latch) {
clearListener(serviceDiscoveryInvoker);
if (needRefresh(serviceDiscoveryInvoker)) {
if (logger.isDebugEnabled()) {
logger.debug("Re-subscribing instance addresses, current interface " + type.getName());
}
if (serviceDiscoveryInvoker != null) {
serviceDiscoveryInvoker.destroy();
}
serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);
}
setListener(serviceDiscoveryInvoker, () -> {
latch.countDown();
//TODO frameworkStatusReporter
// frameworkStatusReporter.reportConsumptionStatus(
// createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "app")
// );
if (step == APPLICATION_FIRST) {
calcPreferredInvoker(rule);
}
});
}
//org.apache.dubbo.registry.integration.RegistryProtocol#getServiceDiscoveryInvoker
public ClusterInvoker getServiceDiscoveryInvoker(Cluster cluster, Registry registry, Class type, URL url) {
DynamicDirectory directory = new ServiceDiscoveryRegistryDirectory<>(type, url);
return doCreateInvoker(directory, cluster, registry, type);
}
//注意此处,此处为 接口级订阅 refreshInterfaceInvoker调用的方法
public ClusterInvoker getInvoker(Cluster cluster, Registry registry, Class type, URL url) {
// FIXME, this method is currently not used, create the right registry before enable.
DynamicDirectory directory = new RegistryDirectory<>(type, url);
return doCreateInvoker(directory, cluster, registry, type);
}
观察两个方法,接口级订阅使用的RegistryDirectory,应用级订阅使用的ServiceDiscoveryRegistryDirectory,二者 使用的实现类不同。
//此处没什么可说
//org.apache.dubbo.registry.integration.RegistryProtocol#doCreateInvoker
protected ClusterInvoker doCreateInvoker(DynamicDirectory directory, Cluster cluster, Registry registry, Class type) {
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
URL urlToRegistry = new ServiceConfigURL(
parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
parameters.remove(REGISTER_IP_KEY), 0, getPath(parameters, type), parameters);
urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(urlToRegistry);
//执行到这里,此处因为实现类的不同所以 此处调用的是ServiceDiscoveryRegistryDirectory的实现
directory.subscribe(toSubscribeUrl(urlToRegistry));
return (ClusterInvoker) cluster.join(directory, true);
}
//org.apache.dubbo.registry.client.ServiceDiscoveryRegistryDirectory#subscribe
@Override
public void subscribe(URL url) {
if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
enableConfigurationListen = true;
getConsumerConfigurationListener(moduleModel).addNotifyListener(this);
referenceConfigurationListener = new ReferenceConfigurationListener(this.moduleModel, this, url);
} else {
enableConfigurationListen = false;
}
//执行到此处
super.subscribe(url);
}
//org.apache.dubbo.registry.integration.DynamicDirectory#subscribe
public void subscribe(URL url) {
setSubscribeUrl(url);
registry.subscribe(url, this);
}
//org.apache.dubbo.registry.client.ServiceDiscoveryRegistry#subscribe
@Override
public final void subscribe(URL url, NotifyListener listener) {
if (!shouldSubscribe(url)) { // Should Not Subscribe
return;
}
url = addRegistryClusterKey(url);
//执行到此处
doSubscribe(url, listener);
}
@Override
public void doSubscribe(URL url, NotifyListener listener) {
writablemetadataService.subscribeURL(url);
boolean check = url.getParameter(CHECK_KEY, false);
Set subscribedServices = Collections.emptySet();
try {
ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension(this.getUrl().getScopeModel());
subscribedServices = serviceNameMapping.getAndListenServices(this.getUrl(), url, new DefaultMappingListener(url, subscribedServices, listener));
} catch (Exception e) {
logger.warn("Cannot find app mapping for service " + url.getServiceInterface() + ", will not migrate.", e);
}
if (CollectionUtils.isEmpty(subscribedServices)) {
if (check) {
throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
}
return;
}
//上面进行的是数组的组织,接着执行到这里
subscribeURLs(url, listener, subscribedServices);
}
//org.apache.dubbo.registry.client.ServiceDiscoveryRegistry#subscribeURLs
protected void subscribeURLs(URL url, NotifyListener listener, Set serviceNames) {
//此处获取应用的名称key
serviceNames = new TreeSet<>(serviceNames);
//此处获取到了groupUrl的key
String serviceNamesKey = toStringKeys(serviceNames);
String protocolServiceKey = url.getServiceKey() + GROUP_CHAR_SEPARATOR + url.getParameter(PROTOCOL_KEY, DUBBO);
// 此处为判断是否已经注册过了监听器
boolean serviceListenerRegistered = true;
ServiceInstancesChangedListener serviceInstancesChangedListener;
synchronized (this) {
serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
if (serviceInstancesChangedListener == null) {
//创建listener,调用MultipleServiceDiscovery类
serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
serviceInstancesChangedListener.setUrl(url);
//此处为应用级别的循环
for (String serviceName : serviceNames) {
//获取应用的对象信息,ServiceInstance中包含有hostportaddress等信息
List serviceInstances = serviceDiscovery.getInstances(serviceName);
if (CollectionUtils.isNotEmpty(serviceInstances)) {
//创建event事件,此处调用MultiServiceInstancesChangedListener类
serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
}
serviceListenerRegistered = false;
//将Listener监听添加到集合中
serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
}
}
serviceInstancesChangedListener.setUrl(url);
listener.addServiceListener(serviceInstancesChangedListener);
serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey, listener);
if (!serviceListenerRegistered) {
//接下来执行到这里,调用ZookeeperServiceDiscovery类
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
}
}
//org.apache.dubbo.registry.multiple.MultipleServiceDiscovery#createListener
@Override
public ServiceInstancesChangedListener createListener(Set serviceNames) {
return new MultiServiceInstancesChangedListener(serviceNames, this);
}
//org.apache.dubbo.registry.multiple.MultipleServiceDiscovery.MultiServiceInstancesChangedListener#onEvent
@Override
public void onEvent(ServiceInstancesChangedEvent event) {
List serviceInstances = new ArrayList<>();
for (SingleServiceInstancesChangedListener singleListener : singleListenerMap.values()) {
if (null != singleListener.event && null != singleListener.event.getServiceInstances()) {
for (ServiceInstance serviceInstance : singleListener.event.getServiceInstances()) {
if (!serviceInstances.contains(serviceInstance)) {
serviceInstances.add(serviceInstance);
}
}
}
}
super.onEvent(new ServiceInstancesChangedEvent(event.getServiceName(), serviceInstances));
}
//org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery#addServiceInstancesChangedListener
@Override
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws NullPointerException, IllegalArgumentException {
//此处根据应用名称的不同,分别注册 zk的watch,接下来看下registerServiceWatcher
listener.getServiceNames().forEach(serviceName -> registerServiceWatcher(serviceName, listener));
}
//org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery#registerServiceWatcher
protected void registerServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
String path = buildServicePath(serviceName);
try {
//通过zk客户端 判断节点是否存在
curatorframework.create().creatingParentsIfNeeded().forPath(path);
} catch (KeeperException.NodeExistsException e) {
// ignored
if (logger.isDebugEnabled()) {
logger.debug(e);
}
} catch (Exception e) {
throw new IllegalStateException("registerServiceWatcher create path=" + path + " fail.", e);
}
CountDownLatch latch = new CountDownLatch(1);
//创建watcher
ZookeeperServiceDiscoveryChangeWatcher watcher = watcherCaches.computeIfAbsent(path, key -> {
//watcher的具体触发实现逻辑
ZookeeperServiceDiscoveryChangeWatcher tmpWatcher = new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, path, latch);
try {
curatorframework.getChildren().usingWatcher(tmpWatcher).forPath(path);
} catch (KeeperException.NoNodeException e) {
// ignored
if (logger.isErrorEnabled()) {
logger.error(e.getMessage());
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
return tmpWatcher;
});
watcher.addListener(listener);
//给listener加一个event 进行事件触发
listener.onEvent(new ServiceInstancesChangedEvent(serviceName, this.getInstances(serviceName)));
latch.countDown();
}
需要注意的是 在migrateToApplicationFirstInvoker方法中 对于文件的缓存只做了一遍,即refreshInterfaceInvoker中的实现,并不代表 应用级不会
进行缓存,我们可以看下 FORCE_APPLICATION下的migrateToForceApplicationInvoker方法
//org.apache.dubbo.registry.client.migration.MigrationInvoker#migrateToForceApplicationInvoker
@Override
public boolean migrateToForceApplicationInvoker(MigrationRule newRule) {
CountDownLatch latch = new CountDownLatch(1);
//此处为zk注册,注册完之后接下来会进行文件的写入
refreshServiceDiscoveryInvoker(latch);
if (invoker == null) {
// invoker is absent, ignore threshold check
this.currentAvailableInvoker = serviceDiscoveryInvoker;
return true;
}
// wait and compare threshold
waitAddressNotify(newRule, latch);
if (newRule.getForce(consumerUrl)) {
// force migrate, ignore threshold check
this.currentAvailableInvoker = serviceDiscoveryInvoker;
this.destroyInterfaceInvoker();
return true;
}
Set detectors = ScopeModelUtil.getApplicationModel(consumerUrl == null ? null : consumerUrl.getScopeModel())
.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
if (CollectionUtils.isNotEmpty(detectors)) {
if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, newRule))) {
this.currentAvailableInvoker = serviceDiscoveryInvoker;
this.destroyInterfaceInvoker();
return true;
}
}
// compare failed, will not change state
if (step == MigrationStep.FORCE_INTERFACE) {
destroyServiceDiscoveryInvoker();
}
return false;
}



