栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

四、Dubbo服务启动流程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

四、Dubbo服务启动流程

启动类定义

DubboNamespaceHandler解析

dubbo标签具体释义可以查看https://dubbo.apache.org/zh/docsv2.7/user/configuration/xml/

https://dubbo.apache.org/zh/docs/references/xml/

public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanmetadataElement {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    @Override
    //具体配置查看 https://dubbo.apache.org/zh/docsv2.7/user/configuration/xml/
    public void init() {
    	//
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class));
        //
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class));
        //
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class));
        //
        registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class));
        //
        registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(metadataReportConfig.class));
        //
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class));
        //
        registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class));
        //
        registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class));
        //
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class));
        //
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class));
        //
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class));
        //
        //从此处开始 有了业务流程的处理
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class));
        //
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class));
        //
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
}

ServiceBean解析

public class ServiceBean extends ServiceConfig implements InitializingBean, DisposableBean,
        ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware {


    private transient ApplicationContext applicationContext;

    private ApplicationEventPublisher applicationEventPublisher;

    //ApplicationContextAware方法实现,添加应用上下文
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    //InitializingBean方法实现
    @Override
    public void afterPropertiesSet() throws Exception {
        if (StringUtils.isEmpty(getPath())) {
            if (StringUtils.isNotEmpty(getInterface())) {
                setPath(getInterface());//将 com.jiangzheng.course.dubbo.api.service.ServiceDemo 设置为path
            }
        }
        //register service bean
        ModuleModel moduleModel = DubboBeanUtils.getModuleModel(applicationContext);
        moduleModel.getConfigManager().addService(this);
        moduleModel.getDeployer().setPending();
        //注: 不同版本此处会有不同, 有些版本的 该方法将 下面将会提到的 DubboBootstrap进行实例化
        //同时有些版本的 ServiceBean 实现了ApplicationListener 逻辑也大致同DubboBootstrapApplicationListener类似
    }
    
    //ApplicationEventPublisherAware方法实现
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }
}
针对3.0.0.preview版本调用中使用DubboBootstrapApplicationListener

public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
        implements Ordered {

    public static final String BEAN_NAME = "dubboBootstrapApplicationListener";

    private final DubboBootstrap dubboBootstrap;

    public DubboBootstrapApplicationListener() {
    	//获取dubboBootstrap单例实例
        this.dubboBootstrap = DubboBootstrap.getInstance();
    }

    @Override
    public void onApplicationContextEvent(ApplicationContextEvent event) {
    	//如果刷新事件
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } 
        //如果关闭事件
		else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }

    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
    	//dubboBootstrap启动
        dubboBootstrap.start();
    }

    private void onContextClosedEvent(ContextClosedEvent event) {
    	//dubboBootstrap停止
        dubboBootstrap.stop();
    }

    @Override
    public int getOrder() {
        return LOWEST_PRECEDENCE;
    }
}
//实现ApplicationListener事件监听
abstract class OneTimeExecutionApplicationContextEventListener implements ApplicationListener, ApplicationContextAware {

    private ApplicationContext applicationContext;

