Condition 是一个线程通信工具 跟锁没有关系
锁 条件控制
代码示例 public class ConditionDemoWait implements Runnable{
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock, Condition condition){
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin----------");
lock.lock();
try{
condition.await();//让当前线程阻塞
System.out.println("end ------------------");
}catch(Exception e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
public class ConditionDemoNotify implements Runnable{
private Lock lock;
private Condition condition;
public ConditionDemoNotify (Lock lock, Condition condition){
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin----------");
lock.lock();
try{
condition.signal();//唤醒处于等待状态下的线程
System.out.println("end ------------------");
}catch(Exception e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
public class ConditionExample{
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ConditionDemoWait wait = new ConditionDemoWait(lock, condition);
ConditionDemoNotify notify = new ConditionDemoNotify(lock, condition);
wait.start();
notify.start();
} |
类比 Object.wait notify
作用:基于某一个条件实现线程的阻塞和唤醒
前提:必须先获取锁
await signal/signalAll
加锁:AQS 队列
释放锁:释放锁的线程去了哪里 什么数据结构存储 等待队列
唤醒被阻塞的线程 从哪里唤醒 ? 什么数据结构存储 同步队列
await
可以阻塞N个线程
会释放持有的锁
signal
如何让线程等待
用等待队列来存储等待中的线程
唤醒等待的线程
AQS中的同步队列和Condition等待队列中线程的转移
伪代码
public class ConditionDemo {
public static void main(String[] args){
Lock lock = new ReentrantLock();
lock.lock();
try{
lock.newCondition.await();//threadA 调用await()方法释放锁
}catch(InterruptedException e){
e.printStackTrace();
}
lock.unlock();
//threadB 调用signal抢占到锁
lock.signal();
}
} |
原理图
线程锁抢占失败 ----链表
调用Object.wait() ---等待队列
调用Object.notify() ---等待队列--同步队列
调用Condition.awati() ----等待队列
调用Condition.signal()--- 等待队列---aqs同步队列
以上猜想设计如何用代码来实现?
lock.newCondition.await()1,释放锁
2,释放锁的线程会被阻塞 并将其存储在一个数据结构中
3,被唤醒以后重新竞争锁
4,要能够处理Interrupt() 的响应
假如当前线程A 获得了锁 ,则执行await()方法的线程是threadA 将其构建成一个Node加入Condition队列,实质是一个单向链表
threadA执行await()方法 释放锁
threadA释放锁以后肯定不在AQS队列里面了,只有确定了不在AQS队列里面就去阻塞该线程
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加到等待队列
Node node = addConditionWaiter();
//释放锁--考虑到重入问题
int savedState = fullyRelease(node); //保存重入次数,保证后面threadA被唤醒后抢占到锁的时候重入次数跟之前是相等的
int interruptMode = 0;
//threadA释放锁以后肯定不在AQS队列里面了,只有确定了不在AQS队列里面就去阻塞该线程
while (!isonSyncQueue(node)) {
//上下文切换浪费CPU资源-性能损耗:因为上下文切换会保存程序计数器 寄存器信息,做为切换后恢复 这个过程会有CPU的参与 用户态--内核态 用户指令--内核指令
LockSupport.park(this);//阻塞当前线程 (当其他线程调用signal的时候,该线程会从这个位置接着往下执行)
//判断当前被阻塞的线程是否是因为interrupt()唤醒 interrupt()操作会唤醒处于等待状态下的线程
//线程唤醒不一定是通过signal方法唤醒 也有可能是被interupt()方法唤醒
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//重新竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//是否是被中断唤醒的 被中断唤醒的要报异常
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0; //0 属于正常唤醒
}
//判断方法执行 是在signal 前 还是后??
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //调用signal 后节点状态 不等于ConDITION
enq(node);
return true;
}
//判断是否已经在aqs队列中了,如果不在 让出CPU 给其他线程 直到存在
while (!isonSyncQueue(node))
Thread.yield();
return false;
}
|
当前的操作是在锁的持有下进行的,所以以下操作均安全 看伪代码示例
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
} |
唤醒处于等待队列中的线程
唤醒以后做什么事情?
1,先唤醒
2,condition队列中的线程转移到aqs队列中,竞争到锁,再回到阻塞处 接着执行代码
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; //得到当前的condition队列,拿到了头部节点 那么整条链路都可以拿到
if (first != null)
doSignal(first);
}
//唤醒等待队列中的线程 1,队列转移 ,2,唤醒
private void doSignal(Node first) {
//完成队列转移 并将转移之后的节点 置为空
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); //唤醒threadA 回到await()方法被阻塞的地方 竞争锁
return true;
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isonSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
} |
源码分析总结:使用的知识点不一样
jdk层面 系统层面 :位运算 数据结构 算法
框架、中间件源码:基础组件如 线程、集合
Condition的实际使用场景实现阻塞队列(业务组件)
线程池—异步场景
生产者消费者
流量缓冲
阻塞队列
有界队列 无界队列(其实也是有界的 INTEGER.MAX_VALUE)
先进先出-- 队列 数组链表均可实现队列
后进先出-- 栈
阻塞队列: 实质--生产者消费者模型
阻塞插入--队列满了,阻塞插入,唤醒移除的线程
阻塞移除-队列空了,阻塞移除, 唤醒插入的线程
使用Condition来实现一个阻塞队列
public class ConditionBlockedQueued{
//表示阻塞队列中的容器
private List |
引出下面J.U.C 中阻塞队列的实现
public class ArrayBlockingQueue |
我只是一个布道者 不开发新的技术
阻塞队列中的方法 添加元素针对队列满了以后的不同的策略
add (): 队列满了,抛异常
offer(): 添加成功返回true 否则false
offer(long timeout): 添加一个元素,如果队列满了会阻塞timeout时间,超过timeout时长 返回false
put(): 如果队列满了 则阻塞
移除元素element(): 队列空了,抛异常
peek() : 移除成功返回true 否则false
take() :如果队列空了 则阻塞
poll(long timeout):移除一个元素,如果队列空了会阻塞timeout时间,超过timeout时长 返回false
dequeue :支持LIFO FIFO 的队列
以下只贴出部分方法的源码供参考
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
//公共逻辑方法的抽象
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items; //引发的思考:很多源代码都会针对成员变量 赋值给新的局部变量来操作 性能差异?
items[putIndex] = x;
if (++putIndex == items.length) //当索引位置=数组长度的时候 索引归零 重新开始++
putIndex = 0;
count++;
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
} |
不同场景的使用不同的阻塞队列
ArrayBlockedQueue 基于数组结构
linkedBlockedQueue 基于链表结构,单向链表
PriorityBlockedQueue 优先级排列 添加元素的时候设置优先级 取出的是按照优先级取出 底层使用Comparator 添加的元素要实现该接口
SynchronousQueue 没有任何存储结构的队列 线程池里面有用到 newCachedThreadPool
newCachedThreadPool 可缓存的线程池 可以处理非常大的请求任务 1000个任务过来,线程池需要分配1000个线程来执行
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue |
DelauQueue 允许延时执行的队列 设置里面的元素延时多久执行 订单延时15分钟没有支付 关闭(定时任务 mq延迟消息队列 时间轮)
可以用来做订单的延时支付 任务的超时处理
//所有的任务都必须实现Delayed接口
public class DelayQueuedTask implements Delayed{
private String orderId;
private long start = System.currentTimeMillis();
private long time;//延时的时间
public DelayQueuedTask(String orderId , long time) {
this.orderId = orderId;
this.time = time;
}
//和当前时间相比 延迟时间的差额 获得下一次执行的时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start+time)-System.currentTimeMillis, TimeUnit.MILLISECONDS);
}
//对任务作比较 根据时间的先后作比较 比较这个任务是不是优先级最高的 最早的先执行
@Override
public int compareTo(Delayed o){
return int(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS))
}
}
public class DelayQueueMain{
private static DelayQueue |



