栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

同步容器及线程池

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

同步容器及线程池

同步容器 1.容器体系 Collection
  • List
  • Set
  • Queue
Map
  • Hash
  • Queue - 主要是为了高并发准备的

具体如下图所示:

2.ConcurrentHashMap发展(HashTable-> CHM)

JDK1.0 :

  • Vector(相当于后来的List)
  • HashTable(相当于后来的map) 都加了锁(基本不用)

发展:

  1. HashTable(方法上加锁)
  2. HashMap(不加锁,线程不安全)
  3. 通过工具类加锁Collections.synchronizedHashMap(在方法里面加锁)
  4. ConcurrentHashMap(并发插入的时候未必比synchronized,hashtable高,但读取的时候效率非常高)
3.Vector到Queue发展(Vector->Queue)

多线程容器,重复先考虑queue在考虑list
Vector -> ArrayList -> ConcurrentlinkedQueue(CAS实现所以效率上高很多)

小结

同步容器类

1:Vector Hashtable :早期使用synchronized实现
2:ArrayList HashSet :未考虑多线程安全(未实现同步)
3:HashSet vs Hashtable StringBuilder vs StringBuffer
4:Collections.synchronized***工厂方法使用的也是synchronized

使用早期的同步容器以及Collections.synchronized***方法的不足之处,请阅读:
http://blog.csdn.net/itm_hadf/article/details/7506529

使用新的并发容器
http://xuganggogo.iteye.com/blog/321630

4.经常在多线程下使用的容器 1.ConCurrentMap

​ -ConcurrentHashMap(CAS操作)
-ConcurrentSkipListMap(高并发并排序)
-HashTable
-HashMap -Collections.SynchronizedXXX
-TreeMap(红黑树-实现CAS太复杂,所以没有juc的类而是使用的跳表来代替Tree的结构)

2.CopyonWriteList

​ -ArrayList -Collections.SynchronizedXXX
-Vector
-CopyonWriteArrayList(写时复制,复制一份数组,在尾部加上要添加的元素,然后把新元素之前的位置用原来指向的引用替代)

3.ConcurrentlinkedQueue

​ -offer() (大小为Integer的最大值)
-size()
-poll()
-peek()

4.linkedBlockingQueue

该队列是双向阻塞队列,天生的对多线程友好的生产者和消费者的模型

​ 底层:
-new Condition
-await()
方法:
-put() ->生产者 如果满了,就会等待
-take() ->消费者 取出,如果队列中没有值了就会阻塞

5.ArrayBlockingQueue

​ -可指定队列长度,其他和上述一样

小结:
  1. Vector HashTable
    • 自带锁,基本不用
  2. HashTable -> CHM
  3. Vector -> Queue
  4. Queue List
    • 对线程友好的API offer peek pool
    • BlockingQueue
    • Put take -> 阻塞
6.阻塞队列中比较特殊的几个队列
  1. DelayQueue

    按时间进行任务调度,内部使用的是PriorityQueue来进行操作

  2. SynchronousQueue

    本质跟Exchangger一样

    • 不能调用add方法,只能使用put方法(阻塞等待消费者消费)且有线程调用take方法的时候才能使用
    • 相当于两个线程交换数据
    • 线程池中线程取任务的时候经常使用到
  3. TransferQueue

    • transfer(装完之后等被取走值后才会回去干活)
    • 可以理解SynchronousQueue是单人手对手交换,TransferQueue是多人手对手交换
总结:

1:对于map/set的选择使用
HashMap
TreeMap
linkedHashMap

​ Hashtable
Collections.sychronizedXXX

​ ConcurrentHashMap
ConcurrentSkipListMap

2:队列
ArrayList
linkedList
Collections.synchronizedXXX
CopyOnWriteList
Queue
CocurrentlinkedQueue //concurrentArrayQueue
BlockingQueue
linkedBQ
ArrayBQ
TransferQueue
SynchronusQueue
DelayQueue执行定时任务

线程池

前置知识

  1. Executor
  2. ExecutorService
  3. Callable => Runnable + ret
  4. Future => 存储执行的将来才会产生的结果
    . FutureTask => Future + Runnable
  5. CompletableFuture管理多个Future的结果
1.线程池
  • ThreadPoolExecutor
ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS,
                new ArrayBlockingQueue(4),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());
  • ForkJoinPool
    • 分解汇总的任务
    • 用很少的线程可以执行很多任务(子任务) TPE做不到先执行子任务
    • CPU密集型
