栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot生产项目中线程池的使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot生产项目中线程池的使用

文章目录
      • 背景
      • 分析
      • 技术点
      • 解决

背景

淘宝代打代发项目订单处理流程是这样子的: 通过订单列表接口获取订单号,通过订单详情接口获取商品信息
一个线程中轮训订单详情接口的效率是非常低的,所以需要在多线程中请求订单详情。

项目使用springboot开发

分析

实际项目中线程池的控制是需要统一控制的(如果每个团队成员可以随意的创建配置线程池,很快服务器的资源就会被耗尽了),然后按照模块设置线程池。
因为要使用各个订单的商品信息,所以需要在主线程拿到子线程的运行结果,基于方便的考虑 所以多线程通过CompletableFuture.supplyAsync实现。然后多个List sub转成一个CompletableFuture sum,sum.get()获取各个子单的商品信息。

技术点
  1. supplyAsyn用法
    CompletableFuture 一段异步执行的代码创建CompletableFuture对象,supplyAsync入参是Supplier, 所以可以拿到返回值
public static CompletableFuture 	runAsync(Runnable runnable)
public static CompletableFuture 	runAsync(Runnable runnable, Executor executor)
public static  CompletableFuture 	supplyAsync(Supplier supplier)
public static  CompletableFuture 	supplyAsync(Supplier supplier, Executor executor)

那么Supplier和Consumer,Runnable都有什么区别呢?
如无参数,请使用Supplier(Use Supplier if it takes nothing)
如无返回,请使用Consumer(Use Consumer if it returns nothing)
如两者都无,请使用Runnable(Use Runnable if it does neither)
如两者都有,请使用Function(Use Function if it does both)
如返回布尔值,请使用Predicate(Use Predicate if it returns a boolean)
如以上皆不可以,请使用自定义@FunctionalInteface(Use @FunctionalInteface if none of above works)

  1. allOf实现: 当所有的CompletableFuture都执行完后执行计算
    主线程等待子线程全部完成对订单详情接口的请求,接着将订单数据入库, 这一步需要实现一个CompletableFuture List转成一个CompletableFuture, 通过get方法实现线程等待
    public static  CompletableFuture> sequence(List> futures) {
        CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }
  1. thenApply 多个任务串联执行,下一个任务的执行依赖上一个任务的结果,每个任务都有输入和输出
public  CompletableFuture     thenApply(Function fn)
public  CompletableFuture     thenApplyAsync(Function fn)
public  CompletableFuture     thenApplyAsync(Function fn, Executor executor)

3例子

        CompletableFuture futureA = CompletableFuture.supplyAsync(() -> "hello");

        CompletableFuture futureB = futureA.thenApply(s->s + " world");

        CompletableFuture future3 = futureB.thenApply(String::toUpperCase);

		// 实际打印HELLO WORLD
        System.out.println(future3.join());
解决


核心代码

    @GetMapping("/poolSupplyAsync")
    @ApiOperation(value = "公用线程池CompletableFuture测试", httpMethod = "GET")
    public String poolSupplyAsync() throws InterruptedException {

        List> orders = new ArrayList<>();

        ThreadPoolTaskExecutor tradeExecutor= (ThreadPoolTaskExecutor) SpringUtils.getBean(ThreadPoolConfig.TRADE_EXECUTOR);
        
        // 通过订单号获取订单详情场景
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            CompletableFuture result = CompletableFuture.supplyAsync(()-> treadTasks.justReturn(finalI), tradeExecutor);
            orders.add(result);
        }
		// CompletableFuture列表转成CompletableFuture
        CompletableFuture> response =  sequence(orders);
        try {
            // 等待子进程全部执行完成
            List ageList= response.get();
            System.out.println("response:" + ageList);

            return "完成了" + ageList.stream().map(Object::toString)
                    .collect(Collectors.joining(", "));
        } catch (Exception e) {
            logger.error("多线程遇到异常{} exception:{}",e.getMessage(), e);
        }

        logger.info("结果 {}",response);
        return "执行完成";
    }
    
    public static  CompletableFuture> sequence(List> futures) {
        CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }

@Component
public class TreadTasks {
    Logger logger = LoggerFactory.getLogger(TreadTasks.class);

    @Async
    public void startMyThreadTask(int i) throws InterruptedException {
        Thread.sleep(10000);

        if (i >8){
            throw new RuntimeException("i超过限制了");
        }

        logger.info("{} 真正执行的方法 startMyThreadTask: {}", Thread.currentThread().getName(),i);
    }

