栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java多线程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java多线程

Thread Thread类 线程Id

public long getId()

线程名

方法一:public final String getName(),获取线程名称。
方法二:public final void setName(String name),设置线程名称。
方法三:Thread(String threadName),通过此构造方法,给线程设置一个定制化的名字。

线程优先级

public final void setPriority(int priority)
Java 线程的优先级最大值为 10,最小值为 1,默认值为 5。

是否为守护线程

public final void setDaemon(boolean on)

线程装态

public Thread.State getState()

public static enum State {
 NEW, //新建
 RUNNABLE, //就绪、运行
 BLOCKED, //阻塞,经过操作系统调度
 WAITING, //等待
 TIMED_WAITING, //计时等待
 TERMINATED; //结束
}

RUNABLE yeid:让出当前时间片 实现running<–>ready
sync 时blocked
WAITING(自旋等待,盲等待):

线程方法
线程的启动和运行

public void start() 启动线程,调用start方法后JVM才会开启一个新的线程来执行用户定义的线程代码逻辑,再此过程中会为新的线程分配需要的资源

public void run() 调用start方法之后,线程获得时间片,就会执行用户线程代码

获取当前线程

public static Thread currentThread()

创建线程的四种方式 继承Thread,共享变量传递 实现Runable,共享变量传递 实现Callable,配合FutureTask

通过泛型指定一个返回的线程

static class MyCall implements Callable {
        @Override
        public String call() {
            System.out.println("Hello MyCall");
            return "success";
        }
    }
FutureTask task = new FutureTask<>(new MyCall());
        Thread t = new Thread(task);
        t.start();
        System.out.println(task.get());// 阻塞等待
lambda表达式 线程池,配合Feature
 ExecutorService service = Executors.newCachedThreadPool();
        service.execute(() -> {
            System.out.println("Hello ThreadPool");
        });

        Future f = service.submit(new MyCall());
        String s = f.get(); // 阻塞类型
        System.out.println(s);
        service.shutdown();
线程状态测试代码:

ReentrantLook.look() JUC<–由CAS实现-- 盲等待

