栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Dubbo服务导入源码分析

Dubbo服务导入源码分析

引入一个Dubbo服务,可以使用@Reference注解或者在xml配置dubbo:reference标签。

ReferenceBean是FactoryBean的子类。实例化ReferenceBean会调用getObject方法生成对象。ReferenceBean#getObject调用父类ReferenceConfig#get获取一个对象。

public Object getObject() {
    return get();
}

ReferenceConfig#get:检查更新参数配置,调用init方法创建一个接口代理,并返回。

public synchronized T get() {
    checkAndUpdateSubConfigs();

    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}

ReferenceConfig#init 创建接口代理。

private void init() {
    if (initialized) {
        return;
    }
    //省略:将@Reference或者properties文件配置的参数放入map。
    //创建代理
    ref = createProxy(map);
    initialized = true;
}

ReferenceConfig#createProxy
REF_PROTOCOL、CLUSTER和PROXY_FACTORY都是Adaptive代理类。
ReferenceConfig#createProxy

先加载Registry类型的URL。
(1)如果注册url只有一个,直接调用REF_PROTOCOL#refer创建一个invoker;Cluster包装顺序是:MockClusterWrapper->FailoverCluster(默认)。
(2)如果注册url有多个,创建一个Invoker集合,最后一个注册URL添加参数cluster=registryaware。调用Cluster#join将多个Invoker合并成一个Invoker。Cluster包装顺序:MockClusterWrapper->RegistryAwareCluster。
最后创建代理。

Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

private T createProxy(Map map) {

	//省略代码。。。。
	List us = loadRegistries(false);
    if (CollectionUtils.isNotEmpty(us)) {
        for (URL u : us) {
            URL monitorUrl = loadMonitor(u);
            if (monitorUrl != null) {
                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
            }
            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
        }
    }

	//省略
	if (urls.size() == 1) {
        invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
    } else {
        List> invokers = new ArrayList>();
        URL registryURL = null;
        for (URL url : urls) {
            invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
            if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                registryURL = url; //使用最后一个注册URL
            }
        }
        if (registryURL != null) { //注册URL是可用的
            //添加参数cluster=registryaware
            URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
            invoker = CLUSTER.join(new StaticDirectory(u, invokers));
        } else { //不是注册URL,直接调用。
            invoker = CLUSTER.join(new StaticDirectory(invokers));
        }
    }

    //省略

    return (T) PROXY_FACTORY.getProxy(invoker);

}

REF_PROTOCOL#refer调用顺序:
Protocol$Adaptive#refer
–>ProtocolFilterWrapper#refer
–>QosProtocolWrapper#refer
–>ProtocolListenerWrapper#refer
–>RegistryProtocol#refer
–>RegistryProtocol#doRefer

RegistryProtocol#refer获取到真正的Registry(如ZookeeperRegistry),调用doRefer方法创建Invoker。

public  Invoker refer(Class type, URL url) throws RpcException {
    url = URLBuilder.from(url)
            .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
            .removeParameter(REGISTRY_KEY)
            .build();
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    Map qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    String group = qs.get(GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    return doRefer(cluster, registry, type, url);
}

RegistryProtocol#doRefer:封装一个RegistryDirectory实例,RegistryDirectory是NotifyListener的子类,也是一个通知器。将消费url注册到注册中心。构造路由链,订阅providers、configurators和routers目录的变化,调用Cluster#join获取一个Invoker。

subscribeUrl:
consumer://192.168.1.107/gdut.DemoService?application=dubbbo-consumer-demo
&dubbo=2.0.2&interface=gdut.DemoService&lazy=false&methods=sayHello,sayHelloAsync&pid=2912&qos.enable=false&release=2.7.3&revision=default&side=consumer&sticky=false×tamp=1636885284173&version=default

private  Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {
    RegistryDirectory directory = new RegistryDirectory(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map parameters = new HashMap(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(subscribeUrl);
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

构造路由链

public static  RouterChain buildChain(URL url) {
    return new RouterChain<>(url);
}

获取到的extensionFactories一共有4种:MockRouterFactory(Mock路由),TagRouterFactory(标签路由),AppRouterFactory(应用条件路由),ServiceRouterFactory(服务条件路由)

获取到的routers:MockInvokersSelector,TagRouter,AppRouter,ServiceRouter

RouterChain#initWithRouters 将上面获取的路由链赋值给RouterChain的变量并排序。

private RouterChain(URL url) {
    List extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
            .getActivateExtension(url, (String[]) null);

    List routers = extensionFactories.stream()
            .map(factory -> factory.getRouter(url))
            .collect(Collectors.toList());

    initWithRouters(routers);
}

public void initWithRouters(List builtinRouters) {
    this.builtinRouters = builtinRouters;
    this.routers = new ArrayList<>(builtinRouters);
    this.sort();
}

RegistryDirectory#subscribe 将当前这个RegistryDirectory实例添加到监听器集合。
registry#subscribe最终会调用具体的registry实现类(如ZookeeperRegistry)的doSubscribe方法。

public void subscribe(URL url) {
    setConsumerUrl(url);
    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
    registry.subscribe(url, this);
}

cluster.join(directory)获得的是一个MockClusterInvoker。
MockClusterWrapper#join会将自己包装的cluster得到的Invoker包装成一个MockClusterInvoker。

public class MockClusterWrapper implements Cluster {

    private Cluster cluster;

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        return new MockClusterInvoker(directory,
                this.cluster.join(directory));
    }

}

RegistryAwareCluster #join利用传入的目录资源构造一个RegistryAwareClusterInvoker对象。

public class RegistryAwareCluster implements Cluster {

    public final static String NAME = "registryaware";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        return new RegistryAwareClusterInvoker(directory);
    }
}

FailoverCluster#join利用传入的目录资源构造一个FailoverClusterInvoker对象。

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        return new FailoverClusterInvoker(directory);
    }

}

MockClusterInvoker: 完成Mock功能,由MockClusterWrapper⽣成。
FailoverClusterInvoker:完成集群容错功能。默认。
RegistryAwareClusterInvoker:如果指定了多个注册中心,RegistryAwareClusterInvoker会选择默认的注册中心(设置registry.default=true)进行调用,如果没有指定默认的,则会遍历注册中心进行调用,如果该注册中心没有对应的服务则跳过。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582423.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号