栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Nacos源码】——(1)概览&Naming Service

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Nacos源码】——(1)概览&Naming Service

目录

一、前言二、源码概览三、数据模型四、服务领域模型

1、Model

(1)Service(2)Cluster(3)Instance 2、核心流程

(1)create service(2)register instance

(a)grpc-NamingGrpcClientProxy

RpcClientcurrentConnection.request() (b)http-NamingHttpClientProxy

一、前言 二、源码概览

address模块: 主要查询nacos集群中节点个数以及IP的列表。
api模块: 主要给客户端调用的api接口的抽象。
common模块: 主要是通用的工具包和字符串常量的定义。
client模块: 主要是对依赖api模块和common模块,对api的接口的实现,给nacos的客户端使用。
cmdb模块: 主要是操作的数据的存储在内存中,该模块提供一个查询数据标签的接口。
config模块: 主要是服务配置的管理, 提供api给客户端拉去配置信息,以及提供更新配置
的,客户端通过长轮询的更新配置信息.数据存储是mysql。
naming模块: 主要是作为服务注册中心的实现模块,具备服务的注册和服务发现的功能。
console模块: 主要是实现控制台的功能.具有权限校验、服务状态、健康检查等功能。
core模块: 主要是实现Spring的PropertySource的后置处理器,用于加载nacos的default的配置信息。
distribution模块: 主要是打包nacos-server的操作,使用maven-assembly-plugin进行自定义打包。

核心功能项目:

配置管理 ——nacos-config
服务注册与发现——nacos-naming

三、数据模型

四、服务领域模型


nacos服务基本分为这几个级别:Service、Cluster、Instance。如上图所示。

1、Model

下面就分别介绍下这三种实体类的重要参数,其中,pojo包下是基本定义,core包中的实体做了额外的扩展。

(1)Service

pojo下的Service

namespace+group+name

public class Service implements Serializable {
    
    private static final long serialVersionUID = -990509089519499344L;
    
    private final String namespace;
    
    private final String group;
    
    private final String name;
    
    //暂时的,短暂的
    private final boolean ephemeral;
    
    private final AtomicLong revision;
    
    private long lastUpdatedTime;

//如何判断两个service相同
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Service)) {
            return false;
        }
        Service service = (Service) o;
        return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
    }
}
(2)Cluster

pojo包下

//属于哪个service
private String serviceName;

//默认端口
private int defaultPort = 80;

core包下

//实例集合
@JsonIgnore
private Set persistentInstances = new HashSet<>();


@JsonIgnore
private Set ephemeralInstances = new HashSet<>();

//name相同
public boolean equals(Object obj) {
    if (!(obj instanceof Cluster)) {
        return false;
    }
    
    return getName().equals(((Cluster) obj).getName());
}

//把check任务交给scheduleExecutor异步执行。
public void init() {
    if (inited) {
        return;
    }
    checkTask = new HealthCheckTask(this);
    
    HealthCheckReactor.scheduleCheck(checkTask);
    inited = true;
}
(3)Instance

ephemeral参数用来区分实例时暂时性的还是持久的。实例的唯一id instanceId 生成规则:simple和snowflake。默认为simple。为了方便查询,instance中冗余了serviceName、clusterName等。

private String instanceId;



private String ip;



private int port;



private double weight = 1.0D;



private boolean healthy = true;



private boolean enabled = true;



private boolean ephemeral = true;



private String clusterName;



private String serviceName;



private Map metadata = new HashMap();


//生成唯一id,两种方式:simple、snowflake
public String generateInstanceId(Set currentInstanceIds) {
    String instanceIdGenerator = getInstanceIdGenerator();
    if (Constants.SNOWFLAKE_INSTANCE_ID_GENERATOR.equalsIgnoreCase(instanceIdGenerator)) {
        return generateSnowflakeInstanceId(currentInstanceIds);
    } else {
        return generateInstanceId();
    }
}

雪花算法 SnowFlowerIdGenerator

public synchronized long nextId() {
    long currentMillis = currentTimeMillis();
    if (this.lastTime == currentMillis) {
        if (0L == (this.sequence = ++this.sequence & 4095L)) {
            currentMillis = this.waitUntilNextTime(currentMillis);
        }
    } else {
        this.sequence = 0L;
    }
    
    this.lastTime = currentMillis;
    
    if (logger.isDebugEnabled()) {
        logger.debug("{}-{}-{}", (new SimpleDateFormat(DATETIME_PATTERN)).format(new Date(this.lastTime)),
                workerId, this.sequence);
    }
    
    currentId = currentMillis - EPOCH << 22 | workerId << 12 | this.sequence;
    return currentId;
}

这里从几个核心功能开始介绍:

2、核心流程 (1)create service