public static void main(String[] args) throws Exception {
        //===================================================
        Thread t1 = new Thread(() -> {
            System.out.println("2: " + Thread.currentThread().getState());
            for (int i = 0; i < 3; i++) {
                SleepHelper.sleepSeconds(1);
                System.out.print(i + " ");
            }
            System.out.println();
        });
        System.out.println("1: " + t1.getState());
        t1.start();
        t1.join();
        System.out.println("3: " + t1.getState());

        //===================================================
        Thread t2 = new Thread(() -> {
            try {
                LockSupport.park();
                System.out.println("t2 go on!");
                TimeUnit.SECONDS.sleep(5);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        t2.start();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("4: " + t2.getState());

        LockSupport.unpark(t2);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("5: " + t2.getState());

        //===================================================
        final Object o = new Object();
        Thread t3 = new Thread(() -> {
            synchronized (o) {
                System.out.println("t3 得到了锁 o");
            }
        });

        new Thread(() -> {
            synchronized (o) {
                SleepHelper.sleepSeconds(5);
            }
        }).start();

        SleepHelper.sleepSeconds(1);

        t3.start();
        SleepHelper.sleepSeconds(1);
        System.out.println("6: " + t3.getState());

        //===================================================
        final Lock lock = new ReentrantLock();
        Thread t4 = new Thread(() -> {
            lock.lock(); //省略try finally
            System.out.println("t4 得到了锁 o");
            lock.unlock();
        });

        new Thread(() -> {
            lock.lock();
            SleepHelper.sleepSeconds(5);
            lock.unlock();
        }).start();

        SleepHelper.sleepSeconds(1);

        t4.start();
        SleepHelper.sleepSeconds(1);
        System.out.println("7: " + t4.getState());

        //===================================================
        Thread t5 = new Thread(() -> {
            LockSupport.park();
        });

        t5.start();

        SleepHelper.sleepSeconds(1);

        System.out.println("8: " + t5.getState());
        LockSupport.unpark(t5);

    }

线程的打断

interrupt() :打断线程(并设置标志)
isInterrupted():实例方法(查询标志,是否被打断)【稍微优雅的结束线程】
interrupted():静态方法【静态方法,当前调用线程】,查询是否被打断过,并重置打断标志

interrupt配合sleep() wait() join()须try…catch{}

sleep()方法在睡眠的时候,不到时间是没有办法叫醒的,这个时候可以用interrupt设置标志位,然后呢必须得catch InterruptedException来进行处理,决定继续睡或者是别的逻辑,(tip:自动进行中断标志复位)

interrupt()不能打断正在竞争锁的线程synchronized()

sleep不释放锁 interrupt不打断锁 打断锁

使用ReentrantLock,并使用.lockInterruptibly()方法,有设标志位,会抛异常

public class Interrupt_and_lockInterruptibly {

    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        Thread t1 = new Thread(()-> {
            lock.lock();
            try {
                SleepHelper.sleepSeconds(10);
            } finally {
                lock.unlock();
            }
            System.out.println("t1 end!");
        });

        t1.start();

        SleepHelper.sleepSeconds(1);


        Thread t2 = new Thread(()-> {
            System.out.println("t2 start!");
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                // TODO
            } finally {
                lock.unlock();
            }
            System.out.println("t2 end!");
        });

        t2.start();

        SleepHelper.sleepSeconds(1);
        
        t2.interrupt();

    }
}
结束线程的方法
1. 自然结束(能自然结束就尽量自然结束)
2. stop() suspend() resume()
3. volatile标志
   1. 不适合某些场景(比如还没有同步的时候,线程做了阻塞操作,没有办法循环回去)
   2. 打断时间也不是特别精确,比如一个阻塞容器,容量为5的时候结束生产者,
      但是,由于volatile同步线程标志位的时间控制不是很精确,有可能生产者还继续生产一段儿时间
4. interrupt() and isInterrupted(比较优雅)

stop() suspend() resume()会释放所有锁,不做善后工作,会产生数据不一致问题,产生死锁

volatile
public class T03_VolatileFlag {

    private static volatile boolean running = true;

    public static void main(String[] args) {
        Thread t = new Thread(() -> {
            long i = 0L;
            while (running) {
                //wait recv accept
                i++;
            }

            System.out.println("end and i = " + i); //4168806262 4163032200
        });

        t.start();

        SleepHelper.sleepSeconds(1);

        running = false;
    }
}
interrupt() and isInterrupted(比较优雅,简单使用不能实现紧缺控制)
    public static void main(String[] args) {
        Thread t = new Thread(() -> {
            while (!Thread.interrupted()) {
                //sleep wait
            }

            System.out.println("t1 end!");
        });

        t.start();

        SleepHelper.sleepSeconds(1);

        t.interrupt();// 不能实现精确控制

    }
interrupt 和 LockSupport.park()
    public static void main(String[] args) {
        Thread t = new Thread(() -> {
            System.out.println("1");
            LockSupport.park();
            System.out.println("2");
        });
        t.start();
        SleepHelper.sleepSeconds(1);
        t.interrupt();
    }
并发可见性

volatile,锁代码保证可见性
缓存行
缓存一致性协议

用Volatile保证可见性

volatile 关键字,使一个变量在多个线程间可见
A B线程都用到一个变量,java默认是A线程中保留一份copy,这样如果B线程修改了该变量,则A线程未必知道
使用volatile关键字,会让所有线程都会读到变量的修改值
volatile修饰的内存,任何的修改,马上刷新主内存,其他线程立马可见,
e.g:r的读取发生在当前线程本地的缓存与主内存的数据进行了刷新同步

某些语句除法内存缓存同步刷新
    private static void m() {
        System.out.println("m start");
        while (running) {
            System.out.println("hello"); //
        }
        System.out.println("m end!");
    }

    public static void main(String[] args) throws IOException {

        new Thread(T01_HelloVolatile::m, "t1").start();

        SleepHelper.sleepSeconds(1);

        running = false;

        System.in.read();
    }

m start

hello
hello
m end!

sync保证了可见性

public void println(String x) {
        synchronized (this) {
            print(x);
            newline();
        }
    }
volatile 引用类型(包括数组)只能保证引用本身的可见性,不能保证内部字段的可见性

三级缓存和可见性

缓存行与Volatile无关

按块读取(缓存行,64字节)
程序的局部性原理,可以提高效率,充分发挥总线CPU针脚等一次性读取更多数据的能力
空局部性,时间局部性(指令)

import java.util.concurrent.CountDownLatch;

public class T01_CacheLinePadding {
    public static long COUNT = 10_0000_0000L;

    private static class T {
        //private long p1, p2, p3, p4, p5, p6, p7;
        public long x = 0L; //8bytes
        //private long p9, p10, p11, p12, p13, p14, p15;
        // 放开注释提升效率
        // p1, p2, p3, p4, p5, p6, p7,x 占有8*8字节,为一个缓存行
    }

    public static T[] arr = new T[2];

    static {
        arr[0] = new T();
        arr[1] = new T();
        // 因为: p1, p2, p3, p4, p5, p6, p7,x 占有8*8字节,为一个缓存行
        // 所以[0]、[1]在不同的缓存行,不存同通信,不走缓存一致性协议
    }

    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(2);

        Thread t1 = new Thread(() -> {
            for (long i = 0; i < COUNT; i++) {
                arr[0].x = i;
            }

            latch.countDown();
        });

        Thread t2 = new Thread(() -> {
            for (long i = 0; i < COUNT; i++) {
                arr[1].x = i;
            }

            latch.countDown();
        });

        final long start = System.nanoTime();
        t1.start();
        t2.start();
        latch.await();
        System.out.println((System.nanoTime() - start) / 100_0000);
    }
}

