栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Dubbo学习记录(十一)--服务导出之启动运行容器

Dubbo学习记录(十一)--服务导出之启动运行容器

服务导出之启动运行容器与服务注册
    protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        // 已经导出了,就不再导出了
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        doExportUrls();
    }

层层调用,最终调用doExportUrlsFor1Protocol方法

doExportUrlsFor1Protocol

目的:拼接服务的URL, 启动容器, 注册到注册中心;
工作:

    获取协议名称;创建服务URL的参数map获取服务的角色(SIDE_KEY),放入map获取dubbo版本信息放入map;获取监控中心配置的参数,放入map获取应用配置的参数,放入map;获取模块配置的参数,放入map;获取provider配置的参数, 放入map;获取协议配置的参数,放入map;获取服务本身的参数,放入map;获取Method配置的参数 ,放入map;通过接口对应的Wrapper,拿到接口中所有的方法名字,放入map (没深入用暂时不知道)获取@Service的token配置, 放入map;获取主机号host;获取端口号port创建服务的URL实例, 参数为 协议名称为(1), 主机号(14) , 端口号(15),应用名称(可配置) + / + 接口名 为服务名, 参数map;获取SCOPE_KEY的值;如果SCOPE_KEY值为本地local, 则进行本地注册;如果SCOPE_key值为远程remote, 则遍历注册中心,远程注册到每一个注册中心;将服务的元数据注册到元数据中心;
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
        // protocolConfig表示某个协议,registryURLs表示所有的注册中心

        // 如果配置的某个协议,没有配置name,那么默认为dubbo
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = DUBBO;
        }

        // 这个map表示服务url的参数
        Map map = new HashMap();
        map.put(SIDE_KEY, PROVIDER_SIDE);

        appendRuntimeParameters(map);

        // 监控中心参数
        appendParameters(map, metrics);
        // 应用相关参数
        appendParameters(map, application);
        // 模块相关参数
        appendParameters(map, module);

        // 提供者相关参数
        appendParameters(map, provider);

        // 协议相关参数
        appendParameters(map, protocolConfig);

        // 服务本身相关参数
        appendParameters(map, this);

        // 服务中某些方法参数 暂时没用到,不知这个有什么用;
        if (CollectionUtils.isNotEmpty(methods)) {
           
        }

        if (ProtocolUtils.isGeneric(generic)) {
            map.put(GENERIC_KEY, generic);
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }

            // 通过接口对应的Wrapper,拿到接口中所有的方法名字
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet(Arrays.asList(methods)), ","));
            }
        }

        // Token是为了防止服务被消费者直接调用(伪造http请求)
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(TOKEN_KEY, token);
            }
        }

        // export service
        // 通过该host和port访问该服务
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        // 服务url
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

        // 可以通过ConfiguratorFactory,在服务导出时候进行统一配置
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(SCOPE_KEY); // scope可能为null,remote, local,none
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
            // 如果scope为none,则不会进行任何的服务导出,既不会远程,也不会本地

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                // 如果scope不是remote,则会进行本地导出,会把当前url的protocol改为injvm,然后进行导出
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                // 如果scope不是local,则会进行远程导出

                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    // 如果有注册中心,则将服务注册到注册中心
                    for (URL registryURL : registryURLs) {

                        //if protocol is only injvm ,not register
                        // 如果是injvm,则不需要进行注册中心注册
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }

                        // 该服务是否是动态,对应zookeeper上表示是否是临时节点,对应dubbo中的功能就是静态服务
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));

                        // 基于注册中心地址的到监控中心地址,为什么是基于注册中心地址?
                        URL monitorUrl = loadMonitor(registryURL);

                        // 把监控中心地址添加到服务url中
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }

                        // 服务的register参数,如果为true,则表示要注册到注册中心
                        if (logger.isInfoEnabled()) {
             
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        // 服务使用的动态代理机制,如果为空则使用javassit
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }


                        // 此invoker表示一个可执行的服务,调用invoker的invoke()方法即可执行服务,同时此invoker也可用来导出
                        Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

                        // DelegateProvidermetaDataInvoker也表示服务提供者,包括了Invoker和服务的配置
                        DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);

                        // 使用特定的协议来对服务进行导出,这里的协议为RegistryProtocol,导出成功后得到一个Exporter
                        // 1. 先使用RegistryProtocol进行服务注册
                        // 2. 注册完了之后,使用DubboProtocol进行导出
                        Exporter exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    // 没有配置注册中心时,也会导出服务

                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }


                    Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);

                    Exporter exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }


                
                // 根据服务url,讲服务的元信息存入元数据中心
                metadataReportService metadataReportService = null;
                if ((metadataReportService = getmetadataReportService()) != null) {
                    metadataReportService.publishProvider(url);
                }
            }
        }
        this.urls.add(url);
    }