    public final void onApplicationEvent(ApplicationEvent event) {
        if (isOriginalEventSource(event) && event instanceof ApplicationContextEvent) {
            //调用DubboBootstrapApplicationListener的onApplicationContextEvent方法
            onApplicationContextEvent((ApplicationContextEvent) event);
        }
    }
    private boolean isOriginalEventSource(ApplicationEvent event) {
        return (applicationContext == null) // Current ApplicationListener is not a Spring Bean, just was added
                // into Spring's ConfigurableApplicationContext
                || Objects.equals(applicationContext, event.getSource());
    }
}
//org.apache.dubbo.config.bootstrap.DubboBootstrap#start
public DubboBootstrap start() {
        if (started.compareAndSet(false, true)) {
            startup.set(false);
            //初始化环境
            initialize();
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is starting...");
            }
            //对应的provider的注册流程,如下流程
            exportServices();

            // Not only provider register
            if (!isOnlyRegisterProvider() || hasExportedServices()) {
                // 2. export metadataService
                exportmetadataService();
                //3. Register the local ServiceInstance if required
                registerServiceInstance();
            }

            referServices();
            if (asyncExportingFutures.size() > 0) {
                new Thread(() -> {
                    try {
                        this.awaitFinish();
                    } catch (Exception e) {
                        logger.warn(NAME + " exportAsync occurred an exception.");
                    }
                    startup.set(true);
                    if (logger.isInfoEnabled()) {
                        logger.info(NAME + " is ready.");
                    }
                }).start();
            } else {
                startup.set(true);
                if (logger.isInfoEnabled()) {
                    logger.info(NAME + " is ready.");
                }
            }
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " has started.");
            }
        }
        return this;
    }

	public void initialize() {
        if (!initialized.compareAndSet(false, true)) {
            return;
        }

        ApplicationModel.initframeworkExts();
        
        startConfigCenter();

        loadRemoteConfigs();

        checkGlobalConfigs();

        startmetadataCenter();
		
        initmetadataService();

        initEventListener();

        if (logger.isInfoEnabled()) {
            logger.info(NAME + " has been initialized!");
        }
    }
    
    private void exportServices() {
        configManager.getServices().forEach(sc -> {
            // TODO, compatible with ServiceConfig.export()
            ServiceConfig serviceConfig = (ServiceConfig) sc;
            serviceConfig.setBootstrap(this);
			//是否异步初始化
            if (exportAsync) {//是
                ExecutorService executor = executorRepository.getServiceExporterExecutor();
                Future future = executor.submit(() -> {
                    sc.export();
                    exportedServices.add(sc);
                });
                asyncExportingFutures.add(future);
            } else {//否
            	//org.apache.dubbo.config.ServiceConfig#export
                sc.export();
                exportedServices.add(sc);
            }
        });
    }
	//org.apache.dubbo.config.ServiceConfig#export
    public synchronized void export() {
        if (!shouldExport() || exported) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.initialize();
            bootstrap.service(this);
        }

        checkAndUpdateSubConfigs();

        //init servicemetadata
        servicemetadata.setVersion(getVersion());
        servicemetadata.setGroup(getGroup());
        servicemetadata.setDefaultGroup(getGroup());
        servicemetadata.setServiceType(getInterfaceClass());
        servicemetadata.setServiceInterfaceName(getInterface());
        servicemetadata.setTarget(getRef());

        if (!shouldExport()) {
            return;
        }
		//是否延迟启动
        if (shouldDelay()) {//是
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {//否
            doExport();
        }
    }
    
    protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        //执行到这里
        doExportUrls();
        exported();
    }

    private void doExportUrls() {
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        //获得要暴露的服务接口
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                servicemetadata
        );
        //获得list
		//service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=org.apache.dubbo.config.RegistryConfig&application=provider&client=curator&dubbo=2.0.2&pid=99194®istry=zookeeper&release=3.0.0.preview×tamp=1640844333536
		//registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=org.apache.dubbo.config.RegistryConfig&application=provider&client=curator&dubbo=2.0.2&pid=99194®istry=zookeeper&release=3.0.0.preview×tamp=1640844333536
        List registryURLs = ConfigValidationUtils.loadRegistries(this, true);
		//protocols:
        for (ProtocolConfig protocolConfig : protocols) {
        	//pathKey: com.jiangzheng.course.dubbo.api.service.ServiceDemo
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            // In case user specified path, register service one more time to map it to path.
            repository.registerService(pathKey, interfaceClass);
            // TODO, uncomment this line once service key is unified
            servicemetadata.setServiceKey(pathKey);
            //执行到此方法
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
    
    //方法部分代码
	private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
		String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // 如果配置不是远程的,则导出到本地(仅当配置是远程的时才导出到远程)
            //scope: null
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);//本地暴露
            }
            // 如果配置不是本地的,则导出到远程(仅当配置为本地时才导出到本地)
            //scope: null
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {//远端暴露
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //若协议仅为injvm,则不注册
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            if (url.getParameter(REGISTER_KEY, true)) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url.getServiceKey() + " to registry " + registryURL.getAddress());
                            } else {
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url.getServiceKey());
                            }
                        }

                        // 对于提供程序,这用于启用自定义代理以生成调用程序
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }

                        Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.putAttribute(EXPORT_KEY, url));
                        DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);
						//此处调用 org.apache.dubbo.registry.integration.RegistryProtocol#export 服务注册,(因为 是循环所以会调用两次)
                        Exporter exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    if (metadataService.class.getName().equals(url.getServiceInterface())) {
                        metadataUtils.savemetadataURL(url);
                    }
                    Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);
					//此处调用 org.apache.dubbo.registry.integration.RegistryProtocol#export 服务注册
                    Exporter exporter = PROTOCOL.export(wrapperInvoker);
                    exporters.add(exporter);
                }

                metadataUtils.publishServiceDefinition(url);
            }
        }
        this.urls.add(url);
	}

