一个 ReadWriteLock保持一对一联系 locks,一个用于只读操作,一个用于写入,读操作可以多个线程而写操作只能一个线程
代码案例:做一个我们自己的cache缓存。分别有写入操作、读取操作,采用五个线程去写入,使用五个线程去读取,查看结果
代码示例:
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
//写入
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(temp +"",temp + "");
},String.valueOf(i)).start();
}
//读取
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(temp + "");
},String.valueOf(i)).start();
}
}
}
class MyCacheLock{
private volatile Map map=new HashMap<>();
//使用读写锁
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
//普通锁
private Lock lock=new ReentrantLock();
public void put(String key,String value){
readWriteLock.writeLock().lock();
try {
//写入
System.out.println(Thread.currentThread().getName()+" 线程 开始写入");
map.put(key, value);
System.out.println(Thread.currentThread().getName()+" 线程 写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
public void get(String key) {
readWriteLock.readLock().lock();
try {
//得到
System.out.println(Thread.currentThread().getName() + " 线程 开始读取");
String o = map.get(key);
System.out.println(Thread.currentThread().getName() + " 线程 读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
class MyCache{
private volatile Map map=new HashMap<>();
public void put(String key,String value){
//写入
System.out.println(Thread.currentThread().getName()+" 线程 开始写入");
map.put(key, value);
System.out.println(Thread.currentThread().getName()+" 线程 写入OK");
}
public String get(String key){
//读取
System.out.println(Thread.currentThread().getName()+" 线程 开始读取");
String o = map.get(key);
System.out.println(Thread.currentThread().getName()+" 线程 读取OK");
return o;
}
}
-
未加锁情况下
-
加锁情况下
小结
ReadWriteLock
- 读-读 可以共存
- 读-写 不能共存
- 写-写 不能共存
独占锁 ----> 写锁 -----> 一次只能被一个线程占用
共享锁 ----> 读锁 -----> 多个线程可以同时占有
10、阻塞队列 10.1 简述对于队列与阻塞的理解
阻塞队列:
BlockingQueue不是新的东西,blockingQueue 是Collection的一个子类。
Collection与BlockingQueue的结构图:
什么情况下会用到BlockingQueue
- 多线程并发处理、线程池
四组API
| 方式 | 抛出异常 | 不会抛出异常,有返回值 | 阻塞 等待 | 超时 等待 |
|---|---|---|---|---|
| 添加 | add() | offer() | put() | offer(timenum,timeUnit) |
| 移除 | remove() | poll() | take() | poll(timenum,timeUnit) |
| 判断队列首 | element() | peek() | - | - |
代码测试
- 抛出异常
@Test
public void test01(){
//需要初始化队列的大小
ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
System.out.println(queue.add("cvzhanshi1"));
System.out.println(queue.add("cvzhanshi2"));
System.out.println(queue.add("cvzhanshi3"));
//抛出异常:java.lang.IllegalStateException: Queue full
//System.out.println(queue.add("a"));
System.out.println(queue.remove());
System.out.println(queue.remove());
System.out.println(queue.remove());
//如果多移除一个
//这也会造成 java.util.NoSuchElementException 抛出异常
System.out.println(queue.remove());
}
-
不会抛出异常,有返回值
@Test public void test02(){ //需要初始化队列的大小 ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3); System.out.println(queue.offer("cvzhanshi1")); System.out.println(queue.offer("cvzhanshi2")); System.out.println(queue.offer("cvzhanshi3")); //添加 一个不能添加的元素 使用offer只会返回false 不会抛出异常 System.out.println(queue.offer("cvzhanshi4")); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); //弹出 如果没有元素 只会返回null 不会抛出异常 System.out.println(queue.poll()); } -
阻塞 等待
@Test public void test03() throws InterruptedException { //需要初始化队列的大小 ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3); //一直阻塞 不会返回 queue.put("cvzhanshi1"); queue.put("cvzhanshi2"); queue.put("cvzhanshi3"); //如果队列已经满了, 再进去一个元素 这种情况会一直等待这个队列 什么时候有了位置再进去,程序不会停止 queue.put("a"); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); //如果我们再来一个 这种情况也会等待,程序会一直运行 阻塞 System.out.println(queue.take()); } -
超时 等待
@Test public void test4() throws InterruptedException { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.offer("a"); blockingQueue.offer("b"); blockingQueue.offer("c"); System.out.println("开始等待"); blockingQueue.offer("d",2, TimeUnit.SECONDS); //超时时间2s 等待如果超过2s就结束等待 System.out.println("结束等待"); System.out.println("===========取值=================="); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println("开始等待"); blockingQueue.poll(2,TimeUnit.SECONDS); //超过两秒 我们就不要等待了 System.out.println("结束等待"); }
没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素
代码示例:
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue queue = new SynchronousQueue<>(); //同步队列
//使用两个进程
// 一个进程 放进去
// 一个进程 拿出来
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+" Put 1");
queue.put("1");
System.out.println(Thread.currentThread().getName()+" Put 2");
queue.put("2");
System.out.println(Thread.currentThread().getName()+" Put 3");
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+" Take "+queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" Take "+queue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" Take "+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
}
11、线程池
线程池:三大方法、七大参数、四种拒绝策略
下面的示例业务图:
池化技术
程序的运行的本质就是占用系统的资源,为了优化资源的使用 ==> 池化技术(线程池、连接池、对象池、内存池)
池化技术:事先准备好一些资源,有人要用,就到池里拿,用完之后还给池
线程池的好处:
- 降低资源的消耗
- 提高响应的速度
- 方便管理
线程复用、可以控制最大并发数、方便管理线程
线程池:三大方法
- ExecutorService threadPool = Executors.newSingleThreadExecutor(); //单个线程
- ExecutorService threadPool = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
- ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸缩的,遇强则强,遇弱则弱
public class Demo01 {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor(); //单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸缩的,遇强则强,遇弱则弱
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName()+ " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
线程池:七大参数
源码分析
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
// 本质:调用ThreadPoolExecutor()
// 分析源码
public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
int maximumPoolSize, // 最大核心线程池大小
long keepAliveTime, // 超时了没有人用就会释放
TimeUnit unit, // 超时单位
BlockingQueue workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂,创建线程的,一般不用动
RejectedExecutionHandler handler // 拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
对于Integer.MAX_VALUE初始值较大,所以一般情况我们要使用底层的ThreadPoolExecutor来创建线程池
自定义线程池
public class Demo02 {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new linkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName()+ " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
线程池:四种拒绝策略
- new ThreadPoolExecutor.AbortPolicy(): => 该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常,超出最大承载,就会抛出异常:队列容量大小+maxPoolSize
- new ThreadPoolExecutor.CallerRunsPolicy():=> 该拒绝策略为:哪来的去哪里 main线程进行处理
- new ThreadPoolExecutor.DiscardPolicy(): => 该拒绝策略为:队列满了,丢掉异常,不会抛出异常。
- new ThreadPoolExecutor.DiscardOldestPolicy():=> 该拒绝策略为:队列满了,尝试去和最早的进程竞争,不会抛出异常
小结与拓展
如何去设置线程池的最大大小如何去设置?
CPU密集型和IO密集型!
- CPU密集型:电脑的核数是几核就选择几;选择maximunPoolSize的大小
- **I/O密集型:**在程序中有15个大型任务,io十分占用资源;I/O密集型就是判断我们程序中十分耗I/O的线程数量,线程池大小大约是最大I/O数的一倍到两倍之间。
Runtime.getRuntime().availableProcessors() // 获取CPU的核数12、四大函数式接口
函数式接口:有且只有一个方法的接口
// Runnable
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
//超级多的@FunctionalInterface
//简化编程模型,在新版本的框架底层大量应用
//foreach()的参数也是一个函数式接口,消费者类的函数式接口
函数型接口可以使用lambda表达式
代码测试:
Function函数式型接口:有一个输入参数,有一个输出参数
public class Demo01 {
public static void main(String[] args) {
// Function function = new Function() {
// @Override
// public String apply(String str) {
// return str;
// }
// };
Function function = (str) -> {return str;};
System.out.println(function.apply("cvzhanshi"));
}
}
Predicate断定型接口:只有一个输入参数,返回值只能是 布尔值
public class Demo02 {
public static void main(String[] args) {
// 判断字符串是否为空
// Predicate predicate = new Predicate(){
// @Override
// public boolean test(String str) {
// return str.isEmpty();
// }
// };
Predicate predicate = (str) -> {return str.isEmpty();};
System.out.println(predicate.test("cvzhanshi"));
}
}
Consumer 消费型接口:只有输入,没有返回值
public class Demo03 {
public static void main(String[] args) {
// Consumer consumer = new Consumer(){
// @Override
// public void accept(String str) {
// System.out.println(str);
// }
// };
Consumer consumer = (str) -> { System.out.println(str);};
consumer.accept("cvzhanshi");
}
}
Supplier 供给型接口 没有参数,只有返回值
public class Demo04 {
public static void main(String[] args) {
// Supplier supplier = new Supplier() {
// @Override
// public Integer get() {
// return 1024;
// }
// };
Supplier supplier = () -> { return 1024; };
System.out.println(supplier.get());
}
}
13、Stream流式计算
什么是Stream流式计算?
大数据:存储 + 计算
集合、MySQL的本质就是存储
计算都应该交给流来操作
代码示例:
题目要求:一分钟内完成此题,只能用一行代码实现!
现在有6个用户!筛选
- ID必须是偶数
- 年龄必须大于23
- 用户名转为大写
- 用户名字母倒序排序
- 只输出一个用户
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
private Integer id;
private String name;
private Integer age;
}
public class Test {
public static void main(String[] args) {
User user1 = new User(1,"a",21);
User user2 = new User(2,"b",22);
User user3 = new User(3,"c",23);
User user4 = new User(4,"d",24);
User user5 = new User(5,"e",25);
User user6 = new User(6,"f",26);
// 集合就是存储
List list = Arrays.asList(user1, user2, user3, user4, user5, user6);
// 计算交给Stream流
list.stream()
.filter(user -> {return user.getId() % 2 == 0;})
.filter(user -> {return user.getAge() > 23;})
.map(user -> {return new User(user.getId(),user.getName().toUpperCase(),user.getAge());})
.sorted((u1,u2) -> {return u2.getName().compareTo(u1.getName());})
.limit(1)
.forEach(System.out::println);
}
}
14、ForkJoin
什么是ForkJoin
ForkJoin主要用于并发执行任务,提高效率,大数据量。
大任务分成小任务,最后把小结果合并成大结果
ForkJoin 特点:工作窃取
ForkJoin 维护的是双端队列
A、B线程执行任务,B更快执行完,他会去A那窃取任务过来执行,可以提高效率
ForkJoin 操作
查看ForkJoinPool
查看ForkJoinTask
查看直接子类RecursiveTask或RecursiveAction,可以看到使用方式
如何使用forkjoin
- ForkJoinPool 通过它来执行
- 计算任务 ForkJoin .execute(ForkJoinTask task)
- 计算类要继承ForkJoinTask
代码测试:
计算类
public class ForkJoinDemo extends RecursiveTask{ private Long start; private Long end; private Long temp = 100000L;// 切割任务的临界值 public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } //计算方法 @Override protected Long compute() { if((end-start) < temp){ Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; }else { //使用forkJoin 分而治之 计算 //计算平均值 long middle = (start + end) / 2; ForkJoinDemo leftTask = new ForkJoinDemo(start, middle); leftTask.fork(); //拆分任务,把线程任务压入线程队列 ForkJoinDemo rightTask = new ForkJoinDemo(middle, end); rightTask.fork(); //拆分任务,把线程任务压入线程队列 long taskSum = leftTask.join() + rightTask.join(); return taskSum; } } }
测试类
public class Test {
// 普通的计算
@org.junit.Test
public void test01(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + " 时间:" + (end - start)); // 8543ms
}
// ForkJoin的计算
@org.junit.Test
public void test02() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinDemo task = new ForkJoinDemo(0L, 10_0000_0000L);
ForkJoinPool joinPool = new ForkJoinPool();
ForkJoinTask submit = joinPool.submit(task);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + " 时间:" + (end - start)); // 5623ms
}
// Stream并行流计算
@org.junit.Test
public void test03() {
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + " 时间:" + (end - start)); // 910ms
}
}
15、异步回调
Future 设计的初衷:对将来的某个事件的结果进行建模
类似于是前端 --> 发送ajax异步请求给后端
一般都使用CompletableFuture
没有返回值的runAsync异步回调
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 发起一个请求
CompletableFuture completableFuture =CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
});
System.out.println("111111111111111111111111");
completableFuture.get(); // 获取阻塞执行结果
}
}
有返回值的异步回调supplyAsync
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 发起一个请求
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
int i = 10 / 0;
return 1024;
});
// 得到返回值
Integer result = completableFuture.whenComplete((t, u) -> {
System.out.println("t => " + t); // 正常的返回结果
System.out.println("u => " + u); // 错误信息:java.util.concurrent.CompletionException:java.lang.ArithmeticException: / by zero
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 5000;
}).get();
System.out.println(result);
}
}
-
正常的返回结果
-
出现异常,返回错误信息
whenComplete: 有两个参数,一个是t 一个是u
T:是代表的 正常返回的结果
U:是代表的 抛出异常的错误信息
如果发生了异常,get可以获取到exceptionally返回的值
并发编程JUC(下)



