流程图示源码分析
Client.request触发channel调用HeaderExchangeChannel.request[核心]
DefaultFuture.newFuture实现真正异步逻辑 NettyClient.sendNettyChannel.send完成netty消息发送 总结
扩展一 NettyChannel扩展一 NettyClientHandler与NettyServerHandler扩展一 NettyClient
流程图示HeaderExchangeChannel完成请求响应对象的映射HeaderExchangeChannel激活Future取消消费者调用线程的阻塞将invocation加入Request通过codec编码层编码Request发送到socket网卡[write+flush]
源码分析
Client.request触发channel调用
Client调用的实现交给dubbo-channel调用,
final class ReferenceCountExchangeClient implements ExchangeClient {
触发HeaderExchangeClient调用
public CompletableFuture
HeaderExchangeChannel.request[核心]
实现invocation到Request的转变通过DefaultFuture实现真正的异步化调用nettyClient.send
完成了invoke->request->send三层方法模型的转变
public CompletableFutureDefaultFuture.newFuture实现真正异步逻辑request(Object request, int timeout) throws RemotingException { ...... 删除其他代码 构建协议对象 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); 构建异步阻塞机制对象 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); 调用nettyclient 完成消息发送 ...... 删除其他代码 channel.send(req); return future; }
request.id 作为DefaultFuture的检索机制Request到达提供者后,response.id = request.idresponse 返回时,依据response.id 找到DefaultFuture并唤醒阻塞线程
public class DefaultFuture extends CompletableFutureNettyClient.send{ future池 将来响应对象通过这里找到请求对象的DefaultFuture,从而赋值结果以及唤醒请求阻塞线程 private static final Map FUTURES = new ConcurrentHashMap<>(); private DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); 加入FUTURES ,id为request.getId() [Request.id同response.id] FUTURES.put(id, this); CHANNELS.put(id, channel); } public static DefaultFuture newFuture(Channel channel, Request request, int timeout) { 构建DefaultFuture并加入 static FUTURES池 final DefaultFuture future = new DefaultFuture(channel, request, timeout); 超时检测 timeoutCheck(future); return future; } }
没有直接调用NettyClient.channel【netty框架nio channel】,通过dubbo框架的NettyChannel进行发送通过getchannel获取nettyChannel
public void send(Object message, boolean sent) throws RemotingException {
if (needReconnect && !isConnected()) {
connect();
}
获取dubbo-channel对象NettyChannel
Channel channel = getChannel();
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
通过dubbo-channel NettyChannel实现消息发送
channel.send(message, sent);
}
NettyChannel.send完成netty消息发送
( nettyClientHandler -> NettyClient -> MultiMessageHandler->HeatBeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->ExchangeHandlerAdapter)基本不做功能
Request.mData就是RPCInvocation对象
| ExchangeCodec | DubboCodec |
|---|---|
| 完成Request编码 | 完成Request.mData编码 |
public void send(Object message, boolean sent) throws RemotingException {
...... 删除其他逻辑 channel 为netty框架的nioSocketChannel
调用调用nettyClient 配置的handler链路[涉及netty Pipeline以及出入站]
调用结果为 nettyClientHandler -> NettyClient -> MultiMessageHandler->HeatBeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->ExchangeHandlerAdapter
调用结果为DubboCountCodec->ExchangeCodec->DubboCodec
ChannelFuture future = channel.writeAndFlush(message);
}
总结
简述消息发送的模型转换 从Invoke->request->senddubbo handler 链在消息发送时基本不起作用消息发送的核心是通过编码器完成编码通过HeaderExchangeChannel完成异步化以及请求响应映射 扩展一 NettyChannel
NettyChannel持有NettyClient与NioSocketChannel的映射关系NettyClient不仅仅作为client对象,同时作为dubbo-handler对象,持有dubbo的所有handler链这样当NioSocketChannel事件发生时候,就可以根据这个映射关系找到对应的NettyClient,确保链路的映射关系 扩展一 NettyClientHandler与NettyServerHandler
适配器模型 适配dubbo的NettyClient与netty的ChannelHandler当事件发生时,先触发NettyClientHandler的相关netty框架事件,在通过NettyChannel的映射关系调用 NettyClient 扩展一 NettyClient
其一作用是作为客户端 打开netty连接其二作用是作为dubbo的handler总入口对象,封装(MultiMessageHandler->HeatBeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->ExchangeHandlerAdapter)这条handler链



