栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

八:DubboConsumer三种调用Provider的方式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

八:DubboConsumer三种调用Provider的方式

Consumer端dubbo:reference的三种配置方式

1、直连提供者

2、本地调用

注: Dubbo 从 2.2.0 每个服务默认都会在本地暴露,无需进行任何配置即可进行本地引用,如果不希望服务进行远程暴露,只需要在 provider 将 protocol 设置成 injvm 即可
3、远程调用

我们先确定入口类


所以我们确定了 ReferenceBean.java是我们研究的入口,该类实现了 四个spring的接口:FactoryBean,
ApplicationContextAware, BeanClassLoaderAware, InitializingBean, DisposableBean,我们根据 FactoryBean的getObject()的方法实现来进一步研究

    @Override
    public Object getObject() {
    	//是否延迟加载
        if (referenceLazyProxy == null) {
        	//进入此方法
            createReferenceLazyProxy();
        }
        return referenceLazyProxy;
    }
    private void createReferenceLazyProxy() {
        this.referenceTargetSource = new DubboReferenceLazyInitTargetSource();

        //set proxy interfaces
        //see also: org.apache.dubbo.rpc.proxy.AbstractProxyFactory.getProxy(org.apache.dubbo.rpc.Invoker, boolean)
        ProxyFactory proxyFactory = new ProxyFactory();
        proxyFactory.setTargetSource(referenceTargetSource);
        // getInterfaceClass() = com.jiangzheng.course.dubbo.api.service.ServiceDemo.class
        proxyFactory.addInterface(getInterfaceClass());
        Class[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
        for (Class anInterface : internalInterfaces) {
            proxyFactory.addInterface(anInterface);
        }
        if (ProtocolUtils.isGeneric(generic)){
            //add actual interface
            proxyFactory.addInterface(ReflectUtils.forName(interfaceName));
        }
		//执行到此处
        this.referenceLazyProxy = proxyFactory.getProxy(this.beanClassLoader);
    }
	//org.springframework.aop.framework.ProxyFactory
	public Object getProxy(@Nullable ClassLoader classLoader) {
		return createAopProxy().getProxy(classLoader);
	}

其实调用到此处已经为 spring内部的动态代理实现了,但是 因为涉及 org.springframework.aop.target.AbstractLazyCreationTargetSource 的方法实现,所以我们需要继续深入熟悉 该实现方法,如下代码

	//org.apache.dubbo.config.spring.ReferenceBean.DubboReferenceLazyInitTargetSource
    private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {

        @Override
        protected Object createObject() throws Exception {
        	//进入此方法
            return getCallProxy();
        }

        @Override
        public synchronized Class getTargetClass() {
            return getInterfaceClass();
        }
    }
    private Object getCallProxy() throws Exception {

        if (referenceConfig == null) {
            throw new IllegalStateException("ReferenceBean is not ready yet, maybe dubbo engine is not started");
        }
        //进入次get方法
        return ReferenceConfigCache.getCache().get(referenceConfig);
    }
	org.apache.dubbo.config.utils.ReferenceConfigCache
    @SuppressWarnings("unchecked")
    public  T get(ReferenceConfigbase referenceConfig) {
        String key = generator.generateKey(referenceConfig);
        Class type = referenceConfig.getInterfaceClass();

        proxies.computeIfAbsent(type, _t -> new ConcurrentHashMap<>());

        ConcurrentMap proxiesOfType = proxies.get(type);
        proxiesOfType.computeIfAbsent(key, _k -> {
        	//进入此get方法
            Object proxy = referenceConfig.get();
            referredReferences.put(key, referenceConfig);
            return proxy;
        });

        return (T) proxiesOfType.get(key);
    }
	//org.apache.dubbo.config.ReferenceConfig
    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
        	//进入此init方法
            init();
        }
        return ref;
    }
