ConcurrentHashMap源码分析(待续)

ConcurrentHashMap源码分析(待续)

概要

HashTable虽然是线程安全的,但是并发操作时是全表锁,性能非常低下

HashTable线程安全

HashMap操作高效,但是并发操作不能保证线程安全,JDK1.7之前采用头插法扩容时可能会形成环状链表,导致get操作时CUP空转。所以为了解决HashMap线程安全问题,ConcurrentHashMap就诞生了。

ConcurrentHashMap在JDK1.8之前使用分段锁来保证线程安全,如下

JDK1.8前使用分段锁
分段

但是JDK1.8版本实现线程安全的思想完全改变,抛弃了分段锁的概念,使用了全新的利用CAS算法实现。

结构认识

image-20210614104117695
image-20210614104240146

ConcurrentHashMap实现了ConcurrentHashMap接口,但是没有实习Cloneable接口

可以看到ConcurrentHashMap有很多的字段,很多都非常陌生,我们一个个来解释。

源码解释

字段解释

常规常量:(HashMap中的常量不作赘述,需要了解可移步JDK1.8HashMap源码通俗解读)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* ---------------- Constants -------------- */

private static final int MAXIMUM_CAPACITY = 1 << 30;


private static final int DEFAULT_CAPACITY = 16;


static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;


private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

private static final float LOAD_FACTOR = 0.75f;

static final int TREEIFY_THRESHOLD = 8;

static final int UNTREEIFY_THRESHOLD = 6;

static final int MIN_TREEIFY_CAPACITY = 64;

// 扩容线程每次最少要迁移16个hash桶
private static final int MIN_TRANSFER_STRIDE = 16;

// 用于生成每次扩容都唯一的生成戳的数,最小是6。很奇怪,这个值不是常量,但是也不提供修改方法
private static final int RESIZE_STAMP_BITS = 16;

// 最大的扩容线程的数量,如果上面的 RESIZE_STAMP_BITS = 32,那么此值为 0
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

// 移位量,把生成戳移位后保存在sizeCtl中当做扩容线程计数的基数,相反方向移位后能够反解出生成戳
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

下面几个是特殊的节点的hash值,正常节点的hash值在hash函数中都处理过了,不会出现负数的情况,特殊节点在各自的实现类中有特殊的遍历方法

1
2
3
4
static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
  • 普通链表节点:通常是桶中元素小于8个,就是一个单链表,头元素hash > 0

  • 转移节点(MOVED): 表明当前正在扩容中,当前的节点元素已经被转移到新table中,头元素hash =-1

  • 树节点(TREEBIN): 表示当前的桶是一个红黑树桶,头元素hash = -2

  • 占位节点(RESERVED):一般用于当key对应的值缺失需要计算的场景,在计算出新值之前临时占坑位用的,计算出来之后就用普通Node节点替换掉,头元素hash = -3

1
2
// CPU的核心数,用于在扩容时计算一个线程一次要干多少活
static final int NCPU = Runtime.getRuntime().availableProcessors();

字段: 重点认识sizeCtl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
transient volatile Node<K,V>[] table;

// 扩容后的新的table数组,只有在扩容时才有用
private transient volatile Node<K,V>[] nextTable;

// 最基础的计数,比如只有一个线程put操作,只需要通过CAS修改baseCount就可以了。
private transient volatile long baseCount;

/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
*当为负数时:-1代表正在初始化,-N代表有N-1个线程 正在进行扩容
*当为0时:代表当时的table还没有被初始化
*当为正数时:表示初始化或者下一次进行扩容的大小,还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。
*/
private transient volatile int sizeCtl;

/**
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;

// 自旋锁标识符
private transient volatile int cellsBusy;

/*这是一个数组,里面放着CounterCell对象,这个类里面就一个属性,其使用方法是,在高并发的时候,多个线程都 *要进行计数,每个线程有一个探针hash值,通过这个hash值定位到数组桶的位置,如果这个位置有值就通过CAS修改 *CounterCell的value(如果修改失败,就换一个再试),如果没有,就创建一个CounterCell对象。--用于多线 *程并发时计数,长度必须是2^n
*/
private transient volatile CounterCell[] counterCells;