2.线程池参数
  1. corePoolSize
  2. maximumPoolSize
  3. keepAliveTime
  4. unit
  5. workQueue
  6. threadFactory
  7. handler
3.默认拒绝策略场景

  • AbortPolicy(抛异常)
  • DiscardPolicy(扔掉,不抛异常)
  • DiscardOldestPolicy(扔掉排队时间最久的)
  • CallerRunsPolicy(调用者处理任务)
  • 自定义拒绝策略
public class MyRejectedHandler {
    public static void main(String[] args) {
        ExecutorService service = new ThreadPoolExecutor(4, 4,
                0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
                Executors.defaultThreadFactory(),
                new MyHandler());
    }

    static class MyHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //log("r rejected")
            //save r kafka mysql redis
            //try 3 times
            if (executor.getQueue().size() < 10000) {
                //try put again();
            }
        }
    }
}
4.线程池类型

线程池一般是通过Executors来进行创建的,返回的是一个ExecutorService

//返回的ThreadPoolExecutor继承了AbstractExecutorService(实现了ExecutorService)
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
1.SingleThreadPool
//创建一个 Executor,它使用单个工作线程在无界队列中运行。 (但是请注意,如果这个单线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,一个新线程将取而代之。)保证任务按顺序执行,并且不会超过一个任务处于活动状态在任何给定的时间
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new linkedBlockingQueue()));
}
2.CachedPool
//创建一个线程池,根据需要创建新线程,但在可用时将重用先前构造的线程。这些池通常会提高执行许多短期异步任务的程序的性能。如果可用,调用 {@code execute} 将重用先前构造的线程。如果没有可用的现有线程,则会创建一个新线程并将其添加到池中。 60 秒内未使用的线程将被终止并从缓存中删除。因此,保持空闲足够长时间的池不会消耗任何资源
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());
}
3.FixedThreadPool
//创建一个线程池,该线程池重用固定数量的线程在共享的无界队列中运行。 在任何时候,最多有nThreads线程是活动的处理任务。 如果在所有线程都处于活动状态时提交了额外的任务,它们将在队列中等待,直到有线程可用。 如果任何线程在关闭前的执行过程中由于失败而终止,则在需要执行后续任务时,将有一个新线程代替它。 池中的线程将一直存在,直到它被明确shutdown 。
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new linkedBlockingQueue());
}
4.ScheduledPool

创建一个线程池,可以安排命令在给定延迟后运行,或定期执行。

//创建一个线程池,可以安排命令在给定延迟后运行,或定期执行(也可以使用更强大的定时框架quartz corn)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
5.WorkStealingPool

//跟原来的线程池不同,原来是多个访问同个任务队列,现在是每个线程维护自己的队列,如果线程自己的队列不够了可以拿别人的
//使用所有available processors作为其目标并行度级别来创建窃取工作的线程池
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}
6.ForkJoinPool
//创建一个ForkJoinPool ,其并行度等于Runtime.availableProcessors ,使用默认线程工厂,没有 UncaughtExceptionHandler 和非异步 LIFO 处理模式
public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}

//自定义任务类,重写compute进行递归处理
static class AddTaskRet extends RecursiveTask {

    private static final long serialVersionUID = 1L;
    int start, end;

    AddTaskRet(int s, int e) {
        start = s;
        end = e;
    }

    @Override
    protected Long compute() {

        if (end - start <= MAX_NUM) {
            long sum = 0L;
            for (int i = start; i < end; i++) sum += nums[i];
            return sum;
        }

        int middle = start + (end - start) / 2;

        AddTaskRet subTask1 = new AddTaskRet(start, middle);
        AddTaskRet subTask2 = new AddTaskRet(middle, end);
        subTask1.fork();
        subTask2.fork();

        return subTask1.join() + subTask2.join();
    }

}

public static void main(String[] args) throws IOException {
    

    T12_ForkJoinPool temp = new T12_ForkJoinPool();

    ForkJoinPool fjp = new ForkJoinPool();
    AddTaskRet task = new AddTaskRet(0, nums.length);
    fjp.execute(task);
    long result = task.join();
    System.out.println(result);

    //System.in.read();
}
5.补充

并行(parallellism )和并发(concurrency )的区别

  1. 并行:同一时刻,有多条指令在多个处理器上同时执行
  2. 并发:同一时刻只能有一条指令执行,但多个进程指令被快速的轮换执行

记忆:并发是提交任务,并行是同时执行,可以说并发是并行的子集

parallelStream是并行Stream,在单个线程运行时效率更高

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/531503.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号