为什么注释放开提升效率

缓存一致性协议,当多线程修改的东西位于同一行的时候,会通知其他线程
中间存在相互干扰,时间

JDK1.7的linkedBlockQueue就是这么做的
disruptor也有这么玩

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

public class Main {

    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) {
        System.out.println(event);
    }

    public static void translate(LongEvent event, long sequence, ByteBuffer buffer) {
        event.setValue(buffer.getLong(0));
    }

    public static void main(String[] args) throws Exception {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(Main::handleEvent);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) {
            bb.putLong(0, l);
            ringBuffer.publishEvent(Main::translate, bb);
            Thread.sleep(1000);
        }
    }
}

@Contentended标注的变量,独占一缓存行

//只有1.8起作用 , 保证x位于单独一行中
需要加参数:-XX:-RestrictContended

缓存一致性总结

缓存一致协议有多种
MESI 四种状态:修改,独享,共享,失效
共享:主动监听


并发有序性

Java程序并发按顺序执行
线程的执行先后顺序时不可预知的

线程的执行先后不可预知
public class T01_Disorder {
    private static int x = 0, y = 0;
    private static int a = 0, b = 0;

    public static void main(String[] args) throws InterruptedException {

        for (long i = 0; i < Long.MAX_VALUE; i++) {
            x = 0;
            y = 0;
            a = 0;
            b = 0;
            CountDownLatch latch = new CountDownLatch(2);

            Thread one = new Thread(new Runnable() {
                public void run() {
                    a = 1;
                    x = b;

                    latch.countDown();
                }

            });

            Thread other = new Thread(new Runnable() {
                public void run() {
                    b = 1;
                    y = a;

                    latch.countDown();
                }
            });
            one.start();
            other.start();
            latch.await();
            String result = "第" + i + "次 (" + x + "," + y + ")";
            if (x == 0 && y == 0) {
                System.err.println(result);
                break;
            }
        }
    }

}

第16939次 (0,0)

当两条语句没有依赖的时候,乱序时为了提高效率

乱序存在的条件:

as-if-serial:好像是序列化
不影响单线程的最终一致性

经典的乱序
public class T02_NoVisibility {
    private static boolean ready = false;
    private static int number;

    private static class ReaderThread extends Thread {
        @Override
        public void run() {
            while (!ready) {
                Thread.yield();
            }
            System.out.println(number);
        }
    }

