栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

高并发与多线程学习七(二)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

高并发与多线程学习七(二)

多线程与高并发学习七(二)

线程池参数ThreadPoolExecutorSingleThreadPoolCachedThreadPool(弹性线程池)FixedThreadPool(固定多少个线程的线程池)ScheduledThreadPool(用来执行定时任务的线程池)并发(concurrent)和并行对比(parallel)

线程池参数

线程池分为以下两种:
1、ThreadPoolExecutor
2、ForkJoinPool(分叉完了再汇总)
分解汇总的任务
用很少的线程可以执行很多的任务(子任务)ThreadPoolExecutor做不到先执行子任务
CPU密集型

ThreadPoolExecutor

线程池七个参数:
1、corePoolSize:核心线程数(线程池初始化线程数)
2、maximumPoolSize:最大线程数(可以从操作系统拿的线程数(可拿线程数=最大线程数-核心线程数))
3、KeepAliveTime 存活时间(如果可拿的那些线程超过存活时间了要归还给操作系统)
4、存活时间的单位
5、任务队列(阻塞队列)
6、线程工厂(默认DefaultThreadFactory)
7、拒绝策略 (在线程池达到最大线程数,且任务队列满)
JDK默认提供四种拒绝策略
Abort:抛异常
DisCard:扔掉,不抛异常
DiscardOldest:扔掉排队时间最久的
CallerRuns:调用者处理任务(哪个线程提交的任务哪个线程就去处理它)

说明:
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式
Executors返回线程池对象的弊端如下:
FixedThreadPool和SingleThreadPool:
允许的请求队列的长度为Integer.Max_Value,可能会堆积大量的请求,从而导致OOM。
CachedThreadPool:
允许的创建线程数量为Integer.Max_Value,可能会创建大量的线程,从而导致OOM

拒绝策略测试代码如下:

 static class Task implements Runnable {
        private int i;

        public Task(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " Task " + i);
            try {
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Task{" +
                    "i=" + i +
                    '}';
        }
    }

    public static void main(String[] args) {

        ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,
                60, TimeUnit.SECONDS,
                new ArrayBlockingQueue(4) ,
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        for (int i = 0; i < 8; i++) {
            tpe.execute(new Task(i));
        }

        System.out.println(tpe.getQueue());

        tpe.execute(new Task(100));

        System.out.println(tpe.getQueue());

        tpe.shutdown();
    }

一般业务中自定义拒绝策略:
实现拒绝策略的接口,将消息保存下来(kafka,redis,mq,mysql)并对未处理任务做日志,当我们发现有大量任务未处理时则需要扩展机器了。

SingleThreadPool

Executors为线程池的工厂

SingleThreadPool顾名思义该线程池只有一个线程在运行,可以保证任务是顺序执行的。

   public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            final int j = i;
            service.execute(() -> {

                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }

    }


为什么要有单线程的线程池?
线程池是有任务队列的
能提供完整的生命周期管理
源码如下:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new linkedBlockingQueue()));
    }
CachedThreadPool(弹性线程池)
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }

SynchronousQueue:是一个容量为空的队列,就是来一个任务必须得有一个线程去处理,否则提交任务得线程就阻塞在那里。

public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        System.out.println(service);
        for (int i = 0; i < 2; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(service);

        TimeUnit.SECONDS.sleep(80);

        System.out.println(service);


    }

FixedThreadPool(固定多少个线程的线程池)

核心线程和最大线程数都是固定的,用的linkedBlockingQueue

什么时候使用Fixed什么时候使用Cached?
任务量忽高忽低,保证任务不堆积Cached
任务来的比较平稳,估算了线程数量则用Fixed
但是阿里都不用,需要自己进行估算

应用代码如下:

public static void main(String[] args) throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        getPrime(1, 200000);
        long end = System.currentTimeMillis();
        System.out.println(end - start);

        final int cpuCoreNum = 4;

        ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);

        MyTask t1 = new MyTask(1, 50000); //1-5 5-10 10-15 15-20
        MyTask t2 = new MyTask(50001, 100000);
        MyTask t3 = new MyTask(100001, 150000);
        MyTask t4 = new MyTask(150001, 200000);

        Future> f1 = service.submit(t1);
        Future> f2 = service.submit(t2);
        Future> f3 = service.submit(t3);
        Future> f4 = service.submit(t4);

        start = System.currentTimeMillis();
        f1.get();
        f2.get();
        f3.get();
        f4.get();
        end = System.currentTimeMillis();
        System.out.println(end - start);
    }

    static class MyTask implements Callable> {
        int startPos, endPos;

        MyTask(int s, int e) {
            this.startPos = s;
            this.endPos = e;
        }

        @Override
        public List call() throws Exception {
            List r = getPrime(startPos, endPos);
            return r;
        }

    }

    static boolean isPrime(int num) {
        for (int i = 2; i <= num / 2; i++) {
            if (num % i == 0) return false;
        }
        return true;
    }

    static List getPrime(int start, int end) {
        List results = new ArrayList<>();
        for (int i = start; i <= end; i++) {
            if (isPrime(i)) results.add(i);
        }

        return results;
    }

ScheduledThreadPool(用来执行定时任务的线程池)
 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

DelayedWorkQueue指定隔多长时间后运行

使用方式:

 public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        service.scheduleAtFixedRate(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, 0, 500, TimeUnit.MILLISECONDS);
        // initialDelay第一个任务隔多少时间执行  period隔多少时间执行一次,
    }

通过scheduleAtFixedRate方法让任务每隔一段时间执行一次。

并发(concurrent)和并行对比(parallel)

并发是指任务提交,并行指任务执行

并行是并发的子集(并发是指多个任务同时涌向CPU,而并行则是多个CPU可以同时进行处理)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760209.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号