栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Motan中transport模块客户端心跳管理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Motan中transport模块客户端心跳管理

客户端心跳
在Motan中客户端会定期发送心跳包到服务端,用以检查服务是否可用。并且修改当前channel连接的状态isAliveState,是否活着。
客户端负载均衡
Motan中使用了客户端的负载均衡,大概实现如下:
每次增加一个server服务注册中心就会通知客户端,客户端会就创建一个Referer(相当于客户端和服务端的连接,有几个服务就会有几个Referer,每一个具体的provider服务都被抽象成一个Referer接口),然后加入服务列表中,客户端请求服务的时候通过指定的算法(轮训,随机等)从服务列表中选择一个进行请求。

当心跳检查到某个服务不可用的时候,同时也会将服务列表中的Referer过滤掉,不会再去请求。
transport模块源码
1-入口类:
NettyEndpointFactory extends AbstractEndpointFactory

2-创建server和client的类
AbstractEndpointFactory.createServer
具体实现在子类NettyEndpointFactory中的方法:
protected Server innerCreateServer(URL url, MessageHandler messageHandler) {
 return new NettyServer(url, messageHandler);
}

客户端也类似
AbstractEndpointFactory.createClient
具体实现在子类NettyEndpointFactory中的方法:
protected Client innerCreateClient(URL url) {
 return new NettyClient(url);
}

3-客户端心跳发送
相比server客户端多了一个心跳的机制,每创建一个客户端连接都会将其加入到心跳管理的HeartbeatClientEndpointManager中:
private Client createClient(URL url, EndpointManager endpointManager) {
 Client client = innerCreateClient(url);
 //将创建的客户端进行统一管理,每创建一个客户端连接,就将其加入到心跳管理
 endpointManager.addEndpoint(client);
 return client;
 }
 HeartbeatClientEndpointManager对心跳进行管理,并且初始化后会定时向服务端发送心跳请求:
   public void init() {
 executorService = Executors.newScheduledThreadPool(1);
 executorService.scheduleWithFixedDelay(new Runnable() {
     @Override
     public void run() {
  for (Map.Entry entry : endpoints.entrySet()) {
      Client endpoint = entry.getKey();
      try {
   // 如果节点是存活状态,那么没必要走心跳
   if (endpoint.isAvailable()) {
continue;
   }
   HeartbeatFactory factory = entry.getValue();
   //1创建心跳request,2-发送心跳请求
   endpoint.heartbeat(factory.createRequest());
      } catch (Exception e) {
   LoggerUtil.error("HeartbeatEndpointManager send heartbeat Error: url=" + endpoint.getUrl().getUri() + ", " + e.getMessage());
      }
  }
     }
 }, MotanConstants.HEARTBEAT_PERIOD, MotanConstants.HEARTBEAT_PERIOD, TimeUnit.MILLISECONDS);
 ShutDownHook.registerShutdownHook(new Closable() {
     @Override
     public void close() {
  if (!executorService.isShutdown()) {
      executorService.shutdown();
  }
     }
 });
    }
 具体心跳请求封装为了HeartbeatRequest
 public static Request getDefaultHeartbeatRequest(long requestId){
 HeartbeatRequest request = new HeartbeatRequest();

 request.setRequestId(requestId);
 request.setInterfaceName(MotanConstants.HEARTBEAT_INTERFACE_NAME);
 request.setMethodName(MotanConstants.HEARTBEAT_METHOD_NAME);
 request.setParamtersDesc(MotanConstants.HHEARTBEAT_PARAM);
 return request;
    }   
    
 实际调用的代码在NettyClient中
 public void heartbeat(Request request) {
 // 如果节点还没有初始化或者节点已经被close掉了,那么heartbeat也不需要进行了
 if (state.isUnInitState() || state.isCloseState()) {
     LoggerUtil.warn("NettyClient heartbeat Error: state={} url={}", state.name(), url.getUri());
     return;
 }
 LoggerUtil.info("NettyClient heartbeat request: url={}", url.getUri());
 try {
     // async request后,如果service is  异步
     // available,那么将会自动把该client设置成可用
     request(request, true);
 } catch (Exception e) {
     LoggerUtil.error("NettyClient heartbeat Error: url={}, {}", url.getUri(), e.getMessage());
 }
    }
 