存储结构解释

1、Node:基本节点/普通节点

此节点就是一个很普通的Entry,在链表形式保存才使用这种节点,它存储实际的数据,基本结构类似于1.8的HashMap.Node,和1.7的Concurrent.HashEntry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val) {
this.hash = hash;
this.key = key;
this.val = val;
}

Node(int hash, K key, V val, Node<K,V> next) {
this(hash, key, val);
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString() {
return Helpers.mapEntryToString(key, val);
}
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

2、TreeNode:红黑树节点 在红黑树形式保存时才存在,它也存储有实际的数据,结构和1.8的HashMapTreeNode一样,一些方法的实现代码也基本一样。不过,ConcurrentHashMap对此节点的操作,都会由TreeBin来代理执行。也可以把这里的TreeNode看出是有一半功能的HashMap.TreeNode,另一半功能在ConcurrentHashMap.TreeBin中。 红黑树节点本身保存有普通链表节点Node的所有属性,因此可以使用两种方式进行读操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}

Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}

/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}

3、ForwardingNode:转发节点 ForwardingNode是一种临时节点,在扩容进行中才会出现,hash值固定为-1,并且它不存储实际的数据数据。如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode。读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null);
this.nextTable = tab;
}

Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}

4、TreeBin:代理操作TreeNode的节点 TreeBin的hash值固定为-2,它是ConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点。因为红黑树进行写入操作,整个树的结构可能会有很大的变化,这个对读线程有很大的影响,所以TreeBin还要维护一个简单读写锁,这是相对HashMap,这个类新引入这种特殊节点的重要原因。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 红黑树节点TreeNode实际上还保存有链表的指针,因此也可以用链表的方式进行遍历读取操作
// 自身维护一个简单的读写锁,不用考虑写-写竞争的情况
// 不是全部的写操作都要加写锁,只有部分的put/remove需要加写锁
// 很多方法的实现和jdk1.8的ConcurrentHashMap.TreeNode里面的方法基本一样,可以互相参考
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root; // 红黑树根节点
volatile TreeNode<K,V> first; //链表结构头结点
volatile Thread waiter; // 最近一个设置WAITER标志位的线程
volatile int lockState; // 整体的锁标志位
// values for lockState
static final int WRITER = 1; // 红黑树写锁001
static final int WAITER = 2; // 等待获取写锁010
static final int READER = 4; // 读锁状态100
...
}

5、ReservationNode:保留节点 或者叫空节点,computeIfAbsentcompute这两个函数式api中才会使用。它的hash值固定为-3,就是个占位符,不会保存实际的数据,正常情况是不会出现的,在jdk1.8新的函数式有关的两个方法computeIfAbsentcompute中才会出现。 为什么需要这个节点,因为正常的写操作,都会想对hash桶的第一个节点进行加锁,但是null是不能加锁,所以就要new一个占位符出来,放在这个空hash桶中成为第一个节点,把占位符当锁的对象,这样就能对整个hash桶加锁了。put/remove不使用ReservationNode是因为它们都特殊处理了,并且这种特殊情况实际上还更简单,put直接使用cas操作,remove直接不操作,都不用加锁。但是computeIfAbsentcompute这个两个方法在碰见这种特殊情况时稍微复杂些,代码多一些,不加锁不好处理,所以需要ReservationNode来帮助完成对hash桶的加锁操作。

1
2
3
4
5
6
7
8
9
static final class ReservationNode<K,V> extends Node<K,V> {
ReservationNode() {
super(RESERVED, null, null);
}

Node<K,V> find(int h, Object k) {
return null;
}
}

构造函数解释

ConcurrentHashMap中一共有5个构造函数

image-20210614112920296
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public ConcurrentHashMap() {
}


public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, LOAD_FACTOR, 1);
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}


public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

JDK1.7中concurrencyLevel表示有多少个Segment,有多少个Segment就表示支持多少个线程可以并发操作,initialCapacity表示每个Segment中有多少个Entry对象。

并发级别不会随扩容变化。

image-20210614114657389

可以看到JDK1.8中虽然保留了Segment,但是构造函数直接将concurrencyLevel赋值给了initialCapacity,没有使用JDK1.7的思想。

