redis分布式锁
模板:
public interface DistributedLockTemplate {
public Object execute(String lockId,int timeout,Callback callback);
}
public interface DistributedReentrantLock {
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException;
public void unlock();
}
zk锁:
public class ZkDistributedLockTemplate implements DistributedLockTemplate {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(ZkDistributedLockTemplate.class);
private Curatorframework client;
public ZkDistributedLockTemplate(Curatorframework client) {
this.client = client;
}
private boolean tryLock(ZkReentrantLock distributedReentrantLock,Long timeout) throws Exception {
return distributedReentrantLock.tryLock(timeout, TimeUnit.MILLISECONDS);
}
@Override
public Object execute(String lockId, int timeout, Callback callback) {
ZkReentrantLock distributedReentrantLock = null;
boolean getLock=false;
try {
distributedReentrantLock = new ZkReentrantLock(client,lockId);
if(tryLock(distributedReentrantLock,new Long(timeout))){
getLock=true;
return callback.onGetLock();
}else{
return callback.onTimeout();
}
}catch(InterruptedException ex){
log.error(ex.getMessage(), ex);
Thread.currentThread().interrupt();
}catch (Exception e) {
log.error(e.getMessage(), e);
}finally {
if(getLock){
distributedReentrantLock.unlock();
}
}
return null;
}
}
public class ZkReentrantLock implements DistributedReentrantLock {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(ZkReentrantLock.class);
private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
public static final String ROOT_PATH = "/ROOT_LOCK/";
private long delayTimeForClean = 1000;
private InterProcessMutex interProcessMutex = null;
private String path;
private Curatorframework client;
public ZkReentrantLock(Curatorframework client, String lockId) {
init(client, lockId);
}
public void init(Curatorframework client, String lockId) {
this.client = client;
this.path = ROOT_PATH + lockId;
interProcessMutex = new InterProcessMutex(client, this.path);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
try {
return interProcessMutex.acquire(timeout, unit);
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
log.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
}
}
@Override
public void unlock() {
try {
interProcessMutex.release();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
}
}
static class Cleaner implements Runnable {
Curatorframework client;
String path;
public Cleaner(Curatorframework client, String path) {
this.client = client;
this.path = path;
}
@Override
public void run() {
try {
List list = client.getChildren().forPath(path);
if (list == null || list.isEmpty()) {
client.delete().forPath(path);
}
} catch (KeeperException.NoNodeException e1) {
//nothing
} catch (KeeperException.NotEmptyException e2) {
//nothing
} catch (Exception e) {
log.error(e.getMessage(), e);//准备删除时,正好有线程创建锁
}
}
}
}



