栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Mybatis学习之路(四)Mybatis源码分析三

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Mybatis学习之路(四)Mybatis源码分析三

文章目录
    • 一、缓存机制
      • 1.1 缓存类
        • 1.1.1 PerpetualCache
        • 1.1.2 LruCache
        • 1.1.3 BlockingCache
      • 1.2 CacheKey
      • 1.3 一级缓存
      • 1.4 二级缓存
    • 二、插件机制
      • 2.1 插件机制原理
        • 2.1.1 植入插件逻辑
        • 2.1.2 执行插件逻辑
      • 2.2 实现一个分页插件

本系列文章:
  Mybatis学习之路(一)理论基础和使用介绍
  Mybatis学习之路(二)Mybatis源码分析一
  Mybatis学习之路(三)Mybatis源码分析二
  Mybatis学习之路(四)Mybatis源码分析三

一、缓存机制

  通常我们都会用 Redis 或 memcached 等缓存中间件,拦截大量奔向数据库的请求,以减轻数据库压力。MyBatis自然也在内部提供了相应的支持。通过在框架层面增加缓存功能,可减轻数据库的压力,同时又可以提升查询速度,可谓一举两得。MyBatis 缓存结构由一级缓存和二级缓存构成,这两级缓存均是使用 Cache 接口的实现类。

1.1 缓存类

  在 MyBatis 中,Cache 是缓存接口,定义了一些基本的缓存操作。MyBatis 内部提供了丰富的缓存实现类,比如具有基本缓存功能的PerpetualCache ,具有 LRU 策略的缓存 LruCache ,以及可保证线程安全的缓存SynchronizedCache 和具备阻塞功能的缓存 BlockingCache 等。
  MyBatis 在实现缓存模块的过程中,使用了装饰模式。

1.1.1 PerpetualCache

  PerpetualCache(位于org.apache.ibatis.cache.impl) 是一个具有基本功能的缓存类,内部使用了 HashMap 实现缓存功能。

public class PerpetualCache implements Cache {

  	private final String id;

  	private final Map cache = new HashMap<>();

  	public PerpetualCache(String id) {
    	this.id = id;
  	}

  	@Override
  	public String getId() {
    	return id;
  	}

  	@Override
  	public int getSize() {
    	return cache.size();
  	}

  	@Override
  	public void putObject(Object key, Object value) {
    	// 存储键值对到 HashMap
    	cache.put(key, value);
  	}

  	@Override
  	public Object getObject(Object key) {
    	// 查找缓存项
    	return cache.get(key);
  	}

  	@Override
  	public Object removeObject(Object key) {
    	// 移除缓存项
    	return cache.remove(key);
  	}

  	@Override
  	public void clear() {
    	cache.clear();
  	}

  	@Override
  	public boolean equals(Object o) {
    	if (getId() == null) {
      		throw new CacheException("Cache instances require an ID.");
    	}
    	if (this == o) {
      		return true;
    	}
    	if (!(o instanceof Cache)) {
      		return false;
    	}

    	Cache otherCache = (Cache) o;
    	return getId().equals(otherCache.getId());
  	}

  	@Override
  	public int hashCode() {
    	if (getId() == null) {
      		throw new CacheException("Cache instances require an ID.");
    	}
    	return getId().hashCode();
  	}
}
1.1.2 LruCache

  LruCache(位于org.apache.ibatis.cache.decorators),顾名思义,是一种具有 LRU 策略的缓存实现类。

public class LruCache implements Cache {

  	private final Cache delegate;
  	private Map keyMap;
  	private Object eldestKey;

  	public LruCache(Cache delegate) {
   	 	this.delegate = delegate;
    	setSize(1024);
  	}

  	@Override
  	public String getId() {
    	return delegate.getId();
  	}

  	@Override
  	public int getSize() {
    	return delegate.getSize();
  	}

  	public void setSize(final int size) {
    	// 初始化 keyMap,注意,keyMap 的类型继承自 linkedHashMap,
		// 并覆盖了 removeEldestEntry 方法
    	keyMap = new linkedHashMap(size, .75F, true) {
      		private static final long serialVersionUID = 4267176411845948333L;
      		// 覆盖 linkedHashMap 的 removeEldestEntry 方法
      		@Override
      		protected boolean removeEldestEntry(Map.Entry eldest) {
        		boolean tooBig = size() > size;
        		if (tooBig) {
          			// 获取将要被移除缓存项的键值
          			eldestKey = eldest.getKey();
        		}
        		return tooBig;
      		}
    	};
  	}

  	@Override
  	public void putObject(Object key, Object value) {
    	// 存储缓存项
    	delegate.putObject(key, value);
    	cycleKeyList(key);
  	}

