栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ThreadPoolTaskExecutor同时自定义线程拒绝策略,防止线程太多造成线程池将任务丢弃

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ThreadPoolTaskExecutor同时自定义线程拒绝策略,防止线程太多造成线程池将任务丢弃

@Bean("lcAsyncServiceExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);
    threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);
    //根据业务场景设置队列长度
    threadPoolTaskExecutor.setQueueCapacity(400);
    //允许线程的空闲时间60秒:当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
    threadPoolTaskExecutor.setKeepAliveSeconds(60);
    threadPoolTaskExecutor.setThreadNamePrefix("BPExecutor-");
    threadPoolTaskExecutor.setRejectedExecutionHandler(new BPConfig.BuriedPointRunsPolicy());
    return threadPoolTaskExecutor;
}


public static class BPRunsPolicy implements RejectedExecutionHandler {
    public BPRunsPolicy() { }
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if(r instanceof  BPTaskManager.BPThread){
            BPTaskManager.BPThread bPThread = (BPTaskManager.BPThread)r;
            //先做存储再更新状态
            bPThread.markBPReport();
            bPThread.failBPReport();
BlockingQueue queue = bPTaskManager.getThreadPoolTaskExecutor().getThreadPoolExecutor().getQueue();
queue.stream().forEach(qu->{BPTaskManager.BPThread thread=(BPTaskManager.BPThread)qu;
    if("69d".equals(thread.bPLogEntityBody.getBPInfo().buriId)){
        log.info("我是true======{}",thread.bPLogEntityBody.getBPInfo().buriedId);
    }else{
        log.info("我是false====={}",thread.bPLogEntityBody.getBPInfo().buriedId);
    }      
});
        }
    }
}

1.TransmittableThreadLocal中TtlRunnable使用spring框架中的ThreadPoolTaskExecutor线程池,如果自定义线程池不加名字时会和TtlRunnable使用同一个spring的ThreadPoolTaskExecutor线程池,并会将任务交给ThreadPoolTaskExecutor线程池处理任务。

2.请求任务太多时,连阻塞队列都放不下时,线程池会直接做丢弃,此时应

判断超过阻塞队列内的任务数1半时,将请求线程休眠1s钟,伪代码如下:

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(200);
QueryBuilder matchQueryBuilder = QueryBuilders.rangeQuery("body.bPInfo.reqInfo.reqTime").gte(begin).lte(end);
searchSourceBuilder.query(matchQueryBuilder);
searchSourceBuilder.sort("body.bPInfo.reqInfo.reqTime", SortOrder.ASC);
List bPLogPage = new ArrayList<>(1000);//list在外面建,不用每次查询再建,指定初始值
ScrollDto scrollDto = new ScrollDto();//持有scrollId
do{
bPLogPage.clear();
while (bPTaskManager.getThreadPoolTaskExecutor().getThreadPoolExecutor().getQueue().size() > 200){
//休眠一秒
    try{
        Thread.sleep(1000);
    }catch (Exception e){}
}
//从es中查询出一批数据处理一批数据,防止OOM内存溢出logService.getDocByDate(bPLogEntity,searchSourceBuilder,bPLogPage,scrollDto);
if(CollectionUtils.isNotEmpty(bPLogPage)){
    bPLogPage.stream().forEach(bP -> {
        if(null!=bP.getBody() && null != bP.getBody().getBPInfo()){
            //将查询的数据推送给线程执行器
            bPTaskManager.pushTask(bP.getBody());
        }
    });
}
}while(null !=bPLogPage && bPLogPage.size() > 0)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/643796.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号