遍历注册中心URL集合
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
		//....省略部分代码
        // 服务url
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

        String scope = url.getParameter(SCOPE_KEY); // scope可能为null,remote, local,none
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
            // 如果scope为none,则不会进行任何的服务导出,既不会远程,也不会本地

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                // 如果scope不是remote,则会进行本地导出,会把当前url的protocol改为injvm,然后进行导出
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                // 如果scope不是local,则会进行远程导出

                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    // 如果有注册中心,则将服务注册到注册中心
                    for (URL registryURL : registryURLs) {

                        //if protocol is only injvm ,not register
                        // 如果本地协议, 即是injvm,则不需要进行注册中心注册
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
						//从服务URL中获取dynamic参数的值, 如果URL中没有,则从注册中心URL中获取,并放入服务URL的参数
                        // 该服务是否是动态,对应zookeeper上表示是否是临时节点,对应dubbo中的功能就是静态服务
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));

                        // 基于注册中心地址的到监控中心地址,为什么是基于注册中心地址?
                        //这个有点疑问,可能是监控中心需要知道服务URL的信息,进而监控服务的请求调用情况等;
                        URL monitorUrl = loadMonitor(registryURL);

                        // 把监控中心地址添加到服务url中
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }

                        // 服务的register参数,如果为true,则表示要注册到注册中心
                        if (logger.isInfoEnabled()) {
                            if (url.getParameter(REGISTER_KEY, true)) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            } else {
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                            }
                        }
						//获取服务的代理机制, Dubbo使用javvassist和jdk动态代理;默认不配置的情况下,使用的是Javaassist;
                        // 服务使用的动态代理机制,如果为空则使用javassit, 并放入注册中心URL的参数中;
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }

                        Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

            
                        DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);


                        Exporter exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    // 没有配置注册中心时,也会导出服务

                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);

                    Exporter exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }

			//省略部分代码;
            }
        }
        this.urls.add(url);
    }

通过动态代理生成服务的Invoker对象, 通过调用Invoker#invoke方法执行服务的方法;会以export为key, 服务URL实例为value, 放入注册中心的URL参数中传入服务实现类,服务接口, 以及注册中心的URL, 所以这个invoker只是针对当前服务的;再对Invoker代理对象实例进行包装,传入当前实例this, 生成一个DelegateProvidermetaDataInvoker包装对象;调用协议为RegistryProtocol的export方法,将包装对象进行注册,导出成功后得到一个Exporter调用Registryprotocol#export方法过程中,会调用DubboProtocol/HttpProtocol 启动Netty/Tomcat运行容器,启动结束后,会将服务invoker注册到注册中心中;

                        Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

                        // DelegateProvidermetaDataInvoker也表示服务提供者,包括了Invoker和服务的配置
                        DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);
                       Exporter exporter = protocol.export(wrapperInvoker);
RegistryProtocol#export(final Invoker originInvoker)
 @Override
    public  Exporter export(final Invoker originInvoker) throws RpcException {

		//originInvoker的协议名称为registry, 而我们使用的zookeeper注册中心, 所以需要吧
		//registry协议名称替换为 zookeepe,再把registry=zookeeper参数去掉;
        // 将registry://xxx?xx=xx®istry=zookeeper 转为---> zookeeper://xxx?xx=xx
        URL registryUrl = getRegistryUrl(originInvoker); 
        // 得到服务提供者url
        URL providerUrl = getProviderUrl(originInvoker); //
        // overrideSubscribeUrl是老版本的动态配置监听url,表示了需要监听的服务以及监听的类型(configurators, 这是老版本上的动态配置)
        // 在服务提供者url的基础上,生成一个overrideSubscribeUrl,协议为provider:
        //增加参数category=configurators&check=false;
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);

        // 一个overrideSubscribeUrl对应一个OverrideListener,用来监听变化事件,监听到overrideSubscribeUrl的变化后,
        // OverrideListener就会根据变化进行相应处理,具体处理逻辑看OverrideListener的实现
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener)

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

        // export invoker
        // 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务了,
        //会启动容器, DubboProtocol则是netty,HttpProtocol则是tomcat; 
        final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);

        // 得到注册中心-ZookeeperRegistry
        final Registry registry = getRegistry(originInvoker);

        // 得到存入到注册中心去的providerUrl,会对服务提供者url中的参数进行简化
        //providerUrl太长了,有些是无关紧要的key-value, 
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);

        // 将当前服务提供者Invoker,以及该服务对应的注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable
        ProviderInvokerWrapper providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);


        //to judge if we need to delay publish
        //是否需要注册到注册中心, 没有配置,默认就是true
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            // 注册服务,把简化后的服务提供者url注册到registryUrl中去
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

      	//以下是监听的内容;
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);


        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        return new DestroyableExporter<>(exporter);
    }

