容器——ConcurrentHashMap—1.8

前言

1.8 相比 1.7的话,主要有以下改变。

  • 数组+链表的形式改为数组+链表+红黑树的形式,提高了遍历效率
  • 去除 Segment + HashEntry + Unsafe 的实现,改为 Synchronized + CAS + Node + Unsafe 的实现,其实 Node 和 HashEntry 的内容一样,但是HashEntry是一个内部类。用 Synchronized + CAS 代替 Segment ,这样锁的粒度更小了,并且不是每次都要加锁了,CAS尝试失败了在加锁。
  • put()方法中 初始化数组大小时,1.8不用加锁,因为用了个 sizeCtl 变量,将这个变量置为-1,就表明table正在初始化。

数据结构

ConcurrentHashMap做到了线程安全,其并发性通过CAS+synchronized锁来实现

ConcurrentHashMap底层和Hashmap一样通过数组+链表+红黑树的方式实现。

容器——ConcurrentHashMap—1.8_2020-10-20-10-38-10.png

需知

  1. 初始默认容量为 16

  2. 最大容量 1<<30

  3. 负载因子 0.75f

  4. 链表转为红黑树阈值 8

  5. 红黑树转为链表阈值 6

  6. sizeCtl:

    • 默认为0,用来控制table的初始化和扩容操作
    • -1 代表table正在初始化
    • -N 取-N对应的二进制的低16位数值为M,此时有M-1个线程进行扩容
    • 如果table未初始化,表示table需要初始化的大小。
    • 如果table初始化完成,表示table的容量,默认是table大小的0.75倍
  7. 扩容节点的 hash 值

    1
    2
    3
    4
    static final int MOVED     = -1; // 表示该节点正在处理中
    static final int TREEBIN = -2; // 表示该节点是树的根节点
    static final int RESERVED = -3; // 暂时保留
    static final int HASH_BITS = 0x7fffffff; // 正常节点的hash值可用的位数

Node是ConcurrentHashMap中存放key、value以及key的hash值的数据结构:

1
2
3
4
5
6
7
static class Node implements Map.Entry {
final int hash;
final K key;
volatile V val;
volatile Node next;
//具体内部方法参照源码
}

当链表转化成红黑树时,用TreeNode存储对象:

1
2
3
4
5
6
7
8
static final class TreeNode extends Node {
TreeNode parent; // red-black tree links
TreeNode left;
TreeNode right;
TreeNode prev; // needed to unlink next upon deletion
boolean red;
//具体方法见源码内部
}

在数组中,转变为红黑树后存放的不是TreeNode对象,而是TreeBin对象,TreeBin就是封装TreeNode的容器

1
2
3
4
5
6
7
8
9
10
11
static final class TreeBin extends Node {
TreeNode root;
volatile TreeNode first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
//具体方法见源码内部
}

ForwardingNode:在扩容时才会出现的特殊节点,其 key,value,hash 全部为 null。并拥有 nextTable 指针引用新的 table 数组。

1
2
3
4
5
6
7
8
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
.....
}

hash

1
2
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;}

定位

1
int index = (n - 1) & hash  // n为bucket的个数

tabAt

1
2
3
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

采用 Unsafe.getObjectVolatie() 来获取,而不是直接用table[index]的原因跟ConcurrentHashMap的弱一致性有关。在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile 可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。

casTabAt

cas在指定的位置设置元素

1
2
3
4
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

setTabAt

该方法用来设置 table 数组中索引为 i 的元素

1
2
3
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

为了保证线程安全,一般在锁方法内部执行。

初始化