  	@Override
  	public Object getObject(Object key) {
    	// 刷新 key 在 keyMap 中的位置
    	keyMap.get(key); // touch
    	// 从被装饰类中获取相应缓存项
    	return delegate.getObject(key);
  	}

  	@Override
  	public Object removeObject(Object key) {
    	// 从被装饰类中移除相应的缓存项
    	return delegate.removeObject(key);
  	}

  	@Override
  	public void clear() {
    	delegate.clear();
    	keyMap.clear();
  	}

  	private void cycleKeyList(Object key) {
    	// 存储 key 到 keyMap 中
    	keyMap.put(key, key);
    	if (eldestKey != null) {
      		// 从被装饰类中移除相应的缓存项
      		delegate.removeObject(eldestKey);
      		eldestKey = null;
    	}
  	}
}

  LruCache 的 keyMap 属性是实现 LRU 策略的关键,该属性类型继承自linkedHashMap,并覆盖了 removeEldestEntry 方法。linkedHashMap 可保持键值对的插入顺序,当插入一个新的键值对时,linkedHashMap 内部的 tail 节点会指向最新插入的节点。head 节点则指向第一个被插入的键值对,也就是最久未被访问的那个键值对。默认情况下,linkedHashMap 仅维护键值对的插入顺序。若要基于 linkedHashMap 实现 LRU 缓存,还需通过构造方法将 linkedHashMap 的 accessOrder 属性设为 true,此时 linkedHashMap会维护键值对的访问顺序。比如,上面代码中 getObject 方法中执行了这样一句代码keyMap.get(key) ,目的是刷新 key 对应的键值对在 linkedHashMap 的位置。
  linkedHashMap 会将 key 对应的键值对移动到链表的尾部,尾部节点表示最久刚被访问过或者插入的节点。除了需将 accessOrder 设为 true,还需覆盖 removeEldestEntry 方法。linkedHashMap 在插入新的键值对时会调用该方法,以决定是否在插入新的键值对后,移除老的键值对。在上面的代码中,当被装饰类的容量超出了 keyMap 的所规定的容量(由构造方法传入)后,keyMap 会移除最长时间未被访问的键,并将该键保存到 eldestKey 中,然后由 cycleKeyList 方法将 eldestKey 传给被装饰类的 removeObject 方法,移除相应的缓存项目。

1.1.3 BlockingCache

  BlockingCache(位于org.apache.ibatis.cache.decorators) 实现了阻塞特性,该特性是基于 Java 重入锁实现的。同一时刻下,BlockingCache 仅允许一个线程访问指定 key 的缓存项,其他线程将会被阻塞住。

public class BlockingCache implements Cache {

  	private long timeout;
  	private final Cache delegate;
  	private final ConcurrentHashMap locks;

  	public BlockingCache(Cache delegate) {
    	this.delegate = delegate;
    	this.locks = new ConcurrentHashMap<>();
  	}

  	@Override
  	public String getId() {
    	return delegate.getId();
  	}

  	@Override
  	public int getSize() {
    	return delegate.getSize();
  	}	

  	@Override
  	public void putObject(Object key, Object value) {
    	try {
      		// 存储缓存项
      		delegate.putObject(key, value);
    	} finally {
      		// 释放锁
      		releaseLock(key);
    	}
  	}

  	@Override
  	public Object getObject(Object key) {
    	// 请求锁
    	acquireLock(key);
    	Object value = delegate.getObject(key);
    	// 若缓存命中,则释放锁。需要注意的是,未命中则不释放锁
    	if (value != null) {
      		// 释放锁
      		releaseLock(key);
    	}
   	 	return value;
  	}

  	@Override
  	public Object removeObject(Object key) {
    	// 释放锁
    	releaseLock(key);
    	return null;
  	}

  	@Override
  	public void clear() {
    	delegate.clear();
  	}

  	private void acquireLock(Object key) {
    	CountDownLatch newLatch = new CountDownLatch(1);
    	while (true) {
      		CountDownLatch latch = locks.putIfAbsent(key, newLatch);
      		if (latch == null) {
        		break;
      		}
      		try {
        		if (timeout > 0) {
          			boolean acquired = latch.await(timeout, TimeUnit.MILLISECONDS);
          			if (!acquired) {
            			throw new CacheException(
                			"Couldn't get a lock in " + timeout + " for the key " + key + " at the cache " + delegate.getId());
          				}
        			} else {
          				latch.await();
        			}
      		} catch (InterruptedException e) {
        		throw new CacheException("Got interrupted while trying to acquire lock for key " + key, e);
      		}
    	}
  	}

