MQ消息可靠性
队列持久化
默认生成的交换机,队列和消息都是持久化的
消费者消息确认
使用auto 目前的配置可能导致消息一直重试
失败重试机制
死信交换机
DelayExchange插件
惰性队列
MQ集群
nacos分级存储模型
nacos注册
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
// 尝试获取namespaceId
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 尝试获取serviceName,其格式为 group_name@@service_name
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
// 解析出实例信息,封装为Instance对象
final Instance instance = parseInstance(request);
// 注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表)
// 此时不包含实例信息
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
==>
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
//尝试从注册表获取服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
// 记录服务最后一次更新时间
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// 初始化空服务
putServiceAndInit(service);
==》
private void putServiceAndInit(Service service) throws NacosException {
// 把服务放入注册表
putService(service);
service = getService(service.getNamespaceId(), service.getName());
// 初始化,健康检测
service.init();
// 服务状态变更监听
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
==》
if (!local) {
addOrReplaceService(service);
}
}
==>
// 拿到创建好的service
Service service = getService(namespaceId, serviceName);
// 拿不到则抛异常
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 添加要注册的实例到service中
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
== 》
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 监听服务列表用到的key,服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 获取服务
Service service = getService(namespaceId, serviceName);
// 同步锁,避免并发修改的安全问题,同一个服务可能有多个实例,多个实例只能串行执行
synchronized (service) {
// 1)获取要更新的实例列表==》 拷贝注册表中旧的实例列表+新注册的实例 得到最终实例列表
List instanceList = addIpAddresses(service, ephemeral, ips);
// 2)封装实例列表到Instances对象
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 3)完成 注册表更新 以及 Nacos集群的数据同步
consistencyService.put(key, instances);
// 这里会判断是临时实例还是永久实例,默认是临时的,永久节点使用强一致性
==》
@Override
public void put(String key, Record value) throws NacosException {
// 更新本地注册表
onPut(key, value);
==>
public void onPut(String key, Record value) {
// 判断是不是临时实例
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
// 封装实例列表到Datum
Datum datum = new Datum<>();
datum.value = (Instances) value;
// 唯一标识
datum.key = key;
datum.timestamp.incrementAndGet();
//serviceId为Key,datum为值 缓存起来
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 把serviceId和当前操作类型存入notifer
notifier.addTask(key, DataOperation.CHANGE);
==>
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
// private BlockingQueue> tasks = new ArrayBlockingQueue<>(1024 * 1024);
//把serviceId和事件 放入阻塞队列
tasks.offer(Pair.with(datumKey, action));
==》
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
//队列放一个任务,我才更新列表
Pair pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
==》
}
==>
}
==>
// If upgrade to 2.0.X, do not sync for v1.
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
// 异步,同步给nacos其他节点
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
DistroConfig.getInstance().getSyncDelayMillis());
==》
// 通知其他节点
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
==>
}
==》
}
}



