这儿都是满满的干货!
从来不多说废话,直接上代码 ,拿来皆可用的那种
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class Threads {
protected final static Logger log = LoggerFactory.getLogger(Threads.class);
public static void shutdownAndAwaitTermination(ExecutorService pool) {
if (pool != null && !pool.isShutdown()) {
pool.shutdown();
try {
if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
pool.shutdownNow();
if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
log.info("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
@Component
public class ShutdownManager {
protected final static Logger log = LoggerFactory.getLogger(ShutdownManager.class);
@PreDestroy
public void destroy() {
shutdownAsyncManager();
}
private void shutdownAsyncManager() {
try {
log.info("关闭后台任务任务线程池");
AsyncManager.getInstance().shutdown();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AsyncManager {
private int corePoolSize = 8;
private final int OPERATE_DELAY_TIME = 10;
private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
//线程执行之后
super.afterExecute(r, t);
}
};
// 单例模式
private AsyncManager() {
}
private static AsyncManager asyncManager = new AsyncManager();
public static AsyncManager getInstance() {
return asyncManager;
}
public void executor(Runnable runnable) {
executor.schedule(runnable, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);
}
public void shutdown() {
Threads.shutdownAndAwaitTermination(executor);
}
}
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Queues;
import com.yw.processplan.core.constant.StompMappingConstant;
import com.yw.processplan.core.utlis.StringUtils;
import com.yw.processplan.core.websocket.StompMessagePusher;
import com.yw.processplan.module.power.dao.MessageMapper;
import com.yw.processplan.module.power.model.Performer;
import com.yw.processplan.module.power.model.entity.SendMessageEntity;
import com.yw.processplan.module.process.model.dto.WebSocketAccountDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
@Component
public class SendMessageActuator {
protected static final Logger LOGGER = LoggerFactory.getLogger(SendMessageActuator.class);
@Autowired
private TaskScheduler taskScheduler;
private static linkedBlockingQueue ACCOUNT_BLOCK_QUEUE = Queues.newlinkedBlockingQueue(200);
private static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MICROSECONDS,
new linkedBlockingQueue<>(200), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
@PostConstruct
public void doDispose() {
taskScheduler.scheduleAtFixedRate(() -> {
WebSocketAccountDTO accountDTO = ACCOUNT_BLOCK_QUEUE.poll();
if (accountDTO == null) {
return;
}
CompletableFuture> future = CompletableFuture.runAsync(() -> {
doProcess(accountDTO); // 异步任务具体处理
}, POOL_EXECUTOR);
future.thenAccept((U) -> {
LOGGER.info("success...............");
});
}, 1000);
}
private void doProcess(WebSocketAccountDTO accountDTO) {
}
public static void offer(WebSocketAccountDTO dto) {
if (ACCOUNT_BLOCK_QUEUE.contains(dto)) {
return;
}
ACCOUNT_BLOCK_QUEUE.offer(dto);
}
}



