在Dubbo中,为provider和consumer提供了一种被称为隐式参数传递的策略,可用于在两者之间传递参数。
本文先通过一个示例来展示下其使用过程,后续通过源码来分析下其传递过程。
1.示例分析 1.1 consumer示例public class Application {
// 服务提供者代码有所精简,本质上还是与之前的示例一样
public static void main(String[] args) throws Exception {
ReferenceConfig reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
// 设置参数
RpcContext.getContext().setAttachment("address", "beijing");
RpcContext.getContext().setAttachment("age", "12");
reference.setScope("remote");
DemoService service = reference.get();
String message = service.sayHello("dubbo");
System.out.println(message);
}
}
相比之前的示例中,多了设置参数的代码,这个需要传递到provider。
1.2 provider示例public class ProviderApplication {
public static void main(String[] args) {
// 服务实现(自定义DemoService接口)
DemoService demoService = new DemoServiceImpl();
// 当前应用配置
ApplicationConfig application = new ApplicationConfig();
application.setName("provider");
// 连接注册中心配置
RegistryConfig registry = new RegistryConfig();
// 本地zookeeper作为配置中心
registry.setAddress("zookeeper://localhost:2181");
// 服务提供者协议配置
ProtocolConfig protocol = new ProtocolConfig();
// dubbo协议,并以20881端口暴露
protocol.setName("dubbo");
protocol.setPort(20881);
// 服务提供者暴露服务配置
ServiceConfig service = new ServiceConfig();
service.setApplication(application);
service.setRegistry(registry);
service.setProtocol(protocol);
service.setInterface(DemoService.class);
service.setRef(demoService);
service.setVersion("1.0.0");
// 暴露及注册服务
service.export();
}
}
// 接口
public interface DemoService {
String sayHello(String name);
}
public class DemoServiceImpl implements DemoService {
private static final Logger logger = LoggerFactory.getLogger(org.apache.dubbo.demo.provider.DemoServiceImpl.class);
@Override
public String sayHello(String name) {
// 在实现类中获取consumer传递过来的参数并打印出来
String address = RpcContext.getContext().getAttachment("address");
String age = RpcContext.getContext().getAttachment("age");
System.out.println("address:" + address);
System.out.println("age:" +age);
return "Hi " + name;
}
}
相比较之前的示例而言,多了在DemoService接口的实现类DemoServiceImpl中获取consumer传递过来的参数的代码。
1.3 测试结果address:beijing age=12
所以,从consumer端传递过来的参数,在provider端被接收到。
下面我们就从源码的角度来分析下整个参数传递的过程。
2.源码分析 2.1 RpcContext解析在分析传递过程之前,先来看下RpcContext的作用
public class RpcContext {
// 参数存放位置
protected final Map attachments = new HashMap<>();
// 有关于RpcContext,其是线程安全的,每一个线程独享一个RpcContext
private static final InternalThreadLocal LOCAL = new InternalThreadLocal() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
// 获取RpcContext对象
public static RpcContext getContext() {
return LOCAL.get();
}
// 设置attachment参数
public RpcContext setAttachment(String key, String value) {
return setObjectAttachment(key, (Object) value);
}
public RpcContext setAttachment(String key, Object value) {
return setObjectAttachment(key, value);
}
@Experimental("Experiment api for supporting Object transmission")
public RpcContext setObjectAttachment(String key, Object value) {
// 实际就是将参数设置到上面的attachments
if (value == null) {
attachments.remove(key);
} else {
attachments.put(key, value);
}
return this;
}
}
通过对RpcContext的分析,我们知道:RpcContext是线程安全的,每一个线程独享一个RpcContext对象,我们在调用setAttachment()方法时将参数设置到该map中。
设置到RpcContext有什么用呢?我们继续分析consumer传递参数的过程
2.1 consumer传递参数通过前面的分析,我们知道consumer方法的调用,会通过ClusterInvoker,默认是FailoverClusterInvoker,调用其invoke()方法。我们就从其invoke()方法分析起。
2.1.1 AbstractClusterInvoker.invoke() 获取consumer设置的参数
public abstract class AbstractClusterInvokerimplements Invoker { public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); // 这里就从当前线程的ThreadLocal中获取RpcContext对象中存放的attachments参数信息 Map contextAttachments = RpcContext.getContext().getObjectAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { // 并将参数绑定到RpcInvocation.attachments属性中 ((RpcInvocation) invocation).addObjectAttachments(contextAttachments); } List > invokers = list(invocation); LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // FailoverClusterInvoker实现 return doInvoke(invocation, invokers, loadbalance); } }
2.1.2 DubboInvoker.doInvoke() 发送数据
public class DubboInvokerextends AbstractInvoker { protected Result doInvoke(final Invocation invocation) throws Throwable { // 获取设置好的Invocation对象,里面有上一步骤获取的参数 RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isoneway = RpcUtils.isoneway(getUrl(), invocation); int timeout = calculateTimeout(invocation, methodName); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 直接通过client传递出去 currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { ExecutorService executor = getCallbackExecutor(getUrl(), inv); CompletableFuture appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } }
传递对象就比较简单了,就是通过Netty channel将invocation对象(包含attachments)传递到服务端
2.2 provider接收参数依旧我们之前对provider接收请求的过程分析,其会先经过一些Filter的处理,最后才交由接口实现类处理。
我们从 看起
2.2.1 ContextFilter 解析attachments
public class ContextFilter implements Filter, Filter.Listener {
public Result invoke(Invoker> invoker, Invocation invocation) throws RpcException {
// 这个invocation就是从消费者端传递过来的那个invocation对象
// 我们从这里获取其attachments
Map attachments = invocation.getObjectAttachments();
if (attachments != null) {
Map newAttach = new HashMap<>(attachments.size());
for (Map.Entry entry : attachments.entrySet()) {
String key = entry.getKey();
if (!UNLOADING_KEYS.contains(key)) {
newAttach.put(key, entry.getValue());
}
}
attachments = newAttach;
}
RpcContext context = RpcContext.getContext();
...
if (attachments != null) {
// 将invocation中获取到的attachments重新放置到当前RpcContext中
if (context.getObjectAttachments() != null) {
context.getObjectAttachments().putAll(attachments);
} else {
context.setObjectAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
context.clearAfterEachInvoke(false);
return invoker.invoke(invocation);
} finally {
context.clearAfterEachInvoke(true);
// importANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
RpcContext.removeContext(true);
RpcContext.removeServerContext();
}
}
}
通过ContextFilter 的分析我们知道:就是在当前Filter中,ContextFilter将从消费者端获取到的attachments,重新添加到provider端的RpcContext.attachments中。
这样,后续在我们的DemoService实现类中,就可以通过RpcContext获取到attachments了。
总结:对隐式参数传递整个分析过程并不算困难,只要我们之前分析consumer、provider的过程足够坚实。
基本上Dubbo的这些扩展功能,都是通过这些Filter来实现的。
所以在分析其源码的时候,先分析主干信息,千万不要陷入无穷无尽的细节中了。
等主干分析清楚之后,再来对细节各个突破即可。