  	private void releaseLock(Object key) {
    	CountDownLatch latch = locks.remove(key);
    	if (latch == null) {
      		throw new IllegalStateException("Detected an attempt at releasing unacquired lock. This should never happen.");
    	}
    	latch.countDown();
  	}

  	public long getTimeout() {
    	return timeout;
  	}

  	public void setTimeout(long timeout) {
    	this.timeout = timeout;
  	}
}

  在查询缓存时,getObject 方法会先获取与 key 对应的锁,并加锁。若缓存命中,getObject 方法会释放锁,否则将一直锁定。getObject 方法若返回 null,表示缓存未命中。此时 MyBatis 会向数据库发起查询请求,并调用 putObject 方法存储查询结果。此时,putObject 方法会将指定 key 对应的锁进行解锁,这样被阻塞的线程即可恢复运行。

1.2 CacheKey

   MyBatis 中,引入缓存的目的是为提高查询效率,降低数据库压力。value 的内容是 SQL 的查询结果,key是一种复合对象,能涵盖可影响查询结果的因子。在 MyBatis 中,这种复合对象就是 CacheKey(位于org.apache.ibatis.cache)。

public class CacheKey implements Cloneable, Serializable {
  	private static final int DEFAULT_MULTIPLIER = 37;
  	private static final int DEFAULT_HASHCODE = 17;
  	// 乘子,默认为 37
  	private final int multiplier;
  	// CacheKey 的 hashCode,综合了各种影响因子
  	private int hashcode;
  	// 校验和
  	private long checksum;
  	// 影响因子个数
  	private int count;
  	// 影响因子集合
  	private List updateList;

  	public CacheKey() {
   	 	this.hashcode = DEFAULT_HASHCODE;
    	this.multiplier = DEFAULT_MULTIPLIER;
    	this.count = 0;
    	this.updateList = new ArrayList<>();
  	}
}
 

  除了 multiplier 是恒定不变的 ,其他变量将在更新操作中被修改。

  	
  	public void update(Object object) {
    	int baseHashCode = object == null ? 1 : ArrayUtil.hashCode(object);
    	// 自增 count
    	count++;
    	// 计算校验和
    	checksum += baseHashCode;
    	// 更新 baseHashCode
    	baseHashCode *= count;
    	// 计算 hashCode
    	hashcode = multiplier * hashcode + baseHashCode;
    	// 保存影响因子
    	updateList.add(object);
  	}

  当不断有新的影响因子参与计算时,hashcode 和 checksum 将会变得愈发复杂和随机。这样可降低冲突率,使 CacheKey 可在缓存中更均匀的分布。CacheKey 最终要作为键存入HashMap,因此它需要覆盖 equals 和 hashCode 方法。下面我们来看一下这两个方法的实现。

  	public boolean equals(Object object) {
    	// 检测是否为同一个对象
    	if (this == object) {
      		return true;
    	}
    	// 检测 object 是否为 CacheKey
    	if (!(object instanceof CacheKey)) {
      		return false;
    	}

    	final CacheKey cacheKey = (CacheKey) object;
    	// 检测 hashCode 是否相等
    	if (hashcode != cacheKey.hashcode) {
      		return false;
    	}
    	// 检测校验和是否相同
    	if (checksum != cacheKey.checksum) {
      		return false;
    	}
    	// 检测 coutn 是否相同
    	if (count != cacheKey.count) {
      		return false;
    	}
    	// 如果上面的检测都通过了,下面分别对每个影响因子进行比较
    	for (int i = 0; i < updateList.size(); i++) {
      		Object thisObject = updateList.get(i);
      		Object thatObject = cacheKey.updateList.get(i);
      		if (!ArrayUtil.equals(thisObject, thatObject)) {
        		return false;
      		}
    	}
    	return true;
  	}	

  	public int hashCode() {
    	// 返回 hashcode 变量
    	return hashcode;
  	}

  equals 方法的检测逻辑比较严格,对 CacheKey 中多个成员变量进行了检测,已保证两者相等。hashCode 方法比较简单,返回 hashcode 变量即可。

1.3 一级缓存

  在进行数据库查询之前,MyBatis 首先会检查以及缓存中是否有相应的记录,若有的话直接返回即可。一级缓存是数据库的最后一道防护,若一级缓存未命中,查询请求将落到数据库上。一级缓存是在 baseExecutor 被初始化的:

public abstract class baseExecutor implements Executor {
	protected PerpetualCache localCache;
	// 省略其他字段

