栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

12.nacos服务注册源码分析之nacos-server服务注册

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

12.nacos服务注册源码分析之nacos-server服务注册

nacos服务注册源码分析之nacos-server服务注册 主要内容

接上篇博文,本篇博文主要介绍nacos-server服务端的处理,包括一下内容:

    nacos-server 如何存储服务信息nacos-server 接收 nacos-client 注册请求的处理,服务实例如何保存nacos-server 接收 nacos-client 心跳请求的处理
服务信息存储结构


存储结构核心代码

// ## ServiceManager 
public class ServiceManager implements RecordListener {
	
    private final Map> serviceMap = new ConcurrentHashMap<>();
}
// ## Service : 服务里面存储 clusterMap 
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener {
	// clusterName -> Cluster
    private Map clusterMap = new HashMap<>();
}
// ## Cluster 
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
	// 存储持久节点
    private Set persistentInstances = new HashSet<>();
    // 存储临时节点
    private Set ephemeralInstances = new HashSet<>();
}
服务端处理服务注册 InstanceController.register 接收到的服务注册信息

服务注册
// serviceManager.registerInstance(namespaceId, serviceName, instance);
// 注册一个AP模式的服务
// 如果服务或者集群不存在,则创建
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        // 创建Service: namespaceId=public; serviceName=DEFAULT_GROUP@@system
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        
        Service service = getService(namespaceId, serviceName);
        
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        // 添加服务实例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
 
创建 Service
// createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// namespaceId=public; serviceName=DEFAULT_GROUP@@system; cluster=null
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
    Service service = getService(namespaceId, serviceName);
    if (service == null) { // 第一次注册为空,则创建Service对象            
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        service.validate();
        
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}

// 保存并初始化Service
// putServiceAndInit(service);
private void putServiceAndInit(Service service) throws NacosException {
    // 将服务放到serviceMap中,见下
    putService(service);
    // 服务初始化:启动客户端心跳检测任务
    service.init();
    // consistencyService 是 DelegateConsistencyServiceImpl
    // 分布式一致性服务,注册监听,一个持久的,一个临时的
    // 持久监听key: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@system
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    // 临时监听key: com.alibaba.nacos.naming.iplist.public##DEFAULT_GROUP@@system
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}


public void putService(Service service) {
	// doubleCheck 方式,将Service放入到 serviceMap 中
    if (!serviceMap.containsKey(service.getNamespaceId())) {
        synchronized (putServiceLock) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
            }
        }
    }
    // serviceMap: namespaceId -> Map(serviceName -> service )
    serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
添加服务实例
// 添加服务实例到Service中
 public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        // key: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@system
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        
        Service service = getService(namespaceId, serviceName);
        
        synchronized (service) {
        	// 构造实例列表
            List instanceList = addIpAddresses(service, ephemeral, ips);
            
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // 保存到 consistencyService 中
            consistencyService.put(key, instances);
        }
    }
服务端接收客户端心跳上报 InstanceController.beat Controller 接收心跳请求
// 这里比较简单了
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
    
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    // clientBeatInterval: 5000
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    
    // beat = "";
    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    // DEFAULT
    String clusterName = WebUtils
            .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    // 192.168.31.30
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    // 8081
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    // false 
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    // public
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // DEFAULT_GROUP@@system
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    // 获取实例信息
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    // 如果实例信息为空,则重新注册到注册中心
    if (instance == null) {
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }        
        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);        
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setmetadata(clientBeat.getmetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());
        
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    
    Service service = serviceManager.getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
    }
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
    // ### 正式处理心跳,见下
    service.processClientBeat(clientBeat);
    // 处理返回信息
    result.put(CommonParams.CODE, NamingResponseCode.OK);
    if (instance.containsmetadata(PreservedmetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}
心跳处理 service.processClientBeat(clientBeat)
//  processClientBeat 构造一个ClientBeatProcessor 来处理心跳请求
public void processClientBeat(final RsInfo rsInfo) {
	 // 客户端心跳处理器
     ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
     clientBeatProcessor.setService(this);
     clientBeatProcessor.setRsInfo(rsInfo);
     // 执行处理器
     HealthCheckReactor.scheduleNow(clientBeatProcessor);
 }

// ClientBeatProcessor.run

@Override
public void run() {
    Service service = this.service;
    
    String ip = rsInfo.getIp(); // 192.168.31.30    
    String clusterName = rsInfo.getCluster(); // DEFAULT
    int port = rsInfo.getPort(); // 8081
    // 获取服务下的集群信息
    Cluster cluster = service.getClusterMap().get(clusterName);
    // 获取集群下的所有实例
    List instances = cluster.allIPs(true);
    
    for (Instance instance : instances) { // 遍历实例
        // 如果是上报心跳的实例
        if (instance.getIp().equals(ip) && instance.getPort() == port) {             
            // 更新最后心跳上送时间
            instance.setLastBeat(System.currentTimeMillis());
            if (!instance.isMarked()) {
                if (!instance.isHealthy()) {
                    // 如果实例不健康了,则更新健康状况
                    instance.setHealthy(true);
                    // 暴露服务信息变更事件
                    getPushService().serviceChanged(service);
                }
            }
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/731427.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号