nacos服务端使用内存方式存储服务实例时,底层采用异步+阻塞队列的方式实现服务的注册。当服务注册时,把服务实例数据写入阻塞队列,返回注册成功,然后异步的从阻塞队列获取实例数据进行注册,其实现流程如下:
nacos服务端提供的restfull API接口为/v1/ns/instance,controller源码如下:
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_ConTEXT + UtilsAndCommons.NACOS_NAMING_INSTANCE_CONTEXT)
public class InstanceController {
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}
}
@Component
public class InstanceOperatorServiceImpl implements InstanceOperator {
@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
com.alibaba.nacos.naming.core.Instance coreInstance = parseInstance(instance);
serviceManager.registerInstance(namespaceId, serviceName, coreInstance);
}
}
@Component
public class ServiceManager implements RecordListener {
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
checkServiceIsNull(service, namespaceId, serviceName);
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);
}
}
}
在服务注册中心,为了提高效率,一般都是采用内存不持久化的方式存储服务实例,EphemeralConsistencyService接口定义了该种方式的存储实现。实现代码如下:
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
// If upgrade to 2.0.X, do not sync for v1.
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
DistroConfig.getInstance().getSyncDelayMillis());
}
}
添加服务实例到阻塞队列的实现逻辑在onPut(String key, Record value)方法中,服务实例存储在ConcurrentHashMap中,注册的数据存储在阻塞队列ArrayBlockingQueue中。其源码如下:
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
public class Notifier implements Runnable {
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
}
从阻塞队列异步获取服务实例,进行数据实例数据的新增,源码如下:
public class Notifier implements Runnable {
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
private void handle(Pair pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
nacos如何剔除无效的服务实例?
nacos采用客户端主动上报,告诉服务端自己健康状态。对于健康检查机制采用了 TTL(Time To Live)机制,即客户端在 一定时间没有向注册中心发送心跳,那么注册中心会认为此服务不健康,进而触发后续的剔除逻辑。