	protected baseExecutor(Configuration configuration,Transaction transaction) {
		this.localCache = new PerpetualCache("LocalCache");
		// 省略其他字段初始化方法
 	}
}

  一级缓存的类型为 PerpetualCache,没有被其他缓存类装饰过。一级缓存所存储从查询结果会在 MyBatis 执行更新操作(INSERT/UPDATE/DELETE),以及提交和回滚事务时被清空。下面我们来看一下访问一级缓存的逻辑。

  	public  List query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    	BoundSql boundSql = ms.getBoundSql(parameter);
    	// 创建 CacheKey
    	CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
    	return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
  	}

  	public  List query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    	ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
    	if (closed) {
      		throw new ExecutorException("Executor was closed.");
    	}
    	if (queryStack == 0 && ms.isFlushCacheRequired()) {
      		clearLocalCache();
    	}
    	List list;
    	try {
      		queryStack++;
      		// 查询一级缓存
      		list = resultHandler == null ? (List) localCache.getObject(key) : null;
      		if (list != null) {
        		// 存储过程相关逻辑
        		handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
      		} else {
        		// 缓存未命中,则从数据库中查询
        		list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
      		}
    	} finally {
      		queryStack--;
    	}
    	if (queryStack == 0) {
      		for (DeferredLoad deferredLoad : deferredLoads) {
        		deferredLoad.load();
      		}
      		// issue #601
      		deferredLoads.clear();
      		if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
        		// issue #482
        		clearLocalCache();
      		}
    	}
    	return list;
  	}

  如上,在访问一级缓存之前,MyBatis 首先会调用 createCacheKey 方法创建 CacheKey:

  	public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    	if (closed) {
      		throw new ExecutorException("Executor was closed.");
    	}
    	// 创建 CacheKey 对象
    	CacheKey cacheKey = new CacheKey();
    	// 将 MappedStatement 的 id 作为影响因子进行计算
    	cacheKey.update(ms.getId());
    	// RowBounds 用于分页查询,下面将它的两个字段作为影响因子进行计算
    	cacheKey.update(rowBounds.getOffset());
    	cacheKey.update(rowBounds.getLimit());
    	// 获取 sql 语句,并进行计算
    	cacheKey.update(boundSql.getSql());
    	List parameterMappings = boundSql.getParameterMappings();
   	 	TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
    	// mimic DefaultParameterHandler logic
    	for (ParameterMapping parameterMapping : parameterMappings) {
      		if (parameterMapping.getMode() != ParameterMode.OUT) {
        		Object value;
        		// 当前大段代码用于获取 SQL 中的占位符 #{xxx} 对应的运行时参数
        		String propertyName = parameterMapping.getProperty();
        		if (boundSql.hasAdditionalParameter(propertyName)) {
          			value = boundSql.getAdditionalParameter(propertyName);
        		} else if (parameterObject == null) {
          			value = null;
        		} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
          			value = parameterObject;
        		} else {
          			metaObject metaObject = configuration.newmetaObject(parameterObject);
          			value = metaObject.getValue(propertyName);
        		}
        		// 让运行时参数参与计算
        		cacheKey.update(value);
      		}
    	}
    	if (configuration.getEnvironment() != null) {
      		// 获取 Environment id 遍历,并让其参与计算
      		cacheKey.update(configuration.getEnvironment().getId());
    	}
    	return cacheKey;
  	}

  在计算 CacheKey 的过程中,有很多影响因子参与了计算。比如 MappedStatement 的id 字段,SQL 语句,分页参数,运行时变量,Environment 的 id 字段等。通过让这些影响因子参与计算,可以很好的区分不同查询请求。所以,我们可以简单的把 CacheKey 看做是一个查询请求的 id。有了 CacheKey,我们就可以使用它读写缓存了。在上面代码中,若一级缓存为命中,baseExecutor 会调用 queryFromDatabase 查询数据库,并将查询结果写入缓存中。

  	private  List queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    	List list;
    	// 向缓存中存储一个占位符
    	localCache.putObject(key, EXECUTION_PLACEHOLDER);
    	try {
      		// 查询数据库
      		list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
    	} finally {
      		// 移除占位符
      		localCache.removeObject(key);
    	}
    	// 存储查询结果
    	localCache.putObject(key, list);
    	// 存储过程相关逻辑
    	if (ms.getStatementType() == StatementType.CALLABLE) {
      		localOutputParameterCache.putObject(key, parameter);
    	}
    	return list;
  	}