NamingFactory提供了两种创建命名服务的方法,根据server list和Properties。

    public static NamingService createNamingService(String serverList) throws NacosException {
        try {
            Class driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
            Constructor constructor = driverImplClass.getConstructor(String.class);
            return (NamingService) constructor.newInstance(serverList);
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }

    public static NamingService createNamingService(Properties properties) throws NacosException {
        try {
            Class driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            return (NamingService) constructor.newInstance(properties);
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }

主要通过反射,根据NacosNamingService类创建服务。

(2)register instance

NacosNamingService,通过NamingClientProxy实现。

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
	NamingUtils.checkInstanceIsLegal(instance);
	clientProxy.registerService(serviceName, groupName, instance);
}

NamingClientProxy:

提供了一系列服务操作的接口。包括服务CRUD、实例CRUD、获取服务列表、订阅/取消订阅、健康检查、更新心跳信息等。

NamingClientProxyDelegate:

提供了代理类NamingHttpClientProxy和NamingGrpcClientProxy分别实现http、grpc两种方式,来实现接口中的方法。

@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {    
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
//判断实例是否是短暂的,是-走grpc,否则走http。
private NamingClientProxy getExecuteClientProxy(Instance instance) {   
 return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
(a)grpc-NamingGrpcClientProxy
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
            instance);
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);
    doRegisterService(serviceName, groupName, instance);
}

     //缓存一份,key:serviceName+groupName,value:data
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
    String key = NamingUtils.getGroupedName(serviceName, groupName);
    InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
    synchronized (registeredInstances) {
        registeredInstances.put(key, redoData);
    }
}


public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
            NamingRemoteConstants.REGISTER_INSTANCE, instance);
    requestToServer(request, Response.class);
    redoService.instanceRegistered(serviceName, groupName);
}


private  T requestToServer(AbstractNamingRequest request, Class responseClass)
        throws NacosException {
    try {
        request.putAllHeader(getSecurityHeaders());
        request.putAllHeader(getSpasHeaders(
                NamingUtils.getGroupedNameOptional(request.getServiceName(), request.getGroupName())));
        Response response =
                requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
        if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
            throw new NacosException(response.getErrorCode(), response.getMessage());
        }
        if (responseClass.isAssignableFrom(response.getClass())) {
            return (T) response;
        }
        NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                response.getClass().getName(), responseClass.getName());
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
    }
    throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}

//private final ConcurrentMap registeredInstances = new ConcurrentHashMap<>();

//将实例数据保存到map中
public void instanceRegistered(String serviceName, String groupName) {
    String key = NamingUtils.getGroupedName(serviceName, groupName);
    synchronized (registeredInstances) {
        InstanceRedoData redoData = registeredInstances.get(key);
        if (null != redoData) {
            redoData.setRegistered(true);
        }
    }
}
RpcClient
public Response request(Request request, long timeoutMills) throws NacosException {
    int retryTimes = 0;
    Response response;
    Exception exceptionThrow = null;
    long start = System.currentTimeMillis();
    while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
        boolean waitReconnect = false;
        try {
            if (this.currentConnection == null || !isRunning()) {
                waitReconnect = true;
                throw new NacosException(NacosException.CLIENT_DISCONNECT,
                        "Client not connected, current status:" + rpcClientStatus.get());
            }
            response = this.currentConnection.request(request, timeoutMills);
            if (response == null) {
                throw new NacosException(SERVER_ERROR, "Unknown Exception.");
            }
            if (response instanceof ErrorResponse) {
                if (response.getErrorCode() == NacosException.UN_REGISTER) {
                    synchronized (this) {
                        waitReconnect = true;
                        if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                            LoggerUtils.printIfErrorEnabled(LOGGER,
                                    "Connection is unregistered, switch server, connectionId = {}, request = {}",
                                    currentConnection.getConnectionId(), request.getClass().getSimpleName());
                            switchServerAsync();
                        }
                    }
                    
                }
                throw new NacosException(response.getErrorCode(), response.getMessage());
            }
            // return response.
            lastActiveTimeStamp = System.currentTimeMillis();
            return response;
            
        } catch (Exception e) {
            if (waitReconnect) {
                try {
                    // wait client to reconnect.
                    Thread.sleep(Math.min(100, timeoutMills / 3));
                } catch (Exception exception) {
                    // Do nothing.
                }
            }
            
            LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
                    request, retryTimes, e.getMessage());
            
            exceptionThrow = e;
            
        }
        retryTimes++;
        
    }
    
    if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
        switchServerAsyncOnRequestFail();
    }
    
    if (exceptionThrow != null) {
        throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
                : new NacosException(SERVER_ERROR, exceptionThrow);
    } else {
        throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
    }
}
currentConnection.request()

分为两种方式的GrpcConnection。

common包下

使用guava的ListenableFuture实现远程调用

