Dubbo作为一个Rpc框架,服务端必须得将自己暴露出去,以便客户端的调用,所以我们来看一下dubbo是如何将服务进行暴露的。
首先我们知道,启动dubbo得进行一些配置,如下图所示的一些dubbo标签(关于spring为什么能识别dubbo标签,可以搜索一下spring的schema机制,这里不做阐述,因为不是重点)
然后我们可以在下图的文件中找到两个命名空间处理器(因为dubbo是由阿里巴巴捐赠给apache),点进apache的文件里面
进去之后可以看到就是将配置文件中的信息解析到bean中存放于ioc,那因为我们要暴露service,所以我们看下ServiceBean
public class DubbonamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(metadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
}
在ServiceBean这个类中就可以看到服务暴露的方法,我们一路向里点
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
一直看到ServiceConfig中的doExportUrl(),上卖弄这个list就是加载我们配置文件中需要注册到的注册中心
private void doExportUrls() {
//加载配置文件中的所有注册中心配置,并且封装为dubbo内部的URL对象列表
List registryURLs = loadRegistries(true);
//循环所有协议配置,根据不同的协议,向注册中心中发起注册 dubbo provider可能提供多种协议服务,默认dubbo协议,还有其他的比如Redis,Thrift等
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
//核心
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
在方法doExportUrlsFor1Protocol()中可以看到将信息组装成Url的形式进行传输,这url其实就是我们注册中心的信息以及服务信息
// 组装 URL
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
接下来就是为该服务生成一个代理,在进行调用时,先经过代理在调用服务
首先是PROXY_FACTORY进行的一个自适应,可以看到getInvoker()上有个@Adaptive,然后默认选择的是javassist,也就是生成的一个代理
// 为服务提供类(ref)生成 Invoker
Invoker> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
@SPI("javassist")
public interface ProxyFactory {
@Adaptive({PROXY_KEY})
T getProxy(Invoker invoker) throws RpcException;
@Adaptive({PROXY_KEY})
T getProxy(Invoker invoker, boolean generic) throws RpcException;
@Adaptive({PROXY_KEY})
Invoker getInvoker(T proxy, Class type, URL url) throws RpcException;
}
于是我们选择JavassistProxyFactory中的getInvoker(),可以看到最终执行实例的调用就是用invokeMethod()
@Override
public Invoker getInvoker(T proxy, Class type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// 为目标类创建 Wrapper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
return new AbstractProxyInvoker(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName, Class>[] parameterTypes, Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
然后我们继续回到ServiceConfig中,可以看到用协议进行了导出,其实导出分两个步骤,一个是启动本地服务,然后是像注册中心中去注册该服务,那这里的代码就是根据协议会选择RegistryProtocol中的export
Exporter> exporter = protocol.export(wrapperInvoker);
接着就能看到核心的代码,一个是启动服务,一个是注册,首先先是将url进行了解析然后配置了监听,随后执行了服务的导出,我们点进去看
接着可以看到根据url的信息找到DubboProtocol,并返回一个Exporter
privateExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) { // dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.200.10&bind.port=20880&deprecated=false&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=15804&qos.port=22222®ister=true&release=&side=provider×tamp=1622539267498 String key = getCacheKey(originInvoker);//访问缓存 return (ExporterChangeableWrapper ) bounds.computeIfAbsent(key, s -> { //创建 Invoker 委托类对象 Delegate Invoker> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); //----****重点跟 protocol.export(invokerDelegate) 方法,此处protocol根据SPI机制,根据URL中的参数找 DubboProtocol 实现 即根据协议导出 return new ExporterChangeableWrapper<>((Exporter ) protocol.export(invokerDelegate), originInvoker); }); }
随后我们就可以在其中到看到先是拿到url,然后创建了相对应的exporter并存放于缓存之中,随后就是根据url开启了一个服务,可以看到默认采取的是netty启动的
接着就是一个dcl,判断了一下是否服务,是否启动,最后创建并能放入缓存之中
private void openServer(URL url) {
// find server. key=192.168.200.10:20880
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key); // serverMap根据ip:port缓存Server对象 因为服务端可能在本机不同端口暴露
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url)); // createServer是核心----****重点去关注******
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
随后就是在url中添加一系列的参数,如果心跳检测的、编解码参数等,接着就是启动服务
能看到的是获取了一个exchanger并绑定了我们的url以及处理器,exchanger这里也是一个拓展点,所以,默认采取的是HeaderExchanger
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
//向url中添加codec参数 dubbo://192.168.200.10:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=org.apache.dubbo.demo.DemoService&bind.ip=192.168.200.10&bind.port=20880&channel.readonly.sent=true&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=6052&qos.port=22222®ister=true&release=&side=provider×tamp=1622539986773
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//getExchanger(url)根据SPI返回的是 HeaderExchanger
return getExchanger(url).bind(url, handler);
}
在这个bin的过程中
- 创建了HeaderExchangeHandler用于负载处理请求协会结果对请求和响应结果做了编解码操作最后就是绑定了地址
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
// 1. new HeaderExchangeHandler(handler) 负载处理请求回写结果
// 2. new DecodeHandler(new HeaderExchangeHandler(handler)) 对请求数据和响应结果进行解码操作,如何交由后续流程继续处理
// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))) 重点看这个bind方法,
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
来到netty的传输层中,看到创建了netty的服务,我们能接着往下点
最后在doOpen()方法中看到熟悉的netty的代码,跟我们写的几乎一样,在这里呢就是服务的暴露流程
接着我们回到之前,服务暴露之后我们需要将服务注册到注册中心去
在这里呢,就是将将服务进行注册,registryFactory不用多说,典型的拓展点,会根据spi机制,先找到FailbackRegistry,在其中的doRegister()找到我们配置的实现--zookeeper的实现类
public void register(URL registryUrl, URL registeredProviderUrl) {
// zookeeper://192.168.200.129:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.200.10%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bean.name%3Dorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.200.10%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D12416%26qos.port%3D22222%26register%3Dtrue%26release%3D%26side%3Dprovider%26timestamp%3D1622540622168&pid=12416&qos.port=22222×tamp=1622540622168
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
那这里呢,就是做了一些节点的创建,将我们的服务注册到注册中心去
以上呢,就是dubbo的服务端暴露服务的内容。