构造函数不会进行初始化,只是进行参数的设定,真正初始化在initTable()。在put方法中有调用此方法,即第一次put才进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 使用保存在sizeCtl中的数据作为初始化容量
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// Thread.yeild() 和 CAS 都不是100%和预期一致的方法,所以用循环,其他代码中也有很多这样的场景
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 真正的初始化是要禁止并发的,保证tables数组只被初始化一次,但是又不能切换线程,所以用yeild()暂时让出CPU
Thread.yield(); // lost initialization race; just spin
// CAS更新sizeCtl标识为 "初始化" 状态
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
// 检查table数组是否已经被初始化,没初始化就真正初始化
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// // sc = threshold,n - (n >>> 2) = n - n/4 = 0.75n,loadFactor没用了,这里看出,统一用0.75f了
sc = n - (n >>> 2);
}
} finally {
// 设置threshold
sizeCtl = sc;
}
break;
}
}
return tab;
}
初始化调用时机

基本方法解释

hash扰动函数,跟1.8的HashMap的基本一样,& HASH_BITS用于把hash值转化为正数,负数hash是有特别作用

1
2
3
4
5
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
  • comparableClassFor()用于获取Comparable接口中的泛型类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static Class<?> comparableClassFor(Object x) {
if (x instanceof Comparable) {
Class<?> c; Type[] ts, as; ParameterizedType p;
if ((c = x.getClass()) == String.class) // bypass checks
return c;
if ((ts = c.getGenericInterfaces()) != null) {
for (Type t : ts) {
if ((t instanceof ParameterizedType) &&
((p = (ParameterizedType)t).getRawType() ==
Comparable.class) &&
(as = p.getActualTypeArguments()) != null &&
as.length == 1 && as[0] == c) // type arg is c
return c;
}
}
}
return null;
}
  • compareComparables()同1.8的HashMap,当类型相同且实现Comparable时,调用compareTo比较大小
1
2
3
4
@SuppressWarnings({"rawtypes","unchecked"}) // for cast to Comparable
static int compareComparables(Class<?> kc, Object k, Object x) {
return (x == null || x.getClass() != kc ? 0 : ((Comparable)k).compareTo(x));
}

下面几个用于读写table数组,使用Unsafe提供的更强的功能(数组元素的volatile读写,CAS 更新)代替普通的读写,调用者预先进行参数控制

1
2
3
4
5
6
7
8
9
10
11
12
13
// volatile读取table[i]
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
// CAS更新table[i],也就是Node链表的头节点,或者TreeBin节点(它持有红黑树的根节点)
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// volatile写入table[i]
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}

计数操作

由于ConcurrentHashMap是一个高并发的集合,集合中增删就比较频繁,那计数就变成了一个问题,如果使用像AtomicInteger这样类型的变量来计数,虽然可以保证原子性,但是太多线程去竞争CAS,自旋也挺浪费时间的,所以ConcurrentHashMap使用了一种类似LongAddr的数据结构去计数,其实LongAddr是继承Striped64,有关于这个类的原理大家可以参考这篇文章:并发之STRIPED64(累加器)和 LONGADDER,大家了解了这个类的原理,理解ConcurrentHashMap计数就没有一点压力了,因为两者在代码实现上基本一样。

实现计数原理的字段如下:

1
2
3
4
5
6
private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells;
@jdk.internal.vm.annotation.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
  • baseCount:基础计数,线程如果进行put等操作,使用CAS修改baseCount即可
  • counterCells:分段计数的数组,根据hash值让线程在数组上不同索引处计数,类似JDK1.7的Segment分段锁,给线程分配一个Segment,这里给线程分配一个索引进行独自计数。

根据STRIPED64LONGADDR的思想,高并发环境下计数先会尝试使用CAS无锁技术baseCount进行更新,但是如果自旋过久就判定竞争很激烈,此时才会使用counterCells进行计数。最后使用一定的方法将counterCells的计数值和baseCount总和起来。

总和的方法就是sumCount()

