dubbo 应用升级
官网:https://dubbo.apache.org/zh/docs/migration/
应用升级
dubbo 3.x 升级依赖
org.apache.dubbo dubbo-spring-boot-starter3.0.5 org.apache.curator curator-recipes5.2.0 org.apache.curator curator-x-discovery5.2.0 com.google.protobuf protobuf-java4.0.0-rc-2
在地址注册与发现环节,3.x(应用粒度、接口粒度)与2.x(接口粒度)是完全兼容的,升级步骤如下:
# 服务端双注册
升级jar包依赖到最新版本,配置双注册开关:dubbo.application.register-mode
register-mode可选值:all(默认)、interface(接口粒度)、
instance(应用粒度)
# 消费端双订阅
升级jar包依赖到最新版本,配置双订阅开关:dubbo.application.service-discovery.migration=APPLICATION_FIRST
service-discovery.migration可选值:APPLICATION_FIRST(应用粒度优先)、
FORCE_INTERFACE(强制使用接口粒度)、
FORCE_APPLICATION(强制使用应用粒度)
# 服务端单注册(注册粒度修改为应用粒度)
服务端在所有消费端切换到应用粒度注册后,变换为单注册:register-mode修改为instance
**************
服务端双注册
相关说明:
# 消费端服务调用影响 服务端升级到3.x后,同时向注册中心注册接口级、应用级的服务; 2.x、3.x的消费端均可从注册中心拉取到服务信息,不影响服务使用; # 注册中心存储空间影响 双注册不可避免带来额外的存储需求,但是应用级别服务相对于接口粒度所需的存储空间很小; 对于一个普通集群,数应用粒度注册数据所需的存储空间大致为接口粒度注册数据1/100~1/1000
**************
消费端双订阅
相关说明:
dubbo3.x 默认支持双订阅,可通过参数dubbo.application.service-discovery.migration设置; # dubbo.application.service-discovery.migration可选值: APPLICATION_FIRST:智能决策接口级、应用级地址(MigrationAddressComparator.shouldMigrate判断),双订阅(默认) FORCE_INTERFACE:强制消费接口级地址,如无地址则报错,单订阅2.x地址 FORCE_APPLICATION:强制消费应用级地址,如无地址则报错,单订阅3.x地址 # APPLICATION_FIRST:双订阅说明 对于双订阅的场景,消费端虽然可同时持有 2.x 地址与 3.x 地址, 但选址过程中两份地址是完全隔离的:要么用2.x 地址,要么用3.x 地址, 不存在两份地址混合调用的情况,这个决策过程是在收到第一次地址通知后就完成了的。
订阅策略实现原理
application.yml
dubbo:
application:
name: dubbo-consumer
service-discovery:
migration: FORCE_INTERFACE #消费端订阅策略设置为接口粒度
registry:
protocol: zookeeper
address: localhost:2181
group: dubbo
#register-mode: instance
protocol:
name: dubbo
#port: 20880
server:
port: 8081
消费端获取订阅策略调用栈
# MigrationRule.getStep:获取订阅策略 at org.apache.dubbo.registry.client.migration.model.MigrationRule.getStep(MigrationRule.java:183) at org.apache.dubbo.registry.client.migration.MigrationRuleHandler.doMigrate(MigrationRuleHandler.java:51) - locked <0x259c> (a org.apache.dubbo.registry.client.migration.MigrationRuleHandler) # refer操作 at org.apache.dubbo.registry.client.migration.MigrationRuleListener.onRefer(MigrationRuleListener.java:241) at org.apache.dubbo.registry.integration.RegistryProtocol.interceptInvoker(RegistryProtocol.java:531) at org.apache.dubbo.registry.integration.RegistryProtocol.doRefer(RegistryProtocol.java:500) at org.apache.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:485) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.refer(QosProtocolWrapper.java:83) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:74) at org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:71) at org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper.refer(ProtocolSerializationWrapper.java:52) at org.apache.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1) # ReferenceConfig类 at org.apache.dubbo.config.ReferenceConfig.createInvokerForRemote(ReferenceConfig.java:481) at org.apache.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:386) at org.apache.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:275) - locked <0x259d> (a org.apache.dubbo.config.ReferenceConfig) at org.apache.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:216) # SimpleReferenceCache类 at org.apache.dubbo.config.utils.SimpleReferenceCache.get(SimpleReferenceCache.java:110) # DefaultModuleDeployer类 at org.apache.dubbo.config.deploy.DefaultModuleDeployer.lambda$referServices$6(DefaultModuleDeployer.java:384) at org.apache.dubbo.config.deploy.DefaultModuleDeployer$$Lambda$905/0x0000000801116b38.accept(Unknown Source:-1) at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780) at org.apache.dubbo.config.deploy.DefaultModuleDeployer.referServices(DefaultModuleDeployer.java:364) at org.apache.dubbo.config.deploy.DefaultModuleDeployer.start(DefaultModuleDeployer.java:151) - locked <0x259e> (a org.apache.dubbo.config.deploy.DefaultModuleDeployer) # DubboDeployApplicationListener类 at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onContextRefreshedEvent(DubboDeployApplicationListener.java:108) at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:98) at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:44) # SimpleApplicationEventMulticaster类 at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176) at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169) at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143) # AbstractApplicationContext类 at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421) at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:938) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) - locked <0x259f> (a java.lang.Object) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) # SpringApplication类 at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) at org.springframework.boot.SpringApplication.run(SpringApplication.java:302) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290) # 应用启动入口 at com.example.demo.DemoApplication.main(DemoApplication.java:14)
MigrationRule:获取订阅规则
public class MigrationRule {
public MigrationStep getStep(URL consumerURL) {
if (interfaceRules != null) {
SubMigrationRule rule = interfaceRules.get(consumerURL.getDisplayServiceKey());
if (rule != null) {
if (rule.getStep() != null) {
return rule.getStep();
}
}
}
if (applications != null) {
ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension(consumerURL.getScopeModel());
Set services = serviceNameMapping.getServices(consumerURL);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
SubMigrationRule rule = applicationRules.get(service);
if (rule.getStep() != null) {
return rule.getStep();
}
}
}
}
if (step == null) { //如果step为null,默认将其设置为APPLICATION_FIRST
// initial step : APPLICATION_FIRST
step = MigrationStep.APPLICATION_FIRST;
step = Enum.valueOf(MigrationStep.class,
consumerURL.getParameter(MIGRATION_STEP_KEY,
ConfigurationUtils.getCachedDynamicProperty(consumerURL.getScopeModel(), DUBBO_SERVICEDISCOVERY_MIGRATION, step.name())));
//从配置中心读取step配置,key为dubbo.application.service-discovery.migration
}
return step;
}
MigrationStep:订阅策略
public enum MigrationStep {
FORCE_INTERFACE,
APPLICATION_FIRST,
FORCE_APPLICATION
}
双订阅选址实现原理
application.yml
dubbo:
application:
name: dubbo-consumer
#service-discovery:
# migration: FORCE_INTERFACE #设置消费端订阅模式,默认APPLICATION_FIRST
registry:
protocol: zookeeper
address: localhost:2181
group: dubbo
#register-mode: instance
protocol:
name: dubbo
#port: 20880
server:
port: 8081
消费端双订阅地址选址决策调用栈
# MigrationInvoker.calcPreferredInvoker:选择使用的invoker at org.apache.dubbo.registry.client.migration.MigrationInvoker.calcPreferredInvoker(MigrationInvoker.java:472) - locked <0x290a> (a org.apache.dubbo.registry.client.migration.MigrationInvoker) at org.apache.dubbo.registry.client.migration.MigrationInvoker.migrateToApplicationFirstInvoker(MigrationInvoker.java:244) # MigrationRuleHandler类 at org.apache.dubbo.registry.client.migration.MigrationRuleHandler.refreshInvoker(MigrationRuleHandler.java:73) at org.apache.dubbo.registry.client.migration.MigrationRuleHandler.doMigrate(MigrationRuleHandler.java:57) - locked <0x2947> (a org.apache.dubbo.registry.client.migration.MigrationRuleHandler) # refer操作 at org.apache.dubbo.registry.client.migration.MigrationRuleListener.onRefer(MigrationRuleListener.java:241) at org.apache.dubbo.registry.integration.RegistryProtocol.interceptInvoker(RegistryProtocol.java:531) at org.apache.dubbo.registry.integration.RegistryProtocol.doRefer(RegistryProtocol.java:500) at org.apache.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:485) at org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:74) at org.apache.dubbo.qos.protocol.QosProtocolWrapper.refer(QosProtocolWrapper.java:83) at org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:71) at org.apache.dubbo.rpc.protocol.ProtocolSerializationWrapper.refer(ProtocolSerializationWrapper.java:52) at org.apache.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1) # ReferenceConfig类 at org.apache.dubbo.config.ReferenceConfig.createInvokerForRemote(ReferenceConfig.java:481) at org.apache.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:386) at org.apache.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:275) - locked <0x2948> (a org.apache.dubbo.config.ReferenceConfig) at org.apache.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:216) # SimpleReferenceCache类 at org.apache.dubbo.config.utils.SimpleReferenceCache.get(SimpleReferenceCache.java:110) # DefaultModuleDeployer类 at org.apache.dubbo.config.deploy.DefaultModuleDeployer.lambda$referServices$6(DefaultModuleDeployer.java:384) at org.apache.dubbo.config.deploy.DefaultModuleDeployer$$Lambda$905/0x0000000801117538.accept(Unknown Source:-1) at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780) at org.apache.dubbo.config.deploy.DefaultModuleDeployer.referServices(DefaultModuleDeployer.java:364) at org.apache.dubbo.config.deploy.DefaultModuleDeployer.start(DefaultModuleDeployer.java:151) - locked <0x2949> (a org.apache.dubbo.config.deploy.DefaultModuleDeployer) at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onContextRefreshedEvent(DubboDeployApplicationListener.java:108) at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:98) at org.apache.dubbo.config.spring.context.DubboDeployApplicationListener.onApplicationEvent(DubboDeployApplicationListener.java:44) # SimpleApplicationEventMulticaster类 at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176) at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169) at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143) # AbstractApplicationContext类 at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421) at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:938) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) - locked <0x294a> (a java.lang.Object) # ServletWebServerApplicationContext类 at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) # SpringApplication类 at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) at org.springframework.boot.SpringApplication.run(SpringApplication.java:302) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290) # 项目启动入口 at com.example.demo.DemoApplication.main(DemoApplication.java:14)
MigrationInvoker:是否使用应用粒度服务发现invoker
public class MigrationInvokerimplements MigrationClusterInvoker { private synchronized void calcPreferredInvoker(MigrationRule migrationRule) { if (serviceDiscoveryInvoker == null || invoker == null) { return; } Set detectors = ScopeModelUtil.getApplicationModel(consumerUrl == null ? null : consumerUrl.getScopeModel()) .getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances(); if (CollectionUtils.isNotEmpty(detectors)) { // pick preferred invoker // the real invoker choice in invocation will be affected by promotion if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, migrationRule))) { //是否使用应用粒度服务发现invoker this.currentAvailableInvoker = serviceDiscoveryInvoker; } else { this.currentAvailableInvoker = invoker; } } }
DefaultMigrationAddressComparator:判断是否使用新地址
public class DefaultMigrationAddressComparator implements MigrationAddressComparator {
private static final Logger logger = LoggerFactory.getLogger(DefaultMigrationAddressComparator.class);
private static final String MIGRATION_THRESHOLD = "dubbo.application.migration.threshold";
private static final String DEFAULT_THRESHOLD_STRING = "0.0";
private static final float DEFAULT_THREAD = 0f;
public static final String OLD_ADDRESS_SIZE = "OLD_ADDRESS_SIZE";
public static final String NEW_ADDRESS_SIZE = "NEW_ADDRESS_SIZE";
private Map> serviceMigrationData = new ConcurrentHashMap<>();
@Override
public boolean shouldMigrate(ClusterInvoker newInvoker, ClusterInvoker oldInvoker, MigrationRule rule) {
Map migrationData = serviceMigrationData.computeIfAbsent(oldInvoker.getUrl().getDisplayServiceKey(), _k -> new ConcurrentHashMap<>());
if (!newInvoker.hasProxyInvokers()) {
migrationData.put(OLD_ADDRESS_SIZE, getAddressSize(oldInvoker));
migrationData.put(NEW_ADDRESS_SIZE, -1);
logger.info("No " + getInvokerType(newInvoker) + " address available, stop compare.");
return false;
}
if (!oldInvoker.hasProxyInvokers()) {
migrationData.put(OLD_ADDRESS_SIZE, -1);
migrationData.put(NEW_ADDRESS_SIZE, getAddressSize(newInvoker));
logger.info("No " + getInvokerType(oldInvoker) + " address available, stop compare.");
return true;
}
int newAddressSize = getAddressSize(newInvoker); //新地址大小
int oldAddressSize = getAddressSize(oldInvoker); //旧地址大小
migrationData.put(OLD_ADDRESS_SIZE, oldAddressSize);
migrationData.put(NEW_ADDRESS_SIZE, newAddressSize);
String rawThreshold = null;
Float configedThreshold = rule == null ? null : rule.getThreshold(oldInvoker.getUrl());
if (configedThreshold != null && configedThreshold >= 0) {
rawThreshold = String.valueOf(configedThreshold);
}
rawThreshold = StringUtils.isNotEmpty(rawThreshold) ? rawThreshold : ConfigurationUtils.getCachedDynamicProperty(MIGRATION_THRESHOLD, DEFAULT_THRESHOLD_STRING);
//比较阀值,dubbo.application.migration.threshold设置,默认为0.0
float threshold;
try {
threshold = Float.parseFloat(rawThreshold);
} catch (Exception e) {
logger.error("Invalid migration threshold " + rawThreshold);
threshold = DEFAULT_THREAD;
}
logger.info("serviceKey:" + oldInvoker.getUrl().getServiceKey() + " Instance address size " + newAddressSize + ", interface address size " + oldAddressSize + ", threshold " + threshold);
if (newAddressSize != 0 && oldAddressSize == 0) {
return true; //新地址不等于0,旧地址等于0,表示只有3.x的地址,返回true
}
if (newAddressSize == 0 && oldAddressSize == 0) {
return false; //新地址等于0,旧地址等于0,,返回false
}
if (((float) newAddressSize / (float) oldAddressSize) >= threshold) {
return true; //新地址大小/旧地址大小 >=0,返回true
}
return false; //如果都不满足,返回false,表示使用旧地址,不进行迁移
}
private int getAddressSize(ClusterInvoker invoker) {
if (invoker == null) {
return -1;
}
List> invokers = invoker.getDirectory().getAllInvokers();
return CollectionUtils.isNotEmpty(invokers) ? invokers.size() : 0;
}
public Map getAddressSize(String displayServiceKey) {
return serviceMigrationData.get(displayServiceKey);
}
private String getInvokerType(ClusterInvoker> invoker) {
if (invoker.isServiceDiscovery()) {
return "instance";
}
return "interface";
}
}