    @Async
    public void executeAsyncTask2(Integer i){
        logger.info("{} 执行异步任务2 executeAsyncTask2: {}", Thread.currentThread().getName(),i);
    }


    public Integer justReturn(Integer id){
        logger.info("ThreadName:{}  justReturn Id:{}", Thread.currentThread().getName(), id);
        return id;
    }
}

定义线程池

package com.carsonlius.config;


import com.carsonlius.handler.SimpleAsyncUncaughtExceptionHandler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;


@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {
    public static final String TRADE_EXECUTOR = "tradeAsyncExecutor";

    private Logger logger = LoggerFactory.getLogger(ThreadPoolConfig.class);

    
    @Value("${thread.trade-pool-size:8}")
    private Integer tradePoolSize;

    
    @Value("${thread.max-trade-pool-size:16}")
    private Integer maxTradePoolSize;
    
    
    @Value("${thread.trade-thread-name-prefix:trade-thread-}")
    private String tradeThreadNamePrefix;

    @Value("${thread.core-pool-size:8}")
    private Integer corePoolSize;

    @Value("${thread.max-pool-size}")
    private Integer maxPoolSize;

    @Value("${thread.thread-queue-capacity}")
    private Integer queueCapacity;

    @Value("${thread.keep-alive-seconds}")
    private Integer keepAliveSeconds;

    @Value("${thread.thread-name-prefix:本地线程池}")
    private String threadNamePrefix;
    
    @Override
    public Executor getAsyncExecutor() {

        int num = Runtime.getRuntime().availableProcessors();
        logger.info("--------->  可用线程数{}", num);
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        // 队列大小
        taskExecutor.setQueueCapacity(queueCapacity);

        // 线程最大空闲时间
        taskExecutor.setKeepAliveSeconds(keepAliveSeconds);

        // 设置拒绝策略
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

        // 使用线程工厂
        taskExecutor.setThreadFactory(initThreadFactory(threadNamePrefix));

        return taskExecutor;
    }

    
    @Bean(TRADE_EXECUTOR)
    public Executor initTradeAsyncExecutor() {
        ThreadPoolTaskExecutor tradeExecutor = (ThreadPoolTaskExecutor) getAsyncExecutor();
        // 重新设置订单的核心数,最大线程数
        tradeExecutor.setCorePoolSize(tradePoolSize);
        tradeExecutor.setMaxPoolSize(maxTradePoolSize);
        tradeExecutor.setThreadFactory(initThreadFactory(tradeThreadNamePrefix));
        tradeExecutor.initialize();
        return tradeExecutor;
    }



    // 多线程派生线程未捕获异常处理
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return  new SimpleAsyncUncaughtExceptionHandler();
    }

    private ThreadFactory initThreadFactory(String prefixName) {
        return new ThreadFactoryBuilder()
                .setNameFormat(prefixName.concat("%d"))
                .setUncaughtExceptionHandler((thread, e) -> {
                    logger.error("ThreadPool Exception {} 发生异常", thread, e);
                }).build();
    }
}

实现ApplicationContextAware 快速的获取订单线程池(bean)

package com.carsonlius.utils;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import java.util.Map;

@Component
public class SpringUtils implements ApplicationContextAware {
    private static ApplicationContext applicationContext;
    private static ApplicationContext parentApplicationContext;

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Assert.notNull(applicationContext, "SpringUtil injection ApplicationContext is null");
        SpringUtils.applicationContext = applicationContext;
        parentApplicationContext = applicationContext.getParent();
    }

    public static Object getBean(String name) {
        Assert.hasText(name, "SpringUtil name is null or empty");
        try {
            return applicationContext.getBean(name);
        } catch (Exception e) {
            return parentApplicationContext.getBean(name);
        }
    }

    public static  T getBean(String name, Class type) {
        Assert.hasText(name, "SpringUtil name is null or empty");
        Assert.notNull(type, "SpringUtil type is null");
        try {
            return applicationContext.getBean(name, type);
        } catch (Exception e) {
            return parentApplicationContext.getBean(name, type);
        }
    }

    public static  T getBean(Class type) {
        Assert.notNull(type, "SpringUtil type is null");
        try {
            return applicationContext.getBean(type);
        } catch (Exception e) {
            return parentApplicationContext.getBean(type);
        }
    }

    public static  Map getBeansOfType(Class type) {
        Assert.notNull(type, "SpringUtil type is null");
        try {
            return applicationContext.getBeansOfType(type);
        } catch (Exception e) {
            return parentApplicationContext.getBeansOfType(type);
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号