1.4 二级缓存

  二级缓存构建在一级缓存之上,在收到查询请求时,MyBatis 首先会查询二级缓存。若二级缓存未命中,再去查询一级缓存。与一级缓存不同,二级缓存和具体的命名空间绑定,一级缓存则是和 SqlSession 绑定。
  在按照 MyBatis 规范使用 SqlSession 的情况下,一级缓存不存在并发问题。二级缓存则不然,二级缓存可在多个命名空间间共享。这种情况下,会存在并发问题,因此需要针对性的去处理。除了并发问题,二级缓存还存在事务问题,相关问题将在接下来进行分析。下面先来看一下CachingExecutor(位于org.apache.ibatis.executor)中的访问二级缓存的逻辑。

  	public  List query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    	BoundSql boundSql = ms.getBoundSql(parameterObject);
    	// 创建 CacheKey
    	CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
    	return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  	}

  	public  List query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
      throws SQLException {
    	// 从 MappedStatement 中获取 Cache,注意这里的 Cache
		// 并非是在 CachingExecutor 中创建的
    	Cache cache = ms.getCache();
    	// 如果配置文件中没有配置 ,则 cache 为空
    	if (cache != null) {
      		flushCacheIfRequired(ms);
      		if (ms.isUseCache() && resultHandler == null) {
        		ensureNoOutParams(ms, boundSql);
        		// 访问二级缓存
        		@SuppressWarnings("unchecked")
        		List list = (List) tcm.getObject(cache, key);
        		// 缓存未命中
        		if (list == null) {
          			// 向一级缓存或者数据库进行查询
          			list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
          			// 缓存查询结果
          			tcm.putObject(cache, key, list); // issue #578 and #116
        		}
        		return list;
      		}
    	}
    	return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  	}

  注意二级缓存是从 MappedStatement 中获取的,而非由 CachingExecutor 创建。由于 MappedStatement 存在于全局配置中,可以被多个 CachingExecutor 获取到,这样就会出现线程安全问题。除此之外,若不加以控制,多个事务共用一个缓存实例,会导致脏读问题。线程安全问题可以通过 SynchronizedCache 装饰类解决,该装饰类会在 Cache 实例构造期间被添加上。至于脏读问题,需要借助其他类来处理,也就是上面代码中 tcm 变量对应的类型,即TransactionalCacheManager(位于org.apache.ibatis.cache)。

public class TransactionalCacheManager {
  	// Cache 与 TransactionalCache 的映射关系表
  	private final Map transactionalCaches = new HashMap<>();

  	public void clear(Cache cache) {
    	// 获取 TransactionalCache 对象,并调用该对象的 clear 方法
    	getTransactionalCache(cache).clear();
  	}

  	public Object getObject(Cache cache, CacheKey key) {
    	return getTransactionalCache(cache).getObject(key);
  	}

 	public void putObject(Cache cache, CacheKey key, Object value) {
    	getTransactionalCache(cache).putObject(key, value);
  	}

  	public void commit() {
    	for (TransactionalCache txCache : transactionalCaches.values()) {
      		txCache.commit();
    	}
  	}

  	public void rollback() {
    	for (TransactionalCache txCache : transactionalCaches.values()) {
      		txCache.rollback();
    	}
  	}

  	private TransactionalCache getTransactionalCache(Cache cache) {
    	// 从映射表中获取 TransactionalCache
    	// TransactionalCache 也是一种装饰类,为 Cache 增加事务功能
    	return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
  	}
}

  TransactionalCacheManager 内部维护了 Cache 实例与 TransactionalCache 实例间的映射关系,该类也仅负责维护两者的映射关系,真正做事的还是 TransactionalCache(位于org.apache.ibatis.cache.decorators)。TransactionalCache 是一种缓存装饰器,可以为 Cache 实例增加事务功能。之前提到的脏读问题正是由该类进行处理的。

public class TransactionalCache implements Cache {

  	private static final Log log = LogFactory.getLog(TransactionalCache.class);

  	private final Cache delegate;
  	private boolean clearOnCommit;
  	// 在事务被ᨀ交前,所有从数据库中查询的结果将缓存在此集合中
  	private final Map entriesToAddOnCommit;
  	// 在事务被ᨀ交前,当缓存未命中时,CacheKey 将会被存储在此集合中
  	private final Set entriesMissedInCache;

  	public TransactionalCache(Cache delegate) {
    	this.delegate = delegate;
    	this.clearonCommit = false;
    	this.entriesToAddonCommit = new HashMap<>();
    	this.entriesMissedInCache = new HashSet<>();
  	}

  	@Override
  	public String getId() {
    	return delegate.getId();
  	}

  	@Override
  	public int getSize() {
    	return delegate.getSize();
  	}

  	@Override
  	public Object getObject(Object key) {
    	// 查询 delegate 所代表的缓存
    	Object object = delegate.getObject(key);
    	if (object == null) {
      		// 缓存未命中,则将 key 存入到 entriesMissedInCache 中
      		entriesMissedInCache.add(key);
    	}
    	// issue #146
    	if (clearOnCommit) {
      		return null;
    	} else {
      		return object;
    	}
  	}

