目录
Tomcat线程池
这里线程分了三个部分:
Tomcat线程池与ThreadPoolExecutor的区别:
一些注意参数:
Fork/Join线程池
ForkJoinPool
发送信息
例子:1-5求和
fork方法和join方法
Tomcat线程池
首先说一下线程池阿里规范:
在JDK原生线程池中,只要线程池线程数量>核心线程数,就会先将任务加入到任务队列中,只有任务队列加入失败,才会重新创建线程,也就是说线程池队列没有满时候,只有核心线程;
这里线程分了三个部分:
Acceptor:负责接收Socket连接;
Poller:监听Socket连接是否有IO读写操作——>采用了多路复用的思路,所以线程数只需要1就可以检测哪些连接具有IO操作;
线程池中的Worker:对任务进行执行,读写数据;
Tomcat线程池与ThreadPoolExecutor的区别:
如果总线程数达到了maximumPoolSize——>不会抛出异常;
它会再次将任务放入队列,如果还是失败,就会抛出RejectedExecutionException异常;
意思就是他会再次尝试一次;
一些注意参数:
daemon:将线程设置为守护线程,会随着主线程的结束而结束;
我们一般线程池中的线程就算主线程结束,它也会再运行;
注意:提交任务跟最大线程比,如果最大线程不够,就会创建救急线程;
Fork/Join线程池
概念:
核心思想:工作窃取算法,指线程从其他任务队列中窃取任务进行执行,但是获取的任务是队列的尾端,因为要避免任务冲突;
子任务执行的结果放在同一队列中;
ForkJoinPool
ForkJoinPool线程池的思想:将大任务分为若干个小任务,然后将其整合返回;
发送信息
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
public class SendMessageTask extends RecursiveAction{
// 每个“小任务”只最多只给10名用户发送短信
private static final int THRESHOLD = 10;
private int start;
private int end;
List list = null;
// 从start到end的任务
public SendMessageTask(int start, int end,List list)
{
this.start = start;
this.end = end;
this.list = list;
}
@Override
protected void compute() {
// TODO Auto-generated method stub
if(end - start < THRESHOLD){
String mobileno="";
for (int i = start ; i < end ; i++ )
{
//此处做手机号码累加,用于发送给短信运营商
mobileno+=list.get(i)+","
}
System.out.println("给手机号码=="+mobileno+"的用户发送手机短信");
}else{
// 如果当end与start之间的差大于THRESHOLD时,即要发送的数超过10个
// 将大任务分解成两个小任务。
int middle = (start + end) /2;
SendMessageTask left = new SendMessageTask(start, middle,list);
SendMessageTask right = new SendMessageTask(middle, end,list);
// 并行执行两个“小任务”
left.fork();
right.fork();
}
}
public static void main(String[] args) throws InterruptedException {
List list = new ArrayList();
for(int i=1;i<=380;i++){
list.add("i------"+i);//假设此处为手机号码--项目中从数据库中获取
}
ForkJoinPool pool = new ForkJoinPool();
// 提交可分解的PrintTask任务
pool.submit(new SendMessageTask(0 , list.size(),list));
//线程阻塞,等待所有任务完成
pool.awaitTermination(10, TimeUnit.SECONDS);
// 关闭线程池
pool.shutdown();
}
}
Fork:把大任务分割成子任务,如果分割的子任务还是很大,可以继续分割,直到子任务足够下;
执行结果并合并结果Join:分割的子任务被存储在双端队列中,然后启动线程分别从双端队列获取任务执行。子任务执行完的结果都统一存储在一个队列中,启动一个线程从队列中拿数据,然后合并这些数据
例子:1-5求和
package com.example.juc.ForkJoin;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
@Slf4j(topic = "c.Test01")
public class Test01 {
public static void main(String[] args) {
ForkJoinPool pool=new ForkJoinPool(4);
System.out.println(pool.invoke(new MyTask(5)));
//new MyTask(5):任务拆分=5+MyTask(4) 4+MyTask(3)
}
}
@Slf4j(topic = "c.MyTask")
class MyTask extends RecursiveTask{
private int n;
public MyTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if(n==1){
log.debug("join{}",n);
return 1;
}
//对任务进行拆分成子任务:减少线程间的竞争
MyTask t1=new MyTask(n-1);
t1.fork();//让一个线程执行该任务
log.debug("fork(){}+{}",n,t1);
Integer result = (Integer) t1.join();
Integer res=n+result;
log.debug("join(){}+{}={}",n,t1,res);
return res;
}
}
fork方法和join方法
fork:用于将新创建的子任务加入到任务队列中(相当于提交任务),每一个ForkJoinWorkerThread线程都有一个独立的任务等待队列;
join:让当前线程阻塞,直到该任务完成并返回执行结果;
任务优化
之前我们任务执行时同步的执行,一个任务依赖于另外一个任务,必须等待另一个任务执行完,我们这个任务才能走完;
解决:我们利用分治的思想,根据条件将任务岔开;
package com.example.juc.ForkJoin;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
@Slf4j(topic = "c.Test03")
public class Test03 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
MyImportTask task = new MyImportTask(1L, 10_0000_000L);
long startTimes = System.currentTimeMillis();
//线程池执行任务task,并获取任务返回值
ForkJoinTask result = pool.submit(task);
System.out.println(result.get());
long endTimes = System.currentTimeMillis();
System.out.println("运行时间:"+(endTimes-startTimes)+"ms");
}
}
class MyImportTask extends RecursiveTask{
Long start;
Long end;
Long threshold= 10000L;
public MyImportTask(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if((end-start)



