进程: 一个运行的程序
线程: 进程中的一个执行任务
一个进程中至少有一个线程,也可以运行多个线程
并行: 同一时刻多任务交替执行(单核CPU)
并发: 同一时刻多任务同时执行(多核CPU)
二、原始创建线程方式使用**jconsole**命令观察线程状态
1.继承Thread类,重写run()方法class Test01 extends Thread{
int count = 0;
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "开始!");
while(true)
{
System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (++count));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(count == 10) {
System.out.println("线程" + Thread.currentThread().getName() + "结束!");
break;
}
}
}
}
public static void main(String[] args) {
System.out.println("线程" + Thread.currentThread().getName() + "开始!");
Test01 test = new Test01();
//test.run();//!!!不能直接调用run() --->没有创建新的线程,由主线程完成,整个程序串行执行
test.start();
for (int i = 0; i < 10; i++) {
System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (i + 1));
}
System.out.println("线程" + Thread.currentThread().getName() + "结束!");
}
注意: Java不能真正创建线程,底层由C++创建
2.实现Runnable接口,放入Thread对象中class Test02 implements Runnable{
int count = 0;
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "开始!");
while(true)
{
System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (++count));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(count == 10) {
System.out.println("线程" + Thread.currentThread().getName() + "结束!");
break;
}
}
}
}
public static void main(String[] args) {
System.out.println("线程" + Thread.currentThread().getName() + "开始!");
Test02 test = new Test02();
//test.start(); //test类没有start()
//使用代理模式 ---> thread类帮你调用start()创建一个线程
Thread thread = new Thread(test);
thread.start();
for (int i = 0; i < 10; i++) {
System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (i + 1));
}
System.out.println("线程" + Thread.currentThread().getName() + "结束!");
}
三、线程操作
1.通知线程
class Test03 implements Runnable{
private int count;
private boolean loop = true;
public void setLoop(boolean loop) {
this.loop = loop;
}
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "开始!");
while(loop)
{
System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (++count));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2.中断线程休眠
使用Thread.interrupt()方法中断线程休眠
class Test04 implements Runnable{
private int count;
private int second = 20;
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "开始!");
for (int i = 0; i < 5; i++) {
System.out.println("线程" + Thread.currentThread().getName() + " " + (++count));
}
try {
while (second > 0) {
//休眠20s
System.out.println("线程" + Thread.currentThread().getName() + "休眠" + (second--) + "s");
Thread.sleep(1000);
}
} catch (InterruptedException e) {
System.out.println("线程" + Thread.currentThread().getName() + "中断休眠");
}
for (int i = 0; i < 5; i++) {
System.out.println("线程" + Thread.currentThread().getName() + " " + (++count));
}
System.out.println("线程" + Thread.currentThread().getName() + "结束!");
}
}
public static void main(String[] args) throws InterruptedException {
Test04 test04 = new Test04();
Thread thread = new Thread(test04, "4");
thread.start();
//3s后中断线程4休眠
Thread.sleep(3000);
thread.interrupt();
}
3.线程插队
class Test05 implements Runnable{
int count = 0;
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "开始!");
while(true)
{
System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (++count));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(count == 10) {
System.out.println("线程" + Thread.currentThread().getName() + "结束!");
break;
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Test05 test05 = new Test05();
Thread A = new Thread(test05, "A");
A.start();
for (int i = 1; i <= 10; i++) {
System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + i);
//让A线程执行完毕后,再执行Main线程
if(i == 5)
{
//A.join(); //插队 一定成功
Thread.yield(); //礼让A线程执行,不一定成功
}
}
}
4.守护线程
用户线程: 工作线程
守护线程: 为工作线程服务,当所有工作线程结束,守护线程自动结束,例如垃圾回收机制 gc
class Test06 implements Runnable{
private int count;
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "开始!");
//线程无限循环执行
while(true)
{
System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + (++count));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Test06 test06 = new Test06();
Thread thread = new Thread(test06, "Daemon");
//将Daemon线程设置为守护线程
thread.setDaemon(true);
thread.start();
System.out.println("线程" + Thread.currentThread().getName() + "开始!");
for (int i = 1; i < 10; i++) {
System.out.println("线程" + Thread.currentThread().getName() + "正在运行 " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("线程" + Thread.currentThread().getName() + "结束!");
}
四、线程六大状态
public enum State {
//创建
NEW,
//可运行
RUNNABLE,
//阻塞
BLOCKED,
//等待
WAITING,
//超时等待
TIMED_WAITING,
//终止
TERMINATED;
}
五、线程同步
同步: 在同一时刻,对于同一数据最多只用一个线程访问
1.Synchronized1.同步代码块
//对象锁
synchronized(对象) {
//同步代码块
}
2.同步方法
private synchronized void f(){}
执行顺序问题:
1.同一个对象调用两个普通同步方法执行顺序
public static void main(String[] args) {
ThreadTest threadTest = new ThreadTest();
new Thread(()->{
threadTest.f1();
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
threadTest.f2();
}).start();
}
class ThreadTest{
public synchronized void f1(){
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1()执行...");
}
public synchronized void f2(){
System.out.println("f2()执行...");
}
}
2.两个不同对象调用两个static同步方法执行顺序
public static void main(String[] args) {
ThreadTest threadTest1 = new ThreadTest();
ThreadTest threadTest2 = new ThreadTest();
new Thread(()->{
threadTest1.f1();
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
threadTest2.f2();
}).start();
}
}
class ThreadTest{
public static synchronized void f1(){
try {
TimeUnit.SECONDS.sleep(3 );
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1()执行...");
}
public static synchronized void f2(){
System.out.println("f2()执行...");
}
}
3.同一对象调用一个static同步方法和一个普通同步方法执行顺序
public static void main(String[] args) {
ThreadTest threadTest1 = new ThreadTest();
new Thread(()->{
threadTest1.f1();
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
threadTest1.f2();
}).start();
}
class ThreadTest{
public static synchronized void f1(){
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1()执行...");
}
public synchronized void f2(){
System.out.println("f2()执行...");
}
}
卖票:
class Sell implements Runnable{
private static int total = 50;
private Boolean loop = true;
@Override
public void run() {
System.out.println("窗口" + Thread.currentThread().getName() +"开始售票!");
while(loop)
{
sellTicket();
}
}
private synchronized void sellTicket() {
// synchronized(this) { //synchronized加在代码块上,变成同步代码块,同一时刻只能有一个线程运行代码块
if (total <= 0) {
System.out.println("票已售完!!!");
loop = false;
return;
}
System.out.println("窗口" + Thread.currentThread().getName() + "卖掉一张票,剩余票数: " + (--total));
try {
//买完一张票休息一秒
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// }
}
}
public static void main(String[] args) {
Sell sell = new Sell();
Thread A = new Thread(sell, "A");
Thread B = new Thread(sell, "B");
Thread C = new Thread(sell, "C");
A.start();
B.start();
C.start();
}
2.Lock
卖票:
class Sell1 {
private int total;
public Sell1(int total) {
this.total = total;
}
private Lock lock = new ReentrantLock(true);
public void sellTicket() {
//上锁
lock.lock();
try {
if (total > 0) {
System.out.println("窗口" + Thread.currentThread().getName() + "卖掉一张票,剩余票数: " + (--total));
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//解锁
lock.unlock();
}
}
}
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入总票数:");
int count = scanner.nextInt();
Sell1 sell = new Sell1(count);
new Thread(()->{
for (int i = 0; i < count; i++) {
sell.sellTicket();
}
}, "A").start();
new Thread(()->{
for (int i = 0; i < count; i++) {
sell.sellTicket();
}
}, "B").start();
new Thread(()->{
for (int i = 0; i < count; i++) {
sell.sellTicket();
}
}, "C").start();
}
Synchronized和Lock区别:
- Synchronized是Java关键字,Lock是一个Java类
- Synchronized自动释放锁,Lock需要手动释放锁
- Synchronized适合锁少量的同步代码,Lock适合锁大量的同步代码
3.生产者消费者问题:Synchronized实现:
class Resource{
private int count = 0;
public synchronized void product() throws InterruptedException {
if(count > 0)
{
this.wait();
}
count++;
System.out.println("生产者" + Thread.currentThread().getName() + "生产了一个产品,count = " + count);
this.notifyAll();
}
public synchronized void consume() throws InterruptedException {
if(count < 1)
{
this.wait();
}
count--;
System.out.println("消费者" + Thread.currentThread().getName() + "消费了一个产品,count = " + count);
this.notifyAll();
}
}
public static void main(String[] args) {
Resource resource = new Resource();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
resource.product();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
resource.consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
多个生产者消费者线程出现虚假唤醒问题:
Resource resource = new Resource();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
resource.product();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
resource.consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
resource.product();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"C").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
resource.consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"D").start();
}
this.wait()方法应放在while循环中判断
class Resource{
private int count = 0;
public synchronized void product() throws InterruptedException {
while(count > 0)
{
this.wait();
}
count++;
System.out.println("生产者" + Thread.currentThread().getName() + "生产了一个产品,count = " + count);
this.notifyAll();
}
public synchronized void consume() throws InterruptedException {
while(count < 1)
{
this.wait();
}
count--;
System.out.println("消费者" + Thread.currentThread().getName() + "消费了一个产品,count = " + count);
this.notifyAll();
}
}
Lock实现:
class Resource1{
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void product() throws InterruptedException {
lock.lock();
try {
while(count > 0)
{
condition.await();
}
count++;
System.out.println("生产者" + Thread.currentThread().getName() + "生产了一个产品,count = " + count);
condition.signalAll();
} catch (InterruptedException e) {
System.out.println(e.getMessage());
} finally {
lock.unlock();
}
}
public void consume() throws InterruptedException {
lock.lock();
try {
while(count < 1)
{
condition.await();
}
count--;
System.out.println("消费者" + Thread.currentThread().getName() + "消费了一个产品,count = " + count);
condition.signalAll();
} catch (InterruptedException e) {
System.out.println(e.getMessage());
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
Resource1 resource = new Resource1();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
resource.product();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
resource.consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
resource.product();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"C").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
resource.consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"D").start();
}
六、集合类并发问题
1.List
public static void main(String[] args) {
ArrayList list = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
解决方法:
- 使用Collections.synchronizedList(),将List方法上添加Sychronized关键字变成同步方法
Listlist = Collections.synchronizedList(new ArrayList<>());
2.使用CopyOnWriteArrayList,当 List 需要被修改的时候,并不直接修改原有数组对象,而是对原有数据进行一次拷贝,将修改的内容写入副本中。写完之后,再将修改完的副本替换成原来的数组,这样就可以保证写操作不会影响读操作了。
Listlist = new CopyOnWriteArrayList<>();
//源码
public boolean add(E e) {
final ReentrantLock lock = this.lock;
//上锁
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//拷贝原数组,增加一个元素
Object[] newElements = Arrays.copyOf(elements, len + 1);
//添加新元素
newElements[len] = e;
//将副本替换成原来的数组
setArray(newElements);
return true;
} finally {
//解锁
lock.unlock();
}
}
2.Set
public static void main(String[] args) {
HashSet set = new HashSet<>();
for (int i = 0; i < 20; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
解决方法:
1.使用Collections.synchronizedSet(),将Set方法上添加Sychronized关键字变成同步方法
Setset = Collections.synchronizedSet(new HashSet<>());
2.使用CopyOnWriteArraySet
Set3.Mapset = new CopyOnWriteArraySet<>();
public static void main(String[] args) {
Map map = new ConcurrentHashMap<>();
for (int i = 0; i < 20; i++) {
final String key = String.valueOf(i);
new Thread(()->{
map.put(key, UUID.randomUUID().toString().substring(0, 8));
System.out.println(map);
}, String.valueOf(i)).start();
}
}
解决方法:
1.使用Collections.synchronizedMap(),将Map方法上添加Sychronized关键字变成同步方法
Mapmap = Collections.synchronizedMap(new HashMap<>());
2.使用ConcurrentHashMap:
Map七、Callablemap = new ConcurrentHashMap<>();
//Callable接口 // V --> 返回值 @FunctionalInterface public interface Callable{ V call() throws Exception; }
//Runnabel子类 --> RunnableFuture实现类 ---> FutureTask(也是一个Runnable) public FutureTask(Callablecallable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
//通过Thread启动
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(() -> {
return "1111";
});
new Thread(futureTask).start();
//获取返回值(可能会阻塞)
String str = futureTask.get();
System.out.println(str);
}
Callable和Runnabel区别:
- Callable执行call()方法,Runnable执行run()方法
- Callable执行后有返回值(可通过FutureTask获取),而Runnable没有返回值
- Callable可以抛出异常,Runnable不行
CountDownLatch: 倒计时计数器,允许一个或多个线程等待直到其他线程执行操作完成。
public static void main(String[] args) {
//倒数次数
int count = 5;
CountDownLatch countDownLatch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "执行...");
//每当一个线程执行完毕,计数器减一
countDownLatch.countDown();
}).start();
}
//当所有线程执行完后输出结束语句,已经执行完的线程等待计数器归零被唤醒
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束!");
}
2.CyclicBarrier
CyclicBarrier: 加法计数器,允许一组线程全部等待彼此达到共同屏障。
public static void main(String[] args) {
int count = 5;
//CyclicBarrier(int parties, Runnable barrierAction)
//加法计数器,直到计数器到达count后执行Runnable线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(count, () -> {
System.out.println("结束!");
});
for (int i = 1; i <= count; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "执行...");
//等待计数器到达5
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
3.Semaphore
Semaphore: 信号量,信号量维持一组许可证
public static void main(String[] args) {
//两个许可证
Semaphore semaphore = new Semaphore(2);
//10个线程每次执行2个(2个线程拿到许可证,执行完后释放)
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
//线程被阻塞直到获取许可证
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "执行...");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放许可证
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
4.ReadWriteLock
ReadWriteLock: 读写锁,读读共享,读写互斥,写写互斥
class MyStorage{
//读写锁
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public void read(){
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "正在读取...");
System.out.println(Thread.currentThread().getName() + "读取成功!");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
}
public void write(){
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "正在写入...");
System.out.println(Thread.currentThread().getName() + "写入成功!");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
}
}
public static void main(String[] args) {
MyStorage myStorage = new MyStorage();
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
myStorage.read();
}, "read" + i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
myStorage.write();
}, "write" + i).start();
}
}
九、线程相关队列
1.BlockingQueue
BlockingQueue: 阻塞队列
| 方式 | 抛出异常 | 返回值 | 阻塞等待 | 超时等待 |
|---|---|---|---|---|
| 添加元素 | add | offer | put | offer(…) |
| 移除元素 | remove | poll | take | poll(…) |
| 查看队首元素 | element | peek | - | - |
public static void main(String[] args) throws InterruptedException {
//队列容量为3
BlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//add ---> 队列满时抛出异常
//System.out.println(blockingQueue.add("A"));
//System.out.println(blockingQueue.add("B"));
//System.out.println(blockingQueue.add("C"));
//System.out.println(blockingQueue.add("D"));
//查看队首元素 队列为空时抛出异常
//System.out.println(blockingQueue.element());
//remove ---> 队列空时抛出异常
//System.out.println(blockingQueue.remove());
//System.out.println(blockingQueue.remove());
//System.out.println(blockingQueue.remove());
//System.out.println(blockingQueue.remove());
//---------------------------------------
//offer ---> 队列满时返回false,加入元素失败
//System.out.println(blockingQueue.offer("A"));
//System.out.println(blockingQueue.offer("B"));
//System.out.println(blockingQueue.offer("C"));
//System.out.println(blockingQueue.offer("D"));
//查看队首元素, 队列为空时返回null
//System.out.println(blockingQueue.peek());
//poll ---> 队列空时返回null
//System.out.println(blockingQueue.poll());
//System.out.println(blockingQueue.poll());
//System.out.println(blockingQueue.poll());
//System.out.println(blockingQueue.poll());
//---------------------------------------
//blockingQueue.put("A");
//blockingQueue.put("B");
//blockingQueue.put("C");
//队列满时,一直等待
//blockingQueue.put("D");
//System.out.println(blockingQueue.take());
//System.out.println(blockingQueue.take());
//System.out.println(blockingQueue.take());
//队列为空时,一直等待
//System.out.println(blockingQueue.take());
//---------------------------------------
blockingQueue.offer("A", 1, TimeUnit.SECONDS);
blockingQueue.offer("B", 1, TimeUnit.SECONDS);
blockingQueue.offer("C", 1, TimeUnit.SECONDS);
//队列满时,等待1s,超时退出
//blockingQueue.offer("D", 1, TimeUnit.SECONDS);
System.out.println(blockingQueue.poll(1, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(1, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(1, TimeUnit.SECONDS));
//队列空时,等待1s,超时返回null
System.out.println(blockingQueue.poll(1, TimeUnit.SECONDS));
}
2.SynchronousQueue
SynchronousQueue: 同步队列,只能存放一个元素
public static void main(String[] args) {
SynchronousQueue queue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "添加元素" + 1);
queue.put("1");
System.out.println(Thread.currentThread().getName() + "添加元素" + 2);
queue.put("2");
System.out.println(Thread.currentThread().getName() + "添加元素" + 3);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
String value = null;
try {
TimeUnit.SECONDS.sleep(1);
value = queue.take();
System.out.println(Thread.currentThread().getName() + "取出元素" + value);
value = queue.take();
System.out.println(Thread.currentThread().getName() + "取出元素" + value);
value = queue.take();
System.out.println(Thread.currentThread().getName() + "取出元素" + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
十、线程池
1.通过Executors创建线程池(不推荐,可能会导致Out Of Memory)
public static void main(String[] args) {
//创建单一线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "执行...");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
public static void main(String[] args) {
//创建固定大小线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
try {
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "执行...");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
public static void main(String[] args) {
//创建大小可变的线程池(根据CPU情况)
ExecutorService executorService = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "执行...");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
2.使用ThreadPool(推荐):
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize, //最大线程池大小
long keepAliveTime, //存活时间
TimeUnit unit, //超时
BlockingQueue workQueue, //阻塞队列
ThreadFactory threadFactory, //线程工厂
RejectedExecutionHandler handler) //拒绝策略
RejectedExecutionHandler(四种拒绝策略):
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
//使用默认的线程工厂
Executors.defaultThreadFactory(),
//多余的线程不处理,抛出异常
//new ThreadPoolExecutor.AbortPolicy());
//哪里来的去哪里,交给原来的线程代理
//new ThreadPoolExecutor.CallerRunsPolicy());
//多余的线程不处理,直接丢弃,不会抛出异常
//new ThreadPoolExecutor.DiscardPolicy());
//多余的线程尝试和最先运行的线程竞争,如果原来的线程运行完,则可以运行,不会抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy());
try {
for (int i = 1; i <= 9; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "执行...");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
设置线程池最大线程数
//CPU密集型 //设置线程池最大线程数为计算机CPU核数 Runtime.getRuntime().availableProcessors() //IO密集型 //设置线程池最大线程数大于程序中十分消耗IO的线程个数即可十一、函数式接口(lambda表达式) 1.Function
//T ---> 参数类型 R ---> 返回值 @FunctionalInterface public interface Function{ R apply(T t); ... }
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
Function function = (str) -> {
System.out.println("请输入一个字符串:");
return scanner.nextLine();
};
System.out.println("你输入的是:" + function.apply("1"));
}
2.Predicate
// T ---> 参数类型 返回值为boolean @FunctionalInterface public interface Predicate{ boolean test(T t); ... }
public static void main(String[] args) {
Predicate predicate = (str) -> {
return str.isEmpty();
};
System.out.println(predicate.test(""));
}
//true
3.Consumer
//T ---> 参数类型 没有返回值 @FunctionalInterface public interface Consumer{ void accept(T t); ... }
public static void main(String[] args) {
Consumer consumer = (str) -> {
System.out.println("str = " + str);
};
consumer.accept("asdasxax");
}
//str = asdasxax
4.Supplier
//T ---> 返回值类型 没有参数 @FunctionalInterface public interface Supplier{ T get(); }
public static void main(String[] args) {
Supplier supplier = () -> {return "asd";};
System.out.println(supplier.get());
}
//asd
十二、Forkjoin
并行执行任务的框架,把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果
工作窃取:所有空闲线程尝试去执行其他线程未执行的子任务
计算1 + ··· + 1000000000:
//普通方式
public class Normal {
private long start;
private long end;
public Normal(long start, long end) {
this.start = start;
this.end = end;
}
public long getStart() {
return start;
}
public long getEnd() {
return end;
}
public long compute(){
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
}
//ForkJoin方式: //1.继承ForkJoinTask的子类: //RecursiveAction: 用于没有返回结果的任务。 //RecursiveTask: 用于有返回结果的任务。 public class ForkJoin_ extends RecursiveTask{ private long start; private long end; private long temp = 10_000L; public ForkJoin_(long start, long end) { this.start = start; this.end = end; } public long getStart() { return start; } public long getEnd() { return end; } @Override protected Long compute() { long sum = 0; if ((end - start) < temp) { for (long i = start; i <= end; i++) { sum += i; } return sum; } else { long middle = (start + end) / 2; //按平均值划分成小任务 ForkJoin_ task1 = new ForkJoin_(start, middle); //计算小任务 task1.fork(); //按平均值划分成小任务 ForkJoin_ task2 = new ForkJoin_(middle + 1, end); //计算小任务 task2.fork(); //返回计算结果 sum = task1.join() + task2.join(); return sum; } } }
//测试
public static void main(String[] args) {
long start = 0L;
long end = 1_000_000_000L;
Normal normal = new Normal(start, end);
long t1 = System.currentTimeMillis();
System.out.println(normal.compute());
long t2 = System.currentTimeMillis();
System.out.println("normal耗时:" + (t2 - t1));
System.out.println("=======================================");
t1 = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask task = new ForkJoin_(start, end);
ForkJoinTask submit = forkJoinPool.submit(task);
try {
System.out.println(submit.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
t2 = System.currentTimeMillis();
System.out.println("forkJoin耗时:" + (t2 - t1));
System.out.println("=======================================");
t1 = System.currentTimeMillis();
System.out.println(LongStream.rangeClosed(start, end).parallel().reduce(0, Long::sum));
t2 = System.currentTimeMillis();
System.out.println("stream耗时:" + (t2 - t1));
}
十三、异步回调
1.CompletableFuture.runAsync(Runnable):
public static void main(String[] args) throws ExecutionException, InterruptedException {
//没有返回值的异步回调(程序不会阻塞而是继续向下执行)
CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
System.out.println("--------------");
//阻塞等待异步结果
completableFuture.get();
}
2.CompletableFuture.supplyAsync(Supplier):
public static void main(String[] args) throws ExecutionException, InterruptedException {
//有返回值的异步回调
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//制造异常异步执行失败
//int i = 1 / 0;
System.out.println(Thread.currentThread().getName());
return "success";
});
//whenComplete(BiConsumer) 执行成功回调
//exceptionally(Function) 执行失败回调
System.out.println(completableFuture.whenComplete((t, u) -> {
//返回值
System.out.println("t = " + t);
//错误信息
System.out.println("u = " + u);
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return "fail";
}).get());
}
十四、JMM
JMM:Java内存模型
规定:
- 线程解锁前,必须把共享变量立刻刷新到主存
- 线程加锁前,必须将主存中的最新值读取到工作内存
- 加锁和解锁是同一把锁
(三组操作成对出现)
public class Volatile_ {
//输出sum = 1 但是线程不会退出 sum不可见
//private static int num = 0;
private volatile static int num = 0; //volatile保证可见性
public static void main(String[] args) {
new Thread(() -> {
while(num == 0)
{
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num = 1;
System.out.println("num = " + num);
}
}
十五、Volatile
Volatile:轻量级同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
public class Volatile01_ {
private volatile static int num = 0;
//volatile不保证原子性
//最后输出num可能小于20000,线程同时操作
public static void add(){
num++;
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
}, String.valueOf(i)).start();
}
while(Thread.activeCount() > 2) //main gc 线程
{
Thread.yield();
}
System.out.println("num = " + num);
}
}
通过反编译发现:
使用原子类AtomicInteger:
public class Volatile01_ {
private volatile static AtomicInteger num = new AtomicInteger();
public static void add(){
num.getAndIncrement(); //num++
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
}, String.valueOf(i)).start();
}
while(Thread.activeCount() > 2) //main gc 线程
{
Thread.yield();
}
System.out.println("num = " + num);
}
}
//20000
指令重排: 是对指令的优化,程序编写的顺序和实际上执行的顺序不一致。程序有万分之一的概率会产生指令重排。在多线程执行相同代码的前提下,产生指令重排,就会导致多个线程获取到了不同的结果。因此在多线程下要禁止指令重排。
x,y,a,b默认值为0
预期结果: x = 0,y = 0
| 线程1 | 线程2 |
|---|---|
| x = a | y = b |
| b = 1 | a = 2 |
如果发生指令重排,结果变成: x = 2,y = 1
| 线程1 | 线程2 |
|---|---|
| b = 1 | a = 2 |
| x = a | y = b |
使用volatile可以禁止指令重排(通过在操作前后添加内存屏障)
Java屏障类型:
1.LoadLoad Barriers
两个线程同时从主存中加载数据到工作内存(Load)操作互斥
2.StoreStore Barriers
两个线程同时将工作内存中的数据刷新到主存(Store)操作互斥
3.LoadStore Barriers
当一个线程从主存中加载数据到工作内存(Load),同时另一个线程将工作内存中的数据刷新到主存(Store)时,要先保证先Load成功,Store才开始
4.StoreLoad Barriers(全能屏障)
当一个线程将工作内存中的数据刷新到主存(Store),同时另一个线程从主存中加载数据到工作内存(Load)时,要先保证Store已经成功,并且数据所有人可见,Load才开始
十六、CASCAS(compareAndSet): CPU的并发原语
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);
System.out.println(atomicInteger.compareAndSet(1, 2));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(1, 2));
System.out.println(atomicInteger.get());
}
源码分析:
//源码 如果和预期结果(expect)一样则进行更新(update)
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
//atomicInteger中字段 Unsafe类对象(包含一系列native方法,底层使用C/C++) ---> 对内存进行操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
//atomicInteger中方法 原子性加一操作
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
-----------------------------------------------------------------------------------
//Unsafe中方法
//var1 = this , var2 = valueOffset , var4 = 1
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
//自旋锁 直到内存中该值加一为止
do {
//获取当前对象在内存中的值
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
//Unsafe中方法
public native int getIntVolatile(Object var1, long var2);
//Unsafe中方法
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
CAS无法解决**ABA**问题
**ABA**问题: 一个线程修改共享数据后又将该数据改回原值,另一个线程无法发现共享数据被修改过
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);
new Thread(() -> {
System.out.println("A:" + atomicInteger.compareAndSet(1, 2));
System.out.println("A:" + atomicInteger.get());
System.out.println("A:" + atomicInteger.compareAndSet(2, 1));
System.out.println("A:" + atomicInteger.get());
},"A").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//无法发现线程A修改过数据(ABA问题)
System.out.println("B:" + atomicInteger.compareAndSet(1, 2));
System.out.println("B:" + atomicInteger.get());
},"B").start();
}
解决方法:
使用**AtomicStampedReference**每次修改更新stamp(版本号/时间戳)
public static void main(String[] args) {
AtomicStampedReference reference = new AtomicStampedReference<>(1, 1);
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A:stamp = " + reference.getStamp());
System.out.println("A:" + reference.compareAndSet(1, 2, reference.getStamp(), reference.getStamp() + 1));
System.out.println("A:stamp = " + reference.getStamp());
System.out.println("A:" + reference.compareAndSet(2, 1, reference.getStamp(), reference.getStamp() + 1));
System.out.println("A:stamp = " + reference.getStamp());
}, "A").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B:stamp = " + reference.getStamp());
System.out.println("B:" + reference.compareAndSet(1, 3, reference.getStamp(), reference.getStamp() + 1));
System.out.println("B:stamp = " + reference.getStamp());
}, "B").start();
}
十七、锁
1.公平锁、非公平锁
公平锁: 先来后到、不能插队
非公平锁: 可以插队
Synchronized非公平锁、Lock默认非公平锁
Lock lock = new ReentrantLock(); //非公平锁
Lock lock = new ReentrantLock(true); //公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2.可重入锁
3.自旋锁
使用CAS操作自定义自旋锁:
public class SpinLock {
private AtomicReference atomicReference = new AtomicReference(null);
//加锁
public void lock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + " lock");
while(!atomicReference.compareAndSet(null, thread)) {
}
}
//解锁
public void unlock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + " unlock");
while(!atomicReference.compareAndSet(thread, null)) {
}
}
}
public static void main(String[] args) {
SpinLock spinLock = new SpinLock();
new Thread(() -> {
spinLock.lock();
try {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "执行");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
spinLock.unlock();
}
}, "A").start();
new Thread(() -> {
spinLock.lock();
try {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "执行");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
spinLock.unlock();
}
}, "B").start();
}
4.死锁
两个线程试图获取对方的锁,相互等待造成死锁
class MyThread implements Runnable{
private static Object lock1 = new Object();
private static Object lock2 = new Object();
private boolean flag;
public MyThread(boolean flag) {
this.flag = flag;
}
@Override
public void run() {
//互相等待获取对方的锁
if(flag) {
synchronized (lock1) {
System.out.println(Thread.currentThread().getName() + "获取lock1...");
synchronized (lock2) {
System.out.println(Thread.currentThread().getName() + "获取lock2...");
}
}
}
else
{
synchronized (lock2) {
System.out.println(Thread.currentThread().getName() + "获取lock2...");
synchronized (lock1) {
System.out.println(Thread.currentThread().getName() + "获取lock1...");
}
}
}
}
}
public static void main(String[] args) {
new Thread(new MyThread(true), "A").start();
new Thread(new MyThread(false), "B").start();
}
排查方式:
1.在终端中输入jsp -l命令,查看程序对应的进程号
2.使用jstack 进程号查看堆栈信息找到死锁
最后显示死锁信息:



