栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java8异步编程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java8异步编程

一、引言

参考Java多线程几种实现方式,我们可以很快速的通过new Thread(...).start()开启一个新的线程,但是这样创建线程会有很多坏处:

  • 每次都要新建一个对象,性能差;

  • 建出来的很多个对象是独立的,缺乏统一的管理。如果在代码中无限新建线程会导致这些线程相互竞争,占用过多的系统资源从而导致死机或者 oom ;

  • 缺乏许多功能如定时执行、中断等。

因此Java给我们提供好一个十分好用的工具,那就是线程池

二、Java线程池 1、Java线程池概述

Java提供了一个工厂类来构造我们需要的线程池,这个工厂类就是 Executors 。这里主要讲4个创建线程池的方法,即

  • newCachedThreadPool()

  • newFixedThreadPool(int nThreads)

  • newScheduledThreadPool(int corePoolSize)

  • newSingleThreadExecutor()

2、newCachedThreadPool()

创建缓存线程池。缓存的意思就是这个线程池会根据需要创建新的线程 ,在有新任务的时候会优先使用先前创建出的线程。线程一旦创建了就一直在这个池子里面了,执行完任务后后续还有任务需要会重用这个线程 ,若是线程不够用了再去新建线程 。

ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    final int index = i;

    // 每次发布任务前根据奇偶不同等待一段时间,如1s,这样就会创建两个线程
    if (i % 2 == 0) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 执行任务
    cachedThreadPool.execute(() -> System.out.println(Thread.currentThread().getName() + ":" + index));

注意这里的线程池是无限大的 ,并没有规定他的大小

3、newFixedThreadPool(int nThreads)

创建定长线程池,参数是线程池的大小。也就是说,在同一时间执行的线程数量只能是 nThreads 这么多,这个线程池可以有效的控制最大并发数从而防止占用过多资源。超出的线程会放在线程池的一个无界队列里等待 其他线程执行完。

ExecutorService executorService = Executors.newFixedThreadPool(5);
4、newScheduledThreadPool(int corePoolSize)

第3个坏处线程池的坏处就是缺乏定时执行功能,这个Scheduled代表是支持的,这个线程池也是定长的,参数 corePoolSize 就是线程池的大小,即在空闲状态下要保留在池中的线程数量。而要实现调度需要使用这个线程池的 schedule() 方法

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
// 三秒后执行
scheduledExecutorService.schedule(() -> System.out.println(Thread.currentThread().getName() + ": 我会在3秒后执行。"),
                3, TimeUnit.SECONDS);

5、newSingleThreadExecutor()

创建单线程池 ,只使用一个线程 来执行任务。但是它与 newFixedThreadPool(1, threadFactory) 不同,它会保证创建的这个线程池不会被重新配置为使用其他的线程 ,也就是说这个线程池里的线程始终如一。

ExecutorService executorService = Executors.newSingleThreadExecutor();
6、线程池的关闭

线程池启动后需要手动关闭,否则会一直不结束

  • shutdown() : 将线程池状态置成 SHUTDOWN,此时不再接受新的任务 ,等待线程池中已有任务执行完成后结束 ;

  • shutdownNow() : 将线程池状态置成 SHUTDOWN,将线程池中所有线程中断 (调用线程的 interrupt() 操作),清空队列,并返回正在等待执行的任务列表 。

并且它还提供了查看线程池是否关闭和是否终止的方法,分别为 isShutdown() 和 isTerminated() 。

三、几种函数式接口 1、简介

函数式接口可以参考java8常用新特性,这里主要介绍几种函数式接口,Callable、Runnable、Future、CompletableFuture和FutureTask

2、Callable和Runnable异同

两个接口的定义

@FunctionalInterface
public interface Runnable {
    
    public abstract void run();
}

@FunctionalInterface
public interface Callable {
    
    V call() throws Exception;
}

相同点

都是接口,都可以编写多线程程序,都可以通过线程池启动线程

不同点

Runnable没有返回值,Callable可以返回执行结果,是个泛型;Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;

