栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

并发编程JUC(中)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

并发编程JUC(中)

9、读写锁

一个 ReadWriteLock保持一对一联系 locks,一个用于只读操作,一个用于写入,读操作可以多个线程而写操作只能一个线程

代码案例:做一个我们自己的cache缓存。分别有写入操作、读取操作,采用五个线程去写入,使用五个线程去读取,查看结果

代码示例:


public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCacheLock myCache = new MyCacheLock();

        //写入
        for (int i = 1; i <= 5; i++) {
            final int temp = i;
            new Thread(() -> {
                myCache.put(temp +"",temp + "");
            },String.valueOf(i)).start();
        }

        //读取
        for (int i = 1; i <= 5; i++) {
            final int temp = i;
            new Thread(() -> {
                myCache.get(temp + "");
            },String.valueOf(i)).start();
        }
    }
}

class MyCacheLock{
    private volatile Map map=new HashMap<>();
    //使用读写锁
    private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
    //普通锁
    private Lock lock=new ReentrantLock();

    public void put(String key,String value){
        readWriteLock.writeLock().lock();
        try {
            //写入
            System.out.println(Thread.currentThread().getName()+" 线程 开始写入");
            map.put(key, value);
            System.out.println(Thread.currentThread().getName()+" 线程 写入OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.writeLock().unlock();
        }

    }

    public void get(String key) {
        readWriteLock.readLock().lock();
        try {
            //得到
            System.out.println(Thread.currentThread().getName() + " 线程 开始读取");
            String o = map.get(key);
            System.out.println(Thread.currentThread().getName() + " 线程 读取OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }

}


class MyCache{
    private volatile Map map=new HashMap<>();
    public void put(String key,String value){
        //写入
        System.out.println(Thread.currentThread().getName()+" 线程 开始写入");
        map.put(key, value);
        System.out.println(Thread.currentThread().getName()+" 线程 写入OK");
    }

    public String get(String key){
        //读取
        System.out.println(Thread.currentThread().getName()+" 线程 开始读取");
        String o = map.get(key);
        System.out.println(Thread.currentThread().getName()+" 线程 读取OK");
        return o;
    }
}
  • 未加锁情况下

  • 加锁情况下

小结

ReadWriteLock

  • 读-读 可以共存
  • 读-写 不能共存
  • 写-写 不能共存

独占锁 ----> 写锁 -----> 一次只能被一个线程占用

共享锁 ----> 读锁 -----> 多个线程可以同时占有

10、阻塞队列 10.1 简述

对于队列与阻塞的理解

阻塞队列:

BlockingQueue不是新的东西,blockingQueue 是Collection的一个子类。

Collection与BlockingQueue的结构图:

什么情况下会用到BlockingQueue

  • 多线程并发处理、线程池
10.2 四组API

四组API

方式抛出异常不会抛出异常,有返回值阻塞 等待超时 等待
添加add()offer()put()offer(timenum,timeUnit)
移除remove()poll()take()poll(timenum,timeUnit)
判断队列首element()peek()--

代码测试

  • 抛出异常
@Test
public void test01(){
    //需要初始化队列的大小
    ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
    System.out.println(queue.add("cvzhanshi1"));
    System.out.println(queue.add("cvzhanshi2"));
    System.out.println(queue.add("cvzhanshi3"));
    //抛出异常:java.lang.IllegalStateException: Queue full
    //System.out.println(queue.add("a"));
    System.out.println(queue.remove());
    System.out.println(queue.remove());
    System.out.println(queue.remove());
    //如果多移除一个
    //这也会造成 java.util.NoSuchElementException 抛出异常
    System.out.println(queue.remove());
}

  • 不会抛出异常,有返回值

    @Test
    public void test02(){
        //需要初始化队列的大小
        ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
        System.out.println(queue.offer("cvzhanshi1"));
        System.out.println(queue.offer("cvzhanshi2"));
        System.out.println(queue.offer("cvzhanshi3"));
        //添加 一个不能添加的元素 使用offer只会返回false 不会抛出异常
        System.out.println(queue.offer("cvzhanshi4"));
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        //弹出 如果没有元素 只会返回null 不会抛出异常
        System.out.println(queue.poll());
    }
    

  • 阻塞 等待

    @Test
    public void test03() throws InterruptedException {
        //需要初始化队列的大小
        ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
    
        //一直阻塞 不会返回
        queue.put("cvzhanshi1");
        queue.put("cvzhanshi2");
        queue.put("cvzhanshi3");
    
        //如果队列已经满了, 再进去一个元素  这种情况会一直等待这个队列 什么时候有了位置再进去,程序不会停止
        queue.put("a");
    
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        //如果我们再来一个  这种情况也会等待,程序会一直运行 阻塞
        System.out.println(queue.take());
    }
    
  • 超时 等待

    @Test
    public void test4() throws InterruptedException {
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        blockingQueue.offer("a");
        blockingQueue.offer("b");
        blockingQueue.offer("c");
        System.out.println("开始等待");
        blockingQueue.offer("d",2, TimeUnit.SECONDS);  //超时时间2s 等待如果超过2s就结束等待
        System.out.println("结束等待");
        System.out.println("===========取值==================");
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println("开始等待");
        blockingQueue.poll(2,TimeUnit.SECONDS); //超过两秒 我们就不要等待了
        System.out.println("结束等待");
    }
    

10.3 SynchronousQueue同步队列

没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素

代码示例:

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue queue = new SynchronousQueue<>();  //同步队列

        //使用两个进程
        // 一个进程 放进去
        // 一个进程 拿出来
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+" Put 1");
                queue.put("1");
                System.out.println(Thread.currentThread().getName()+" Put 2");
                queue.put("2");
                System.out.println(Thread.currentThread().getName()+" Put 3");
                queue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


        },"A").start();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+" Take "+queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+" Take "+queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+" Take "+queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B").start();
    }
}

