ConcurrentHashMap采用的数据结构和JDK1.8HashMap一样,也是数组+链表+红黑树来存储元素的,不过和HashMap相比,因为使用了自旋锁 + CAS + synchronized,所以是线程安全的
ConcurrentHashMap添加元素的整体流程如下:
(1)桶数组如果还没初始化,就先初始化
(2)根据元素的key计算hash值,然后用hash值去定义桶位
(3)桶位如果为空,就直接插入元素
(4)桶位的元素如果被迁移了,说明当前桶处于扩容过程,当前线程应该要先帮助完成扩容
(5)桶位有元素,且没有被迁移,就锁住这个桶位
(6)如果桶位元素是链表节点,就遍历链表添加元素,更新或新增元素
(7)如果桶位元素是红黑树节点,就往红黑树添加元素,更新或新增元素
(8)如果元素存在,返回旧值
(9)如果元素不存在,桶的元素个数+1,,然后判断添加元素后的桶是否要扩容
添加元素使用了自旋锁 + CAS + synchronized来保证线程安全的
//桶数组的最大容量 private static final int MAXIMUM_CAPACITY = 1 << 30; //桶数组的默认容量 private static final int DEFAULT_CAPACITY = 16; //最大数组大小 不是2的n次方 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //默认并发级别JDK1.7用的 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //默认加载因子,不能修改 和HashMap不一样 HashMap的可以通过构造方法修改 private static final float LOAD_FACTOR = 0.75f; //链表树化的阈值 大于等于这个值就树化 static final int TREEIFY_THRESHOLD = 8 //树退化成链表的阈值 小于等于这个值就退化成链表 static final int UNTREEIFY_THRESHOLD = 6 //链表树化桶数组的阈值 桶数组的大小要大于等于这个值链表才允许树化成红黑树 否则就直接扩容 static final int MIN_TREEIFY_CAPACITY = 64 //迁移数据时一个线程所能处理的最多桶位 private static final int MIN_TRANSFER_STRIDE = 16 //用来计算扩容标识戳 每次扩容的标识戳都不一样 private static int RESIZE_STAMP_BITS = 16 //最多能有多少个线程参与并发扩容 //1左移16位后-1 相当于2的16次方-1 = 65535 最多能有65535个线程参与并发扩容 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1 //用来算扩容标识戳的 sizeCtl高16表示扩容标识戳 所以要右移16位来判断当前线程是否是当前参与扩容的线程 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS //节点hash值的特殊含义 static final int MOVED = -1; //表示当前节点已经被迁移了 即处于扩容状态 static final int TREEBIN = -2 //表示当前节点是红黑树节点 static final int RESERVED = -3 //表示翻转节点 static final int HASH_BITS = 0x7fffffff //作用是把hash值变为正数 //cpu数 static final int NCPU = Runtime.getRuntime().availableProcessors();2.2关键字段
//存储元素的桶数组 transient volatile Node2.3、重要的内部类[] table //新表的引用 扩容时,旧表的元素会迁移到这个新表,扩容完成之后会被置空 private transient volatile Node [] nextTable //LongAdder的baseCount 没有发生线程竞争时,增量都存储在这里 private transient volatile long baseCount; //用来表示table的状态,可正可负 //负数时有两种情况:1、-1时表示table正在进行初始化,其他线程要自旋等待进行初始化的线程 // 2、-(1 + nThread),当前有n个线程进行并发扩容 //= 0时,用默认容量16初始化table //> 0 时:如果table还没初始化,就表示始化大小 如果table已经初始化,就表示下次扩容的阈值 private transient volatile int sizeCtl //表示扩容时,已经处理到哪个桶位了 //例如桶位大小为32,transferIndex的初始化时是桶位大小即32 //第一个线程被分配到处理第17个到第32个桶位,然后transferIndex变为16 //第二个线程根据transferIndex被分配到处理第1个到16个桶位 private transient volatile int transferIndex; //0表示LongAdder对象处于无锁 1表示加锁 private transient volatile int cellsBusy //LongAdder的cell数组 baseCount发生竞争后会创建这个数组,然后线程通过计算的hash值去取对应的cell,然后把增量加到这个cell里 //桶数组的元素个数是baseCount的数加上每个cell的数 private transient volatile CounterCell[] counterCells
2.3.1、单链表节点
static class Nodeimplements Entry { final int hash; final K key; volatile V val; volatile Node next; //下一个节点 }
2.3.2、红黑树代理节点
static final class TreeBinextends Node { TreeNode root; //根节点 volatile TreeNode first; //链表头结点 volatile Thread waiter; // 等待者线程(当前lockState是读锁状态) volatile int lockState; //锁的状态 写锁是独占状态 写的时候其他线程不能读或写 读锁是共享状态 其他线程可以读不能写 等待者状态 写线程要等其他线程读完才能写 把lockState最后两位设为2 // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; }
2.3.3、红黑树节点
static final class TreeNodeextends Node { TreeNode parent; // red-black tree links TreeNode left; TreeNode right; TreeNode prev; // needed to unlink next upon deletion boolean red; }
2.3.4、FWD节点,该节点的作用是标识当前节点已经被迁移了,并表明当前table处于扩容状态
static final class ForwardingNode2.4、偏移地址属性extends Node { final Node [] nextTable; ForwardingNode(Node [] tab) { super(MOVED, null, null, null); this.nextTable = tab; } }
private static final sun.misc.Unsafe U; //根据偏移地址进行操作的类
private static final long SIZECTL; //sizeCtl在ConcurrentHashMap内存的偏移地址
private static final long TRANSFERINDEX; //transferIndex在ConcurrentHashMap内存的偏移地址
private static final long baseCOUNT; //baseCount在ConcurrentHashMap内存的偏移地址
private static final long CELLSBUSY; //cellsBusy在ConcurrentHashMap内存的偏移地址
private static final long CELLVALUE; //cellValue在ConcurrentHashMap内存的偏移地址
private static final long Abase; //数组第一个元素的地址
private static final int ASHIFT; //偏移量 Abase + ASHIFT能定义到具体的位置
//这段代码是用来给上面的偏移地址赋值的
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
baseCOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class> ak = Node[].class;
Abase = U.arraybaseOffset(ak);// 拿到数组第一个元素的偏移地址
int scale = U.arrayIndexScale(ak);// 表示数组中每一个单元所占用的空间大小,即scale表示Node[]数组中每一个单元所占用的空间
if ((scale & (scale - 1)) != 0)//scale要是2的n次方 不是2的2次方就不合法
throw new Error("data type scale not a power of two");
//numberOfLeadingZeros(scale)统计scale从高位有多少个连续的0
//例如numberOfLeadingZeros(4)= 29 ASHIFT = 31 - 29 = 2
//ASHIFT的作用是用来算某个下标元素的偏移地址 例如下标为5的元素偏移地址为 Abase + (5 << ASHIFT) = Abase + (5 << 2) = Abase + (5 * scale) = Abase + (5 * 4) =
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);//获取偏移地址
} catch (Exception e) {
throw new Error(e);
}
}
2.5、几个关键的通用方法和构造方法
//根据table和下标i的偏移地址得到下标i的桶位元素 static finalNode tabAt(Node [] tab, int i) { return (Node )U.getObjectVolatile(tab, ((long)i << ASHIFT) + Abase);// ((long)i << ASHIFT) + Abase能得到当前Node[] 数组下标为i的节点对象的偏移地址 } //通过CAS把table下标i的桶位元素的值更新成v 更新成功返回true 更新失败返回false c是期望值 v是更新后的值 static final boolean casTabAt(Node [] tab, int i, Node c, Node v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + Abase, c, v); } //把table下标i的桶位元素的值更新成v 没有用到cas static final void setTabAt(Node [] tab, int i, Node v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + Abase, v); } static final int spread(int h) { //hashCode和hashCode的高16位进行异或运算使得hashCode变得更为分散 return (h ^ (h >>> 16)) & HASH_BITS; //hashmap的hash方法加上与HASH_BITS进行与运算使得hash值永远是正数 } //算出大于c的最小2次方 例如c是29 就会算出32 private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } //算出扩容标识戳 线程参与并发扩容时通过sizeCtl算出的扩容标识戳等于这个才能参与并发扩容 //例如n是16,表示table从16扩容到32 static final int resizeStamp(int n) { // 27 | (1 << 15) = 27 | 32768 = 0000 0000 0001 1011 | 1000 0000 0000 0000 = 1000 0000 0001 1011 // 线程通过sizeCtl算出的扩容标识戳要等于1000 0000 0001 1011才能参与到16扩容到32的这个过程 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
构造方法
//指定初始容量
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap; //目前table还没初始化时,sizeCtl表示初始化容量
}
//通过一个map初始化ConcurrentHashMap
public ConcurrentHashMap(Map extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // 当指定的初始化容量initialCapacity小于并发级别concurrencyLevel时
initialCapacity = concurrencyLevel; // 初始化容量值设置为并发级别的值
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap; //sizeCtl > 0 table还没初始化,sizeCtl表示初始容量
}



