引入一个Dubbo服务,可以使用@Reference注解或者在xml配置dubbo:reference标签。
ReferenceBean是FactoryBean的子类。实例化ReferenceBean会调用getObject方法生成对象。ReferenceBean#getObject调用父类ReferenceConfig#get获取一个对象。
public Object getObject() {
return get();
}
ReferenceConfig#get:检查更新参数配置,调用init方法创建一个接口代理,并返回。
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
ReferenceConfig#init 创建接口代理。
private void init() {
if (initialized) {
return;
}
//省略:将@Reference或者properties文件配置的参数放入map。
//创建代理
ref = createProxy(map);
initialized = true;
}
ReferenceConfig#createProxy
REF_PROTOCOL、CLUSTER和PROXY_FACTORY都是Adaptive代理类。
ReferenceConfig#createProxy
先加载Registry类型的URL。
(1)如果注册url只有一个,直接调用REF_PROTOCOL#refer创建一个invoker;Cluster包装顺序是:MockClusterWrapper->FailoverCluster(默认)。
(2)如果注册url有多个,创建一个Invoker集合,最后一个注册URL添加参数cluster=registryaware。调用Cluster#join将多个Invoker合并成一个Invoker。Cluster包装顺序:MockClusterWrapper->RegistryAwareCluster。
最后创建代理。
Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension(); private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); private T createProxy(Mapmap) { //省略代码。。。。 List us = loadRegistries(false); if (CollectionUtils.isNotEmpty(us)) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } } //省略 if (urls.size() == 1) { invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); } else { List > invokers = new ArrayList >(); URL registryURL = null; for (URL url : urls) { invokers.add(REF_PROTOCOL.refer(interfaceClass, url)); if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; //使用最后一个注册URL } } if (registryURL != null) { //注册URL是可用的 //添加参数cluster=registryaware URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME); invoker = CLUSTER.join(new StaticDirectory(u, invokers)); } else { //不是注册URL,直接调用。 invoker = CLUSTER.join(new StaticDirectory(invokers)); } } //省略 return (T) PROXY_FACTORY.getProxy(invoker); }
REF_PROTOCOL#refer调用顺序:
Protocol$Adaptive#refer
–>ProtocolFilterWrapper#refer
–>QosProtocolWrapper#refer
–>ProtocolListenerWrapper#refer
–>RegistryProtocol#refer
–>RegistryProtocol#doRefer
RegistryProtocol#refer获取到真正的Registry(如ZookeeperRegistry),调用doRefer方法创建Invoker。
publicInvoker refer(Class type, URL url) throws RpcException { url = URLBuilder.from(url) .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)) .removeParameter(REGISTRY_KEY) .build(); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); }
RegistryProtocol#doRefer:封装一个RegistryDirectory实例,RegistryDirectory是NotifyListener的子类,也是一个通知器。将消费url注册到注册中心。构造路由链,订阅providers、configurators和routers目录的变化,调用Cluster#join获取一个Invoker。
subscribeUrl:
consumer://192.168.1.107/gdut.DemoService?application=dubbbo-consumer-demo
&dubbo=2.0.2&interface=gdut.DemoService&lazy=false&methods=sayHello,sayHelloAsync&pid=2912&qos.enable=false&release=2.7.3&revision=default&side=consumer&sticky=false×tamp=1636885284173&version=default
privateInvoker doRefer(Cluster cluster, Registry registry, Class type, URL url) { RegistryDirectory directory = new RegistryDirectory (type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map parameters = new HashMap (directory.getUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
构造路由链
public staticRouterChain buildChain(URL url) { return new RouterChain<>(url); }
获取到的extensionFactories一共有4种:MockRouterFactory(Mock路由),TagRouterFactory(标签路由),AppRouterFactory(应用条件路由),ServiceRouterFactory(服务条件路由)
获取到的routers:MockInvokersSelector,TagRouter,AppRouter,ServiceRouter
RouterChain#initWithRouters 将上面获取的路由链赋值给RouterChain的变量并排序。
private RouterChain(URL url) {
List extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, (String[]) null);
List routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());
initWithRouters(routers);
}
public void initWithRouters(List builtinRouters) {
this.builtinRouters = builtinRouters;
this.routers = new ArrayList<>(builtinRouters);
this.sort();
}
RegistryDirectory#subscribe 将当前这个RegistryDirectory实例添加到监听器集合。
registry#subscribe最终会调用具体的registry实现类(如ZookeeperRegistry)的doSubscribe方法。
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
cluster.join(directory)获得的是一个MockClusterInvoker。
MockClusterWrapper#join会将自己包装的cluster得到的Invoker包装成一个MockClusterInvoker。
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
@Override
public Invoker join(Directory directory) throws RpcException {
return new MockClusterInvoker(directory,
this.cluster.join(directory));
}
}
RegistryAwareCluster #join利用传入的目录资源构造一个RegistryAwareClusterInvoker对象。
public class RegistryAwareCluster implements Cluster {
public final static String NAME = "registryaware";
@Override
public Invoker join(Directory directory) throws RpcException {
return new RegistryAwareClusterInvoker(directory);
}
}
FailoverCluster#join利用传入的目录资源构造一个FailoverClusterInvoker对象。
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public Invoker join(Directory directory) throws RpcException {
return new FailoverClusterInvoker(directory);
}
}
MockClusterInvoker: 完成Mock功能,由MockClusterWrapper⽣成。
FailoverClusterInvoker:完成集群容错功能。默认。
RegistryAwareClusterInvoker:如果指定了多个注册中心,RegistryAwareClusterInvoker会选择默认的注册中心(设置registry.default=true)进行调用,如果没有指定默认的,则会遍历注册中心进行调用,如果该注册中心没有对应的服务则跳过。



