dubbo标签具体释义可以查看https://dubbo.apache.org/zh/docsv2.7/user/configuration/xml/
或
https://dubbo.apache.org/zh/docs/references/xml/
public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanmetadataElement {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
//具体配置查看 https://dubbo.apache.org/zh/docsv2.7/user/configuration/xml/
public void init() {
//
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class));
//
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class));
//
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class));
//
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class));
//
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(metadataReportConfig.class));
//
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class));
//
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class));
//
registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class));
//
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class));
//
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class));
//
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class));
//
//从此处开始 有了业务流程的处理
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class));
//
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class));
//
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
}
ServiceBean解析
public class ServiceBean针对3.0.0.preview版本调用中使用DubboBootstrapApplicationListenerextends ServiceConfig implements InitializingBean, DisposableBean, ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware { private transient ApplicationContext applicationContext; private ApplicationEventPublisher applicationEventPublisher; //ApplicationContextAware方法实现,添加应用上下文 @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } //InitializingBean方法实现 @Override public void afterPropertiesSet() throws Exception { if (StringUtils.isEmpty(getPath())) { if (StringUtils.isNotEmpty(getInterface())) { setPath(getInterface());//将 com.jiangzheng.course.dubbo.api.service.ServiceDemo 设置为path } } //register service bean ModuleModel moduleModel = DubboBeanUtils.getModuleModel(applicationContext); moduleModel.getConfigManager().addService(this); moduleModel.getDeployer().setPending(); //注: 不同版本此处会有不同, 有些版本的 该方法将 下面将会提到的 DubboBootstrap进行实例化 //同时有些版本的 ServiceBean 实现了ApplicationListener 逻辑也大致同DubboBootstrapApplicationListener类似 } //ApplicationEventPublisherAware方法实现 @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; } }
public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
implements Ordered {
public static final String BEAN_NAME = "dubboBootstrapApplicationListener";
private final DubboBootstrap dubboBootstrap;
public DubboBootstrapApplicationListener() {
//获取dubboBootstrap单例实例
this.dubboBootstrap = DubboBootstrap.getInstance();
}
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
//如果刷新事件
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
}
//如果关闭事件
else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
//dubboBootstrap启动
dubboBootstrap.start();
}
private void onContextClosedEvent(ContextClosedEvent event) {
//dubboBootstrap停止
dubboBootstrap.stop();
}
@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
}
}
//实现ApplicationListener事件监听
abstract class OneTimeExecutionApplicationContextEventListener implements ApplicationListener, ApplicationContextAware {
private ApplicationContext applicationContext;
public final void onApplicationEvent(ApplicationEvent event) {
if (isOriginalEventSource(event) && event instanceof ApplicationContextEvent) {
//调用DubboBootstrapApplicationListener的onApplicationContextEvent方法
onApplicationContextEvent((ApplicationContextEvent) event);
}
}
private boolean isOriginalEventSource(ApplicationEvent event) {
return (applicationContext == null) // Current ApplicationListener is not a Spring Bean, just was added
// into Spring's ConfigurableApplicationContext
|| Objects.equals(applicationContext, event.getSource());
}
}
//org.apache.dubbo.config.bootstrap.DubboBootstrap#start
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
startup.set(false);
//初始化环境
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
//对应的provider的注册流程,如下流程
exportServices();
// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export metadataService
exportmetadataService();
//3. Register the local ServiceInstance if required
registerServiceInstance();
}
referServices();
if (asyncExportingFutures.size() > 0) {
new Thread(() -> {
try {
this.awaitFinish();
} catch (Exception e) {
logger.warn(NAME + " exportAsync occurred an exception.");
}
startup.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}).start();
} else {
startup.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
public void initialize() {
if (!initialized.compareAndSet(false, true)) {
return;
}
ApplicationModel.initframeworkExts();
startConfigCenter();
loadRemoteConfigs();
checkGlobalConfigs();
startmetadataCenter();
initmetadataService();
initEventListener();
if (logger.isInfoEnabled()) {
logger.info(NAME + " has been initialized!");
}
}
private void exportServices() {
configManager.getServices().forEach(sc -> {
// TODO, compatible with ServiceConfig.export()
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
//是否异步初始化
if (exportAsync) {//是
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
asyncExportingFutures.add(future);
} else {//否
//org.apache.dubbo.config.ServiceConfig#export
sc.export();
exportedServices.add(sc);
}
});
}
//org.apache.dubbo.config.ServiceConfig#export
public synchronized void export() {
if (!shouldExport() || exported) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.initialize();
bootstrap.service(this);
}
checkAndUpdateSubConfigs();
//init servicemetadata
servicemetadata.setVersion(getVersion());
servicemetadata.setGroup(getGroup());
servicemetadata.setDefaultGroup(getGroup());
servicemetadata.setServiceType(getInterfaceClass());
servicemetadata.setServiceInterfaceName(getInterface());
servicemetadata.setTarget(getRef());
if (!shouldExport()) {
return;
}
//是否延迟启动
if (shouldDelay()) {//是
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {//否
doExport();
}
}
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
//执行到这里
doExportUrls();
exported();
}
private void doExportUrls() {
ServiceRepository repository = ApplicationModel.getServiceRepository();
//获得要暴露的服务接口
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
servicemetadata
);
//获得list
//service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=org.apache.dubbo.config.RegistryConfig&application=provider&client=curator&dubbo=2.0.2&pid=99194®istry=zookeeper&release=3.0.0.preview×tamp=1640844333536
//registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=org.apache.dubbo.config.RegistryConfig&application=provider&client=curator&dubbo=2.0.2&pid=99194®istry=zookeeper&release=3.0.0.preview×tamp=1640844333536
List registryURLs = ConfigValidationUtils.loadRegistries(this, true);
//protocols:
for (ProtocolConfig protocolConfig : protocols) {
//pathKey: com.jiangzheng.course.dubbo.api.service.ServiceDemo
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// TODO, uncomment this line once service key is unified
servicemetadata.setServiceKey(pathKey);
//执行到此方法
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
//方法部分代码
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// 如果配置不是远程的,则导出到本地(仅当配置是远程的时才导出到远程)
//scope: null
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);//本地暴露
}
// 如果配置不是本地的,则导出到远程(仅当配置为本地时才导出到本地)
//scope: null
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {//远端暴露
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//若协议仅为injvm,则不注册
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url.getServiceKey() + " to registry " + registryURL.getAddress());
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url.getServiceKey());
}
}
// 对于提供程序,这用于启用自定义代理以生成调用程序
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
Invoker> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.putAttribute(EXPORT_KEY, url));
DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);
//此处调用 org.apache.dubbo.registry.integration.RegistryProtocol#export 服务注册,(因为 是循环所以会调用两次)
Exporter> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (metadataService.class.getName().equals(url.getServiceInterface())) {
metadataUtils.savemetadataURL(url);
}
Invoker> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);
//此处调用 org.apache.dubbo.registry.integration.RegistryProtocol#export 服务注册
Exporter> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
metadataUtils.publishServiceDefinition(url);
}
}
this.urls.add(url);
}
针对3.0.4版本调用中使用DubboDeployApplicationListener(与3.0.0类似)
//org.apache.dubbo.config.spring.context.DubboDeployApplicationListener#onApplicationEvent
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
ModuleDeployer deployer = moduleModel.getDeployer();
Assert.notNull(deployer, "Module deployer is null");
// start module
Future future = deployer.start();
// if the module does not start in background, await finish
if (!deployer.isBackground()) {
try {
future.get();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for dubbo module start: " + e.getMessage());
} catch (Exception e) {
logger.warn("An error occurred while waiting for dubbo module start: " + e.getMessage(), e);
}
}
}
//org.apache.dubbo.config.deploy.DefaultModuleDeployer#start
@Override
public synchronized Future start() throws IllegalStateException {
if (isStarting() || isStarted()) {
return startFuture;
}
onModuleStarting();
startFuture = new CompletableFuture();
applicationDeployer.initialize();
// initialize
initialize();
// export services
exportServices();
// prepare application instance
if (hasExportedServices()) {
applicationDeployer.prepareApplicationInstance();
}
// refer services
referServices();
executorRepository.getSharedExecutor().submit(() -> {
// wait for export finish
waitExportFinish();
// wait for refer finish
waitReferFinish();
onModuleStarted(startFuture);
});
return startFuture;
}
private void exportServices() {
for (ServiceConfigbase sc : configManager.getServices()) {
exportServiceInternal(sc);
}
}
private void exportServiceInternal(ServiceConfigbase sc) {
ServiceConfig> serviceConfig = (ServiceConfig>) sc;
if (!serviceConfig.isRefreshed()) {
serviceConfig.refresh();
}
if (sc.isExported()) {
return;
}
if (exportAsync || sc.shouldExportAsync()) {
ExecutorService executor = executorRepository.getServiceExportExecutor();
CompletableFuture future = CompletableFuture.runAsync(() -> {
try {
if (!sc.isExported()) {
sc.exportOnly();
exportedServices.add(sc);
}
} catch (Throwable t) {
logger.error(getIdentifier() + " export async catch error : " + t.getMessage(), t);
}
}, executor);
asyncExportingFutures.add(future);
} else {
if (!sc.isExported()) {
sc.exportOnly();
exportedServices.add(sc);
}
}
}
//org.apache.dubbo.config.ServiceConfig#exportOnly
@Override
public synchronized void exportOnly() {
if (this.exported) {
return;
}
if (!this.isRefreshed()) {
this.refresh();
}
if (this.shouldExport()) {
this.init();
if (shouldDelay()) {
doDelayExport();
} else {
doExport();
}
}
}
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
exported();
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
ModuleServiceRepository repository = getScopeModel().getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
providerModel = new ProviderModel(getUniqueServiceName(),
ref,
serviceDescriptor,
this,
getScopeModel(),
servicemetadata);
repository.registerProvider(providerModel);
List registryURLs = ConfigValidationUtils.loadRegistries(this, true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) {
Map map = buildAttributes(protocolConfig);
//init servicemetadata attachments
servicemetadata.getAttachments().putAll(map);
URL url = buildUrl(protocolConfig, registryURLs, map);
exportUrl(url, registryURLs);
}
private void exportUrl(URL url, List registryURLs) {
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
url = exportRemote(url, registryURLs);
metadataUtils.publishServiceDefinition(url);
}
}
this.urls.add(url);
}
private URL exportRemote(URL url, List registryURLs) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
}
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.putAttribute(MONITOR_KEY, monitorUrl);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url.getServiceKey() + " to registry " + registryURL.getAddress());
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url.getServiceKey());
}
}
doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
}
} else {
if (metadataService.class.getName().equals(url.getServiceInterface())) {
localmetadataService.setmetadataServiceURL(url);
}
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
doExportUrl(url, true);
}
return url;
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrl(URL url, boolean withmetaData) {
Invoker> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
if (withmetaData) {
invoker = new DelegateProvidermetaDataInvoker(invoker, this);
}
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
Exporter> exporter = protocolSPI.export(invoker);
exporters.add(exporter);
}



