Java内置锁,使用时不需要对同步对象的监视器(Monitor)通过Java代码显示的进行抢占和释放,因为这些工作由JVM层面来完成。所以,Java的内置锁使用起来很方便。但是,Java内置锁的功能相对单一,不具备一些比较高级的锁功能:
限时抢锁:在抢锁时设置超时时长,超时则放弃。
中断抢锁:在抢锁时,外部线程给抢锁线程发出一个中断信号,就能唤起等待锁的线程,并且终止抢占过程。
多个等待队列:为锁维持多个等待队列,以便提高锁的效率。比如在生产者消费者模式实现中,生产者和消费者共用一把锁,该锁上维持两个等待队列,一个生产者队列,一个消费者队列。
Java内置锁在竞争稍微激烈的情况下,Java内置锁会膨胀为重量级锁。重量级锁, 它的线程间的调度和状态变更由操作系统负责;
它基于操作系统的临界区机制(每个进程中访问临界资源的那段程序称为临界区(临界资源是一次仅允许一个进程使用的共享资源)。每次只准许一个进程进入临界区,进入后不允许其他进程进入(加锁)。)阻塞锁机制(当有一个线程获取锁后,其他所有等待获取该锁的线程会进入阻塞状态);
上下文切换(CPU通过分配时间片来执行任务,当一个任务的时间片用完,就会切换到另一个任务。在切换之前会保存上一个任务的状态,当下次再切换到该任务,就会加载这个状态。任务从保存到再加载的过程);上下文切换只能发生在内核态,所以还会触发用户态与内核态切换,这个切换的过程会带来很大的性能开销,非常影响性能。
Java显示锁接口Loke就是为了解决这些Java对象的功能问题、性能问题。
synchronized是非公平锁,ReentrantLock 可以是公平锁也可以是非公平锁。
ReentrantLock支持非阻塞的方式获取锁,而synchronized不行。synchronized不可中断,ReentrantLock可中断。
synchronized是独占锁,Lock可以是独占锁(ReentrantLock,ReentrantReadWriteLock的写锁)也可以是共享(ReentrantReadWriteLock的读锁)。
都是可重入锁。
Lock必须手动获取和释放锁,而synchronized不需要。
synchronized在发生异常的时候,会自动释放线程占有的锁,而ReentrantLock在发生异常时,如果没有通过unlock去释放锁,很有可能造成死锁,因此需要在finally块中释放锁。
Lock存在获得多个锁的方法(ReentrantReadWriteLock中的读锁获取 reentrantReadWriteLock.readLock().lock();)。
synchronized: 使用Object对象本身的wait 、notify、notifyAll调度机制
Lock: 可以使用Condition中的 signal(); await()等, 进行线程之间的调度。
lock():调用该方法,当前线程获取锁,获取到锁后返回;
lockInterruptibly():可中断地获取锁,即在锁的获取中可以中断当前线程;
tryLock():尝试非阻塞获取锁并立刻返回,如果锁没有被其他线程获取到,则返回true并获取到锁,否则返回false;
tryLock(long time, TimeUnit unit):设置过期时间尝试非阻塞获取锁,可以在获取锁的时间内被中断,过期时间内获取到锁返回true,否则返回false;
unlock():释放锁;
newCondition():当前线程获取到锁的前提下,可以将一个condition实例绑定到lock实例上,然后可以通过condition中的方法,对获得该锁线程进行线程调度。
独占锁,可重入锁,公平/非公平锁,可中断锁。
简单使用案例:public class ReentrantLockTest1 implements Runnable{
Lock lock = new ReentrantLock();
@Override
public void run() {
get();
}
private void get() {
try {
lock.lock();
System.out.println("get 方法");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLockTest1 lockTest1 = new ReentrantLockTest1();
new Thread(lockTest1).start();
}
}
源码分析( AQS):
ReentrantLock核心方法说明:
//构造器
//默认构造器创建非公平锁
public ReentrantLock();
//传入一个Boolean类型的参数,true创建一个公平锁,false非公平
public ReentrantLock(boolean fair);
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public int getHoldCount() {
return sync.getHoldCount();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public boolean isLocked() {
return sync.isLocked();
}
public final boolean isFair() {
return sync instanceof FairSync;
}
protected Thread getOwner() {
return sync.getOwner();
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection getQueuedThreads() {
return sync.getQueuedThreads();
}
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
protected Collection getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
源码分析:
构造方法:
public ReentrantLock(boolean fair);构造方法返回的是,实现了AQS(AbstractQueuedSynchronizer)模板类部分方法的抽象类Sync的实现类。
sync = fair ? new FairSync() : new NonfairSync(); //static final class FairSync extends Sync //abstract static class Sync extends AbstractQueuedSynchronizer
FairSync 和 NonfairSync都实现了 Sync抽象类lock()抽象方法,重写了AQS中的tryAcquire;两个对象实现的两个方法大致相同 。NonfairSync 使用lock()方法获取锁之前,不管同步队列的先判断当前锁是否有人占用,如果没有就直接获取。FairSync 在使用tryAcquire()方法判断是否有可用资源时,保证公平性先判段阻塞队列的情况。
lock()方法,底层是调用的是AQS中的模板方法acquire(int arg);if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
tryAcquire(arg) :调用子类中重写的tryAcquire判断能否获得资源
FairSync类公平方式的实现类中重写的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //得到当前锁的状态
if (c == 0) { //锁的状态为0时,表示现在锁没有被占用
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) { //如果当前线程是等待时长最长获得资源的线程,CAS修改state同步状态的值
setExclusiveOwnerThread(current); //给当前现在设置独占访问权(拿到锁了)。
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //如果当前线程等于持有锁的线程(锁的重入)
int nextc = c + acquires; //计数加一
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); //修改同步状态的值
return true;
}
return false; //到这里说明没有拿到锁,返回false
}
如当前环境中有多个线程竞争同步资源失败,这些线程都会被封装为node,形成一个同步队列
当tryAcquire(arg)方法不能拿到锁后,addWaiter(Node.EXCLUSIVE)为当前线程创建并招募节点。
addWaiter()代码:
Node node = new Node(Thread.currentThread(), mode); //将当前线程封装为一个节点
Node pred = tail; //tail同步队列的尾节点
if (pred != null) { // 尾结点本身就是空,说明此时同步队列为空
//将尾部节点更新为这个新节点 双端队列
node.prev = pred;
if (compareAndSetTail(pred, node)) { //CAS成功说明节点入队成功,设置尾结点成功
pred.next = node;
return node;
}
}
//如果直接入队失败,就采用自旋方式 enq(node);加入结点直至入队成功,返回该结点.
enq(node);//将节点插入队列,必要时初始化。
return node;
final boolean acquireQueued(final Node node, int arg);
当线程包装成node进入同步队列后,会在同步队列中等待同步资源的释放。
当线程争取到同步资源 的使用权后,线程中同步队列中出队。
final boolean acquireQueued(final Node node, int arg) {
//在这个方法中,node已经加入了同步队列,它在这里尝试在同步队列中自旋以获取同步资源.
boolean failed = true; //标记是否成功获取到资源
try {
boolean interrupted = false; //标记线程是否被 中断
for (;;) { //自旋
final Node p = node.predecessor(); //获得传入节点的前驱节点
if (p == head && tryAcquire(arg)) { //如果当前节点的前驱节点节点等于头节点,且可以获得资源
setHead(node); //将当前节点设置为头节点
p.next = null; //设置原头结点的后继节点为null,帮助jvm回收垃圾.
failed = false; //标记已经成功获取同步资源
return interrupted;
}
//当前节点不是头节或者获取资源失败,
//(shouldParkAfterFailedAcquire(p, node)线程获取资源失败后,判断是否阻塞线程.
//false 再次进入自旋查找前驱并获取锁的过程.
//true 执行parkAndCheckInterrupt() 阻塞当前线程,当线程被其他线程唤醒时,返回线程的中断状态,并清除中断标识
//为true ,并将interrupted中断标记设置为true,回到acquire()方法中,
//就会执行selfInterrupt()方法。 完成当前线程的中断信号发出.
//为false 再次进入自旋查找前驱并获取锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
// -----shouldParkAfterFailedAcquire(Node pred, Node node) 线程获取资源失败后,判断是否阻塞线程.
// -----parkAndCheckInterrupt() 阻塞当前线程,并判断线程的中断状态
}
} finally {
//如failed为true,说明线程不能成功获取资源。
//如果node的前驱结点 ( final Node p = node.predecessor(); )为空,代码抛出异常
//这说明node为头结点,node无前驱结点。
if (failed)
cancelAcquire(node); //取消对资源的获取.
// ------------cancelAcquire()取消对资源的获取.
}
}
lockInterruptibly(); 方法底层调用的是AQS中的acquireInterruptibly(1)方法;
if (Thread.interrupted()) //判断线程中断状态
throw new InterruptedException();
if (!tryAcquire(arg))
//以独占可中断模式获取。大致与lock()方法调用相同。
doAcquireInterruptibly(arg);
unlock();底层调用的是AQS的release(1);模板方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head; //获得头节点
if (h != null && h.waitStatus != 0) //头节点不为null且状态不为0即初始状态
unparkSuccessor(h); //唤醒同步队列的后继节点
return true;
}
return false;
}
子类重写的tryRelease()方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//如果当前线程不是已经获得资源的线程则报错
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//标识释放状态
boolean free = false;
//如果同步状态减少releases后是初始状态则说明资源释放成功
if (c == 0) {
free = true;
//将以获得资源的线程赋为null
setExclusiveOwnerThread(null);
}
//设置同步状态
setState(c);
return free;
}
原理分析:
ReentrantLock方法实际调用的大多数都是AQS中的模板方法。AQS通过模板方法模式固定了程序的执行顺序,子类通过重写AQS中的
isHeldExclusively() //该线程是否正在独占资源。只有用到condition才需要去实现它;
tryAcquire(int) //独占方式。尝试获取资源,功返回true,失败则返回false;
tryRelease(int) /独占方式。尝试释放资源,功贝则返回true,失败则返回falseo;
tryAcquireShared(int) //共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源;
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败返回false;
等方法来确定资源的获取的逻辑等。
AQS实际是通过维系一个CLH队列(FIFO的双向队列)来实现线程的等待,线程唤醒和资源的分配机制。
每有一个线程请求共享资源时,判段共享资源是否空闲,如果空闲就将当前请求线程设置为工作线程,并将共享资源的状态(State)设置为锁定(通过CAS设置state值)(公平/非公平)。
如果共享资源被占用,就将这个线程包装成一个节点加入到CLH队列中,在acquireQueued方法中不断去尝试得到资源,并在方法中判断当前线程是否中断和将已经取消的线程移出队列。
当工作线程运行结束后,工作线程会释放共享资源并将同步状态的赋为0,并唤醒同步队列中的第一个待唤醒的线程。
读写锁,一般在读多写少情况下使用。有两把锁一把读锁,一把写锁,写锁是独占式的,读锁是共享式的,读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读状态,也就是获取到读锁的次数,低16位表示获取到写线程的可重入次数。在有线程持有读锁的情况下没有线程可以获得写锁,在有线程持有写锁的情况下只有持有写锁的那个线程可以获取读锁。
原理分析参考:https://cloud.tencent.com/developer/article/1469555
简单案例public class ReentrantReadWriteLockeTest {
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
//让线程同步去争抢锁
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
private static int i = 100;
public static void main(String[] args) {
//利用线程池和CyclicBarrier 保证线程是去同步争抢锁的
executor.execute(() -> {
write(Thread.currentThread());
});
executor.execute(() -> {
read(Thread.currentThread());
});
executor.execute(() -> {
read(Thread.currentThread());
});
executor.shutdown();
}
private static void write(Thread thread) {
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
//获取写锁
reentrantReadWriteLock.writeLock().lock();
//获取读锁,说明获得写锁后同线程还可以获得读锁
reentrantReadWriteLock.readLock().lock();
try {
System.out.println("写线程开始运行" + thread.getName() + " : i=" + i);
Thread.sleep(100);
System.out.println(thread.getName() + "运行完");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放写锁
reentrantReadWriteLock.writeLock().unlock();
}
}
private static void read(Thread thread) {
try {
//线程阻塞, 让线程同步争抢锁
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
//获得读锁
reentrantReadWriteLock.readLock().lock();
//获取写锁, 不注释,线程获得读锁后阻塞
//reentrantReadWriteLock.writeLock().lock();
try {
System.out.println("读线程开始运行" + thread.getName() + " : i=" + i);
Thread.sleep(100);
System.out.println(thread.getName() + "运行完");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁
reentrantReadWriteLock.readLock().unlock();
}
}
}
结果,说明获得写锁和获得读锁的线程不能同时运行,获得读锁的线程可以同时运行
写线程开始运行pool-1-thread-1 : i=100 pool-1-thread-1运行完 : i=1 读线程开始运行pool-1-thread-2 : i=1 读线程开始运行pool-1-thread-3 : i=1 pool-1-thread-2运行完 pool-1-thread-3运行完Condition接口
用Lock和Condition可以实现类似于,synchronized和wait()、wait(long timeout)、notify()以及notifyAll()方法配合实现的等待通知模式。
以前的方式只能有一个等待队列,在实际应用时可能需要多个,比如读和写。为了这个灵活性,lock将同步互斥控制和等待队列分离开来,互斥保证在某个时刻只有一个线程访问临界区(lock自己完成),等待队列负责保存被阻塞的线程(condition完成)。
学习文章:https://www.jianshu.com/p/c7af7f3fa135
public class producerConsumer {
public static void main(String[] args) {
AppleBox appleBox = new AppleBox();
Producer producer = new Producer(appleBox);
Consumer consumer = new Consumer(appleBox);
Consumer consumer2 = new Consumer(appleBox);
Thread p = new Thread(producer, "生产者");
Thread c1 = new Thread(consumer, "消费者1");
Thread c2 = new Thread(consumer2, "消费者2");
p.start();
c1.start();
c2.start();
}
}
class Apple {
int id;
public Apple(int id) {
this.id = id;
}
}
class AppleBox {
AtomicInteger index = new AtomicInteger(0);
AtomicReferenceArray apples = new AtomicReferenceArray<>(new Apple[5]);
// AtomicReference apples = new AtomicReference<>();
//创建锁对象 ,替换synchronized
private final ReentrantLock lock = new ReentrantLock();
//生产条件
private final Condition produceCondition = lock.newCondition();
//消费条件
private final Condition consumeCondition = lock.newCondition();
// public synchronized boolean deposite(Apple apple) {
public boolean deposite(Apple apple) {
lock.lock();
try {
while (index.get() == apples.length()) {
//生产线程进入等待
produceCondition.await();
return false;
}
apples.set( index.getAndIncrement(),apple);
//this.apples[index.incrementAndGet()] = apple;
System.out.println(Thread.currentThread().getName() + "生产了: " + index.get());
//唤醒消费线程
consumeCondition.signal();
return true;
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return false;
}
public Apple withdraw() throws Exception {
lock.lock();
try {
while (index.get() == 0) {
System.out.println(Thread.currentThread().getName() + "消费缺货" );
consumeCondition.await();
}
Apple a = apples.get(index.decrementAndGet());
System.out.println(Thread.currentThread().getName() + "消费了: " + a.id);
return a;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
}
class Producer implements Runnable {
AppleBox appleBox;
public Producer(AppleBox appleBox) {
this.appleBox = appleBox;
}
@Override
public void run() {
while (true) {
appleBox.deposite(new Apple(appleBox.index.get() + 1));
try {
Thread.sleep((int) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
AppleBox appleBox;
public Consumer(AppleBox appleBox) {
this.appleBox = appleBox;
}
@Override
public void run() {
while (true) {
try {
appleBox.withdraw();
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep((int) (Math.random() * 3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
自定义实现
CountdownLatch的自定义实现(共享方式获资源)
重写tryAcquireShared(),tryReleaseShared()
public class MyCountDownLatch {
//实现AQS
private static final class MySync extends AbstractQueuedSynchronizer {
public MySync(int count) {
//初始化设置AQS中的state状态值
setState(count);
}
public int getCount(){
//获得AQS中的state状态值
return getState();
}
@Override
protected int tryAcquireShared(int arg) {
return (getState() == 0)? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
while (true){
int c = getState();
if (c == 0){ //说明锁已经释放
return false;
}
int nextc = c - 1;
//调用CAS设置state值
if (compareAndSetState(c,nextc)){
//为0说明释放锁成功
return nextc == 0;
}
}
}
}
private final MySync sync;
public MyCountDownLatch(int count) {
this.sync = new MySync(count);
}
public void await(){
//模板方法,组装流程,调用 子类中实现的tryAcquireShared
sync.acquireShared(1);
}
public void countDown(){
//模板方法,组装流程,调用 子类中实现的tryReleaseShared
sync.releaseShared(1);
}
public int getCount(){
return sync.getCount();
}
}
自定义ReentrantLocak ( 独占锁,非可重入)
重写isHeldExclusively(),tryAcquire(),tryRelease()
public class MyReentrantLock implements Lock {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
@Override
protected boolean tryAcquire(int arg) {
//state状态值不是0就是1,所以是非重入锁 CAS操作
if (compareAndSetState(0, 1)) {
//设置当前线程为执行线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) { //没有线程执行有锁
throw new UnsupportedOperationException();
}
//将执行线程置为空
setExclusiveOwnerThread(null);
//同步状态设置为01
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1); //响应中断式直接获得锁
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}



