物理结构:只分数组和链表
逻辑结构:如下
Deque:双端队列
Vector和HashTable:自带锁,现在基本不用
从HashTable发展到HashMap、SynchronizedHashMap、ConcurrentHashMap
package character06;
public class Constants {
public static final int COUNT = 1000000;
public static final int THREAD_COUNT = 100;
}
package character06;
import java.util.*;
public class T01_TestHashTable {
static Hashtable m=new Hashtable<>();
static int count=Constants.COUNT;
static UUID[] keys=new UUID[count];
static UUID[] values=new UUID[count];
static final int THREAD_COUNT=Constants.THREAD_COUNT;
static {
for(int i=0;i{
for(int j=0;j<1000000;j++){
m.get(keys[10]);//读取keys[10]100000次
}
});
}
for(Thread t:threads){
t.start();
}
for(Thread t:threads){
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
end=System.currentTimeMillis();
System.out.println(end-start);
}
}
测试SynchronizedHashMap
package character06;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class T03_TestSynchronizedHashMap {
static Map m= Collections.synchronizedMap(new HashMap());
static int count=Constants.COUNT;
static UUID[] keys=new UUID[count];
static UUID[] values=new UUID[count];
static final int THREAD_COUNT=Constants.THREAD_COUNT;
static {
for(int i=0;i{
for(int j=0;j<1000000;j++){
m.get(keys[10]);//读取keys[10]100000次
}
});
}
for(Thread t:threads){
t.start();
}
for(Thread t:threads){
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
end=System.currentTimeMillis();
System.out.println(end-start);
}
}
测试ConcurrentHashMap
package character06;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class T04_TestConcurrentHashMap {
static Map m=new ConcurrentHashMap<>();
static int count=Constants.COUNT;
static UUID[] keys=new UUID[count];
static UUID[] values=new UUID[count];
static final int THREAD_COUNT=Constants.THREAD_COUNT;
static {
for(int i=0;i{
for(int j=0;j<1000000;j++){
m.get(keys[10]);//读取keys[10]100000次
}
});
}
for(Thread t:threads){
t.start();
}
for(Thread t:threads){
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
end=System.currentTimeMillis();
System.out.println(end-start);
}
}
结论:ConcurrentHashMap读效率比HashTable和HashMap要高,写效率反而低
Vector、List、Queue 测试例子package character06;
import java.util.ArrayList;
import java.util.List;
public class TicketSeller1 {
static List tickets = new ArrayList<>();
static {
for(int i=0; i<10000; i++){
tickets.add("票编号:" + i);
}
}
public static void main(String[] args) {
for(int i=0; i<10; i++) {
new Thread(()->{
while(tickets.size() > 0) {
System.out.println("销售了--" + tickets.remove(0));
}
}).start();
}
}
}
使用Vector进行对比
package character06;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
public class TicketSeller2 {
//Vector是同步容器
static Vector tickets = new Vector<>();
static {
for(int i=0; i<1000; i++) {
tickets.add("票 编号:" + i);
}
}
public static void main(String[] args) {
for(int i=0; i<10; i++) {
new Thread(()->{
//tickets.size()与tickets.remove()都是原子性操作,但是tickets.size()与tickets.remove()之间的操作不是原子性的
while(tickets.size() > 0) {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("销售了--" + tickets.remove(0));
}
}).start();
}
}
}
上述方法还会有bug,为此我们可以使用list并进行加锁
package character06;
import java.util.linkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class TicketSeller3 {
static List tickets = new linkedList<>();
static {
for(int i=0; i<1000; i++) {
tickets.add("票 编号:" + i);
}
}
public static void main(String[] args) {
for(int i=0; i<10; i++) {
new Thread(()->{
while(true) {
synchronized(tickets) {
//判断与销售加到一个原子操作里,所以不会出问题,但是效率不高
if(tickets.size() <= 0) {
break;
}
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("销售了--" + tickets.remove(0));
}
}
}).start();
}
}
}
使用队列优化
package character06;
import java.util.Queue;
import java.util.concurrent.ConcurrentlinkedQueue;
import java.util.concurrent.TimeUnit;
public class TicketSeller4 {
static Queue tickets = new ConcurrentlinkedQueue<>();
//ConcurrentlinkedQueue是并发容器 这个程序效率比3高
static {
for(int i=0; i<1000; i++) {
tickets.add("票 编号:" + i);
}
}
public static void main(String[] args) {
for(int i=0; i<10; i++) {
new Thread(()->{
while(true) {
String s = tickets.poll();
//拿到头位置的那个票,为空说明没拿到,poll()是有原子性的
// 判断后没有修改队列操作,所以不需要加锁
if(s == null){
break;
}
else
{
System.out.println("销售了--" + s);
}
}
}).start();
}
}
}
ConcurrentSkipListMap
为什么要有这个类?
java中hashmap是无序的,treemap是有序的,但是treemap实现高并发比较困难,因此诞生了折中方案——ConcurrentSkipListMap
特点:
高并发且排序
底层实现是跳表
什么是跳表?
写时复制
当一组数据读的操作特别多,写的操作特别少,可以使用它来提高效率
为什么提高效率?
读的时候不加锁,写入的时候在原数组上copy一份并内存空间加一给线程,操作完之后,把原数组的引用指向新的数组
读的时候是共享锁,写的时候是排它锁
QueueQueue和List的区别?
Queue添加了很多对线程友好的api:offer、peek、poll
put、take会线程阻塞
为线程池做准备
作用:使线程实现自动阻塞
基本操作:offer(添加数据),peek(看第一个元素),pool(取出第一个元素)
链表实现,无界队列
阻塞方法:put、take
阻塞使用Condition类的await实现的,底层使用unsafe的park()
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.linkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class linkedBlockingQueue {
static BlockingQueue strs = new linkedBlockingQueue<>();
static Random r = new Random();
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
strs.put("a" + i); //如果满了,就会等待
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "p1").start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (; ; ) {
try {
System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "c" + i).start();
}
}
}
ArrayBlockingQueue
有界队列
put()满了就等待,线程阻塞
add()满了会抛异常
offer()有返回值,满了就会返回false
需要实现compareTo方法
需要指定等待时间
用来按时间进行任务调度
本质是一个PriorityQueue
import java.util.Calendar;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class T07_DelayQueue {
static BlockingQueue tasks = new DelayQueue<>();
static Random r = new Random();
static class MyTask implements Delayed {
String name;
long runningTime;
MyTask(String name, long rt) {
this.name = name;
this.runningTime = rt;
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return name + " " + runningTime;
}
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask("t1", now + 1000);
MyTask t2 = new MyTask("t2", now + 2000);
MyTask t3 = new MyTask("t3", now + 1500);
MyTask t4 = new MyTask("t4", now + 2500);
MyTask t5 = new MyTask("t5", now + 500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for (int i = 0; i < 5; i++) {
System.out.println(tasks.take());
}
}
}
PriorityQueue
内部进行了排序,底层是一个二叉树(小顶堆)的结构
package com.mashibing.juc.c_025;
import java.util.PriorityQueue;
public class T07_01_PriorityQueque {
public static void main(String[] args) {
PriorityQueue q = new PriorityQueue<>();
q.add("c");
q.add("e");
q.add("a");
q.add("d");
q.add("z");
for (int i = 0; i < 5; i++) {
System.out.println(q.poll());
}
}
}
输出:
a
c
d
e
z
SynchronusQueue
容量为0,只是单纯用来给其他线程传递任务
本质和Exchanger比较相似,也是需要两个线程同步对接,否则都会阻塞着
在线程池里面,线程之间进行任务调度的时候,经常会用到
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class T08_SynchronusQueue { //容量为0
public static void main(String[] args) throws InterruptedException {
BlockingQueue strs = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.put("aaa"); //阻塞等待消费者消费
//strs.put("bbb");
//strs.add("aaa");
System.out.println(strs.size());
}
}
TransferQueue
装完,阻塞等着,有线程把它取走,再离开
要先开启消费者线程,再往里面transfer,要不然就阻塞了~
场景1:要求某件任务有一个结果(比如一个订单等付款完成之后,确认有线程去处理它了,再给客户反馈)
场景2:确认收钱完成之后,才能把商品取走,比如面对面付款
package com.mashibing.juc.c_025;
import java.util.concurrent.linkedTransferQueue;
public class T09_TransferQueue {
public static void main(String[] args) throws InterruptedException {
linkedTransferQueue strs = new linkedTransferQueue<>();
new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.transfer("aaa");
//strs.put("aaa");
}
}
transfer():装完数据就堵塞等待其他线程取走数据
经典的交替打印面试题可以用 TransferQueue 实现import java.util.concurrent.linkedTransferQueue;
import java.util.concurrent.TransferQueue;
public class T13_TransferQueue {
public static void main(String[] args) {
char[] aI = "1234567".toCharArray();
char[] aC = "ABCDEFG".toCharArray();
TransferQueue queue = new linkedTransferQueue();
new Thread(() -> {
try {
for (char c : aI) {
System.out.print(queue.take());
queue.transfer(c);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
for (char c : aC) {
queue.transfer(c);
System.out.print(queue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
}