    public static void main(String[] args) throws Exception {
        Thread t = new ReaderThread();
        t.start();
        number = 42;
        ready = true; //不会马上停止
        // number = 42; ready = true;没有依赖关系,所以sout的结果有可能是0
        t.join();
    }
}
error:没有保证可见性
error:
为什么又是可以立即停止

MESI的主动性, Thread.yield();的同步刷新

解决方案
private static volatile boolean ready = false;//解决可见性
Java对象的创建过程
class T{
int m = 8;
}
T t = new T();

 0 new #2 
 3 dup
 4 invokespecial #3 >
 8 astore_1
15 return

new:申请内存,[安全性:成员变量(默认值0),调用构造方法的时候m才为8]
invokevirtua调用构造方法,设置初始值
astore_1关联

this对象溢出

Java并发编程
由于this溢出
所以不要在构造方法中new线程直接启动

使用内存屏障阻止乱序执行

内存屏障是特殊指令:看到这种指令,前面的必须执行完,后面的才能执行
intel : lfence sfence mfence(CPU特有指令)

jvm中的内存屏障

LoadLoad屏障(LL)

对于ld1,LL,ld2
在ld2及后续读取操作要读取的数据被访问前,必须保证ld1中要读取的数据被读取完

StoreStore

对于s1,SS,s2
在s2及后续的写入操作执行前,保证s1中的写入操作对其他处理器可见

LoadStore

对于l1,LS,s2
在s2及后续写入操作执行前,必须保证ld1中要读取的数据被读取完

StoreLoad

对于s1,SL,l2
在ld2及后续读取操作要读取的数据被访问前,保证s1中的写入操作对其他处理器可见

volatile修饰的内存,不可以重排序,对volatile修饰变量的读写访问,都不可以换顺序(JVM)

–SS–
对volatile变量进行 写
–SL–

对volatile变量进行 读
–LL–
–LS–

这几条指令解决了this对象溢出

volatile的底层实现

volatile修饰的内存,不可以重排序,对volatile修饰变量的读写访问,都不可以换顺序
1: volatile i
2: ACC_VOLATILE
3: JVM的内存屏障
​ 屏障两边的指令不可以重排!保障有序!
​ happends-before
​ as - if - serial
4:hotspot实现

bytecodeinterpreter.cpp

int field_offset = cache->f2_as_index();
          if (cache->is_volatile()) {
            if (support_IRIW_for_not_multiple_copy_atomic_cpu) {
              OrderAccess::fence();
            }

orderaccess_linux_x86.inline.hpp

inline void OrderAccess::fence() {
  if (os::is_MP()) {
    // always use locked addl since mfence is sometimes expensive
#ifdef AMD64
    __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
#else
    __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
#endif
  }
}

LOCK 用于在多处理器中执行指令时对共享内存的独占使用。
它的作用是能够将当前处理器对应缓存的内容刷新到内存,并使其他处理器对应的缓存失效。
另外还提供了有序的指令无法越过这个内存屏障的作用。

并发原子性

race condition => 竞争条件 , 指的是多个线程访问共享数据的时候产生竞争
数据的不一致(unconsistency),并发访问之下产生的不期望出现的结果
如何保障数据一致呢?–> 线程同步(线程执行的顺序安排好),
monitor (管程) —> 锁 —>本质是将并发操作变成了序列化操作
critical section -> 临界区
如果临界区执行时间长,语句多,叫做 锁的粒度比较粗,反之,就是锁的粒度比较细

多线程访问共享数据,产生竞争,产生数据不一致

解决:保证原子性

public class T00_00_IPlusPlus {
    private static long n = 0L;
    public static void main(String[] args) throws Exception {
        //Lock lock = new ReentrantLock();
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
//                    synchronized (T00_00_IPlusPlus.class) {
                        //lock.lock();
                        n++;
                        //lock.unlock();
//                    }
                }
                latch.countDown();
            });
        }
                latch.countDown();
            });
        }
        for (Thread t : threads) {
            t.start();
        }
        latch.await();
        System.out.println(n);
    }
}
原子性和底层原理 Java中的8大原子操作

个人认为,确定不了就上锁

sync

并发–>序列化
保证可见性,原子性,不保证有序性

管程monitor

临界区