针对3.0.4版本调用中使用DubboDeployApplicationListener(与3.0.0类似)
	//org.apache.dubbo.config.spring.context.DubboDeployApplicationListener#onApplicationEvent
    @Override
    public void onApplicationEvent(ApplicationContextEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }
    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
        ModuleDeployer deployer = moduleModel.getDeployer();
        Assert.notNull(deployer, "Module deployer is null");
        // start module
        Future future = deployer.start();

        // if the module does not start in background, await finish
        if (!deployer.isBackground()) {
            try {
                future.get();
            } catch (InterruptedException e) {
                logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());
            } catch (Exception e) {
                logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);
            }
        }
    }
	//org.apache.dubbo.config.deploy.DefaultModuleDeployer#start
    @Override
    public synchronized Future start() throws IllegalStateException {
        if (isStarting() || isStarted()) {
            return startFuture;
        }

        onModuleStarting();
        startFuture = new CompletableFuture();

        applicationDeployer.initialize();

        // initialize
        initialize();

        // export services
        exportServices();

        // prepare application instance
        if (hasExportedServices()) {
            applicationDeployer.prepareApplicationInstance();
        }

        // refer services
        referServices();

        executorRepository.getSharedExecutor().submit(() -> {

            // wait for export finish
            waitExportFinish();

            // wait for refer finish
            waitReferFinish();

            onModuleStarted(startFuture);
        });

        return startFuture;
    }

    private void exportServices() {
        for (ServiceConfigbase sc : configManager.getServices()) {
            exportServiceInternal(sc);
        }
    }

    private void exportServiceInternal(ServiceConfigbase sc) {
        ServiceConfig serviceConfig = (ServiceConfig) sc;
        if (!serviceConfig.isRefreshed()) {
            serviceConfig.refresh();
        }
        if (sc.isExported()) {
            return;
        }
        if (exportAsync || sc.shouldExportAsync()) {
            ExecutorService executor = executorRepository.getServiceExportExecutor();
            CompletableFuture future = CompletableFuture.runAsync(() -> {
                try {
                    if (!sc.isExported()) {
                        sc.exportOnly();
                        exportedServices.add(sc);
                    }
                } catch (Throwable t) {
                    logger.error(getIdentifier() + " export async catch error : " + t.getMessage(), t);
                }
            }, executor);

            asyncExportingFutures.add(future);
        } else {
            if (!sc.isExported()) {
                sc.exportOnly();
                exportedServices.add(sc);
            }
        }
    }

	//org.apache.dubbo.config.ServiceConfig#exportOnly
    @Override
    public synchronized void exportOnly() {
        if (this.exported) {
            return;
        }
        if (!this.isRefreshed()) {
            this.refresh();
        }
        if (this.shouldExport()) {
            this.init();

            if (shouldDelay()) {
                doDelayExport();
            } else {
                doExport();
            }
        }
    }
    protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        doExportUrls();
        exported();
    }
    @SuppressWarnings({"unchecked", "rawtypes"})
    private void doExportUrls() {
        ModuleServiceRepository repository = getScopeModel().getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        providerModel = new ProviderModel(getUniqueServiceName(),
            ref,
            serviceDescriptor,
            this,
            getScopeModel(),
            servicemetadata);

        repository.registerProvider(providerModel);

        List registryURLs = ConfigValidationUtils.loadRegistries(this, true);

        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            // In case user specified path, register service one more time to map it to path.
            repository.registerService(pathKey, interfaceClass);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
        Map map = buildAttributes(protocolConfig);

        //init servicemetadata attachments
        servicemetadata.getAttachments().putAll(map);

        URL url = buildUrl(protocolConfig, registryURLs, map);

        exportUrl(url, registryURLs);
    }

    private void exportUrl(URL url, List registryURLs) {
        String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }

            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                url = exportRemote(url, registryURLs);
                metadataUtils.publishServiceDefinition(url);
            }

        }
        this.urls.add(url);
    }

    private URL exportRemote(URL url, List registryURLs) {
        if (CollectionUtils.isNotEmpty(registryURLs)) {
            for (URL registryURL : registryURLs) {
                if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
                    url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
                }

                //if protocol is only injvm ,not register
                if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                    continue;
                }

                url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                if (monitorUrl != null) {
                    url = url.putAttribute(MONITOR_KEY, monitorUrl);
                }

                // For providers, this is used to enable custom proxy to generate invoker
                String proxy = url.getParameter(PROXY_KEY);
                if (StringUtils.isNotEmpty(proxy)) {
                    registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                }

                if (logger.isInfoEnabled()) {
                    if (url.getParameter(REGISTER_KEY, true)) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url.getServiceKey() + " to registry " + registryURL.getAddress());
                    } else {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url.getServiceKey());
                    }
                }

                doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
            }

        } else {

            if (metadataService.class.getName().equals(url.getServiceInterface())) {
                localmetadataService.setmetadataServiceURL(url);
            }

            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }

            doExportUrl(url, true);
        }


        return url;
    }
    @SuppressWarnings({"unchecked", "rawtypes"})
    private void doExportUrl(URL url, boolean withmetaData) {
        Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
        if (withmetaData) {
            invoker = new DelegateProvidermetaDataInvoker(invoker, this);
        }
        //org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
        Exporter exporter = protocolSPI.export(invoker);
        exporters.add(exporter);
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/686716.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号