1
2
3
4
5
6
7
8
9
10
11
12
final long sumCount() {
CounterCell[] cs = counterCells;
// 当前的baseCount作为基准值
long sum = baseCount;
if (cs != null) {
for (CounterCell c : cs)
if (c != null)
// 对每个counterCell进行累加
sum += c.value;
}
return sum;
}

高并发下的计数流程大致了解了,我们深入一下。

刚刚谈到竞争激烈情况下才会使用counterCells进行计数,那么线程怎么直到我要把计数放在哪里呢?这和探针hash有关。这个值比较有趣,分析代码的时候我们再说。

我们看看计数+1的具体代码addCount()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private final void addCount(long x, int check) {
CounterCell[] cs; long b, s;
// 如果counterCells还未创建,就直接对baseCount进行CAS操作
if ((cs = counterCells) != null ||
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell c; long v; int m;
// 设置没有冲突的标志位为true
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
// c = cs[ThreadLocalRandom.getProbe() & m]) == null 说明数组是创建了,但是通过探针hash定位的桶中没有对象
// 如果桶中有对象,进行CAS修改counterCell,如果也失败了就要进入下面的方法
(c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) {
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
transfer(tab, null);
s = sumCount();
}
}
}

注意到一句代码c = cs[ThreadLocalRandom.getProbe() & m]) == null,根据方法名可以猜到是获取一个随机值生成探针,为什么要使用ThreadLocalRandom,而不直接使用Random,因为每个线程都是隔离的,只需要知道自己应该在哪个位置计数就可以了,所以使用ThreadLocal线程内部变量即可。还有一个关键就是随机数Random的生成需要一个种子,默认是当前时机毫秒值。如果初始化Random,种子(seed1)为当前时间戳,另外一个线程需要计算随机数,需要使用CAS更新当前种子生成seed2,然后再生成随机数。

问题就出在CAS,高并发情况下都维护一个seed会非常影响性能,所以使用ThreadLocalRandom可以保证每个线程维护自己内部私有的种子即可,不存在竞争修改种子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//上面我贴出来了介绍ThreadLocalRandom的文章,这里如果是首次获取,其实就是0
if ((h = ThreadLocalRandom.getProbe()) == 0) {
//如果为0,就初始化,这里其实就是把种子和随机数设置到(Thread)线程中
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//死循环,保证计数一定成功
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//说明数组已经初始化,在后面有判断数组没有初始化的情况
if ((as = counterCells) != null && (n = as.length) > 0) {
//这里是不是和ConcurrentHashMap定位桶的位置很像,其实是一摸一样的
//说明数组中这个位置没有元素
if ((a = as[(n - 1) & h]) == null) {
//这个字段保证数组新增节点,扩容只有一个线程在进行,防止多线程并发
//这里限制一个线程处理只是在数组新增节点和扩容的时候,修改对象的值并不需要限制这个变量
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create

//如果为0表示没有别的线程在修改数组,通过CAS修改为1,表示当前线程在修改数组
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
//再次校验,确保数组没有变化
//rs[j = (m - 1) & h] == null,再次确认该位置是否为null,防止别的线程插入了
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//插入数组
rs[j] = r;
created = true;
}
} finally {
//释放CAS锁
cellsBusy = 0;
}
if (created)
//如果新节点插入成功,表示计数已经成功,这里直接break了
break;
//如果失败会一直重试
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash

//定位到桶中有值,然后通过CAS修改其值
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//下面的两个elseif其实是为了防止数组一直扩容使用的,数组的最大容量就是CPU的核数
//因为核数就是并发数,数组太大没有意义,没有那么多线程可以同时操作
//就是说上面的新建节点或者CAS修改值事变了,就会到这里,然后拦截住,不让执行扩容
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//先竞争到CAS锁,然后执行扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale

//每次扩容成原来的两倍
CounterCell[] rs = new CounterCell[n << 1];
//复制元素,看过ConcurrentHashMap的扩容,再看这个,简直就跟一个大学生看小学数学题一样
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//这里是重新生成一个随机数,换个位置试试,比如上面新增节点失败了,换个位置试试,或者通过CAS修改值失败,也换个位置再试试
h = ThreadLocalRandom.advanceProbe(h);
}
//这里就是判断数组没有初始化的情况,搞不明白没啥放在这里,不放在开头
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
//初始化的数组大小是2,非常小
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//如果以上CAS修改,创建新节点都失败了,这里还有一道防线,通过CAS修改baseCount
//这也是再addCount中,当判断数组不为空,不先修改下baseCount试试,而是直接跳到这个方法中,因为在这个方法中也会修改baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}