锁执行的代码
执行时间长,代码多,锁粗化,反之锁细化

悲观锁

悲观的认为这个操作会被别的线程打断(悲观锁)synchronized
通过加锁避免被打断

乐观锁

乐观的认为这个做不会被别的线程打断(乐观锁 自旋锁 无锁)cas操作
CAS = Compare And Set/Swap/Exchange

CAS

ABA问题

当时引用类型的时候需要解决ABA问题
解决,加版本[时间戳,布尔类型],因为修改是业务程序,所以版本自然可以自己加

在CAS过程中,如果被其他线程修改为不同值

解决:必须保证CAS本身时原子性

CAS的原子性

原子类
e.g:AI

汇编:多核加lock,单核直接
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

public class T01_AtomicInteger {
	 //int count1 = 0;

	AtomicInteger count = new AtomicInteger(0);

	void m() {
		for (int i = 0; i < 10000; i++)
			//if count1.get() < 1000
			count.incrementAndGet(); //count1++
	}

	public static void main(String[] args) {
		T01_AtomicInteger t = new T01_AtomicInteger();

		List threads = new ArrayList();

		for (int i = 0; i < 100; i++) {
			threads.add(new Thread(t::m, "thread-" + i));
		}

		threads.forEach((o) -> o.start());

		threads.forEach((o) -> {
			try {
				o.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});

		System.out.println(t.count);

	}

}

悲观锁和等待队列

在队列中等待的线程,不会消耗CPU资源
blocking waiting packing 阻塞

乐观锁

compare and swap–>时一个循环操作,会让操作系统进行调度,会占用资源
不阻塞

长过程有阻塞用悲观,快操作,用乐观 个人体验sync好用

sync保证可见性
解锁后,会将所有的内存状态跟本地缓存做一次刷新
sync本身有一条lock,lock的内存屏障保证了可见性

Sync提炼 用户态,内核态

早期sync叫做重量级锁,因为申请锁资源必须通过kernel,系统调用

因为当时需要向OS申请,发生了用户态<—>内核态的转变

对象的内存布局

markword(8个字节,标记字)+classpoint(内存指针[可以找到T.class知道是哪个类],默认开启压缩,压缩后4字节)+instanceDate(成员变量[int,byte…])

hostpot:8字节对齐

对象初始化的时候必内存大小必定是8的整数倍,不足就对齐补上

    public static void main(String[] args) {
       Object o = new Object();
       System.out.println(ClassLayout.parseInstance(o).toPrintable());
       synchronized (o) {
            System.out.println(ClassLayout.parseInstance(o).toPrintable());
        }
    }
sync锁升级

hotspot实现
有所态是的hashcode在线程的线程栈中
锁升级初步
用户空间锁&重量级锁

偏向、自旋锁、都是在用户空完成
重量级锁都是需要向内核申请

>一加sync就由01–>00成了轻量级锁

这是因为偏向锁未启动
new - 偏向锁 - {偏向锁撤销} - 轻量级锁 (无锁 自旋 自适应) - 重量级锁

锁竞争(每个线程的线程栈都有一个锁记录LockRoecord)

自旋争抢将自己的LR放在pointClass,锁的pointClass指向那个线程的LR,则代表那个线程抢到锁,没抢到的锁继续自旋
自旋锁转圈十次之后,升级为重量级锁,重量级锁就是去操作系统那里去申请资源
两中exit是因为自然结束和异常结束

锁重入

锁重入,统一线程,多次上锁是被允许的

sync是可重入的,重入次数是会被记录的[因为要解锁],记录在
重入一次记录一次LR,解锁的时候弹出LR —偏向锁和轻量级锁大概是一样的
重量及锁是在ObjectMonitor上由体现

看不懂

自旋消耗CPU资源,如果自旋线程多,CPU会被大量消耗
重量级锁由等待队列,处于等待队列的线程不消耗CPU资源

偏向锁不一定比自旋锁效率高

不一定,若明确知道一开始就会出现多线程竞争的情况下,偏向锁会肯定会走向自旋,此时由锁撤销,
JVM启动时,会由多线程金正,锁以默认不开启偏向锁,过一段时间再打开

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781671.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号