栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

异步任务模板

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

异步任务模板

这儿都是满满的干货!

从来不多说废话,直接上代码 ,拿来皆可用的那种 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;


public class Threads {

    protected final static Logger log = LoggerFactory.getLogger(Threads.class);

    
    public static void shutdownAndAwaitTermination(ExecutorService pool) {
        if (pool != null && !pool.isShutdown()) {
            pool.shutdown();
            try {
                if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
                    pool.shutdownNow();
                    if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
                        log.info("Pool did not terminate");
                    }
                }
            } catch (InterruptedException ie) {
                pool.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;


@Component
public class ShutdownManager {
    protected final static Logger log = LoggerFactory.getLogger(ShutdownManager.class);

    @PreDestroy
    public void destroy() {
        shutdownAsyncManager();
    }

    
    private void shutdownAsyncManager() {
        try {
            log.info("关闭后台任务任务线程池");
            AsyncManager.getInstance().shutdown();
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}

import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class AsyncManager {
    
    private int corePoolSize = 8;

    
    private final int OPERATE_DELAY_TIME = 10;

    
    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(corePoolSize,
            new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build()) {
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            //线程执行之后
            super.afterExecute(r, t);
        }
    };

    // 单例模式
    private AsyncManager() {

    }

    private static AsyncManager asyncManager = new AsyncManager();

    public static AsyncManager getInstance() {
        return asyncManager;
    }

    
    public void executor(Runnable runnable) {
        executor.schedule(runnable, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);
    }

    
    public void shutdown() {
        Threads.shutdownAndAwaitTermination(executor);
    }
}

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Queues;
import com.yw.processplan.core.constant.StompMappingConstant;
import com.yw.processplan.core.utlis.StringUtils;
import com.yw.processplan.core.websocket.StompMessagePusher;
import com.yw.processplan.module.power.dao.MessageMapper;
import com.yw.processplan.module.power.model.Performer;
import com.yw.processplan.module.power.model.entity.SendMessageEntity;
import com.yw.processplan.module.process.model.dto.WebSocketAccountDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;


@Component
public class SendMessageActuator {
    protected static final Logger LOGGER = LoggerFactory.getLogger(SendMessageActuator.class);

    @Autowired
    private TaskScheduler taskScheduler;


    
    private static linkedBlockingQueue ACCOUNT_BLOCK_QUEUE = Queues.newlinkedBlockingQueue(200);

    
    private static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MICROSECONDS,
            new linkedBlockingQueue<>(200), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());


    @PostConstruct
    public void doDispose() {
        taskScheduler.scheduleAtFixedRate(() -> {
            WebSocketAccountDTO accountDTO = ACCOUNT_BLOCK_QUEUE.poll();
            if (accountDTO == null) {
                return;
            }
            CompletableFuture future = CompletableFuture.runAsync(() -> {
                doProcess(accountDTO); // 异步任务具体处理
            }, POOL_EXECUTOR);
            future.thenAccept((U) -> {
                LOGGER.info("success...............");
            });
        }, 1000);
    }

    private void doProcess(WebSocketAccountDTO accountDTO) {
        
    }

    public static void offer(WebSocketAccountDTO dto) {
        if (ACCOUNT_BLOCK_QUEUE.contains(dto)) {
            return;
        }
        ACCOUNT_BLOCK_QUEUE.offer(dto);
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/777746.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号