public class Test1 {

    static class Min implements Callable {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            return 4;
        }
    }
    static class Max implements Runnable{
        @SneakyThrows
        @Override
        public void run() {
            Thread.sleep(1000);
        }
    }
    
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new Min());
        executorService.submit(new Max());
    }
3、Future类
//Since:1.5
public interface Future {
    //取消任务的执行,参数指定是否立即中断任务执行,或者等等任务结束
    boolean cancel(boolean mayInterruptIfRunning);
    //任务是否已经取消,任务正常完成前将其取消,则返回 true
    boolean isCancelled();
    //任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
    boolean isDone();
    //等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException
    V get() throws InterruptedException, ExecutionException;
    //同上面的get功能一样,多了设置超时时间。超时会抛出TimeoutException
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

一般情况下,我们会结合Callable和Future一起使用,通过ExecutorService的submit方法执行Callable,并返回Future。

 public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        ExecutorService executor = Executors.newCachedThreadPool();
        //Lambda 是一个 callable, 提交后便立即执行,这里返回的是 FutureTask 实例
        Future future = executor.submit(() -> {
            System.out.println("running task");
            Thread.sleep(10000);
            return "return task";
        });
        future.get(2, TimeUnit.SECONDS);
    }

当然Future模式也有它的缺点,它没有提供通知的机制,我们无法得知Future什么时候完成。如果要在future.get()的地方等待future返回的结果,那只能通过isDone()轮询查询。

4、CompletableFuture类

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

CompletableFuture的静态工厂方法,其中runAsync 和 supplyAsync 方法的区别是runAsync返回的CompletableFuture是没有返回值的。

方法名描述
runAsync(Runnable runnable)使用ForkJoinPool.commonPool()作为它的线程池执行异步代码
runAsync(Runnable runnable, Executor executor)使用指定的thread pool执行异步代码
supplyAsync(Supplier supplier)使用ForkJoinPool.commonPool()作为它的线程池执行异步代码,异步操作有返回值
supplyAsync(Supplier supplier, Executor executor)使用指定的thread pool执行异步代码,异步操作有返回值
allOf(CompletableFuture… cfs)等待所有任务完成,构造后CompletableFuture完成
anyOf(CompletableFuture… cfs)只要有一个任务完成,构造后CompletableFuture就完成

对于变量的方法,常用有这几种方法

方法名描述
complete(T t)完成异步执行的话返回执行结果,若不是返回设置的值
completeExceptionally(Throwable ex)完成异步执行的话返回执行结果,若不是返回一个异常
thenApply(Function fn)返回一个新的CompletionStage,当这个阶段正常完成时,它将以这个阶段的结果作为所提供函数的参数执行
thenAccept(Consumer action)返回一个新的CompletionStage,当这个阶段正常完成时,它将以这个阶段的结果作为提供的操作的参数执行。
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello");
        //在这里执行返回值为World
        // future.complete("World");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //在这里执行结果为Hello
        future.complete("World");

        try {
            //get() 方法会抛出经检查的异常,可被捕获,自定义处理或者直接抛出。join() 会抛出未经检查的异常。
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
//thenApplyhello shawn
// thenAccept shawn
public static void main(String[] args) throws InterruptedException, ExecutionException {
    //thenApply
    CompletableFuture cfuture =
            CompletableFuture.supplyAsync(() -> "shawn").thenApply(data -> "hello "+ data);
    System.out.println("thenApply" + cfuture.get());
    //thenAccept
    CompletableFuture.supplyAsync(() -> "thenAccept shawn").thenAccept(System.out::println);
}
5、FutureTask

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/FutureTask.html

Future是一个接口,是无法生成一个实例的,所以又有了FutureTask。FutureTask实现了RunnableFuture接口,RunnableFuture接口又实现了Runnable接口和Future接口。所以FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。


参考文章

https://juejin.cn/post/6970607007669682212

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Callable.html#call

https://www.jianshu.com/p/dff9063e1ab6

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/602891.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号