线程:
创建线程的方法: 继承Thread类,实现Runable接口,重写run()方法,
线程的5种状态:新建、就绪、运行、阻塞、死亡状态。
wait()/ notify()/ notifayAll()三者的区别:
wait():线程处于等待状态,
notify():唤醒当前等待的线程
notify All():唤醒所有处于等待的线程。
线程同步:
线程同步保证数据的原子性, 使数据不受其他线程的干扰( 一次只允许一个线程执行, 其他线程处于等待, 等待该线程执行完其他线程才可继续执行)
Synchronized与lock的区别:
两者都是属于同步锁,Synchronized代码执行完成以后会自动释放锁,lock需要手动开启同步,同时也需要手动释放锁。
synchronized同步锁的执行原理:
首先有一个线程已经拿到了锁,其他线程已经有cup执行权,一直排队,等待释放锁。
当代码执行完毕或者程序抛出异常都会被释放掉。
锁已经被释放掉的话,其他线程开始进行抢锁(资源竞争),谁抢到谁进入同步中去,其他线程继续等待。
synchronized自动释放锁:当代码执行结束、抛出异常、遇到wait方法会自动释放锁。
同步锁:synchronized 单机锁, 在分布式环境下会导致锁失效,
死锁:
synchronized在内存不足的情况下、交叉锁和遇到数据库行级锁(比如某个线程执行for update 语句退出了事物,其它线程访问该数据库时都将陷入死锁)
等情况都会产生死锁。
交叉锁示例:
synchronized(true){
synchronized(true){
/
@Bean(initMethod = "start")
public Curatorframework curatorframework(ZookeeperProperties zookeeperProperties) {
return CuratorframeworkFactory.newClient(
zookeeperProperties.getAddress(),
zookeeperProperties.getSessionTimeoutMs(),
zookeeperProperties.getConnectionTimeoutMs(),
new RetryNTimes(zookeeperProperties.getRetryCount(),
zookeeperProperties.getElapsedTimeMs()));
}
创建分布式锁的工具类
@Slf4j
@Component
public class LockUtil {
@Autowired
Curatorframework curatorframework;
public static final String NODE_PATH = "/lock-space/%s";
public InterProcessMutex tryLock(String key, int expireTime, TimeUnit timeUnit) {
try {
InterProcessMutex mutex = new InterProcessMutex(curatorframework, String.format(NODE_PATH, key));
boolean locked = mutex.acquire(expireTime, timeUnit);
if (locked) {
log.info("申请锁(" + key + ")成功");
return mutex;
}
} catch (Exception e) {
log.error("申请锁(" + key + ")失败,错误:{}", e);
}
log.warn("申请锁(" + key + ")失败");
return null;
}
public void unLock(String key, InterProcessMutex lockInstance) {
try {
lockInstance.release();
log.info("解锁(" + key + ")成功");
} catch (Exception e) {
log.error("解锁(" + key + ")失败!");
}
}
创建 Controller 类并使用分布式锁
@Slf4j
@RestController
public class LockController {
@Autowired
private LockUtil lockUtil;
ExecutorService executor = Executors.newFixedThreadPool(10);//创建线程池
@GetMapping("/lock")
public void lockTest() {
for (int i = 0; i < 1000; i++) {
executor.submit(() -> { //启动线程
try {
String key = "test";
// 获取锁
InterProcessMutex lock = lockUtil.tryLock(key, 10, TimeUnit.SECONDS);
if (lock != null) {
// 如果获取锁成功,则执行对应逻辑
*******
// 释放锁
lockUtil.unLock(key, lock);
}
} catch (Exception e){
log.error("", e);
}
});
}
}
线程池:
线程池相当于一个池子, 容纳多个线程,重复利用以及创建好的线程。
线程优势:
降低资源消耗,减少线程频繁创建和销毁。
提高响应效率、不能等待创建线程的时间可立即执行
防止内存溢出、CPU耗尽
java里面的线程池的接口是Executor,Executor并不是一个线程池,而只是一个执行线程的工具,而真正的线程池是ExecutorService。
newCachedThreadPool创建一个可缓存线程池
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务。
线程池处理过程:
线程池判断核心线程池中是否有任务或者线程池中的线程是否都在执行任务,如果是则创建一个新的线程,
如线程池已满会进入阻塞队列等待执行
当阻塞队列满了且没有多余的空闲线程来执行,如果继续提交该任务,必须选择一种处理策略。线程池提供4种处理策略
处理线程的4种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。注:默认策略!!!!!!
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
使用线程池的风险:
例如同步错误导致死锁使线程迟迟不释放导致资源不足和线程泄露。
资源不足:如果线程池太大, 那么被消耗的线程会影响系统性能,在线程之中切换会浪费时间
线程泄露:如线程池中有一个线程去执行任务的过程中发生死锁导致线程迟迟没有回归池中, 会导致线程泄露
请求过载:同步的请求数量太多, 导致排在队列中等待执行的线程过多, 导致队列已满,线程池提供4种处理策略(见上述处理线程的4种策略)
配置线程池:
写一个线程配置类, 使用@Configuration注解,在spring启动时会扫描到这个配置累类,
类里面写一个处理方法,使用@bean对象标注,在spring启动时会初始化这个bean对象, 方法里面配置线程池的参数
线程池所用的参数:
线程池最小线程数、最大线程数、 线程池拒绝的处理策略(见上述处理线程的4种策略),空闲时间。
使用线程使用 @Autowired 注入线程对象, 即可使用调用submit方法或许excuter方法。
线程池的执行原理:
创建线程池:
@Configuration
public class ThreadPoolConfig {
@Value("${asyncPool.corePoolSize}")
private transient int corePoolSize; // 线程池维护线程的最少数量
@Value("${asyncPool.maxPoolSize}")
private transient int maxPoolSize; // 线程池维护线程的最大数量
@Value("${asyncPool.queueCapacity}")
private transient int queueCapacity; // 线程池所使用的缓冲队列
@Value("${asyncPool.keepAliveSeconds}")
private transient int keepAliveSeconds; // 线程池维护线程所允许的空闲时间
@Bean(name = "asyncTaskExecutor")
public TaskExecutor threadPoolTaskExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
// 线程池名的前缀
executor.setThreadNamePrefix("asyncExecutor-");
//线程池对拒绝任务(无线程可用)的处理策略
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
//调度器shutdown被调用时等待当前被调度的任务完成
executor.setWaitForTasksToCompleteOnShutdown(true);
//等待调度的任务完成的时长,单位秒
executor.setAwaitTerminationSeconds(300);
//线程池执行线程时,将Log MDC传递给子线程,以保证子线程日志能正常打印MDC中存储的内容
executor.setTaskDecorator(runnable -> {
Map context = MDC.getCopyOfContextMap();
return () -> {
//在执行前执行MDC.setContextMap 传递给子线程
if (context != null) {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
//需要清理
MDC.clear();
}
};
});
return executor;
}
}
使用线程池:
@Autowired
@Qualifier("asyncTaskExecutor")
private TaskExecutor taskExecutor;
taskExecutor.execute(() -> threadHandler(finalFileName, finalLocalFile, sxrsOrgInfo, transNo));



