接上篇博文,本篇博文主要介绍nacos-server服务端的处理,包括一下内容:
- nacos-server 如何存储服务信息nacos-server 接收 nacos-client 注册请求的处理,服务实例如何保存nacos-server 接收 nacos-client 心跳请求的处理
存储结构核心代码
// ## ServiceManager public class ServiceManager implements RecordListener服务端处理服务注册 InstanceController.register 接收到的服务注册信息 服务注册{ private final Map > serviceMap = new ConcurrentHashMap<>(); } // ## Service : 服务里面存储 clusterMap public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener { // clusterName -> Cluster private Map clusterMap = new HashMap<>(); } // ## Cluster public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable { // 存储持久节点 private Set persistentInstances = new HashSet<>(); // 存储临时节点 private Set ephemeralInstances = new HashSet<>(); }
// serviceManager.registerInstance(namespaceId, serviceName, instance);
// 注册一个AP模式的服务
// 如果服务或者集群不存在,则创建
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 创建Service: namespaceId=public; serviceName=DEFAULT_GROUP@@system
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 添加服务实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
创建 Service
// createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// namespaceId=public; serviceName=DEFAULT_GROUP@@system; cluster=null
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) { // 第一次注册为空,则创建Service对象
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
// 保存并初始化Service
// putServiceAndInit(service);
private void putServiceAndInit(Service service) throws NacosException {
// 将服务放到serviceMap中,见下
putService(service);
// 服务初始化:启动客户端心跳检测任务
service.init();
// consistencyService 是 DelegateConsistencyServiceImpl
// 分布式一致性服务,注册监听,一个持久的,一个临时的
// 持久监听key: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@system
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
// 临时监听key: com.alibaba.nacos.naming.iplist.public##DEFAULT_GROUP@@system
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
public void putService(Service service) {
// doubleCheck 方式,将Service放入到 serviceMap 中
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
// serviceMap: namespaceId -> Map(serviceName -> service )
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
添加服务实例
// 添加服务实例到Service中
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// key: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@system
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
// 构造实例列表
List instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 保存到 consistencyService 中
consistencyService.put(key, instances);
}
}
服务端接收客户端心跳上报 InstanceController.beat
Controller 接收心跳请求
// 这里比较简单了
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// clientBeatInterval: 5000
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
// beat = "";
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
// DEFAULT
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
// 192.168.31.30
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
// 8081
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
// false
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
// public
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// DEFAULT_GROUP@@system
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
// 获取实例信息
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
// 如果实例信息为空,则重新注册到注册中心
if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setmetadata(clientBeat.getmetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// ### 正式处理心跳,见下
service.processClientBeat(clientBeat);
// 处理返回信息
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsmetadata(PreservedmetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
心跳处理 service.processClientBeat(clientBeat)
// processClientBeat 构造一个ClientBeatProcessor 来处理心跳请求
public void processClientBeat(final RsInfo rsInfo) {
// 客户端心跳处理器
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
// 执行处理器
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
// ClientBeatProcessor.run
@Override
public void run() {
Service service = this.service;
String ip = rsInfo.getIp(); // 192.168.31.30
String clusterName = rsInfo.getCluster(); // DEFAULT
int port = rsInfo.getPort(); // 8081
// 获取服务下的集群信息
Cluster cluster = service.getClusterMap().get(clusterName);
// 获取集群下的所有实例
List instances = cluster.allIPs(true);
for (Instance instance : instances) { // 遍历实例
// 如果是上报心跳的实例
if (instance.getIp().equals(ip) && instance.getPort() == port) {
// 更新最后心跳上送时间
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
// 如果实例不健康了,则更新健康状况
instance.setHealthy(true);
// 暴露服务信息变更事件
getPushService().serviceChanged(service);
}
}
}
}
}