ConcurrentHashMap的构造方法都没有实际对table进行初始化,对table的初始化会放在put时。防止不必要的扩容,影响效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Node[] initTable() {
Node[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { //如果表为空则执行初始化操作
if ((sc = sizeCtl) < 0) //如果sizeCtl小于0,说明此时有其他线程在初始化或扩展表
Thread.yield(); // 使当前线程由执行状态,变成为就绪状态,让出cpu时间
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //通过cas操作去竞争初始化表的操作,设定为-1表示要初始化了
try {
if ((tab = table) == null || tab.length == 0) {//如果指定了大小就创建指定大小的数组,否则创建默认的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc; //sizeCtl长度为数组长度的3/4
}
break;
}
}
return tab;
}

扩容

在扩容过程中,依然支持并发更新操作;也支持并发插入。

  1. 遍历整个table,当前节点为空,则采用CAS的方式在当前位置放入fwd(特殊的扩容节点,表示已扩容)
  2. 当前节点已经为fwd(with hash field “MOVED”),则已经有有线程处理完了了,直接跳过 ,这里是控制并发扩容的核心
  3. 当前节点为链表节点或红黑树,重新计算链表节点的hash值,移动到nextTable相应的位置(构建了一个反序链表和顺序链表,分别放置在i和i+n的位置上)。移动完成后,用Unsafe.putObjectVolatile在tab的原位置赋为为fwd, 表示当前节点已经完成扩容。

addCount

  1. 当插入结束的时候,会对 size 进行加一。也会进行是否须要扩容的判断。
  2. 优先使用计数盒子(若是不是空,说明并发了),若是计数盒子是空,使用 baseCount 变量。对其加 X。
  3. 若是修改 baseCount 失败,使用计数盒子。若是这次修改失败,在另外一个方法死循环插入。
  4. 检查是否须要扩容。
  5. 若是 size 大于等于 sizeCtl 阈值,且长度小于 1 << 30,能够扩容成 1 << 30,但不能扩容成 1 << 31。
  6. 若是已经在扩容,帮助其扩容
  7. 若是没有在扩容,自行开启扩容,更新 sizeCtl 变量为负数,赋值为标识符高 16 位 + 2。

put

  1. 根据key定位到table的具体位置。
  2. 如果当前的数组为空,说明这是第一插入数据,则会对table进行初始化;
  3. 插入数据,这里分为3中情况:
    • 插入位置为空,直接将数据放入table的第一个位置中(CAS);
    • 插入位置不为空,并且是一个ForwardingNode(f.hash=MOVED)节点,说明该位置上的链表或红黑树正在进行扩容,然后让当前线程加进去并发扩容,提高效率;
    • 插入位置不为空,也不是ForwardingNode节点,若为链表则从第一节点开始组个往下遍历,如果有key的hashCode相等并且值也相等,那么就将该节点的数据替换掉,否则将数据加入到链表末段;若为红黑树,则按红黑树的规则放进相应的位置;(对于链表和红黑树的操作需加SYNC锁)
  4. 数据插入成功后,判断当前位置上的节点的数量,如果节点数据大于转换红黑树阈值(默认为8),则将链表转换成红黑树,提高get操作的速度;
  5. 数据量+1,addCount统计数据总量,并判断当前table是否需要扩容;

get

  1. 计算hash值,定位到该table索引位置,如果是首节点符合就返回

  2. 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回

  3. 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null

size

在JDK1.8版本中,对于size的计算,在扩容和addCount()方法就已经有处理了,可以注意一下Put函数,里面就有addCount()函数,早就计算好的,然后你size的时候直接给你。

synchronized 锁升级

针对 synchronized 获取锁的方式,JVM 使用了锁升级的优化方式,就是先使用偏向锁优先同一线程然后再次获取锁,如果失败,就升级为 CAS 轻量级锁,如果失败就会短暂自旋,防止线程被系统挂起。最后如果以上都失败就升级为重量级锁。

总结

主要设计上的变化有以下几点:

  1. 不采用 segment 而采用 node,锁住 node 来实现减小锁粒度。
  2. 设计了 MOVED 状态 当 resize 的中过程中 线程 2 还在 put 数据,线程 2 会帮助 resize(扩容并发)。
  3. 使用 3 个 CAS 操作来确保 node 的一些操作的原子性,这种方式代替了锁。
  4. sizeCtl 的不同值来代表不同含义,起到了控制的作用。
  5. 采用 synchronized 而不是 ReentrantLock
文章目录
  1. 1. 前言
  2. 2. 数据结构
  3. 3. 需知
  4. 4. hash
  5. 5. 定位
  6. 6. tabAt
  7. 7. casTabAt
  8. 8. setTabAt
  9. 9. 初始化
  10. 10. 扩容
  11. 11. addCount
  12. 12. put
  13. 13. get
  14. 14. size
  15. 15. synchronized 锁升级
  16. 16. 总结
|