public Response request(Request request, long timeouts) throws NacosException {
    Payload grpcRequest = GrpcUtils.convert(request);
    ListenableFuture requestFuture = grpcFutureServiceStub.request(grpcRequest);
    Payload grpcResponse;
    try {
        grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }


    return (Response) GrpcUtils.parse(grpcResponse);
}

b.core包下

使用grpc的StreamObserver

public Response request(Request request, long timeoutMills) throws NacosException {
    DefaultRequestFuture pushFuture = sendRequestInner(request, null);
    try {
        return pushFuture.get(timeoutMills);
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, e);
    } finally {
        RpcAckCallbackSynchronizer.clearFuture(getmetaInfo().getConnectionId(), pushFuture.getRequestId());
    }
}


private DefaultRequestFuture sendRequestInner(Request request, RequestCallBack callBack) throws NacosException {
    final String requestId = String.valueOf(PushAckIdGenerator.getNextId());
    request.setRequestId(requestId);
    
    DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(getmetaInfo().getConnectionId(), requestId,
            callBack, () -> RpcAckCallbackSynchronizer.clearFuture(getmetaInfo().getConnectionId(), requestId));
    
    RpcAckCallbackSynchronizer.syncCallback(getmetaInfo().getConnectionId(), requestId, defaultPushFuture);
    sendRequestNoAck(request);
    return defaultPushFuture;
}


private void sendRequestNoAck(Request request) throws NacosException {
    try {
        //StreamObserver#onNext() is not thread-safe,synchronized is required to avoid direct memory leak.
        synchronized (streamObserver) {
            
            Payload payload = GrpcUtils.convert(request);
            traceIfNecessary(payload);
            streamObserver.onNext(payload);
        }
    } catch (Exception e) {
        if (e instanceof StatusRuntimeException) {
            throw new ConnectionAlreadyClosedException(e);
        }
        throw e;
    }
}
(b)http-NamingHttpClientProxy
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
            instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    final Map params = new HashMap(32);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, groupedServiceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put(IP_PARAM, instance.getIp());
    params.put(PORT_PARAM, String.valueOf(instance.getPort()));
    params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
    params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
    params.put(meta_PARAM, JacksonUtils.toJson(instance.getmetadata()));
    
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    
}


public String reqApi(String api, Map params, String method) throws NacosException {
    return reqApi(api, params, Collections.EMPTY_MAP, method);
}

public String reqApi(String api, Map params, Map body, String method)
        throws NacosException {
    return reqApi(api, params, body, serverListManager.getServerList(), method);
}


//请求api

//判断服务列表是否有域名,有-按最大重试次数进行调用,无-随机请求server,也是请求server size次数
public String reqApi(String api, Map params, Map body, List servers,
        String method) throws NacosException {
    
    params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
    
    if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {
        throw new NacosException(NacosException.INVALID_PARAM, "no server available");
    }
    
    NacosException exception = new NacosException();
    
    if (serverListManager.isDomain()) {
        String nacosDomain = serverListManager.getNacosDomain();
        for (int i = 0; i < maxRetry; i++) {
            try {
                return callServer(api, params, body, nacosDomain, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                }
            }
        }
    } else {
        Random random = new Random(System.currentTimeMillis());
        int index = random.nextInt(servers.size());
        
        for (int i = 0; i < servers.size(); i++) {
            String server = servers.get(index);
            try {
                return callServer(api, params, body, server, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", server, e);
                }
            }
            index = (index + 1) % servers.size();
        }
    }
    
    NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
            exception.getErrMsg());
    
    throw new NacosException(exception.getErrCode(),
            "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
    
}


//call server 使用NacosRestTemplate
public String callServer(String api, Map params, Map body, String curServer,
        String method) throws NacosException {
    long start = System.currentTimeMillis();
    long end = 0;
    params.putAll(getSecurityHeaders());
    params.putAll(getSpasHeaders(params.get(SERVICE_NAME_PARAM)));
    Header header = NamingHttpUtil.builderHeader();
    
    String url;
    if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
        url = curServer + api;
    } else {
        if (!InternetAddressUtil.containsPort(curServer)) {
            curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;
        }
        url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
    }
    
    try {
        HttpRestResult restResult = nacosRestTemplate
                .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
        end = System.currentTimeMillis();
        
        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
                .observe(end - start);
        
        if (restResult.ok()) {
            return restResult.getData();
        }
        if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
            return StringUtils.EMPTY;
        }
        throw new NacosException(restResult.getCode(), restResult.getMessage());
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to request", e);
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }
}

NacosRestTemplate

nacos封装的http调用工具类,api类似RestTemplate。

a.DefaultHttpClientRequest 默认使用apache的CloseableHttpClient。
b.JdkHttpClientRequest jdk http client
c.InterceptingHttpClientRequest 拦截器http client请求。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/782694.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号