一、前言二、源码概览三、数据模型四、服务领域模型
1、Model
(1)Service(2)Cluster(3)Instance 2、核心流程
(1)create service(2)register instance
(a)grpc-NamingGrpcClientProxy
RpcClientcurrentConnection.request() (b)http-NamingHttpClientProxy
一、前言 二、源码概览address模块: 主要查询nacos集群中节点个数以及IP的列表。
api模块: 主要给客户端调用的api接口的抽象。
common模块: 主要是通用的工具包和字符串常量的定义。
client模块: 主要是对依赖api模块和common模块,对api的接口的实现,给nacos的客户端使用。
cmdb模块: 主要是操作的数据的存储在内存中,该模块提供一个查询数据标签的接口。
config模块: 主要是服务配置的管理, 提供api给客户端拉去配置信息,以及提供更新配置
的,客户端通过长轮询的更新配置信息.数据存储是mysql。
naming模块: 主要是作为服务注册中心的实现模块,具备服务的注册和服务发现的功能。
console模块: 主要是实现控制台的功能.具有权限校验、服务状态、健康检查等功能。
core模块: 主要是实现Spring的PropertySource的后置处理器,用于加载nacos的default的配置信息。
distribution模块: 主要是打包nacos-server的操作,使用maven-assembly-plugin进行自定义打包。
核心功能项目:
配置管理 ——nacos-config
服务注册与发现——nacos-naming
nacos服务基本分为这几个级别:Service、Cluster、Instance。如上图所示。
下面就分别介绍下这三种实体类的重要参数,其中,pojo包下是基本定义,core包中的实体做了额外的扩展。
(1)Servicepojo下的Service
namespace+group+name
public class Service implements Serializable {
private static final long serialVersionUID = -990509089519499344L;
private final String namespace;
private final String group;
private final String name;
//暂时的,短暂的
private final boolean ephemeral;
private final AtomicLong revision;
private long lastUpdatedTime;
//如何判断两个service相同
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Service)) {
return false;
}
Service service = (Service) o;
return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
}
}
(2)Cluster
pojo包下
//属于哪个service private String serviceName; //默认端口 private int defaultPort = 80;
core包下
//实例集合 @JsonIgnore private Set(3)InstancepersistentInstances = new HashSet<>(); @JsonIgnore private Set ephemeralInstances = new HashSet<>(); //name相同 public boolean equals(Object obj) { if (!(obj instanceof Cluster)) { return false; } return getName().equals(((Cluster) obj).getName()); } //把check任务交给scheduleExecutor异步执行。 public void init() { if (inited) { return; } checkTask = new HealthCheckTask(this); HealthCheckReactor.scheduleCheck(checkTask); inited = true; }
ephemeral参数用来区分实例时暂时性的还是持久的。实例的唯一id instanceId 生成规则:simple和snowflake。默认为simple。为了方便查询,instance中冗余了serviceName、clusterName等。
private String instanceId; private String ip; private int port; private double weight = 1.0D; private boolean healthy = true; private boolean enabled = true; private boolean ephemeral = true; private String clusterName; private String serviceName; private Mapmetadata = new HashMap (); //生成唯一id,两种方式:simple、snowflake public String generateInstanceId(Set currentInstanceIds) { String instanceIdGenerator = getInstanceIdGenerator(); if (Constants.SNOWFLAKE_INSTANCE_ID_GENERATOR.equalsIgnoreCase(instanceIdGenerator)) { return generateSnowflakeInstanceId(currentInstanceIds); } else { return generateInstanceId(); } }
雪花算法 SnowFlowerIdGenerator
public synchronized long nextId() {
long currentMillis = currentTimeMillis();
if (this.lastTime == currentMillis) {
if (0L == (this.sequence = ++this.sequence & 4095L)) {
currentMillis = this.waitUntilNextTime(currentMillis);
}
} else {
this.sequence = 0L;
}
this.lastTime = currentMillis;
if (logger.isDebugEnabled()) {
logger.debug("{}-{}-{}", (new SimpleDateFormat(DATETIME_PATTERN)).format(new Date(this.lastTime)),
workerId, this.sequence);
}
currentId = currentMillis - EPOCH << 22 | workerId << 12 | this.sequence;
return currentId;
}
这里从几个核心功能开始介绍:
2、核心流程 (1)create serviceNamingFactory提供了两种创建命名服务的方法,根据server list和Properties。
public static NamingService createNamingService(String serverList) throws NacosException {
try {
Class> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(String.class);
return (NamingService) constructor.newInstance(serverList);
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
Class> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
return (NamingService) constructor.newInstance(properties);
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
主要通过反射,根据NacosNamingService类创建服务。
(2)register instanceNacosNamingService,通过NamingClientProxy实现。
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
clientProxy.registerService(serviceName, groupName, instance);
}
NamingClientProxy:
提供了一系列服务操作的接口。包括服务CRUD、实例CRUD、获取服务列表、订阅/取消订阅、健康检查、更新心跳信息等。
NamingClientProxyDelegate:
提供了代理类NamingHttpClientProxy和NamingGrpcClientProxy分别实现http、grpc两种方式,来实现接口中的方法。
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
//判断实例是否是短暂的,是-走grpc,否则走http。
private NamingClientProxy getExecuteClientProxy(Instance instance) {
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
(a)grpc-NamingGrpcClientProxy
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
doRegisterService(serviceName, groupName, instance);
}
//缓存一份,key:serviceName+groupName,value:data
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
synchronized (registeredInstances) {
registeredInstances.put(key, redoData);
}
}
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
redoService.instanceRegistered(serviceName, groupName);
}
private T requestToServer(AbstractNamingRequest request, Class responseClass)
throws NacosException {
try {
request.putAllHeader(getSecurityHeaders());
request.putAllHeader(getSpasHeaders(
NamingUtils.getGroupedNameOptional(request.getServiceName(), request.getGroupName())));
Response response =
requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
throw new NacosException(response.getErrorCode(), response.getMessage());
}
if (responseClass.isAssignableFrom(response.getClass())) {
return (T) response;
}
NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
response.getClass().getName(), responseClass.getName());
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
}
throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
//private final ConcurrentMap registeredInstances = new ConcurrentHashMap<>();
//将实例数据保存到map中
public void instanceRegistered(String serviceName, String groupName) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
synchronized (registeredInstances) {
InstanceRedoData redoData = registeredInstances.get(key);
if (null != redoData) {
redoData.setRegistered(true);
}
}
}
RpcClient
public Response request(Request request, long timeoutMills) throws NacosException {
int retryTimes = 0;
Response response;
Exception exceptionThrow = null;
long start = System.currentTimeMillis();
while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
waitReconnect = true;
throw new NacosException(NacosException.CLIENT_DISCONNECT,
"Client not connected, current status:" + rpcClientStatus.get());
}
response = this.currentConnection.request(request, timeoutMills);
if (response == null) {
throw new NacosException(SERVER_ERROR, "Unknown Exception.");
}
if (response instanceof ErrorResponse) {
if (response.getErrorCode() == NacosException.UN_REGISTER) {
synchronized (this) {
waitReconnect = true;
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Connection is unregistered, switch server, connectionId = {}, request = {}",
currentConnection.getConnectionId(), request.getClass().getSimpleName());
switchServerAsync();
}
}
}
throw new NacosException(response.getErrorCode(), response.getMessage());
}
// return response.
lastActiveTimeStamp = System.currentTimeMillis();
return response;
} catch (Exception e) {
if (waitReconnect) {
try {
// wait client to reconnect.
Thread.sleep(Math.min(100, timeoutMills / 3));
} catch (Exception exception) {
// Do nothing.
}
}
LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
request, retryTimes, e.getMessage());
exceptionThrow = e;
}
retryTimes++;
}
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsyncOnRequestFail();
}
if (exceptionThrow != null) {
throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
: new NacosException(SERVER_ERROR, exceptionThrow);
} else {
throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
}
}
currentConnection.request()
分为两种方式的GrpcConnection。
common包下
使用guava的ListenableFuture实现远程调用
public Response request(Request request, long timeouts) throws NacosException {
Payload grpcRequest = GrpcUtils.convert(request);
ListenableFuture requestFuture = grpcFutureServiceStub.request(grpcRequest);
Payload grpcResponse;
try {
grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, e);
}
return (Response) GrpcUtils.parse(grpcResponse);
}
b.core包下
使用grpc的StreamObserver
public Response request(Request request, long timeoutMills) throws NacosException {
DefaultRequestFuture pushFuture = sendRequestInner(request, null);
try {
return pushFuture.get(timeoutMills);
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, e);
} finally {
RpcAckCallbackSynchronizer.clearFuture(getmetaInfo().getConnectionId(), pushFuture.getRequestId());
}
}
private DefaultRequestFuture sendRequestInner(Request request, RequestCallBack callBack) throws NacosException {
final String requestId = String.valueOf(PushAckIdGenerator.getNextId());
request.setRequestId(requestId);
DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(getmetaInfo().getConnectionId(), requestId,
callBack, () -> RpcAckCallbackSynchronizer.clearFuture(getmetaInfo().getConnectionId(), requestId));
RpcAckCallbackSynchronizer.syncCallback(getmetaInfo().getConnectionId(), requestId, defaultPushFuture);
sendRequestNoAck(request);
return defaultPushFuture;
}
private void sendRequestNoAck(Request request) throws NacosException {
try {
//StreamObserver#onNext() is not thread-safe,synchronized is required to avoid direct memory leak.
synchronized (streamObserver) {
Payload payload = GrpcUtils.convert(request);
traceIfNecessary(payload);
streamObserver.onNext(payload);
}
} catch (Exception e) {
if (e instanceof StatusRuntimeException) {
throw new ConnectionAlreadyClosedException(e);
}
throw e;
}
}
(b)http-NamingHttpClientProxy
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
final Map params = new HashMap(32);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, groupedServiceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put(IP_PARAM, instance.getIp());
params.put(PORT_PARAM, String.valueOf(instance.getPort()));
params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
params.put(meta_PARAM, JacksonUtils.toJson(instance.getmetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
public String reqApi(String api, Map params, String method) throws NacosException {
return reqApi(api, params, Collections.EMPTY_MAP, method);
}
public String reqApi(String api, Map params, Map body, String method)
throws NacosException {
return reqApi(api, params, body, serverListManager.getServerList(), method);
}
//请求api
//判断服务列表是否有域名,有-按最大重试次数进行调用,无-随机请求server,也是请求server size次数
public String reqApi(String api, Map params, Map body, List servers,
String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (serverListManager.isDomain()) {
String nacosDomain = serverListManager.getNacosDomain();
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
exception.getErrMsg());
throw new NacosException(exception.getErrCode(),
"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
//call server 使用NacosRestTemplate
public String callServer(String api, Map params, Map body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
params.putAll(getSecurityHeaders());
params.putAll(getSpasHeaders(params.get(SERVICE_NAME_PARAM)));
Header header = NamingHttpUtil.builderHeader();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!InternetAddressUtil.containsPort(curServer)) {
curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
HttpRestResult restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
NacosRestTemplate
nacos封装的http调用工具类,api类似RestTemplate。
a.DefaultHttpClientRequest 默认使用apache的CloseableHttpClient。
b.JdkHttpClientRequest jdk http client
c.InterceptingHttpClientRequest 拦截器http client请求。



