栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Springboot中的异步处理框架@Async

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Springboot中的异步处理框架@Async

在SpringBoot的日常开发中,一般都是同步调用的,但经常有特殊业务需要做异步来处理。比如:注册用户、需要送积分、发短信和邮件、或者下单成功、发送消息等等。

第一个原因:容错问题,如果送积分出现异常,不能因为送积分而导致用户注册失败。第二个原因:提升性能,比如注册用户花了30毫秒,送积分划分50毫秒,如果同步的话一共耗时:70毫秒,用异步的话,无需等待积分,故耗时是:30毫秒就完成了业务。 异步,同步的区别

同步:按顺序执行,前面结束了,后面才开始,是一个串行调用
异步:按顺序执行,但是后面拿到开始不用等待前面结束

举个例子:用户注册,那就送积分和发短信

同步 逻辑

用户注册:50MS 短信发送:100ms 、添加积分:100ms 总时长:250ms

代码
package com.kuangstudy.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;



@RestController
@Slf4j
public class RegController {

    
    @GetMapping("/reg")
    public String reguser(){
        // 1: 注册用户
        log.info("新用户注册");
        //userService.save(user);

        // 2: 发送短信
        log.info("发送短信");
        //messageService.sendMsg(user);

        // 3: 添加积分
        log.info("添加积分");
        //scoreService.addScore(user);

        return "ok";
    }
    
 
}

异步 逻辑


分析:它执行用户注册的执行时长,并不会因为短信发送、添加积分收到影响。执行时间:>50ms

代码

第一步:开启异步支持(在启动类或者线程池配置类上加注解)

package com.kuangstudy;

import com.kuangstudy.config.WeixinPayProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableAsync;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@SpringBootApplication
@EnableAsync //开启异步执行
public class KuangstudySpringbootProApplication {

    public static void main(String[] args) {
        SpringApplication.run(KuangstudySpringbootProApplication.class, args);
    }

}

第二步:方法上添加@Async
让方法变成异步方法

package com.kuangstudy.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;


@Service
@Slf4j
public class RegService {


