栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Java线程池

Java线程池

Java线程池

线程池的核心类- ThreadPoolExecutor

线程池重要的方法 创建线程池

ThreadPoolExecutor自定义Executor创建 多线程异常处理

多线程异常处理

try-catch Callable接口Executor改写UncaughtExceptionHandler接口 多线程定时任务

java定时器Timer类ScheduledExecutorService运行多任务Spring-Task任务调度工具Quartz大数据任务调度框架

线程池的核心类- ThreadPoolExecutor
ThreadPoolExecutor extends AbstractExecutorService

Executors

ExecutorService

构造方法
ThreadPoolExecutor(int corePoolSize, //  核心线程数
                   int maximumPoolSize, // 最大线程数
                   long keepAliveTime, // 空闲时间
                   TimeUnit unit, // 时间单位
                   BlockingQueue workQueue, // 任务队列
                   ThreadFactory threadFactory, // 线程工厂
                   RejectedExecutionHandler handler // 拒绝策略
                   )
任务队列
BlockingQueue接口的实现类
1. SynchronousQueue类
2. linkedBlockingQueue类
3. ArrayBlockingQueue类
4. linkedTransferQueue类
线程池重要的方法

execute

execute(Runnable command)
1. 充当核心线程运行
2. 放入任务队列,需要double-check
3. 如果放不进任务队列,尝试运行新线程,失败了就拒绝

addWorker

addWorker(Runnable firstTask, boolean core)
根据核心线程数和最大线程数判断是否添加新的线程
第一个参数是execute提交的任务
第二个参数是核心线程或者最大线程

submit

submit(Runnable task)
未来能得到返回值

创建线程池 ThreadPoolExecutor自定义
ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            10,
            10,
            TimeUnit.SECONDS,
            new linkedBlockingQueue<>(10),
            new ThreadPoolExecutor.CallerRunsPolicy());
Executor创建
ExecutorService p1 = Executors.newFixedThreadPool(3);
ScheduledExecutorService p2 = Executors.newSingleThreadScheduledExecutor();
ExecutorService p3 = Executors.newSingleThreadExecutor();
ExecutorService p4 = Executors.newCachedThreadPool();
ScheduledExecutorService p5 = Executors.newScheduledThreadPool(5);
ExecutorService p6 = Executors.newWorkStealingPool(2);
多线程异常处理

例子 – 无法捕获异常

public class MultiThreadsException {
    public static void main(String[] args) {
        Thread thread = new Thread(new ThrowsExceptionRun());
        try {
            thread.start();
        }catch (Exception e){
            System.out.println("捕获到异常。。。");
        }
    }

    static class ThrowsExceptionRun implements Runnable {
        @Override
        public void run() {
            int dividend = 25;
            int divisor = 3;
            while (divisor >=0){
                System.out.println(dividend + " 和 " + divisor + "相除结果是: " + dividend / divisor);
                divisor --;
            }
            System.out.println("结束运算。。。");
        }
    }
}

// 不能捕获异常
原因: 
Thread所创建的线程需要在自己的逻辑中,即run方法中捕获异常
不能抛出异常给外界的线程

底层原因是Thread默认使用uncaughtException()方法空处理,虚拟机会忽略该方法之后的抛出异常
多线程异常处理
1. 在Thread及Runnable的run方法中try-catch
2. 使用Callable接口创建线程,通过get() 方法获取异常
3. 使用线程池Executor设置改写的ExceptionHandler来处理线程异常
try-catch
public class MultiThreadsException {
    public static void main(String[] args) {
        Thread thread = new Thread(new ThrowsExceptionRun());
        try {
            thread.start();
        }catch (Exception e){
            System.out.println("捕获到异常。。。");
        }
    }

    static class ThrowsExceptionRun implements Runnable {
        @Override
        public void run() {
            int dividend = 25;
            int divisor = 3;
            while (divisor >=0){
                try {
                    System.out.println(dividend + " 和 " + divisor + "相除结果是: " + dividend / divisor);
                }catch (Exception e){
                    System.out.println("哟,有异常了");
                    throw e;
                }
                divisor --;
            }
            System.out.println("结束运算。。。");
        }
    }
}
Callable接口
public class MultiThreadsException {
    public static void main(String[] args) {
        Callable getCallThread = new ThrowsExceptionCall();
        FutureTask futureTask = new FutureTask<>(getCallThread);
        Thread thread = new Thread(futureTask);
        thread.start();
        try {
            TimeUnit.SECONDS.sleep(3);
            String s = futureTask.get();
            System.out.println(s);
        } catch (Exception e) {
            System.out.println("异常捕获到了。。。");
            e.printStackTrace();
        }
    }

