随着业务的不断发展,单量越来越大,对数据库的访问压力也越来越大,为了保证系统的稳定、减轻数据库的查询压力,针对 订单信息表、订单扩展表、 订单产品信息表和产品服务信息表做分表方案,由于oms系统在业务操作时订单查询是多种维度查询,比如通过运单号查询、客户单号查询以及平台订单号查询,故采用常规的分库分表方案无法满足现有的业务变化,考虑采用冷热数据分离方案来解决目前的现状。
方案概述:1、四张表分表,分表需要考虑后期的查询问题,以时间维度进行冷热分离(冷热数据分界线:以新增的冷热库时间(hot_date)6个月为分界点)。
2、热数据只占全部数据的一部分,因此每次优先查询热库,以下情况才查询冷库
- 当查询条件未命中(结果集为空)时,查询冷库。 - 当查询条件部分命中时,查询冷库。
3、为了区分部分命中和全部命中,可以新建一张R表存放每次查询冷库的查询条件和查询结果数量和查询结果的主键,每次查询热库时,对比相同查询条件的查询结果数量是否一致。一致,则本次查询结束。不一致,则需要到冷库中进行查询。
4、举例说明:100条中80条还是热数据 20条变成了冷数据,其实应该只是对冷数据库发起这20条数据的请求。此时需要将R表数据拿出来比对,只查一部分冷数据。
5、热库时间=>冷库时间 : 查询和使用热数据时,将一段时间不再使用的热数据移到冷库(时间)。
6、冷库时间=>热库时间 :查询冷库时,将本次查询的结果移到热库,附上最新查询日期,并删除冷库数据。
7、数据同步(每次查询进行或达到一定量级进行)
冷热数据迁移流程说明
1.全量数据如何迁移?
2.数据冷热切换如何解决重复问题?
3.冷库数据是否需做归档?若需要,如何归档。
准备:1:上述三张表每张表需新建对应冷热表
2:两个定时任务
1:在数据超期时后的一段时间移除热表数据到冷库
2:在数据移出后的(时间?)删除热库数据
3:mybatis插件,在插件中对所有三张表的查询做结果拦截,查询热库如结果集为空,查询冷库,如冷库查询到数据移除冷库数据,并插入到热库更新热库查询时间,
上代码demo:(joor 方便反射 实测这方案性能还可以)
package xxxxx.hera.mybatisIntercept;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.imile.hera.mapper.client.OmsOrderInfoMapper;
import com.imile.hera.util.SpringUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.ibatis.executor.parameter.ParameterHandler;
import org.apache.ibatis.executor.resultset.DefaultResultSetHandler;
import org.apache.ibatis.executor.resultset.ResultSetHandler;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.plugin.Intercepts;
import org.apache.ibatis.plugin.Invocation;
import org.apache.ibatis.plugin.Plugin;
import org.apache.ibatis.plugin.Signature;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.joor.Reflect;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import java.sql.Statement;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Intercepts({
@Signature(type = ResultSetHandler.class, method = "handleResultSets", args = {Statement.class})
})
@Component
@Lazy(value = true)
public class OrderInfoPlugin implements Interceptor {
public static final String mapperClassName = "OmsOrderInfoMapper";
public static final Class orderInfoMapperClass = OmsOrderInfoMapper.class;
static Cache<String, Object> cache = Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.DAYS)
.maximumSize(10_000)
.build();
//方法拦截
@Override
public Object intercept(Invocation invocation) throws Throwable {
//取出查询的结果
Object resultObject = invocation.proceed();
//若热库查询为空查询冷库
if (ObjectUtils.isEmpty(resultObject)) {
DefaultResultSetHandler df = (DefaultResultSetHandler) invocation.getTarget();
Reflect reflect = Reflect.on(df);
MappedStatement mappedStatement = reflect.field("mappedStatement").get();
ParameterHandler parameterHandler = reflect.field("parameterHandler").get();
Map<String, String> pathMap = filterMethodById(mappedStatement.getId());
Map<Object, Object> map = (Map<Object, Object>) parameterHandler.getParameterObject();
List<Object> list = new ArrayList<>();
//查询参数组装
if (!map.isEmpty()) {
Map<Object, Object> treeMap = new TreeMap<Object, Object>(
new Comparator<Object>() {
public int compare(Object obj1, Object obj2) {
return obj1.toString().compareTo(obj2.toString());
}
});
Iterator<Map.Entry<Object, Object>> it = map.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Object, Object> entry = it.next();
if (entry.getKey().toString().contains("param")) {
treeMap.put(entry.getKey(), entry.getValue());
}
}
Iterator<Map.Entry<Object, Object>> its = treeMap.entrySet().iterator();
while (its.hasNext()) {
Map.Entry<Object, Object> entry = its.next();
list.add(entry.getValue());
}
}
Object[] params = list.toArray(new Object[0]);
if (mapperClassName.equalsIgnoreCase(pathMap.get("mapper"))) {
String key = "orderInfoMapperClassProxy";
GetProxy getProxy = (GetProxy) cache.get(key, k -> getInstance(orderInfoMapperClass));
String methodPath = pathMap.get("method");
resultObject = Reflect.on(getProxy.getProxy()).call(methodPath, params).get();
// todo 冷库查询到数据迁移到热库并删除冷库数据
//测插入删除同理
// OmsOrderInfoDO omsOrderInfoDO = new OmsOrderInfoDO();
// omsOrderInfoDO.setId(IdWorkerUtil.getId());
// omsOrderInfoDO.setRecordVersion("2");
// omsOrderInfoDO.setClientOrgId(101L);
// SpringUtils.getBean("omsOrderInfoMapper", OmsOrderInfoMapper.class).insert(omsOrderInfoDO);
// SpringUtils.getBean("omsOrderInfoMapper", OmsOrderInfoMapper.class).deleteById(12121);
}
}
return resultObject;
}
private GetProxy getInstance(Class orderInfoMapperClass) {
SqlSession sqlSession = SpringUtils.getBean("sqlSessionFactory", SqlSessionFactory.class).openSession();
return GetProxy.getInstance(orderInfoMapperClass, sqlSession);
}
private Map filterMethodById(String id) {
Map<String, String> resultObject = new HashMap<>();
String[] names = id.split("\.");
resultObject.put("mapper", names[names.length - 2]);
resultObject.put("method", names[names.length - 1]);
return resultObject;
}
//获取到拦截的对象,底层也是通过代理实现的,实际上是拿到一个目标代理对象
@Override
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}
//获取设置的阈值等参数
@Override
public void setProperties(Properties properties) {
}
}