    // 发送短信,用异步进行处理和标记
    @Async
    public void sendMsg(){
        // todo :模拟耗时5秒
        try {
            Thread.sleep(5000);
            log.info("---------------发送消息--------");
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }


    // 添加积分,用异步进行处理和标记
    @Async
    public void addScore(){
        // todo :模拟耗时5秒
        try {
            Thread.sleep(5000);
            log.info("---------------处理积分--------");
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}

第三步:
调用异步任务

package com.kuangstudy.controller;

import com.kuangstudy.service.RegService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;



@RestController
@Slf4j
public class RegController {


    @Autowired
    private RegService regService;

    
    @GetMapping("/reg")
    public String reguser(){
        // 1: 注册用户 10ms
        log.info("新用户注册");
        //userService.save(user);

        // 2: 发送短信 5s
        log.info("发送短信");
        regService.sendMsg();

        // 3: 添加积分 5s
        log.info("添加积分");
        regService.addScore();

        return "ok";
    }
}

执行的时间很快,因为添加积分和发送短信都是额外的开启了新的线程去执行。所有异步编程的性能是很快的。

异步线程池的优化

Springboot的tomcat的线程默认数量:200个,如果异步线程线程过多,有请求线程、异步处理的线程这个时候,这么线程都在争抢CPU的执行时间。这样很耗费资源 ;而且@ Async注解默认情况下用的是SimpleAsyncTaskExecutor线程池.[。该线程池不是真正意义上的线程

不是真正意义上的线程,因为线程不重用,每次调用都会新建一个新的线程。

通过上面的日志分析获得结论:【task-1】,【task-2】,【task-3】….递增。

这样每次都创建一个新的线程,太小太消耗资源了,所有我们要把@ Async使用的线程池变成真正意义下的线程池(不会每次都创建)。

@Async(这是我的另一文章)注解异步框架提供多种线程机制:

SimpleAsyncTaskExecutor:简单的线程池,这个类不重用线程,每次调用都会创建一个新的线程。SyncTaskExecutor:这个类没实现异步调用,只是一个同步操作,只适合用于不需要多线程的地方。ConcurrentTaskExecutor:Executor的适配类,不推荐使用.。ThreadPoolTaskScheduler:可以和cron表达式使用。ThreadPoolTaskExecutor:最常用,推荐,其本质就是:java.util.concurrent.ThreadPoolExecutor的包装

从上可以看出,我们推荐使用最后面的哪个线程池ThreadPoolTaskExecutor

优化

优化的就是定义一个ThreadPoolTaskExecutor线程池的配置类,把默认的 SimpleAsyncTaskExecutor线程池覆盖掉。从而让他不会每次都创建新的线程。

其实就是定义一个线程池,spring机制就会自动覆盖掉异步以前默认端线程池。

package com.kuangstudy.config;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class SyncThreadPoolConfiguration {

    

    @Bean(name="threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor getThreadPoolTaskExecutor(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        // 1: 创建核心线程数 cpu核数 -- 50  
        threadPoolTaskExecutor.setCorePoolSize(10);
        // 2:线程池维护线程的最大数量,只有在缓存队列满了之后才会申请超过核心线程数的线程
        threadPoolTaskExecutor.setMaxPoolSize(100);
        // 3:缓存队列 可以写大一点无非就浪费一点内存空间
        threadPoolTaskExecutor.setQueueCapacity(200); 
        // 4:线程的空闲事件,当超过了核心线程数之外的线程在达到指定的空闲时间会被销毁 200ms
        threadPoolTaskExecutor.setKeepAliveSeconds(200);
        // 5:异步方法内部线的名称
        threadPoolTaskExecutor.setThreadNamePrefix("ksdsysn-thread-");
        // 6:缓存队列的策略  多线程 JUC并发
        
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

思考

用户注册的核心点:就是把用户注册到数据库中。至于你短信有发送成功,有影响。其实不影响?

问:短信发布不成功,那不是注册失败吗?

我们用短信的目的其实就是为了:校验手机的合法性以及激活仅此而已。如果收不到其实不应该去影响注册用户。大不了。我在发一次,或者改天在来发送。但是至少不会让你重新在去填写。

在处理与第三方系统交互的时候,容易造成响应迟缓的情况,之前大部分都是使用多线程来完成此类任务,其实,在spring 3.x之后,就已经内置了@Async来完美解决这个问题,本文将完成介绍@Async的用法。

区分同步和异步调用的区别

同步:按顺序执行,前面结束了,后面才开始,是一个串行调用
异步:按顺序执行,但是后面拿到开始不用等待前面结束

常规的异步调用方法

在Java中,一般在处理类似的场景之时,都是基于创建独立的线程去完成相应的异步调用逻辑,通过主线程和不同的线程之间的执行流程,从而在启动独立的线程之后,主线程继续执行而不会产生停滞等待的情况。

@Async介绍

在Spring中,基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。
@EnableAsync就可以使用多线程。使用@Async就可以定义一个线程任务

如何使用@Async 无返回值的异步任务

第一步:在定义了线程池的属性类上加@EnableAsync,开启对异步的支持,也就是多线程的使用

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
	private static final int corePoolSize = 10;     // 核心线程数(默认线程数)
	private static final int maxPoolSize = 100; // 最大线程数
	private static final int keepAliveTime = 10;// 允许线程空闲时间(单位:默认为秒)
	private static final int queueCapacity = 200;// 缓冲队列数
	private static final String threadNamePrefix = "Async-Service-"; // 线程池名前缀
	@Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
	public ThreadPoolTaskExecutor getAsyncExecutor(){
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(corePoolSize);   
		executor.setMaxPoolSize(maxPoolSize);
		executor.setQueueCapacity(queueCapacity);
		executor.setKeepAliveSeconds(keepAliveTime);
		executor.setThreadNamePrefix(threadNamePrefix);
		
		// 线程池对拒绝任务的处理策略
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		// 初始化
		executor.initialize();
		return executor;
	}
}

第二步:需要异步的方法(任务)上面加上@Async

@Service
public class testAsyncService {
	Logger log = LoggerFactory.getLogger(testAsyncService.class);
 
	// 发送提醒短信 1
	@Async("taskExecutor") // 给线程池命名了就得加上注解里面的属性,没有就不需要
	public void service1() throws InterruptedException {
		log.info("--------start-service1------------");
		Thread.sleep(5000); // 模拟耗时
	    log.info("--------end-service1------------");
	}
	
	// 发送提醒短信 2
	@Async("taskExecutor")
	public void service2() throws InterruptedException {
		log.info("--------start-service2------------");
		Thread.sleep(2000); // 模拟耗时
	    log.info("--------end-service2------------");
	}
}

@Async注解来声明一个或多个异步任务,可以加在方法或者类上,加在类上表示这整个类都是使用这个自定义线程池进行操作
--------start-service1------------
--------start-service2------------
--------end-service2------------
--------end-service1------------
可以说明我们的异步运行成功了

以上是无返回值的异步任务,使用起来很简单,那有返回值的异步认为u如何处理呢?

有返回值的异步任务

对于有返回值的异步任务,我们需要注意以下两点:

方法的返回值需要被Future包起来,比如public Future aaa(){}方法体内部的的return 结果,需要创建一个AsyncResult对象,比如 return new AsyncResult(“hello world !!!”);

代码:

// 异步方法
 @Async
 public Future asyncMethodWithReturnType() {
     System.out.println("Execute method asynchronously - "
       + Thread.currentThread().getName());
     try {
         Thread.sleep(5000);
         return new AsyncResult("hello world !!!!"); // 返回
     } catch (InterruptedException e) {
	  	 return null;
     }
 }
 public void testAsyncAnnotationForMethodsWithReturnType()
    throws InterruptedException, ExecutionException {
     System.out.println("Invoking an asynchronous method. "
       + Thread.currentThread().getName());
     Future future = asyncAnnotationExample.asyncMethodWithReturnType();
     while (true) {  ///这里使用了循环判断,等待获取结果信息
         if (future.isDone()) {  //判断是否执行完毕
             System.out.println("Result" + future.get());
             break;
         }
         System.out.println("Continue doing something else. ");
         Thread.sleep(1000);
     }
 }

这些获取异步方法的结果信息,是通过不停的检查Future的状态来获取当前的异步方法是否执行完毕来实现的

会让@Async注解失效的情况

异步方法使用static修饰

异步类没有使用@Component注解(或其他注解)导致spring无法扫描到异步类(因为@Async是spring的注解)

类中需要使用@Autowired或@Resource等注解自动注入,不能自己手动new对象(就以上例来说,得注入service,而不能new)

如果使用SpringBoot框架必须在启动类中/或者线程池固定属性类中,增加@EnableAsync注

在Async 方法上标注@Transactional是没用的。 在Async 方法调用的方法上标注@Transactional 有效。

调用被@Async标记的方法的调用者不能和被调用的方法在同一类中不然不会起作用!!!!!!!

使用@Async时要求是不能有直接的返回值(void或者futre)的不然会报错的 因为异步要求是不关心结果的

如何解决事务和异步之间的矛盾

在@Async标注的方法,同时也适用了@Transactional进行了标注;在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的操作。
那该如何给这些操作添加事务管理呢?可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional.
例如:
方法A,使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。
方法B,使用了@Async来标注, B中调用了C、D,C/D分别使用@Transactional做了标注,则可实现事务控制的目的。

拓展:线程池的另外一种配置

下面关于线程池的配置还有一种方式,就是直接实现AsyncConfigurer接口,重写getAsyncExecutor方法即可,代码如下

@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {

     @Override
     public Executor getAsyncExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(7);
         executor.setMaxPoolSize(42);
         executor.setQueueCapacity(11);
         executor.setThreadNamePrefix("MyExecutor-");
         executor.initialize();
         return executor;
     }

     @Override
     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
         return new MyAsyncUncaughtExceptionHandler();
     }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/703598.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号