参考
项目使用保存白名单配置,并在合适的时候更新配置
public class AggrWhitelist {
static public boolean isAggrDataId(String dataId) {
if (null == dataId) {
throw new IllegalArgumentException();
}
for (Pattern pattern : AGGR_DATAID_WHITELIST.get()) {
if (pattern.matcher(dataId).matches()) {
return true;
}
}
return false;
}
static public void load(String content) {
if (StringUtils.isBlank(content)) {
fatalLog.error("aggr dataId whitelist is blank.");
return;
}
defaultLog.warn("[aggr-dataIds] {}", content);
try {
List lines = IOUtils.readLines(new StringReader(content));
compile(lines);
} catch (Exception ioe) {
defaultLog.error("failed to load aggr whitelist, " + ioe.toString(), ioe);
}
}
static void compile(List whitelist) {
List list = new ArrayList(whitelist.size());
for (String line : whitelist) {
if (!StringUtils.isBlank(line)) {
String regex = RegexParser.regexFormat(line.trim());
list.add(Pattern.compile(regex));
}
}
AGGR_DATAID_WHITELIST.set(list);
}
static public List getWhiteList() {
return AGGR_DATAID_WHITELIST.get();
}
// =======================
static public final String AGGRIDS_METADATA = "com.alibaba.nacos.metadata.aggrIDs";
static final AtomicReference> AGGR_DATAID_WHITELIST = new AtomicReference>(
new ArrayList());
}
关键代码
private volatile V value;
//这个方法是原子操作的基本
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
public final V getAndUpdate(UnaryOperator updateFunction) {
V prev, next;
do {
prev = get();
next = updateFunction.apply(prev);
} while (!compareAndSet(prev, next));
return prev;
}
public final V getAndSet(V newValue) {
return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}
public final Object getAndSetObject(Object o, long offset, Object newValue) {
Object v;
do {
v = getObjectVolatile(o, offset);
} while (!compareAndSwapObject(o, offset, v, newValue));
return v;
}
用法——volatile+原子更新
对于一般的自增更新或者先获取值,再更新原值的操作,都会出现并发问题。volatile只能解决可见性,但是不能保证跟原值有关的更新操作的原子性。如果更新时,使用compareAndSet等操作,就可以解决这个问题。
发现项目中使用了AtomicBoolean等作为标志,但是没用compareAndSet等方法,是因为它只是为了使用volatile特性,保证可见性。
public class BankCardARTest {
private static AtomicReference bankCardRef = new AtomicReference<>(new BankCard("cxuan",100));
public static void main(String[] args) {
for(int i = 0;i < 10;i++){
new Thread(() -> {
while (true){
// 使用 AtomicReference.get 获取
final BankCard card = bankCardRef.get();
BankCard newCard = new BankCard(card.getAccountName(), card.getMoney() + 100);
// 使用 CAS 乐观锁进行非阻塞更新
if(bankCardRef.compareAndSet(card,newCard)){
System.out.println(newCard);
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
}
@FunctionalInterface
项目使用
@FunctionalInterface public interface TransactionCallback用法{ @Nullable T doInTransaction(TransactionStatus status); }
由java8引入,被@FunctionalInterface注解的接口称为函数式接口,可以使用lambda表达式。特点:
(1)接口有且仅有一个抽象方法
(2)该注解不是必须的,如果一个接口符合"函数式接口"定义,那么加不加该注解都没有影响。加上该注解能够更好地让编译器进行检查。如果编写的不是函数式接口,但是加上了@FunctionInterface,那么编译器会报错
public static void state(boolean expression, String message) {
if (!expression) {
throw new IllegalStateException(message);
}
}
public static void isTrue(boolean expression, String message) {
if (!expression) {
throw new IllegalArgumentException(message);
}
}
用法
和Assert.isTrue方法比较,只是返回的异常类型不一样
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");事件发布和监听模型+发布订阅模式 项目使用+关键代码
//事件接口
public interface Event {
}
//业务事件接口
public class ConfigDataChangeEvent implements Event {
final public boolean isBeta;
final public String dataId;
final public String group;
final public String tenant;
final public String tag;
final public long lastModifiedTs;
public ConfigDataChangeEvent(boolean isBeta, String dataId, String group, String tenant, String tag,
long gmtModified) {
if (null == dataId || null == group) {
throw new IllegalArgumentException();
}
this.isBeta = isBeta;
this.dataId = dataId;
this.group = group;
this.tenant = tenant;
this.tag = tag;
this.lastModifiedTs = gmtModified;
}
}
//父类事件监听器,提供onEvent事件监听方法
public abstract class AbstractEventListener {
public AbstractEventListener() {
EventDispatcher.addEventListener(this);
}
abstract public List> interest();
abstract public void onEvent(Event event);
}
//事件监听器的业务处理
public class AsyncNotifyService extends AbstractEventListener {
@Override
public void onEvent(Event event) {
// 并发产生 ConfigDataChangeEvent
// 通过instanceof,处理自己感兴趣的事件
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
List> ipList = serverListService.getServerList();
// 其实这里任何类型队列都可以
Queue queue = new LinkedList();
for (int i = 0; i < ipList.size(); i++) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));
}
EXECUTOR.execute(new AsyncTask(httpclient, queue));
}
}
}
// 事件与监听器绑定模型
static private class Entry {
//能处理的事件类型
final Class extends Event> eventType;
//该类型事件的监听器列表
final CopyOnWriteArrayList listeners;
Entry(Class extends Event> type) {
eventType = type;
listeners = new CopyOnWriteArrayList();
}
}
//事件分发器。简单理解其结构为List
用法
事件发布和监听模型(发布订阅模式),主要是用来做异步、解耦的,使得事件的发布者和订阅者完全解耦。有时如果是异步的,则可通过队列、线程池来做异步。
CloseableHttpAsyncClient订阅模式,是观察者模式的一个别称。但是经过时间的沉淀,已经独立于观察者模式,成为另外一种不同的设计模式。在现在的发布订阅模式中,称为发布者的消息发送者不会将消息直接发送给订阅者,这意味着发布者和订阅者不知道彼此的存在。在发布者和订阅者之间存在第三个组件,称为调度中心或事件通道,它维持着发布者和订阅者之间的联系,过滤所有发布者传入的消息并相应地分发它们给订阅者。
举一个例子,你在微博上关注了A,同时其他很多人也关注了A,那么当A发布动态的时候,微博就会为你们推送这条动态。A就是发布者,你是订阅者,微博就是调度中心,你和A是没有直接的消息往来的,全是通过微博来协调的(你的关注,A的发布动态)。
普通httpclient发出请求之后,连接会等到服务端返回信息;而http异步请求,则多个请求可以共用一个连接,请求放到缓冲区,哪个请求有返回就处理哪个。通过提供回调方法,完成响应的处理。
ThreadFactory通过线程工厂,给线程设置名字,设置线程为守护线程
static class NotifyThreadFactory implements ThreadFactory {
private final AtomicInteger count = new AtomicInteger(1);
private String threadNamePrefix = "com.alibaba.nacos.AsyncNotifyServiceThread";
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName(threadNamePrefix + count.getAndIncrement());
return thread;
}
}
守护线程
根据jdk的注释,当所有正在运行的线程都是用户线程时,那么jvm会自动退出,自然守护线程也会自动关闭。
创建线程池时,有时可以通过把线程设置为守护线程,来隐式的关闭线程池。
//失败任务重试时,时间指数增加,以免断网场景不断重试无效任务,影响正常同步
private static int getDelayTime(NotifySingleTask task) {
int failCount = task.getFailCount();
int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
if (failCount <= MAX_COUNT) {
task.setFailCount(failCount + 1);
}
return delay;
}
隐式调用构造方法,并通过this完成bean的初始化
项目代码
//父类
public abstract class AbstractEventListener {
public AbstractEventListener() {
EventDispatcher.addEventListener(this);
}
abstract public List> interest();
}
//子类
public class AsyncNotifyService extends AbstractEventListener {
@Override
public List> interest() {
List> types = new ArrayList>();
// 触发配置变更同步通知
types.add(ConfigDataChangeEvent.class);
return types;
}
@Autowired
public AsyncNotifyService(ServerListService serverListService) {
this.serverListService = serverListService;
httpclient.start();
}
}
父类构造方法调用
参考
如果父类没有显式的构造方法,那么它会有一个默认的无参构造方法;
但是一旦有显式的构造方法,不管有参还是无参,则不会有默认的无参构造方法;
调用子类构造方法时,默认会先调用父类的无参构造方法,也可以通过super显式的调用父类构造方法。
父类构造方法被调用时,this参数传递的是子类的对象。
因为bean的生命周期里,是先设置属性(@Autowired注入、@Value注入),后调用初始化后置方法。如果不这样写,通常是用new Service()手工管理,来保证单例,而不通过spring管理,会没那么优雅。
项目代码@Service
public class DumpService {
@Autowired
PersistService persistService;
@PostConstruct
public void init() {
//这里this,会把DumpService这个bean传给DumpProcessor的
DumpProcessor processor = new DumpProcessor(this);
}
//通过dumpService.persistService,调用persistService的方法
class DumpProcessor implements TaskProcessor {
DumpProcessor(DumpService dumpService) {
this.dumpService = dumpService;
}
final DumpService dumpService;
@Override
public boolean process(String taskType, AbstractTask task) {
ConfigInfo4Beta cf = dumpService.persistService.findConfigInfo4Beta(dataId, group, tenant);
...}
}
大量数据分页加载
项目代码
//先查出最大id,因为id是主键,所以很快
long currentMaxId = persistService.findConfigMaxId();
long lastMaxId = 0;
while (lastMaxId < currentMaxId) {
Page page = persistService.findAllConfigInfoFragment(lastMaxId,
PAGE_SIZE);
//ELECT id,data_id,group_id,tenant_id,app_name,content,md5,gmt_modified from config_info where id > ? order by id asc limit ?,?
if (page != null && page.getPageItems() != null) {
for (PersistService.ConfigInfoWrapper cf : page.getPageItems()) {
long id = cf.getId();
//更新lastMaxId
lastMaxId = id > lastMaxId ? id : lastMaxId;
//业务处理
} else {
lastMaxId += PAGE_SIZE;
}
}
架构
配置中心
Nacos配置中心集群中,没有主从之分,各个节点是对等的。通过增加虚拟ip(viper)的方式,实现高可用。
Nacos的数据存储分为两部分:
(1)所有节点共用的mysql。Mysql是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的
(2)每个节点都会在内存和磁盘维护一份数据。当节点启动时,把全量的mysql数据加载到磁盘。
(1)接收到更新配置的http请求后,节点把配置更新到数据库后,直接返回true,更新成功
(2)以namespace+group+dataId作为key,通过事件分发器发布配置变更事件
(3)事件分发器触发该事件对应的所有listener的处理方法
a.获取配置集群的所有节点的ip端口
b.为每个节点生成一个配置变更任务,并放到队列里
c.由一个调度线程池处理队列中的任务:只要队列不为空,就从队列poll任务,发送http异步请求节点的dataChange接口,并注册回调方法。如果请求失败,把失败任务放入一个新队列,使用调度线程池延迟一定时间后再执行这个队列的任务进行重试。延迟时间会根据失败次数指数增加
d.可以看出,节点之间通过http异步的同步,数据只保证最终一致性
(4)其他节点(包括自身)收到dataChange通知后,发布一个dump任务,讲任务放到内存ConcurrentHashMap中
(5)有一个dump线程,通过死循环+sleep 100ms的方式,处理这个任务map集
(6)循环处理map中的任务:
a.从数据库中查出任务的配置
b.讲配置内容的md5和内存中该配置的md5相比较。此处内存使用的是ConcurrentHashMap,如果内存中不存在,md5就用null比较
c.md5相同,则忽略此次更新;否则把数据更新到磁盘文件
d.更新内存的md5,内存这里只保存md5,无需更新内容
e.发布本地数据变更事件。这里应该是一个扩展点,因为没有listener
(7)如果任务处理失败,设置任务的上次处理时间,把任务再加回任务map中,等待下次处理
启动初始化流程这里充分利用了异步。在配置变更那里,通过把任务放入队列,并用调度线程池触发+调度,完成异步。在dataChange那里,把任务放到内存map中,通过一个线程循环处理,来异步处理
容错:通过重试容错;如果节点宕机,那么加入到内存中的任务就没了。但是节点启动时,会把mysql中的所有数据再通过任务的方式更新到磁盘和内存,就解决了这个问题
(1)分页查询配置心虚,循环处理配置信息,跟上面的流程一样,更新md5,再更新磁盘
(2)调度线程池每隔6小时,dump一次全量数据到磁盘,作为保底
(3)加载节点的ip端口信息。如果是单机启动,就只加载本机,否则从cluster.conf文件中读取所有节点的ip端口进行加载。并且用调度线程池每隔5s,更新一次配置
(4)同时每隔5秒,对上面加载的节点都通过异步http调用一次健康检查。回调中,根据检查结果来更新节点的失败次数,并根据失败次数判断是否从健康节点集中剔除
(1)配置中心获取配置,如果是单机模式,从mysql获取,否则从磁盘文件获取。可以看出不保证实时一致性
(2)