11、线程池

线程池:三大方法、七大参数、四种拒绝策略

下面的示例业务图:

池化技术

程序的运行的本质就是占用系统的资源,为了优化资源的使用 ==> 池化技术(线程池、连接池、对象池、内存池)

池化技术:事先准备好一些资源,有人要用,就到池里拿,用完之后还给池

线程池的好处:

  • 降低资源的消耗
  • 提高响应的速度
  • 方便管理

线程复用、可以控制最大并发数、方便管理线程

线程池:三大方法

  • ExecutorService threadPool = Executors.newSingleThreadExecutor(); //单个线程
  • ExecutorService threadPool = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
  • ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸缩的,遇强则强,遇弱则弱
public class Demo01 {
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newSingleThreadExecutor(); 	//单个线程
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);         //创建一个固定的线程池的大小
        ExecutorService threadPool = Executors.newCachedThreadPool();      //可伸缩的,遇强则强,遇弱则弱
        try {
            for (int i = 0; i < 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName()+ " ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

线程池:七大参数

源码分析

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new linkedBlockingQueue()));
}

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new linkedBlockingQueue());
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());
}
// 本质:调用ThreadPoolExecutor()
// 分析源码
public ThreadPoolExecutor(int corePoolSize,      // 核心线程池大小
                          int maximumPoolSize,   // 最大核心线程池大小
                          long keepAliveTime,	 // 超时了没有人用就会释放
                          TimeUnit unit,		 // 超时单位
                          BlockingQueue workQueue, // 阻塞队列
                          ThreadFactory threadFactory,		 // 线程工厂,创建线程的,一般不用动
                          RejectedExecutionHandler handler   // 拒绝策略) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

对于Integer.MAX_VALUE初始值较大,所以一般情况我们要使用底层的ThreadPoolExecutor来创建线程池

自定义线程池



public class Demo02 {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new linkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        try {
            for (int i = 0; i < 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName()+ " ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

线程池:四种拒绝策略

  • new ThreadPoolExecutor.AbortPolicy(): => 该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常,超出最大承载,就会抛出异常:队列容量大小+maxPoolSize
  • new ThreadPoolExecutor.CallerRunsPolicy():=> 该拒绝策略为:哪来的去哪里 main线程进行处理
  • new ThreadPoolExecutor.DiscardPolicy(): => 该拒绝策略为:队列满了,丢掉异常,不会抛出异常。
  • new ThreadPoolExecutor.DiscardOldestPolicy():=> 该拒绝策略为:队列满了,尝试去和最早的进程竞争,不会抛出异常

小结与拓展

如何去设置线程池的最大大小如何去设置?

CPU密集型和IO密集型!

  • CPU密集型:电脑的核数是几核就选择几;选择maximunPoolSize的大小
  • **I/O密集型:**在程序中有15个大型任务,io十分占用资源;I/O密集型就是判断我们程序中十分耗I/O的线程数量,线程池大小大约是最大I/O数的一倍到两倍之间。
Runtime.getRuntime().availableProcessors()  // 获取CPU的核数
12、四大函数式接口

函数式接口:有且只有一个方法的接口

// Runnable
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
//超级多的@FunctionalInterface
//简化编程模型,在新版本的框架底层大量应用
//foreach()的参数也是一个函数式接口,消费者类的函数式接口

函数型接口可以使用lambda表达式

代码测试:

Function函数式型接口:有一个输入参数,有一个输出参数


public class Demo01 {
    public static void main(String[] args) {
//        Function function = new Function() {
//            @Override
//            public String apply(String str) {
//                return str;
//            }
//        };
        Function function = (str) -> {return str;};
        System.out.println(function.apply("cvzhanshi"));
    }
}

Predicate断定型接口:只有一个输入参数,返回值只能是 布尔值



public class Demo02 {
    public static void main(String[] args) {
        // 判断字符串是否为空
//        Predicate predicate = new Predicate(){
//            @Override
//            public boolean test(String str) {
//                return str.isEmpty();
//            }
//        };
        Predicate predicate = (str) -> {return str.isEmpty();};
        System.out.println(predicate.test("cvzhanshi"));
    }
}

Consumer 消费型接口:只有输入,没有返回值

public class Demo03 {
    public static void main(String[] args) {
//        Consumer consumer = new Consumer(){
//            @Override
//            public void accept(String str) {
//                System.out.println(str);
//            }
//        };

        Consumer consumer = (str) -> { System.out.println(str);};
        consumer.accept("cvzhanshi");
    }
}

Supplier 供给型接口 没有参数,只有返回值



public class Demo04 {
    public static void main(String[] args) {
//        Supplier supplier = new Supplier() {
//            @Override
//            public Integer get() {
//                return 1024;
//            }
//        };

        Supplier supplier = () -> { return 1024; };
        System.out.println(supplier.get());
    }
}
13、Stream流式计算

什么是Stream流式计算?

大数据:存储 + 计算

集合、MySQL的本质就是存储

计算都应该交给流来操作

代码示例:

题目要求:一分钟内完成此题,只能用一行代码实现!

现在有6个用户!筛选

  • ID必须是偶数
  • 年龄必须大于23
  • 用户名转为大写
  • 用户名字母倒序排序
  • 只输出一个用户
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private Integer id;
    private String name;
    private Integer age;
}




public class Test {
    public static void main(String[] args) {
        User user1 = new User(1,"a",21);
        User user2 = new User(2,"b",22);
        User user3 = new User(3,"c",23);
        User user4 = new User(4,"d",24);
        User user5 = new User(5,"e",25);
        User user6 = new User(6,"f",26);

        // 集合就是存储
        List list = Arrays.asList(user1, user2, user3, user4, user5, user6);

        // 计算交给Stream流
        list.stream()
                .filter(user -> {return user.getId() % 2 == 0;})
                .filter(user -> {return user.getAge() > 23;})
                .map(user -> {return new User(user.getId(),user.getName().toUpperCase(),user.getAge());})
                .sorted((u1,u2) -> {return u2.getName().compareTo(u1.getName());})
                .limit(1)
                .forEach(System.out::println);
    }
}

14、ForkJoin

什么是ForkJoin

ForkJoin主要用于并发执行任务,提高效率,大数据量。

大任务分成小任务,最后把小结果合并成大结果

ForkJoin 特点:工作窃取

ForkJoin 维护的是双端队列

A、B线程执行任务,B更快执行完,他会去A那窃取任务过来执行,可以提高效率

ForkJoin 操作

查看ForkJoinPool

查看ForkJoinTask

查看直接子类RecursiveTask或RecursiveAction,可以看到使用方式

如何使用forkjoin

  • ForkJoinPool 通过它来执行
  • 计算任务 ForkJoin .execute(ForkJoinTask task)
  • 计算类要继承ForkJoinTask

代码测试:

计算类


public class ForkJoinDemo extends RecursiveTask {
    private Long start;
    private Long end;

    private Long temp = 100000L;// 切割任务的临界值

    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }
    //计算方法
    @Override
    protected Long compute() {
        if((end-start) < temp){
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }else {
            //使用forkJoin 分而治之 计算
            //计算平均值
            long middle = (start + end) / 2;
            ForkJoinDemo leftTask = new ForkJoinDemo(start, middle);
            leftTask.fork();  //拆分任务,把线程任务压入线程队列
            ForkJoinDemo rightTask = new ForkJoinDemo(middle, end);
            rightTask.fork();  //拆分任务,把线程任务压入线程队列
            long taskSum = leftTask.join() + rightTask.join();
            return taskSum;
        }
    }
}

测试类

public class Test {

    // 普通的计算
    @org.junit.Test
    public void test01(){
        Long sum = 0L;
        long start = System.currentTimeMillis();
        for (Long i = 1L; i <= 10_0000_0000; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println("sum = " + sum + " 时间:" + (end - start)); // 8543ms
    }

    // ForkJoin的计算
    @org.junit.Test
    public void test02() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        ForkJoinDemo task = new ForkJoinDemo(0L, 10_0000_0000L);
        ForkJoinPool joinPool = new ForkJoinPool();
        ForkJoinTask submit = joinPool.submit(task);
        Long sum = submit.get();
        long end = System.currentTimeMillis();
        System.out.println("sum = " + sum + " 时间:" + (end - start)); // 5623ms
    }


    // Stream并行流计算
    @org.junit.Test
    public void test03() {
        long start = System.currentTimeMillis();
        long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
        long end = System.currentTimeMillis();
        System.out.println("sum = " + sum + " 时间:" + (end - start)); // 910ms
    }
}
15、异步回调

Future 设计的初衷:对将来的某个事件的结果进行建模

类似于是前端 --> 发送ajax异步请求给后端

一般都使用CompletableFuture

没有返回值的runAsync异步回调

public class Demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 发起一个请求
        CompletableFuture completableFuture =CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
        });
        System.out.println("111111111111111111111111");
        completableFuture.get(); // 获取阻塞执行结果
    }
}

有返回值的异步回调supplyAsync

public class Demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 发起一个请求
        CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
            int i = 10 / 0;
            return 1024;
        });
        // 得到返回值
        Integer result = completableFuture.whenComplete((t, u) -> {
            System.out.println("t => " + t);  // 正常的返回结果
            System.out.println("u => " + u);  // 错误信息:java.util.concurrent.CompletionException:java.lang.ArithmeticException: / by zero
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 5000;
        }).get();

        System.out.println(result);

    }
}
  • 正常的返回结果

  • 出现异常,返回错误信息

whenComplete: 有两个参数,一个是t 一个是u

T:是代表的 正常返回的结果

U:是代表的 抛出异常的错误信息

如果发生了异常,get可以获取到exceptionally返回的值

并发编程JUC(下)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/270549.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号