  	@Override
  	public void putObject(Object key, Object object) {
    	// 将键值对存入到 entriesToAddonCommit 中,而非 delegate 缓存中
    	entriesToAddOnCommit.put(key, object);
  	}

  	@Override
  	public Object removeObject(Object key) {
    	return null;
  	}

  	@Override
  	public void clear() {
    	clearonCommit = true;
    	// 清空 entriesToAddOnCommit,但不清空 delegate 缓存
    	entriesToAddOnCommit.clear();
  	}

  	public void commit() {
    	// 根据 clearonCommit 的值决定是否清空 delegate
    	if (clearOnCommit) {
      		delegate.clear();
    	}
    	// 刷新未缓存的结果到 delegate 缓存中
    	flushPendingEntries();
    	// 重置 entriesToAddonCommit 和 entriesMissedInCache
    	reset();
  	}

  	public void rollback() {
    	unlockMissedEntries();
    	reset();
  	}

  	private void reset() {
    	clearonCommit = false;
    	// 清空集合
    	entriesToAddOnCommit.clear();
    	entriesMissedInCache.clear();
  	}

  	private void flushPendingEntries() {
    	for (Map.Entry entry : entriesToAddOnCommit.entrySet()) {
      	// 将 entriesToAddonCommit 中的内容转存到 delegate 中
      	delegate.putObject(entry.getKey(), entry.getValue());
    	}
    	for (Object entry : entriesMissedInCache) {
      		if (!entriesToAddOnCommit.containsKey(entry)) {
        		// 存入空值
        		delegate.putObject(entry, null);
      		}
    	}
  	}

  	private void unlockMissedEntries() {
    	for (Object entry : entriesMissedInCache) {
      		try {
        		// 调用 removeObject 进行解锁
        		delegate.removeObject(entry);
      		} catch (Exception e) {
        		log.warn("Unexpected exception while notifiying a rollback to the cache adapter. "
            		+ "Consider upgrading your cache adapter to the latest version. Cause: " + e);
      		}
    	}
  	}
}
 

  在 TransactionalCache 的代码中,我们要重点关注 entriesToAddonCommit 集合,TransactionalCache 中的很多方法都会与这个集合打交道。该集合用于存储从查询的结果,那为什么要将结果保存在该集合中,而非 delegate 所表示的缓存中呢?主要是因为直接存到 delegate 会导致脏数据问题。
  我们再来看一下 entriesMissedInCache 集合,这个集合是用于存储未命中缓存的查询请求所对应的 CacheKey。单独分析与 entriesMissedInCache 相关的逻辑没什么意义,要搞清 entriesMissedInCache 的实际用途,需要把它和 BlockingCache 的逻辑结合起来进行分析。在 BlockingCache,同一时刻仅允许一个线程通过 getObject 方法查询指定 key 对应的缓存项。如果缓存未命中,getObject 方法不会释放锁,导致其他线程被阻塞住。其他线程要想恢复运行,必须进行解锁,解锁逻辑由 BlockingCache 的 putObject 和 removeObject 方法执行。其中 putObject 会在TransactionalCache 的flushPendingEntries方法中被调用,removeObject方法则由 TransactionalCache 的 unlockMissedEntries 方法调用。flushPendingEntries 和unlockMissedEntries 最终都会遍历 entriesMissedInCache 集合,并将集合元素传给BlockingCache 的相关方法。这样可以解开指定 key 对应的锁,让阻塞线程恢复运行。

二、插件机制

  一般情况下,开源框架都会提供插件或其他形式的拓展点,供开发者自行拓展。这样的好处是显而易见的,一是增加了框架的灵活性。二是开发者可以结合实际需求,对框架进行拓展,使其能够更好的工作。以 MyBatis 为例,我们可基于 MyBatis 插件机制实现分页、分表,监控等功能。

2.1 插件机制原理

  在编写插件时,除了需要让插件类实现 Interceptor 接口外,还需要通过注解标注该插件的拦截点。所谓拦截点指的是插件所能拦截的方法,MyBatis 所允许拦截的方法如下:

  1. Executor::update, query, flushStatements, commit, rollback,
    getTransaction, close, isClosed
  2. ParameterHandler: getParameterObject, setParameters
  3. ResultSetHandler::handleResultSets, handleOutputParameters
  4. StatementHandler::prepare, parameterize, batch, update, query

  如果想要拦截 Executor 的 query 方法,那么可以这样定义插件:

@Intercepts({
	@Signature(
		type = Executor.class,
		method = "query",
		args ={MappedStatement.class, Object.class, RowBounds.class,
			ResultHandler.class}
	 )
})
public class ExamplePlugin implements Interceptor {
	// 省略逻辑
}

  除此之外,我们还需将插件配置到相关文件中。这样 MyBatis 在启动时可以加载插件,并保存插件实例到相关对象(InterceptorChain,拦截器链)中。待准备工作做完后,MyBatis处于就绪状态。我们在执行 SQL 时,需要先通过 DefaultSqlSessionFactory 创 建SqlSession 。Executor 实例会在创建 SqlSession 的过程中被创建,Executor 实例创建完毕后,MyBatis 会通过 JDK 动态代理为实例生成代理类。这样,插件逻辑即可在 Executor 相关方法被调用前执行。以上就是 MyBatis 插件机制的基本原理。

2.1.1 植入插件逻辑

  此处以 Executor 为例,分析 MyBatis 是如何为 Executor 实例植入插件逻辑的。Executor 实例是在开启 SqlSession 时被创建的,因此,下面从源头进行分析。先来看一下 SqlSession 开启的过程。先看DefaultSqlSessionFactory:

  	public SqlSession openSession() {
    	return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false);
  	}

	private SqlSession openSessionFromDataSource(ExecutorType execType,
  		TransactionIsolationLevel level, boolean autoCommit) {
		Transaction tx = null;
		try {
			// 省略部分逻辑
			// 创建 Executor
			final Executor executor = configuration.newExecutor(tx, execType);
			return new DefaultSqlSession(configuration, executor, autoCommit);
	 	}	
		catch (Exception e) {...}
		finally {...}
	}

  Executor 的创建过程封装在 Configuration 中:

  	public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
    	executorType = executorType == null ? defaultExecutorType : executorType;
    	executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
    	Executor executor;
    	// 根据 executorType 创建相应的 Executor 实例
    	if (ExecutorType.BATCH == executorType) {
      		executor = new BatchExecutor(this, transaction);
    	} else if (ExecutorType.REUSE == executorType) {
      		executor = new ReuseExecutor(this, transaction);
    	} else {
      		executor = new SimpleExecutor(this, transaction);
    	}
    	if (cacheEnabled) {
      		executor = new CachingExecutor(executor);
    	}
    	// 植入插件
    	executor = (Executor) interceptorChain.pluginAll(executor);
    	return executor;
  	}

  newExecutor 方法在创建好 Executor 实例后,紧接着通过拦截器链 interceptorChain为 Executor 实例植入代理逻辑。接下来看下InterceptorChain(位于org.apache.ibatis.plugin):

public class InterceptorChain {

  	private final List interceptors = new ArrayList<>();

  	public Object pluginAll(Object target) {
    	// 遍历拦截器集合
    	for (Interceptor interceptor : interceptors) {
      		// 调用拦截器的 plugin 方法植入相应的插件逻辑
      		target = interceptor.plugin(target);
    	}
    	return target;
  	}
  	
  	public void addInterceptor(Interceptor interceptor) {
    	interceptors.add(interceptor);
  	}
  	
  	public List getInterceptors() {
    	return Collections.unmodifiableList(interceptors);
  	}
}

  pluginAll 方法会调用具体插件的plugin 方法植入相应的插件逻辑。如果有多个插件,则会多次调用 plugin 方法,最终生成一个层层嵌套的代理类。形如下面:

  当 Executor 的某个方法被调用的时候,插件逻辑会先行执行。执行顺序由外而内,比如上图的执行顺序为 plugin3 → plugin2 → Plugin1 → Executor。
  plugin 方法是由具体的插件类实现,不过该方法代码一般比较固定,所以下面找个示例分析一下,例如ExamplePlugin:

	public Object plugin(Object target) {
		return Plugin.wrap(target, this);
	}

  继续看Plugin(位于org.apache.ibatis.plugin):

  	public static Object wrap(Object target, Interceptor interceptor) {
    	// 获取插件类 @Signature 注解内容,并生成相应的映射结构。形如下面:
    	// {
		// Executor.class : [query, update, commit],
		// ParameterHandler.class : [getParameterObject, setParameters]
		// }
    	Map, Set> signatureMap = getSignatureMap(interceptor);
    	Class type = target.getClass();
    	// 获取目标类实现的接口
    	Class[] interfaces = getAllInterfaces(type, signatureMap);
    	if (interfaces.length > 0) {
      		// 通过 JDK 动态代理为目标类生成代理类
      		return Proxy.newProxyInstance(
          		type.getClassLoader(),
          		interfaces,
          		new Plugin(target, interceptor, signatureMap));
    	}
    	return target;
  	}

  plugin 方法在内部调用了 Plugin 类的 wrap 方法,用于为目标对象生成代理。Plugin类实现了InvocationHandler接口,因此它可以作为参数传给Proxy的newProxyInstance方法。
  关于插件植入的逻辑就分析完了。接下来,我们来看看插件逻辑是怎样执行的。

