- 项目结构
- 调用流程
- 缓存实现分析
项目结构 调用流程https://blog.csdn.net/oe1019/article/details/105982128
由于原作者未提供源码,未解析代码逻辑实现,因此整理一下代码实现逻辑
1. 创建fabric用户信息
public static User getUser() {
User appuser = null;
File sampleStoreFile = new File(System.getProperty("user.home") + "/test.properties");
if (sampleStoreFile.exists()) { //For testing start fresh
sampleStoreFile.delete();
}
final SampleStore sampleStore = new SampleStore(sampleStoreFile);
try {
appuser = sampleStore.getMember("peer1", "Org1", "Org1MSP",
new File(String.valueOf(findFileSk(Paths.get(configUserPath).toFile()))),
new File("./src/main/resources/crypto-config/peerOrganizations/org1.example.com/users/Admin@org1.example.com/msp/signcerts/Admin@org1.example.com-cert.pem"));
} catch (Exception e) {
e.printStackTrace();
}
return appuser;
}
2. 创建Fabric连接池
private static ObjectPoolfabricJavaPool = FabricConnectionPoolFactory.getPool(getUser(), "mychannel");
public class FabricConnectionPoolFactory {
private FabricConnectionPoolFactory() {
}
public static GenericObjectPool getPool(User appUser, String channel) {
return new FabricJavaPool(appUser, channel);
}
}
import org.apache.commons.pool2.impl.GenericObjectPool; import com.github.samyuan1990.FabricJavaPool.api.FabricConnection; public class FabricJavaPool extends GenericObjectPool{ public FabricJavaPool(User appUser, String channel) { super(new ConnectionPoolFactory(appUser, channel)); } private static class ConnectionPoolFactory extends basePooledObjectFactory { private FabricJavaPoolConfig config = new FabricJavaPoolConfig(); private String config_network_path = ""; private User appUser; private String channel = ""; ConnectionPoolFactory(User appUser, String channel) { this.config_network_path = config.getConfigNetworkPath(); this.appUser = appUser; this.channel = channel; } //创建fabric客户端连接的方法 @Override public FabricConnection create() throws Exception { FabricConnectionImpl myConnection; CryptoSuite cryptoSuite = CryptoSuite.Factory.getCryptoSuite(); HFClient hfclient = HFClient.createNewInstance(); hfclient.setCryptoSuite(cryptoSuite); NetworkConfig networkConfig = NetworkConfig.fromJsonFile(new File(config_network_path)); hfclient.setUserContext(appUser); hfclient.loadChannelFromConfig(channel, networkConfig); Channel myChannel = hfclient.getChannel(channel); myChannel.initialize(); //这一步,创建连接池中连接 myConnection = new FabricConnectionImpl(hfclient, myChannel, appUser); //这一步很关键,是否使用缓存 if (config.isUseCache()) { FabricConnectionImplCacheProxy proxy = new FabricConnectionImplCacheProxy(myConnection, config.getCacheURL(), appUser.getName(), channel, config.getCacheTimeout()); return (FabricConnection) Proxy.newProxyInstance(FabricConnectionImpl.class.getClassLoader(), new Class[]{FabricConnection.class}, proxy); } else { return myConnection; } } @Override public PooledObject wrap(FabricConnection obj) { return new DefaultPooledObject<>(obj); } } }
3. 获取Fabric连接
FabricConnection myConnection = fabricJavaPool.borrowObject();
6. 获取Fabric连接
ExecuteResult result = myConnection.query("mycc", "query","a");
rs = result.getResult();
5. 回收Fabric连接
fabricJavaPool.returnObject(myConnection);缓存实现分析
- 缓存实现关键代码
FabricConnectionImplCacheProxy proxy = new FabricConnectionImplCacheProxy(myConnection, config.getCacheURL(), appUser.getName(), channel, config.getCacheTimeout());
return (FabricConnection) Proxy.newProxyInstance(FabricConnectionImpl.class.getClassLoader(), new Class[]{FabricConnection.class}, proxy);
- FabricConnectionImplCacheProxy 类
public class FabricConnectionImplCacheProxy extends FabricContractConnectImplCacheProxy implements InvocationHandler {
public FabricConnectionImplCacheProxy(Object obj, String cacheURL, String userName, String channelName, int timeout) {
super(obj, cacheURL, userName, channelName, timeout);
}
//父类构造方法
public FabricContractConnectImplCacheProxy(Object obj, String cacheURL, String userName, String channelName, int timeout) {
this.timeout = timeout;
this.channelName = channelName;
this.userName = userName;
this.cacheURL = cacheURL;
//使用memcache对查询结果进行缓存
MemcachedClientBuilder memcachedClientBuilder = new XMemcachedClientBuilder(AddrUtil.getAddresses(this.cacheURL));
try {
memcachedClient = memcachedClientBuilder.build();
} catch (IOException e) {
e.printStackTrace();
}
this.obj = obj;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object result = null;
if (method.getName().equals("query")) {
String key = genericKey(userName, channelName, args);
result = memcachedClient.get(key);
if (result != null) {
//System.out.println("hit");
return result;
}
result = method.invoke(obj, args);
ExecuteResult executeResult = (ExecuteResult) result;
if (executeResult.getPropResp() == null) {
return result;
}
for (ProposalResponse p : executeResult.getPropResp()) {
TxReadWriteSetInfo txReadWriteSetInfo = p.getChaincodeActionResponseReadWriteSetInfo();
for (TxReadWriteSetInfo.NsRwsetInfo nsRwsetInfo : txReadWriteSetInfo.getNsRwsetInfos()) {
KvRwset.KVRWSet rws = nsRwsetInfo.getRwset();
for (KvRwset.KVRead readList : rws.getReadsList()) {
String blockKey = readList.getKey();
memcachedClient.set(blockKey, timeout, key);
}
}
}
memcachedClient.set(key, timeout, result);
return result;
}
if (method.getName().equals("invoke")) {
result = method.invoke(obj, args);
ExecuteResult executeResult = (ExecuteResult) result;
if (executeResult.getPropResp() == null) {
return result;
}
for (ProposalResponse p : executeResult.getPropResp()) {
TxReadWriteSetInfo txReadWriteSetInfo = p.getChaincodeActionResponseReadWriteSetInfo();
for (TxReadWriteSetInfo.NsRwsetInfo nsRwsetInfo : txReadWriteSetInfo.getNsRwsetInfos()) {
KvRwset.KVRWSet rws = nsRwsetInfo.getRwset();
for (KvRwset.KVRead readList : rws.getReadsList()) {
String blockKey = readList.getKey();
String blockCache = memcachedClient.get(blockKey);
if (!blockCache.equals(null)) {
memcachedClient.delete(blockCache);
memcachedClient.delete(blockKey);
}
}
}
}
return result;
}
result = method.invoke(obj, args);
return result;
}
}
简单总结:
- 如果使用缓存,那么连接池中的连接对象将是动态生成的代理连接对象;
- 在查询前会先查询缓存,如果存在就直接返回结果,如果不存在,再去查询;
- 将查询后的结果存到缓存中



