flea-cache使用之Redis集群模式接入 源代码
jedis-3.0.1.jar
redis.clients jedis 3.0.1
spring-context-4.3.18.RELEASE.jar
org.springframework spring-context 4.3.18.RELEASE
spring-context-support-4.3.18.RELEASE.jar
3. 基础接入 3.1 定义Flea缓存接口 — IFleaCacheorg.springframework spring-context-support 4.3.18.RELEASE
可参考笔者的这篇博文 Memcached接入,不再赘述。
3.2 定义抽象Flea缓存类 — AbstractFleaCache可参考笔者的这篇博文 Memcached接入,不再赘述。
3.3 定义Redis客户端接口类 — RedisClient
public interface RedisClient {
String set(final String key, final Object value);
String set(final byte[] key, final byte[] value);
String set(final String key, final Object value, final int expiry);
String set(final byte[] key, final byte[] value, final int expiry);
String set(final String key, final Object value, final long expiry);
String set(final byte[] key, final byte[] value, final long expiry);
String set(final String key, final Object value, final SetParams params);
String set(final byte[] key, final byte[] value, final SetParams params);
Object get(final String key);
byte[] get(final byte[] key);
Long del(final String key);
String getLocation(final String key);
String getLocation(final byte[] key);
String getHost(final String key);
String getHost(final byte[] key);
Integer getPort(final String key);
Integer getPort(final byte[] key);
Client getClient(final String key);
Client getClient(final byte[] key);
String getPoolName();
}
3.4 定义集群模式Redis客户端实现类 — FleaRedisClusterClient
集群模式 Redis 客户端 主要使用 JedisCluster 来操作 Redis 数据。
public class FleaRedisClusterClient extends FleaRedisClient {
private JedisCluster jedisCluster;
private FleaRedisClusterClient() {
this(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
}
private FleaRedisClusterClient(String poolName) {
super(poolName);
init();
}
private void init() {
if (CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME.equals(getPoolName())) {
jedisCluster = RedisClusterPool.getInstance().getJedisCluster();
} else {
jedisCluster = RedisClusterPool.getInstance(getPoolName()).getJedisCluster();
}
}
@Override
public String set(String key, Object value) {
if (value instanceof String)
return jedisCluster.set(key, (String) value);
else
return jedisCluster.set(SafeEncoder.encode(key), ObjectUtils.serialize(value));
}
// 省略。。。。。。
public static class Builder {
private String poolName; // 连接池名
public Builder() {
}
public Builder(String poolName) {
this.poolName = poolName;
}
public RedisClient build() {
if (StringUtils.isBlank(poolName)) {
return new FleaRedisClusterClient();
} else {
return new FleaRedisClusterClient(poolName);
}
}
}
}
该类的构造函数初始化逻辑,可以看出我们使用了 RedisClusterPool, 下面来介绍一下。
3.5 定义Redis集群连接池 — RedisClusterPool我们使用 RedisClusterPool 用于Redis集群相关配置信息的初始化,其中重点是获取Jedis集群实例对象 JedisCluster ,该类其中一个构造方法如下:
public JedisCluster(SetjedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig) { super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig); }
public class RedisClusterPool {
private static final ConcurrentMap redisClusterPools = new ConcurrentHashMap<>();
private String poolName; // 连接池名
private JedisCluster jedisCluster; // Jedis集群实例
private RedisClusterPool(String poolName) {
this.poolName = poolName;
}
public static RedisClusterPool getInstance() {
return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
}
public static RedisClusterPool getInstance(String poolName) {
if (!redisClusterPools.containsKey(poolName)) {
synchronized (redisClusterPools) {
if (!redisClusterPools.containsKey(poolName)) {
RedisClusterPool redisClusterPool = new RedisClusterPool(poolName);
redisClusterPools.putIfAbsent(poolName, redisClusterPool);
}
}
}
return redisClusterPools.get(poolName);
}
public void initialize() {
// 省略。。。。。。
}
public void initialize(List cacheServerList) {
// 省略。。。。。。
}
// 省略。。。。。。
public JedisCluster getJedisCluster() {
if (ObjectUtils.isEmpty(jedisCluster)) {
throw new FleaCacheConfigException("获取Jedis集群实例对象失败:请先调用initialize初始化");
}
return jedisCluster;
}
}
3.6 定义Redis集群配置文件
flea-cache读取redis.cluster.properties(Redis集群配置文件),用作初始化 RedisClusterPool
# Redis集群配置 # Redis缓存所属系统名 redis.systemName=Fleaframe # Redis集群服务节点地址 redis.cluster.server=127.0.0.1:20011,127.0.0.1:20012,127.0.0.1:20021,127.0.0.1:20022,127.0.0.1:20031,127.0.0.1:20032 # Redis集群服务节点登录密码(集群各节点配置同一个) redis.cluster.password=huazie123 # Redis集群客户端socket连接超时时间(单位:ms) redis.cluster.connectionTimeout=2000 # Redis集群客户端socket读写超时时间(单位:ms) redis.cluster.soTimeout=2000 # Redis集群客户端连接池配置 # Jedis连接池最大连接数 redis.pool.maxTotal=100 # Jedis连接池最大空闲连接数 redis.pool.maxIdle=10 # Jedis连接池最小空闲连接数 redis.pool.minIdle=0 # Jedis连接池获取连接时的最大等待时间(单位:ms) redis.pool.maxWaitMillis=2000 # Redis客户端操作最大尝试次数【包含第一次操作】 redis.maxAttempts=5 # 空缓存数据有效期(单位:s) redis.nullCacheExpiry=103.7 定义Redis Flea缓存类 — RedisFleaCache
可参考笔者的这篇博文 Redis分片模式接入,不再赘述。
3.8 定义抽象Flea缓存管理类 — AbstractFleaCacheManager可参考笔者的这篇博文 Memcached接入,不再赘述。
3.9 定义Redis集群模式Flea缓存管理类 — RedisClusterFleaCacheManager该类继承抽象Flea缓存管理类 AbstractFleaCacheManager,构造方法使用了 RedisClientFactory 获取集群模式下默认连接池的Redis客户端 RedisClient,可在 3.10 查看。newCache 方法返回的是 RedisFleaCache 的实例对象,每一类 Redis 缓存数据都对应了一个 RedisFleaCache 的实例对象。
public class RedisClusterFleaCacheManager extends AbstractFleaCacheManager {
private RedisClient redisClient; // Redis客户端
public RedisClusterFleaCacheManager() {
// 初始化默认连接池
RedisClusterPool.getInstance().initialize();
// 获取集群模式下默认连接池的Redis客户端
redisClient = RedisClientFactory.getInstance(CacheModeEnum.CLUSTER);
}
@Override
protected AbstractFleaCache newCache(String name, int expiry) {
int nullCacheExpiry = RedisClusterConfig.getConfig().getNullCacheExpiry();
return new RedisFleaCache(name, expiry, nullCacheExpiry, CacheModeEnum.CLUSTER, redisClient);
}
}
3.10 定义Redis客户端工厂类 — RedisClientFactory
Redis 客户端工厂类,有四种方式获取 Redis 客户端:
- 一是获取分片模式下默认连接池的 Redis 客户端,应用在单个缓存接入场景;
- 二是获取指定模式下默认连接池的 Redis 客户端,应用在单个缓存接入场景【3.9 采用】;
- 三是获取分片模式下指定连接池的 Redis 客户端,应用在整合缓存接入场景;
- 四是获取指定模式下指定连接池的 Redis 客户端,应用在整合缓存接入场景。
public class RedisClientFactory {
private static final ConcurrentMap redisClients = new ConcurrentHashMap<>();
private RedisClientFactory() {
}
public static RedisClient getInstance() {
return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
}
public static RedisClient getInstance(CacheModeEnum mode) {
return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME, mode);
}
public static RedisClient getInstance(String poolName) {
return getInstance(poolName, CacheModeEnum.SHARDED);
}
public static RedisClient getInstance(String poolName, CacheModeEnum mode) {
String key = StringUtils.strCat(poolName, CommonConstants.SymbolConstants.UNDERLINE, StringUtils.valueOf(mode.getMode()));
if (!redisClients.containsKey(key)) {
synchronized (redisClients) {
if (!redisClients.containsKey(key)) {
RedisClientStrategyContext context = new RedisClientStrategyContext(poolName);
redisClients.putIfAbsent(key, FleaStrategyFacade.invoke(mode.name(), context));
}
}
}
return redisClients.get(key);
}
}
在上面 的 getInstance(String poolName, CacheModeEnum mode) 方法中,使用了 RedisClientStrategyContext ,用于定义 Redis 客户端策略上下文。根据不同的缓存模式,就可以找到对应的 Redis 客户端策略。
3.11 定义 Redis 客户端策略上下文 — RedisClientStrategyContext可参考笔者的这篇博文 Redis分片模式接入,不再赘述。
3.12 定义集群模式 Redis 客户端策略 — RedisClusterClientStrategypublic class RedisClusterClientStrategy implements IFleaStrategy{ @Override public RedisClient execute(String poolName) throws FleaStrategyException { RedisClient originRedisClient; // 新建一个Flea Redis集群客户端类实例 if (CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME.equals(poolName)) { originRedisClient = new FleaRedisClusterClient.Builder().build(); } else { originRedisClient = new FleaRedisClusterClient.Builder(poolName).build(); } return originRedisClient; } }
好了,到这里我们可以来测试 Redis 集群模式。
3.13 Redis集群模式接入自测 — FleaCacheTest首先,这里需要按照 Redis集群配置文件 中的地址部署相应的 Redis集群 服务,后续有机会我再出一篇简单的Redis主从集群搭建博文。
@Test
public void testRedisClusterFleaCache() {
try {
// 集群模式下Flea缓存管理类
AbstractFleaCacheManager manager = FleaCacheManagerFactory.getFleaCacheManager(CacheEnum.RedisCluster.getName());
AbstractFleaCache cache = manager.getCache("fleamenufavorites");
LOGGER.debug("Cache={}", cache);
//#### 1. 简单字符串
// cache.put("menu1", "huazie");
// cache.put("menu2", null);
// cache.get("menu1");
// cache.get("menu2");
// cache.delete("menu1");
// cache.delete("menu2");
cache.clear();
cache.getCacheKey();
LOGGER.debug(cache.getCacheName() + ">>>" + cache.getCacheDesc());
} catch (Exception e) {
LOGGER.error("Exception:", e);
}
}
4. 进阶接入
4.1 定义抽象Spring缓存 — AbstractSpringCache
可参考笔者的这篇博文 Memcached接入,不再赘述。
4.2 定义Redis Spring缓存类 — RedisSpringCache可参考笔者的这篇博文 Redis分片模式接入,不再赘述。
4.3 定义抽象Spring缓存管理类 — AbstractSpringCacheManager可参考笔者的这篇博文 Memcached接入,不再赘述。
4.4 定义Redis集群模式Spring缓存管理类 — RedisClusterSpringCacheManager该类继承抽象 Spring 缓存管理类 AbstractSpringCacheManager,用于对接Spring; 基本实现同 RedisClusterSpringCacheManager,唯一不同在于 newCache 的实现。
public class RedisClusterSpringCacheManager extends AbstractSpringCacheManager {
private RedisClient redisClient; // Redis客户端
public RedisClusterSpringCacheManager() {
// 初始化默认连接池
RedisClusterPool.getInstance().initialize();
// 获取集群模式下默认连接池的Redis客户端
redisClient = RedisClientFactory.getInstance(CacheModeEnum.CLUSTER);
}
@Override
protected AbstractSpringCache newCache(String name, int expiry) {
int nullCacheExpiry = RedisClusterConfig.getConfig().getNullCacheExpiry();
return new RedisSpringCache(name, expiry, nullCacheExpiry, CacheModeEnum.CLUSTER, redisClient);
}
}
4.5 spring 配置
4.6 缓存自测
private ApplicationContext applicationContext;
@Before
public void init() {
applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
LOGGER.debug("ApplicationContext={}", applicationContext);
}
@Test
public void testRedisClusterSpringCache() {
try {
// 集群模式下Spring缓存管理类
AbstractSpringCacheManager manager = (RedisClusterSpringCacheManager) applicationContext.getBean("redisClusterSpringCacheManager");
AbstractSpringCache cache = manager.getCache("fleamenufavorites");
LOGGER.debug("Cache = {}", cache);
//#### 1. 简单字符串
// cache.put("menu1", "huazie");
// cache.get("menu1");
// cache.get("menu1", String.class);
//#### 2. 简单对象(要是可以序列化的对象)
// String user = new String("huazie");
// cache.put("user", user);
// LOGGER.debug(cache.get("user", String.class));
// cache.get("FLEA_RES_STATE");
// cache.clear();
//#### 3. List塞对象
// List userList = new ArrayList<>();
// userList.add("huazie");
// userList.add("lgh");
// cache.put("user_list", userList);
// LOGGER.debug(cache.get("user_list",userList.getClass()).toString());
} catch (Exception e) {
LOGGER.error("Exception:", e);
}
}
结语
哇哇哇,Redis 集群模式接入终于搞定。到目前为止,不论是Memcached的接入还是 Redis分片模式接入亦或是本篇,都是单一的缓存接入,笔者的 下一篇博文 将介绍如何 整合Memcached和Redis接入,以应对日益复杂的业务需求。 敬请期待!!!