public synchronized void init() {
        if (initialized) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.initialize();
            bootstrap.reference(this);
        }

        checkAndUpdateSubConfigs();

        checkStubAndLocal(interfaceClass);
        ConfigValidationUtils.checkMock(interfaceClass, this);

        Map map = new HashMap();
        map.put(SIDE_KEY, CONSUMER_SIDE);

        ReferenceConfigbase.appendRuntimeParameters(map);
        if (!ProtocolUtils.isGeneric(generic)) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet(Arrays.asList(methods)), COMMA_SEPARATOR));
            }
        }
        map.put(INTERFACE_KEY, interfaceName);
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ConsumerConfig
        // appendParameters(map, consumer, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, consumer);
        AbstractConfig.appendParameters(map, this);
        metadataReportConfig metadataReportConfig = getmetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(metaDATA_KEY, REMOTE_metaDATA_STORAGE_TYPE);
        }
        Map attributes = null;
        if (CollectionUtils.isNotEmpty(getMethods())) {
            attributes = new HashMap<>();
            for (MethodConfig methodConfig : getMethods()) {
                AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                if (asyncMethodInfo != null) {
//                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                    attributes.put(methodConfig.getName(), asyncMethodInfo);
                }
            }
        }

        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(REGISTER_IP_KEY, hostToRegistry);

        servicemetadata.getAttachments().putAll(map);
		
		//进入此方法
        ref = createProxy(map);

        servicemetadata.setTarget(ref);
        servicemetadata.addAttribute(PROXY_CLASS_REF, ref);
        ConsumerModel consumerModel = repository.lookupReferredService(servicemetadata.getServiceKey());
        consumerModel.setProxyObject(ref);
        consumerModel.init(attributes);

        initialized = true;

        checkInvokerAvailable();

        // dispatch a ReferenceConfigInitializedEvent since 2.7.4
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }

//我们重点关注以下方法的实现

private T createProxy(Map map) {
		//是否是本地调用
        if (shouldJvmRefer(map)) {
        	//LOCAL_PROTOCOL = "injvm" 
        	//LOCALHOST_VALUE = "127.0.0.1"
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
        	//先清理所有URL
            urls.clear();
            
            if (url != null && url.length() > 0) { //如果有配置url的话
            	//SEMIcolon_SPLIT_PATTERN = Pattern.compile("\s*[;]+\s*")
            	//如果是服务直连的话 多个地址会用分好进行分割,所以此处通过分好进行分割
                String[] us = SEMIcolon_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        //配置URL的path为interfaceName
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        //每一个url做一个缓存
                        if (UrlUtils.isRegistry(url)) {
                            urls.add(url.putAttribute(REFER_KEY, map));
                        } else {
                            URL peerURL = ClusterUtils.mergeUrl(url, map);
                            peerURL = peerURL.putAttribute(PEER_KEY, true);
                            urls.add(peerURL);
                        }
                    }
                }
            } else { //否则的话 此处为除了injvm和直连提供者外
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                    checkRegistry();
                    List us = ConfigValidationUtils.loadRegistries(this, false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.putAttribute(REFER_KEY, map));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config  to your spring config.");
                    }
                }
            }
			//接下来是 服务订阅 和 调用远程代理
			//判断有多少个远程接口调用配置,我们重点关注 REF_PROTOCOL.refer 的方法
            if (urls.size() == 1) {
                invoker = **REF_PROTOCOL.refer**(interfaceClass, urls.get(0));
            } else {
                List> invokers = new ArrayList>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(**REF_PROTOCOL.refer**(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default
                    String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                } else { // not a registry url, must be direct invoke.
                    String cluster = CollectionUtils.isNotEmpty(invokers)
                            ? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
                            : Cluster.DEFAULT;
                    invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
                }
            }
        }

        if (logger.isInfoEnabled()) {
            logger.info("Referred dubbo service " + interfaceClass.getName());
        }

        URL consumerURL = new URL(CONSUMER_PROTOCOL, map.get(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
        //加入到集群中
        metadataUtils.publishServiceDefinition(consumerURL);

        //执行到这里,generic主要用于泛化调用
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }
	//org.apache.dubbo.rpc.proxy.AbstractProxyFactory
	@Override
    public  T getProxy(Invoker invoker, boolean generic) throws RpcException {
        Set> interfaces = new HashSet<>();

        String config = invoker.getUrl().getParameter(INTERFACES);
        if (config != null && config.length() > 0) {
            String[] types = COMMA_SPLIT_PATTERN.split(config);
            for (String type : types) {
                // TODO can we load successfully for a different classloader?.
                interfaces.add(ReflectUtils.forName(type));
            }
        }

        if (generic) {
            if (!GenericService.class.isAssignableFrom(invoker.getInterface())) {
                interfaces.add(com.alibaba.dubbo.rpc.service.GenericService.class);
            }

            try {
                // find the real interface from url
                String realInterface = invoker.getUrl().getParameter(Constants.INTERFACE);
                interfaces.add(ReflectUtils.forName(realInterface));
            } catch (Throwable e) {
                // ignore
            }
        }

        interfaces.add(invoker.getInterface());
        interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));
		//执行到这里
        return getProxy(invoker, interfaces.toArray(new Class[0]));
    }

	//到这里后,执行实现类的方法
	public abstract  T getProxy(Invoker invoker, Class[] types);
	//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
    public  T getProxy(Invoker invoker, Class[] interfaces) {
    	//执行到此处,返回一个InvokerInvocationHandler,而InvokerInvocationHandler实现了InvocationHandler接口,
    	//所以我们明白了,此处用的JDK动态代理
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771351.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号