- List
- Set
- Queue
- Hash
- Queue - 主要是为了高并发准备的
具体如下图所示:
JDK1.0 :
- Vector(相当于后来的List)
- HashTable(相当于后来的map) 都加了锁(基本不用)
发展:
- HashTable(方法上加锁)
- HashMap(不加锁,线程不安全)
- 通过工具类加锁Collections.synchronizedHashMap(在方法里面加锁)
- ConcurrentHashMap(并发插入的时候未必比synchronized,hashtable高,但读取的时候效率非常高)
多线程容器,重复先考虑queue在考虑list
Vector -> ArrayList -> ConcurrentlinkedQueue(CAS实现所以效率上高很多)
小结4.经常在多线程下使用的容器 1.ConCurrentMap同步容器类
1:Vector Hashtable :早期使用synchronized实现
2:ArrayList HashSet :未考虑多线程安全(未实现同步)
3:HashSet vs Hashtable StringBuilder vs StringBuffer
4:Collections.synchronized***工厂方法使用的也是synchronized使用早期的同步容器以及Collections.synchronized***方法的不足之处,请阅读:
http://blog.csdn.net/itm_hadf/article/details/7506529使用新的并发容器
http://xuganggogo.iteye.com/blog/321630
-ConcurrentHashMap(CAS操作)
-ConcurrentSkipListMap(高并发并排序)
-HashTable
-HashMap -Collections.SynchronizedXXX
-TreeMap(红黑树-实现CAS太复杂,所以没有juc的类而是使用的跳表来代替Tree的结构)
-ArrayList -Collections.SynchronizedXXX
-Vector
-CopyonWriteArrayList(写时复制,复制一份数组,在尾部加上要添加的元素,然后把新元素之前的位置用原来指向的引用替代)
-offer() (大小为Integer的最大值)
-size()
-poll()
-peek()
该队列是双向阻塞队列,天生的对多线程友好的生产者和消费者的模型
底层:
-new Condition
-await()
方法:
-put() ->生产者 如果满了,就会等待
-take() ->消费者 取出,如果队列中没有值了就会阻塞
-可指定队列长度,其他和上述一样
小结:6.阻塞队列中比较特殊的几个队列
- Vector HashTable
- 自带锁,基本不用
- HashTable -> CHM
- Vector -> Queue
- Queue List
- 对线程友好的API offer peek pool
- BlockingQueue
- Put take -> 阻塞
-
DelayQueue
按时间进行任务调度,内部使用的是PriorityQueue来进行操作
-
SynchronousQueue
本质跟Exchangger一样
- 不能调用add方法,只能使用put方法(阻塞等待消费者消费)且有线程调用take方法的时候才能使用
- 相当于两个线程交换数据
- 线程池中线程取任务的时候经常使用到
-
TransferQueue
- transfer(装完之后等被取走值后才会回去干活)
- 可以理解SynchronousQueue是单人手对手交换,TransferQueue是多人手对手交换
总结:线程池1:对于map/set的选择使用
HashMap
TreeMap
linkedHashMap Hashtable
Collections.sychronizedXXX ConcurrentHashMap
ConcurrentSkipListMap2:队列
ArrayList
linkedList
Collections.synchronizedXXX
CopyOnWriteList
Queue
CocurrentlinkedQueue //concurrentArrayQueue
BlockingQueue
linkedBQ
ArrayBQ
TransferQueue
SynchronusQueue
DelayQueue执行定时任务
前置知识
- Executor
- ExecutorService
- Callable => Runnable + ret
- Future => 存储执行的将来才会产生的结果
. FutureTask => Future + Runnable - CompletableFuture管理多个Future的结果
- ThreadPoolExecutor
ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS,
new ArrayBlockingQueue(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
- ForkJoinPool
- 分解汇总的任务
- 用很少的线程可以执行很多任务(子任务) TPE做不到先执行子任务
- CPU密集型
- corePoolSize
- maximumPoolSize
- keepAliveTime
- unit
- workQueue
- threadFactory
- handler
- AbortPolicy(抛异常)
- DiscardPolicy(扔掉,不抛异常)
- DiscardOldestPolicy(扔掉排队时间最久的)
- CallerRunsPolicy(调用者处理任务)
- 自定义拒绝策略
public class MyRejectedHandler {
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(4, 4,
0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
Executors.defaultThreadFactory(),
new MyHandler());
}
static class MyHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//log("r rejected")
//save r kafka mysql redis
//try 3 times
if (executor.getQueue().size() < 10000) {
//try put again();
}
}
}
}
4.线程池类型
线程池一般是通过Executors来进行创建的,返回的是一个ExecutorService
//返回的ThreadPoolExecutor继承了AbstractExecutorService(实现了ExecutorService)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
1.SingleThreadPool
//创建一个 Executor,它使用单个工作线程在无界队列中运行。 (但是请注意,如果这个单线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,一个新线程将取而代之。)保证任务按顺序执行,并且不会超过一个任务处于活动状态在任何给定的时间
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue()));
}
2.CachedPool
//创建一个线程池,根据需要创建新线程,但在可用时将重用先前构造的线程。这些池通常会提高执行许多短期异步任务的程序的性能。如果可用,调用 {@code execute} 将重用先前构造的线程。如果没有可用的现有线程,则会创建一个新线程并将其添加到池中。 60 秒内未使用的线程将被终止并从缓存中删除。因此,保持空闲足够长时间的池不会消耗任何资源
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
3.FixedThreadPool
//创建一个线程池,该线程池重用固定数量的线程在共享的无界队列中运行。 在任何时候,最多有nThreads线程是活动的处理任务。 如果在所有线程都处于活动状态时提交了额外的任务,它们将在队列中等待,直到有线程可用。 如果任何线程在关闭前的执行过程中由于失败而终止,则在需要执行后续任务时,将有一个新线程代替它。 池中的线程将一直存在,直到它被明确shutdown 。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue());
}
4.ScheduledPool
创建一个线程池,可以安排命令在给定延迟后运行,或定期执行。
//创建一个线程池,可以安排命令在给定延迟后运行,或定期执行(也可以使用更强大的定时框架quartz corn)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
5.WorkStealingPool
//跟原来的线程池不同,原来是多个访问同个任务队列,现在是每个线程维护自己的队列,如果线程自己的队列不够了可以拿别人的
//使用所有available processors作为其目标并行度级别来创建窃取工作的线程池
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
6.ForkJoinPool
//创建一个ForkJoinPool ,其并行度等于Runtime.availableProcessors ,使用默认线程工厂,没有 UncaughtExceptionHandler 和非异步 LIFO 处理模式
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
//自定义任务类,重写compute进行递归处理
static class AddTaskRet extends RecursiveTask {
private static final long serialVersionUID = 1L;
int start, end;
AddTaskRet(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
if (end - start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) sum += nums[i];
return sum;
}
int middle = start + (end - start) / 2;
AddTaskRet subTask1 = new AddTaskRet(start, middle);
AddTaskRet subTask2 = new AddTaskRet(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
public static void main(String[] args) throws IOException {
T12_ForkJoinPool temp = new T12_ForkJoinPool();
ForkJoinPool fjp = new ForkJoinPool();
AddTaskRet task = new AddTaskRet(0, nums.length);
fjp.execute(task);
long result = task.join();
System.out.println(result);
//System.in.read();
}
5.补充
并行(parallellism )和并发(concurrency )的区别
- 并行:同一时刻,有多条指令在多个处理器上同时执行
- 并发:同一时刻只能有一条指令执行,但多个进程指令被快速的轮换执行
记忆:并发是提交任务,并行是同时执行,可以说并发是并行的子集
parallelStream是并行Stream,在单个线程运行时效率更高