方法流程梳理如下:

  1. 判断数组是否为空,为空的话初始化数组
  2. 如果数组存在,通过探针hash定位桶中的位置,如果桶中为空,新建节点,通过CAS锁插入数组,如果成功,结束,如果失败转到第5步
  3. 如果定位到桶中有值,通过CAS修改,如果成功,结束,如果失败向下走
  4. 如果数组大小小于CPU核数,扩容数组
  5. 重新计算探针hash

扩容

JDK1.8的扩容可以多线程一起完成,因此变得复杂了,但是效率提升了。

transfer任务

由字面意思可以知道transfer是转移的意思,这里指代扩容时多线程中每个线程分配的迁移原结点到新结点处的任务

对于一个大任务拆分成多个小任务供多线程执行,一般都要求这些小任务具有相似性,流程一致,并且很重要的一点,任务之间的相互影响尽量少。那么在扩容之中,是怎么划分这个任务的呢?

对于一般扩容,大致分为两步:

  • 新建一个2倍大小的数组,这个过程要求单线程完成,多线程没有意义,反而容易出错。
  • 迁移原数组结点到新数组,即rehash过程,我们知道HashMap中使用了一个技巧避免重新计算hash值,即使用(e.hash & oldCap) == 0判断新索引处于高位还是低位。

这一点对多线程扩容非常有利。根据这一点,可以知道,每个hash桶的迁移都可以作为一个线程在扩容时的一个transfer任务。

另外,每个线程要任务都不应该规模太小,因为扩容并不是IO型操作,节点迁移的执行速度本身很快,太多的线程来执行节点迁移,线程调度开销占比变大,反而降低了吞吐量。ConcurrentHashMap这里,会根据CPU的核心数目(前面提到的NCPU),来算出一个transfer任务包含的hash桶的数量。

transfer()中会计算每个transfer任务中要迁移多少个hash桶,一个transfer任务完成后,可以再次申请,这个方法代码非常多,先省略后面的部分。

1
2
3
4
5
6
7
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// stride 就是要进行迁移的量
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
···
}

MIN_TRANSFER_STRIDE我们知道代表一次transfer中最小迁移的hash桶数量,默认为16.即一次迁移任务中最少应该迁移16个hash桶。

申请transfer任务

1
private transient volatile int transferIndex;

还记得这个变量吗?它的意思就是用于标记整体的transfer进行到了哪里,申请transfer任务跟它关联非常大。

经典生产者消费者中,消费者会从任务队列里去获取可执行任务,因此MQ、Redis应对高并发的思想也是通过队列来进行控制。ConcurrentHashMap中不存在队列,因为底层的数组就可以实现队列,通过transferIndex来控制任务的调度。

每次申请任务,transferIndex就会-1。由于迭代操作是从小到大,为了减少和扩容时transfer的迭代发生冲突,transferIndex采取反向迭代,即下标从大到小。二者相遇后,要transfer的hash桶都已经被遍历过了,要遍历的的hash桶都已经tranfer完成到新数组了,这样减少了冲突。

下标在[transferIndex - stride(>= 0), transferIndex - 1]内的hash桶,就是每个transfer的任务区间。transferIndex <= 0 时,代表没有任务可以申请,此时无法帮助扩容。注意,NCPU不一定是2^n,因此最后一个任务中的hash桶的数量可能不足stride个,此时只执行余下的数量。 为了保证每个任务只被领取一次,transferIndex递减是用CAS操作完成的。

特殊情况下,会出现多线程扩容重叠,此时某个transfer任务虽然被领取了,但是却不能被执行,会被作废。这是根据transfer方法的代码理解得到的,因为transfer方法的代码中有考虑任务作废的情况。 简而言之就是:代码中有处理扩容作废,但是实际不会发生。

transfer任务申请流程图(源自网络)

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!