doLocalExport(originInvoker, providerUrl)
    private  ExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);
        return (ExporterChangeableWrapper) bounds.computeIfAbsent(key, s -> {
            Invoker invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            // protocol属性的值是哪来的,是在SPI中注入进来的,是一个代理类
            // 这里实际利用的就是DubboProtocol或HttpProtocol去export  NettyServer
            return new ExporterChangeableWrapper<>((Exporter) protocol.export(invokerDelegate), originInvoker);
        });
    }
DubboProtocol#export

目的:启动tomcat, 启动相关的服务;
工作:

    获取服务URL;获取服务的ServiceKey,构造一个DubboExporter, 传入执行invoker, 服务serviceKey, exporterMap;放入导出器的map;启动运行容器;
    @Override
    public  Exporter export(Invoker invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        // 构造一个Exporter
        DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
        // exporter的Map,  以服务serviceKey为键, exporter为值的map; 查找服务比较方便;
        exporterMap.put(key, exporter);
		//...省略部分代码;

        // 开启NettyServer
        openServer(url);

        optimizeSerialization(url);

        return exporter;
    }

DubboProtocol#openServer(URL url)

目的:启动运行容器, 启动本地服务;
工作:

    获取服务URL的ip:port值’获取服务URL的isserver值,默认为true;通过servermap获取容器容器为空, 创建并设置服务;容器不为空,设置服务;
    private void openServer(URL url) {
        // find server.
         // 获得ip地址和port, 192.168.40.17:20880
        String key = url.getAddress();
        // NettyClient, NettyServer
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            // 缓存Server对象, 运行容器对象, key为ip:port;
            ExchangeServer server = serverMap.get(key);

            // DCL,Double Check LocK
            //容器对象为空,那就创建容器, DCL保证线程安全,避免重复创建容器;
            //容器创建完后,放入serverMap, 下一个服务导出就使用reset方法;
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        // 创建Server,并进行缓存
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                // 服务重新导出时,就会走这里
                server.reset(url);
            }
        }
    }

DubboProtocol# createServer(URL url)

目的:创建容器
工作:

    生成服务URL, 增加channel.readonly.sent=TRUE, heartbeat=60000, codec=dubbo参数;
    ===>心跳为1分钟;获取协议的服务器端实现类型;
private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();

        // 协议的服务器端实现类型,比如:dubbo协议的mina,netty等,http协议的jetty,servlet等,默认为netty
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        // 通过url绑定端口,和对应的请求处理器
        ExchangeServer server;
        try {
            // requestHandler是请求处理器,类型为ExchangeHandler
            // 表示从url的端口接收到请求后,requestHandler来进行处理
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        // 协议的客户端实现类型,比如:dubbo协议的mina,netty等
        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }
Exchangers.bind(url, requestHandler)

·工作:

    通过SPI机制,去扫描Exchanger文件, 类型为header, 最终扫描出来的是HeaderExchanger;调用HeaderExchanger#bind方法;HeaderExchanger#bind又调用了Transporter#bind方法;
    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        // codec表示协议编码方式
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // 通过url得到HeaderExchanger, 利用HeaderExchanger进行bind,将得到一个HeaderExchangeServer
        return getExchanger(url).bind(url, handler);
    }
    //getExchanger(url):是通过SPI去获取一个Exchanger的, 这里的实现类是HeaderExchanger;
    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }
    //通过SPI获取Exchanger文件, 类型为header; 返回一个HeaderExchanger
    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }
	public class HeaderExchanger implements Exchanger {
	
	    public static final String NAME = "header";

	    @Override
	    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
	
	        // 下面会去启动Netty
	        // 对handler包装了两层,表示当处理一个请求时,每层Handler负责不同的处理逻辑
	        // 为什么在connect和bind时都是DecodeHandler,解码,解的是把InputStream解析成RpcInvocation对象
	        // 客户端调用服务时,会将服务的信息例如:接口路径名,参数, 版本号, ip端口等与服务香瓜你的信息;然后服务端在更根据这些信息,找到服务实现类,进而执行;
	        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
	    }
	
	}
