栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

dubbo源码分析第十三篇一dubbo远程调用第三小节一消费端remote层发送消息核心源码解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

dubbo源码分析第十三篇一dubbo远程调用第三小节一消费端remote层发送消息核心源码解析

文章目录

流程图示源码分析

Client.request触发channel调用HeaderExchangeChannel.request[核心]

DefaultFuture.newFuture实现真正异步逻辑 NettyClient.sendNettyChannel.send完成netty消息发送 总结

扩展一 NettyChannel扩展一 NettyClientHandler与NettyServerHandler扩展一 NettyClient

流程图示

HeaderExchangeChannel完成请求响应对象的映射HeaderExchangeChannel激活Future取消消费者调用线程的阻塞将invocation加入Request通过codec编码层编码Request发送到socket网卡[write+flush]
源码分析 Client.request触发channel调用

Client调用的实现交给dubbo-channel调用,

final class ReferenceCountExchangeClient implements ExchangeClient {
	触发HeaderExchangeClient调用
    public CompletableFuture request(Object request, int timeout) throws RemotingException {
        return client.request(request, timeout);
    }
}
public class HeaderExchangeClient implements ExchangeClient {
	触发dubbo-channel调用
    public CompletableFuture request(Object request) throws RemotingException {
        return channel.request(request);
    }
}
 
HeaderExchangeChannel.request[核心] 

实现invocation到Request的转变通过DefaultFuture实现真正的异步化调用nettyClient.send

完成了invoke->request->send三层方法模型的转变

public CompletableFuture request(Object request, int timeout) throws RemotingException {
    ...... 删除其他代码
    构建协议对象
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    构建异步阻塞机制对象
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    调用nettyclient 完成消息发送
    ...... 删除其他代码
    channel.send(req);
    return future;
}
 
DefaultFuture.newFuture实现真正异步逻辑 

request.id 作为DefaultFuture的检索机制Request到达提供者后,response.id = request.idresponse 返回时,依据response.id 找到DefaultFuture并唤醒阻塞线程

public class DefaultFuture extends CompletableFuture {
    future池 将来响应对象通过这里找到请求对象的DefaultFuture,从而赋值结果以及唤醒请求阻塞线程
    private static final Map FUTURES = new ConcurrentHashMap<>();

    private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
        加入FUTURES ,id为request.getId() [Request.id同response.id]
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

    public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
        构建DefaultFuture并加入 static FUTURES池
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        超时检测
        timeoutCheck(future);
        return future;
    }
}
 
NettyClient.send 

没有直接调用NettyClient.channel【netty框架nio channel】,通过dubbo框架的NettyChannel进行发送通过getchannel获取nettyChannel

public void send(Object message, boolean sent) throws RemotingException {
    if (needReconnect && !isConnected()) {
        connect();
    }
    获取dubbo-channel对象NettyChannel
    Channel channel = getChannel();
    if (channel == null || !channel.isConnected()) {
        throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    }
    通过dubbo-channel NettyChannel实现消息发送
    channel.send(message, sent);
}
NettyChannel.send完成netty消息发送

( nettyClientHandler -> NettyClient -> MultiMessageHandler->HeatBeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->ExchangeHandlerAdapter)基本不做功能

Request.mData就是RPCInvocation对象

ExchangeCodecDubboCodec
完成Request编码完成Request.mData编码
public void send(Object message, boolean sent) throws RemotingException {
    ...... 删除其他逻辑 channel 为netty框架的nioSocketChannel
    调用调用nettyClient 配置的handler链路[涉及netty Pipeline以及出入站]
    调用结果为 nettyClientHandler -> NettyClient -> MultiMessageHandler->HeatBeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->ExchangeHandlerAdapter
    调用结果为DubboCountCodec->ExchangeCodec->DubboCodec
    ChannelFuture future = channel.writeAndFlush(message);
      
}
总结

简述消息发送的模型转换 从Invoke->request->senddubbo handler 链在消息发送时基本不起作用消息发送的核心是通过编码器完成编码通过HeaderExchangeChannel完成异步化以及请求响应映射 扩展一 NettyChannel

NettyChannel持有NettyClient与NioSocketChannel的映射关系NettyClient不仅仅作为client对象,同时作为dubbo-handler对象,持有dubbo的所有handler链这样当NioSocketChannel事件发生时候,就可以根据这个映射关系找到对应的NettyClient,确保链路的映射关系 扩展一 NettyClientHandler与NettyServerHandler

适配器模型 适配dubbo的NettyClient与netty的ChannelHandler当事件发生时,先触发NettyClientHandler的相关netty框架事件,在通过NettyChannel的映射关系调用 NettyClient 扩展一 NettyClient

其一作用是作为客户端 打开netty连接其二作用是作为dubbo的handler总入口对象,封装(MultiMessageHandler->HeatBeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->ExchangeHandlerAdapter)这条handler链

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号