4-NettyServer创建
public NettyServer(URL url, MessageHandler messageHandler) {
 //初始化 Codec  DefaultRpcCodec
 super(url);
 this.messageHandler = messageHandler;
    }
主要代码如下 
//server 处理任务的线程池创建和预先启动核心线程
standardThreadExecutor = (standardThreadExecutor != null && !standardThreadExecutor.isShutdown()) ? standardThreadExecutor
  : new StandardThreadExecutor(minWorkerThread, maxWorkerThread, workerQueueSize, new DefaultThreadFactory("NettyServer-" + url.getServerPortStr(), true));
 standardThreadExecutor.prestartAllCoreThreads();
设置当前server最大的连接数
 //默认当前server最大的连接数100000  all clients conn  server支持的最大连接数TCP
     channelManage = new NettyServerChannelManage(maxServerConnection);
    
     serverBootstrap.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .childHandler(new ChannelInitializer() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
   ChannelPipeline pipeline = ch.pipeline();
   pipeline.addLast("channel_manage", channelManage);
   pipeline.addLast("decoder", new NettyDecoder(codec, NettyServer.this, maxContentLength));
   pipeline.addLast("encoder", new NettyEncoder());
   pipeline.addLast("handler", new NettyChannelHandler(NettyServer.this, messageHandler, standardThreadExecutor));
      }
  });  
      
5-NettyServer处理消息
   @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
 if (msg instanceof NettyMessage) {
     if (threadPoolExecutor != null) {
  try {
      threadPoolExecutor.execute(new Runnable() {
   @Override
   public void run() {
processMessage(ctx, ((NettyMessage) msg));
   }
      });
  } catch (RejectedExecutionException rejectException) {
      if (((NettyMessage) msg).isRequest()) {
   rejectMessage(ctx, (NettyMessage) msg);
      } else {
   LoggerUtil.warn("process thread pool is full, run in io thread, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}",
    threadPoolExecutor.getActiveCount(), threadPoolExecutor.getPoolSize(), threadPoolExecutor.getCorePoolSize(),
    threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getTaskCount(), ((NettyMessage) msg).getRequestId());
   processMessage(ctx, (NettyMessage) msg);
      }
  }
     } else {
  processMessage(ctx, (NettyMessage) msg);
     }
 } else {
     LoggerUtil.error("NettyChannelHandler messageReceived type not support: class=" + msg.getClass());
     throw new MotanframeworkException("NettyChannelHandler messageReceived type not support: class=" + msg.getClass());
 }
    }  
     
6-那服务端是如何处理心跳的呢?
  重点在AbstractEndpointFactory.createServer中。
  AbstractEndpointFactory.createServer创建server的时候做了什么呢?直接看关键代码:
  messageHandler = getHeartbeatFactory(url).wrapMessageHandler(messageHandler);
  它把我们的消息处理器messageHandler 进行了包装
  最终传递给我们NettyServer的messageHandler (***pipeline.addLast("handler", new 
 NettyChannelHandler(NettyServer.this,messageHandler,standardThreadExecutor));
 ***) 是HeartMessageHandleWrapper:
    private class HeartMessageHandleWrapper implements MessageHandler {
 private MessageHandler messageHandler;

 public HeartMessageHandleWrapper(MessageHandler messageHandler) {
     this.messageHandler = messageHandler;
 }

 @Override
 public Object handle(Channel channel, Object message) {
     //判断是否是HeartbeatRequest,如果是则直接从这里进行返回
     if (isHeartbeatRequest(message)) {
  return getDefaultHeartbeatResponse(((Request)message).getRequestId());
     }
     //如果不是则直接处理业务的请求
     return messageHandler.handle(channel, message);
 }
    }      

可以看到如果request 是HeartbeatRequest 则会直接返回HeartbeatResponse。    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/239654.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号