    static class ThrowsExceptionCall implements Callable {

        @Override
        public String call() throws Exception {
            String fullString = "I'm OK,do not worry about me";
            return fullString.substring(3, 60); // 下标越界异常
        }
    }
Executor改写UncaughtExceptionHandler接口
public class ExecutorCatchException {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool(new SuccessCatchThreadExceptionFactory());
        service.execute(new ThrowsExceptionRun());
        service.awaitTermination(3, TimeUnit.SECONDS);
        service.shutdown();
    }
}
// 线程工厂 设置了异常处理器
class SuccessCatchThreadExceptionFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable runnable) {
        Thread thread = new Thread(runnable);
        thread.setUncaughtExceptionHandler(new SuccessCatchThreadExceptionHandler());
        return thread;
    }
}
// 异常处理器
class SuccessCatchThreadExceptionHandler implements Thread.UncaughtExceptionHandler{
    @Override
    public void uncaughtException(Thread thread, Throwable e) {
        System.out.println(thread.getName() + "抛出了异常,请看: " + e.getMessage());
    }
}
// 任务
class ThrowsExceptionRun implements Runnable {
    @Override
    public void run() {
        int dividend = 25;
        int divisor = 3;
        while (divisor >= 0) {
            System.out.println(dividend + " 和 " + divisor + "相除结果是: " + dividend / divisor);
            divisor--;
        }
        System.out.println("结束运算。。。");
    }
}
多线程定时任务 java定时器Timer类
package com.chauncy.timers;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;



public class TestMultiTimerTask {
   static volatile  AtomicInteger decreaseInteger = new AtomicInteger(8);
    public static void main(String[] args) {

        Timer timer = new Timer();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
        timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        System.out.println("我是第一个TimerTask线程,当前时间为: " + dateFormat.format(new Date()));
                    }
                }, 2000, 5000);

        timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        System.out.println("我是第二个TimerTask线程,当前时间为: " + dateFormat.format(new Date()));
                        int de = TestMultiTimerTask.decreaseInteger.decrementAndGet();
                        int i = (100 / de);
                    }
                }, 3001, 1000);
    }
}

原理

优先级任务队列 TaskQueue queue = new TaskQueue()
线程    TimerThread thread = new TimerThread(queue);

启动的线程会等待任务 -- queue.wait();
放任务到优先级队列 queue.add(task);
取任务  task = queue.getMin();
执行 task.run();

schedule(TimerTask task, long delay, long period) ->
sched(task, System.currentTimeMillis()+delay, -period) ->
sched(TimerTask task, long time, long period) -> 
task.nextExecutionTime = time
task.period = period;
task.state = TimerTask.SCHEDULED;
queue.add(task); -> queue[++size] = task; fixUp(size);


loop线程
队列为空时等待 queue.wait();
取任务 task = queue.getMin();
是否执行 taskFired = (executionTime<=currentTime)
queue.rescheduleMin(加上period时间) -> queue[1].nextExecutionTime = newTime;
执行任务 task.run();
    
ScheduledExecutorService运行多任务
package com.chauncy.timers;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;




public class ScheduleExecutorServiceTest {
    static volatile AtomicInteger decreaseInteger = new AtomicInteger(8);
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(4, new BasicThreadFactory.Builder()
                .namingPattern("example-schedule-pool-%d").daemon(true).build());

        //ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);

        executor.scheduleAtFixedRate(() -> System.out.println( Thread.currentThread().getName() + "第一次延迟2s执行,以后每隔5s执行一次"),
                2000,
                5000,
                TimeUnit.MILLISECONDS);

        executor.schedule(() -> {
            System.out.println(Thread.currentThread().getName()+ " 每隔3s执行");
        },3,TimeUnit.SECONDS);

        executor.scheduleWithFixedDelay(() -> System.out.println(Thread.currentThread().getName() + " 延迟1s执行。。。"),1,1,TimeUnit.SECONDS);

        executor.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " 第一次延迟3s执行,以后每隔1s执行一次");
                int de = ScheduleExecutorServiceTest.decreaseInteger.decrementAndGet();
                try {
                    int i = (100 / de);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        },3001,1000, TimeUnit.MILLISECONDS);


        while (true){
            // main线程活着
        }
    }
}

Spring-Task任务调度工具 Quartz大数据任务调度框架
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/730264.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号