Transporters# bind(URL url, ChannelHandler… handlers)

目的:启动容器,加入ChannelHander处理类;
工作:

    handlers的处理, 如果有多个,则创建一个 ChannelHandlerDispatcher实例;调用getTransporter() 获取Transporter实例, 通过SPI机制, 查找Transporter配置文件, 由于Transporter接口使用了注解@SPI(“netty”), 默认使用的时NettryTransporter实现类;调用NettyTransporter#bind方法 创建NettryServer实例;
    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
		//....省略无关代码

        // 如果bind了多个handler,那么当有一个连接过来时,会循环每个handler去处理连接
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }

        // 调用NettyTransporter去绑定,Transporter表示网络传输层
        return getTransporter().bind(url, handler);
    }
	//通过SPI机制, 查找Transporter配置文件, 由于Transporter接口使用了注解@SPI("netty"), 默认使用的时NettryTransporter实现类;
    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }
    @SPI("netty")
	public interface Transporter {....}
    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

NettyServer
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        // 设置线程名,wrap方法会返回一个MultiMessageHandler,这个Handler会被设置到AbstractPeer的handler属性上
        // 而当netty接收到数据时,会调用AbstractPeer的handler属性的received方法
        // 所以MultiMessageHandler就是负责处理请求
        // 而MultiMessageHandler
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();

        } catch (Throwable t) {
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }
NettryServer# doOpen()

目的:创建容器;
工作:

    创建netty服务器创建一个服务接收线程组;创建一个服务执行线程组;创建NettyServerHandler ,用来接收连接,心跳请求等;服务器设置一些参数, 如NettyServerHandler,线程组等服务器绑定地址;
    @Override
    protected void doOpen() throws Throwable {
    	//创建服务器
        bootstrap = new ServerBootstrap();
		//创建一个服务接收线程组;
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        //创建一个服务执行线程组;
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

	
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

运行容器

在服务URL中指定了协议,比如Http协议、Dubbo协议。根据不同的协议启动对应的Server。
比如Http协议就启动Tomcat、Jetty。
比如Dubbo协议就启动Netty。

不能只启动Server,还需要绑定一个RequestHandler,用来处理请求。
比如,Http协议对应的就是InternalHandler。Dubbo协议对应的就是ExchangeHandler。

    调用DubboProtocol的openServer(URL url)方法开启启动Server调用DubboProtocol的createServer(url)方法,在createServer()方法中调用Exchangers.bind(url, requestHandler)得到一个ExchangeServer; 其中requestHandler表示请求处理器,用来处理请求在Exchangers.bind(url, requestHandler)中,先会根据URL得到一个Exchanger,默认为HeaderExchanger;HeaderExchanger中包括HeaderExchangeClient、HeaderExchangeServer.; HeaderExchangeClient负责发送心跳,HeaderExchangeServer负责接收心跳,如果超时则会关闭channel在构造HeaderExchangeServer之前,会通过调用Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))方法的到一个Server默认会使用getTransporter去bind(URL url, ChannelHandler listener)从而得到一个Servlet,此时的listener就是外部传进来的DecodeHandler在NettyTransporter的bind方法中会去new NettyServer(url, listener),所以上面返回的Server默认就是NettyServer在构造NettyServer时,会调用ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))再构造一个ChannelHandler。wrap中的handler就是上面的listener;在wrap方法中会调用new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));构造一个ChannelHandler。构造完ChannelHandler后,就是真正的去开启Server了,会调用AbstractServer抽象类的doOpen方法。在NettyServer中,会实现doOpen方法,会调用new NettyServerHandler(getUrl(), this)构造一个NettyServerHandler,并bind地址至此,DubboProtocol协议的启动Server流程就结束
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773738.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号