2.1.2 执行插件逻辑

  Plugin 实现了 InvocationHandler 接口,因此它的 invoke 方法会拦截所有的方法调用。invoke 方法会对所拦截的方法进行检测,以决定是否执行插件逻辑。

  	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    	try {
      		// 获取被拦截方法列表,比如:signatureMap.get(Executor.class),
	  		// 可能返回 [query, update, commit]
      		Set methods = signatureMap.get(method.getDeclaringClass());
      		// 检测方法列表是否包含被拦截的方法
      		if (methods != null && methods.contains(method)) {
        		// 执行插件逻辑
        		return interceptor.intercept(new Invocation(target, method, args));
      		}
      		// 执行被拦截的方法
      		return method.invoke(target, args);
    	} catch (Exception e) {
      		throw ExceptionUtil.unwrapThrowable(e);
    	}
  	}

  invoke 方法会检测被拦截方法是否配置在插件的 @Signature 注解中,若是,则执行插件逻辑,否则执行被拦截方法。插件逻辑封装在 intercept 中,该方法的参数类型为 Invocation。Invocation 主要用于存储目标类,方法以及方法参数列表。下面简单看一下Invocation(位于org.apache.ibatis.plugin):

public class Invocation {

  	private final Object target;
  	private final Method method;
  	private final Object[] args;

  	public Invocation(Object target, Method method, Object[] args) {
    	this.target = target;
    	this.method = method;
    	this.args = args;
  	}

  	public Object getTarget() {
    	return target;
  	}

  	public Method getMethod() {
    	return method;
  	}

  	public Object[] getArgs() {
    	return args;
  	}

  	public Object proceed() throws InvocationTargetException, IllegalAccessException {
    	// 调用被拦截的方法
	   return method.invoke(target, args);
  	}
}
2.2 实现一个分页插件

  本节将实现一个 MySQL 数据库分页插件。相关代码如下:

@Intercepts({
	@Signature(
		type = Executor.class, // 目标类
		method = "query", // 目标方法
		args ={MappedStatement.class,
			Object.class, RowBounds.class, ResultHandler.class}
	 )
})

public class MySqlPagingPlugin implements Interceptor {
	private static final Integer MAPPED_STATEMENT_INDEX = 0;
	private static final Integer PARAMETER_INDEX = 1;
	private static final Integer ROW_BOUNDS_INDEX = 2;
	
	@Override
	public Object intercept(Invocation invocation) throws Throwable {
	  	Object[] args = invocation.getArgs();
		RowBounds rb = (RowBounds) args[ROW_BOUNDS_INDEX];
		// 无需分页
		if (rb == RowBounds.DEFAULT) {
			return invocation.proceed();
		}
		// 将原 RowBounds 参数设为 RowBounds.DEFAULT,关闭 MyBatis 内置的分页机制
		args[ROW_BOUNDS_INDEX] = RowBounds.DEFAULT;
		MappedStatement ms = (MappedStatement) args[MAPPED_STATEMENT_INDEX];
		BoundSql boundSql = ms.getBoundSql(args[PARAMETER_INDEX]);
		// 获取 SQL 语句,拼接 limit 语句
		String sql = boundSql.getSql();
		String limit = String.format("LIMIT %d,%d", rb.getOffset(), rb.getLimit());
		sql = sql + " " + limit;
		// 创建一个 StaticSqlSource,并将拼接好的 sql 传入
		SqlSource sqlSource = new StaticSqlSource(
			ms.getConfiguration(), sql, boundSql.getParameterMappings());
		// 通过反射获取并设置 MappedStatement 的 sqlSource 字段
		Field field = MappedStatement.class.getDeclaredField("sqlSource");
		field.setAccessible(true);
		field.set(ms, sqlSource);
		// 执行被拦截方法
		return invocation.proceed();
	}

	@Override
	public Object plugin(Object target) {
		return Plugin.wrap(target, this);
	}
	
	@Override
	public void setProperties(Properties properties) {
 	}
}

  上面的分页插件通过 RowBounds 参数获取分页信息,并生成相应的 limit 语句。之后拼接 sql,并使用该 sql 作为参数创建 StaticSqlSource。最后通过反射替换 MappedStatement 对象中的 sqlSource 字段。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号