ConcurrentHashMap
是一个线程安全的哈希表,支持高并发的更新和检索操作,提供比Hashtable更高的并发性能。
HashTable
的方法通过加synchronized
关键字实现线程安全。
ConcurrentHashMap
的检索操作(如 get
)通常不需要加锁,可以与其他更新操作(如 put
和 remove
)同时进行。 当哈希表中的冲突过多时,它会自动进行扩容。
主要内部类
Node类
ConcurrentHashMap
的内部静态类 Node
实现了 Map.Entry
接口,它是 ConcurrentHashMap
底层数据结构的基本单元。
成员变量
- hash: 哈希值,用于快速定位
Node
在哈希表中的位置。 - key: 键对象,不能为
null
。 - val: 值对象,可以为
null
,并且是volatile
类型的,以确保对它的写操作对其他线程立即可见。 - next: 指向下一个
Node
的引用,用于处理哈希碰撞(两个不同的键有相同的哈希值)。
Segment
类
Segment
类的主要作用是作为 ConcurrentHashMap
的分段锁机制的一部分。在 ConcurrentHashMap
的早期版本(Java 1.7及之前),整个哈希表被分割成多个段(segment),每个段有自己的锁。这种设计允许不同段上的并发更新操作,从而提高了并发性能。在高版本中为了兼容旧版本仍保留该代码。
成员变量
serialVersionUID: 序列化版本号,用于确保不同版本的 Segment
类在序列化时的兼容性。
loadFactor: 负载因子,用于确定何时对段进行扩容。当段中的元素数量达到其容量与负载因子的乘积时,段会进行扩容。
构造函数
Segment(float lf): 构造一个新的 Segment
实例,并初始化其负载因子。
继承关系
extends ReentrantLock: Segment
类继承自 ReentrantLock
,这意味着它具有可重入锁的所有特性,包括公平锁和非公平锁的实现。这允许 Segment
实例控制对其中包含的哈希表部分的访问。
Segment
类是 ConcurrentHashMap
实现分段锁机制的关键部分,它通过继承 ReentrantLock
来提供线程安全的访问控制,并通过负载因子来管理其内部存储的大小。这种设计允许 ConcurrentHashMap
在多线程环境中提供高效的并发访问。 需要注意的是,从 Java 8 开始,ConcurrentHashMap
的实现已经放弃了分段锁机制,转而使用了一种基于 CAS
操作和 synchronized
块的更细粒度的锁机制,这进一步提高了并发性能。因此,上述关于 Segment
类的描述主要适用于 Java 7 及之前的版本。在 Java 8 及之后的版本中,Segment
类不再作为 ConcurrentHashMap
实现的一部分。
TreeNode
类
TreeNode
是ConcurrentHashMap中的一个内部类,它继承了Node
类,并用于构成红黑树。当哈希表的某个桶(bin)中的节点数量超过了特定的阈值,这个桶中的链表就会被转换成红黑树,以减少搜索时间。
查看代码
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
//...
}
- 属性说明:
parent
:指向当前节点的父节点,用于维护红黑树的结构。left
:指向当前节点的左子节点。right
:指向当前节点的右子节点。prev
:指向前一个节点,这个属性在节点删除时用于断开链接。red
:一个布尔值,表示当前节点是否为红色,这是红黑树性质的一部分。
- 构造函数:创建一个新的TreeNode节点,并设置其父节点。
- find方法:这是一个辅助方法,用于在红黑树中查找具有特定哈希值和键的TreeNode。
- findTreeNode方法:这是一个递归方法,用于在红黑树中查找具有特定哈希值和键的TreeNode。如果找到了,则返回该TreeNode;如果没有找到,则返回null。
TreeBin
类
TreeBin
也是ConcurrentHashMap的一个内部类,它扩展了Node
类,但并不直接存储用户键值对。相反,它作为树的头节点,管理着红黑树的根节点以及相关的一些并发控制信息。
查看代码
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
// ...
}
root
:这是一个指向红黑树根节点的引用。所有的树操作(如查找、插入、删除)都是从根节点开始的。根节点是树中唯一没有父节点的节点。first
:这是一个指向树中最小节点的引用,也就是最左边的节点。这个属性对于执行树的前序遍历非常有用,因为它提供了一个遍历的起点。waiter
:这是一个指向正在等待写锁的线程的引用。在ConcurrentHashMap中,写操作(如插入、删除节点)需要独占访问树结构,因此如果有线程正在执行写操作,其他线程必须等待。waiter
字段用于跟踪哪个线程正在等待获取写锁。lockState
:这是一个整数值,用于表示当前TreeBin的锁定状态。它用于实现读写锁的机制,允许多个读线程同时访问树,但在写线程操作树时阻止所有其他线程。lockState
的具体值可以是以下几种状态的总和:WRITER
:当写锁被持有时,lockState
的值至少包含WRITER
。WAITER
:当有线程正在等待写锁时,lockState
的值至少包含WAITER
。READER
:每当一个读线程获得读锁时,lockState
的值会增加READER
。
主要属性
transient volatile Node<K,V>[] table;
- 类型:
Node<K,V>[]
,一个节点数组,用于存储哈希表的桶(bins)。 - 可见性:
volatile
关键字确保了table
变量的可见性,即一个线程对table
的修改能够立即被其他线程看到。 - 初始化:它是延迟初始化的,意味着只有在第一次插入操作时才会被初始化。
- 大小:数组的长度始终是2的幂,这是为了优化哈希值的映射过程,使得
(n - 1) & hash
操作能够高效地计算出桶的位置。 - 用途:这是
ConcurrentHashMap
存储键值对的地方,每个桶可能是一个链表或树。
private transient volatile Node<K,V>[] nextTable;
- 类型:
Node<K,V>[]
,在扩容操作期间使用的下一个表。 - 可见性:
volatile
关键字确保了nextTable
变量的可见性。 - 用途:当
table
需要扩容时,nextTable
将用作新表,所有的元素将被重新哈希到nextTable
中,并在扩容完成后替换table
。
private transient volatile long baseCount;
- 类型:
long
,基础计数器值。 - 可见性:
volatile
关键字确保了baseCount
变量的可见性。 - 用途:用于记录
ConcurrentHashMap
中元素的总数。在无竞争的情况下,直接通过CAS操作更新baseCount
。在竞争激烈的情况下,会使用CounterCell
数组来减少竞争。
private transient volatile int sizeCtl;
- 类型:
int
,表初始化和扩容的控制变量。 - 可见性:
volatile
关键字确保了sizeCtl
变量的可见性。 - 用途:
- 当
sizeCtl
为负数时,表示表正在被初始化或扩容。具体值取决于初始化或扩容的状态。 - 当
table
为空时,sizeCtl
持有初始表的大小,或者为0表示使用默认大小。 - 初始化后,
sizeCtl
持有下一次扩容的元素计数值。
- 当
private transient volatile int transferIndex;
- 类型:
int
,下一个在扩容时需要分裂的表索引。 - 可见性:
volatile
关键字确保了transferIndex
变量的可见性。 - 用途:在扩容过程中,
transferIndex
用于记录下一个需要被处理的桶索引。扩容时,多个线程可以并行地处理不同的桶。
private transient volatile int cellsBusy;
- 类型:
int
,一个自旋锁。 - 可见性:
volatile
关键字确保了cellsBusy
变量的可见性。 - 用途:用于在创建
CounterCells
数组或在扩容时保护对counterCells
数组的更新。
private transient volatile CounterCell[] counterCells;
- 类型:
CounterCell[]
,计数器单元的数组。 - 可见性:
volatile
关键字确保了counterCells
变量的可见性。 - 用途:在高并发环境下,
counterCells
用于减少对单个计数器的竞争。每个CounterCell
单元可以独立地更新,从而减少锁的竞争。
private transient KeySetView<K,V> keySet;
- 类型:
KeySetView<K,V>
,键的集合视图。 - 用途:提供对
ConcurrentHashMap
中所有键的视图,允许对其进行迭代等操作。
private transient ValuesView<K,V> values;
- 类型:
ValuesView<K,V>
,值的集合视图。 - 用途:提供对
ConcurrentHashMap
中所有值的视图,允许对其进行迭代等操作。
private transient EntrySetView<K,V> entrySet;
- 类型:
EntrySetView<K,V>
,键值对的集合视图。 - 用途:提供对
ConcurrentHashMap
中所有键值对的视图,允许对其进行迭代等操作。
主要方法
put(K key, V value)
添加元素
这是ConcurrentHashMap
中公开的put
方法,用于将指定的键值对插入到映射表中。它调用putVal
方法来实现实际的插入操作,并传递false
给onlyIfAbsent
参数,表示即使键已存在,也会替换其值。
在调用
putIfAbsent
方法时,第三个参数传递true
,表示如果键已存在,则什么都不做。
public V put(K key, V value) {
return putVal(key, value, false);
}
putVal(K key, V value, boolean onlyIfAbsent)
是ConcurrentHashMap
中用于实现put
操作的核心方法,执行流程如下:
检查键或值是否为空:如果键或值为空,则抛出NullPointerException
。
计算哈希值:使用spread
方法处理键的哈希值,以减少哈希冲突。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
它将哈希值的高16位和低16位进行异或操作,然后与HASH_BITS
(值为常量0x7fffffff
)进行与操作,以得到最终的哈希值。
初始化表:如果哈希表未初始化,则调用initTable
方法进行初始化。
定位桶位置:通过哈希值找到对应的桶位置,如果该位置为空,则尝试使用CAS操作添加新的节点。
处理正在扩容的表:如果遇到ForwardingNode
,则帮助完成扩容。
检查第一个节点:如果设置了onlyIfAbsent
并且找到相同的键,则直接返回该节点的值。
同步块处理链表或树:锁定桶位置的头节点,并根据是链表还是树结构来插入或更新节点。
处理链表转树:如果链表长度超过阈值,则尝试将链表转换为红黑树。
增加计数:如果插入新节点或更新了节点的值,则增加元素计数。
返回旧值:如果插入了新节点,则返回null
;如果更新了节点的值,则返回旧值。
源码
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 如果键或值为null,抛出空指针异常
if (key == null || value == null) throw new NullPointerException();
// 计算键的哈希值
int hash = spread(key.hashCode());
// 用于记录链表长度
int binCount = 0;
// 无限循环,直到成功插入
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
// 如果表未初始化或长度为0,初始化表
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果当前桶位为空,尝试CAS操作插入新节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // 添加到空桶位时不需要加锁
}
// 如果正在扩容,帮助完成扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 如果键已存在且onlyIfAbsent为true,直接返回当前值
else if (onlyIfAbsent // 不加锁检查第一个节点
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
// 用于记录旧值
V oldVal = null;
// 对当前桶位加锁
synchronized (f) {
// 再次检查当前桶位是否被修改
if (tabAt(tab, i) == f) {
// 如果是链表节点
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果找到相同键的节点
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 如果允许更新,则更新值
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 如果链表末尾,添加新节点
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
// 如果是红黑树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 插入到红黑树中
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
// 如果允许更新,则更新值
if (!onlyIfAbsent)
p.val = value;
}
}
// 如果是保留节点,抛出异常
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
// 如果链表长度不为0
if (binCount != 0) {
// 如果链表长度超过阈值,转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果旧值不为空,返回旧值
if (oldVal != null)
return oldVal;
break;
}
}
}
// 增加元素计数
addCount(1L, binCount);
// 如果插入成功,返回null
return null;
}
源码中使用了一个用于获取指定索引位置的桶中的节点的原子操作方法。它使用getReferenceAcquire
来确保线程安全地读取节点。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
这是一个使用CAS操作来设置指定索引位置的桶中的节点的原子操作方法。它尝试将桶中的节点从c
更新为v
,如果成功则返回true
。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
源码中还使用了下面这个方法用于帮助进行扩容操作。如果检测到ForwardingNode
,它会尝试帮助完成当前正在进行的扩容。
查看代码
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT;
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
增加计数
添加一个元素后更新总数的操作是通过addCount
方法实现的。
方法签名与参数
private final void addCount(long x, int check)
x
: 要添加到总数中的值,通常为1,表示添加了一个元素。check
: 用于控制是否需要检查扩容的条件。小于0表示不检查扩容,等于1表示仅在无竞争的情况下检查扩容,大于1表示总是检查扩容。
方法逻辑
更新基础计数器
首先尝试对基本计数 baseCount
进行原子性的增加。如果成功,则直接返回。
if ((cs = counterCells) != null ||
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// ...
}
如果counterCells
数组不为空或者更新baseCount
失败(说明有多个线程同时在修改计数),则会尝试使用 CounterCell
数组来分散这些更新操作。
使用CounterCell进行计数
CounterCell c; long v; int m;
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
这里使用线程的探针值(probe)来选择一个CounterCell
进行更新。如果CounterCell
不存在或者更新失败,则调用fullAddCount
方法来进行更复杂的计数更新逻辑。
检查是否需要扩容
如果check
参数大于等于0,需要检查是否需要扩容:
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// ...
}
}
这里的逻辑是:
- 如果当前元素总数
s
大于等于阈值sizeCtl
,并且当前表tab
不为空且未达到最大容量 (MAXIMUM_CAPACITY
),则尝试扩容。 - 扩容操作是通过
transfer
方法完成的,该方法将旧表中的节点重新映射到新表中。
扩容操作
ConcurrentHashMap
在元素数量达到一定阈值时触发扩容,通常这个阈值是当前容量的0.75倍。扩容过程会创建一个新的哈希表,其大小是原哈希表的两倍。
扩容过程涉及以下步骤:
- 初始化新表:如果扩容尚未开始,则创建一个新的哈希表,大小是原表的两倍。
- 迁移元素:将旧表中的所有元素重新哈希并迁移到新表中。这个过程可以在多个线程之间并发进行。
在扩容过程中,锁的粒度是桶级别的,这意味着不同的桶可以由不同的线程同时迁移,从而提高并发性能。
扩容的核心逻辑封装在transfer
方法中,该方法负责将元素从旧表迁移到新表。以下是扩容过程的简化描述:
- 初始化:如果扩容尚未开始,初始化新表并设置相关变量。
- 迁移:遍历旧表的每个桶,将桶中的链表或树结构迁移到新表中。
- 并发迁移:多个线程可以同时迁移不同的桶,每个线程负责迁移一部分桶。
- 完成迁移:当所有桶迁移完成后,将新表设置为当前表,并更新控制变量。
扩容时的读写操作:
- 如果读操作发生在迁移过程中,且目标桶尚未迁移,则可以正常读取。
- 如果目标桶已经迁移,头节点会被设置为一个特殊的
ForwardingNode
,此时读操作会帮助完成剩余的迁移工作。 - 如果写操作的目标桶已经迁移,头节点会被设置为
ForwardingNode
,此时写操作会帮助完成扩容。 - 如果扩容未完成,写操作会被阻塞,直到扩容完成。
size()
size
方法用于获取映射中键值对的总数。在并发环境中,准确计算集合的大小是一个挑战,因为元素可能会在计算过程中被添加或删除。
ConcurrentHashMap
使用 CounterCell
来处理并发计数问题。每个 CounterCell
对象包含一个 value
字段,用于记录部分键值对的数量。
过程
size
方法调用 sumCount
方法来计算总的键值对数量,sumCount
方法中:
- 首先获取
counterCells
引用。 - 从
baseCount
开始累加,baseCount
是在没有并发修改时使用的计数器。 - 如果
counterCells
不为空,说明有并发更新,需要遍历counterCells
数组,并将每个非空计数单元的值加到sum
上。 - 最后返回累加的总数。
查看代码
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
private transient volatile long baseCount;
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] cs = counterCells;
long sum = baseCount;
if (cs != null) {
for (CounterCell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}
特点
- 线程安全:通过使用
volatile
关键字和CounterCell
数组,ConcurrentHashMap
能够在不加锁的情况下安全地更新和读取计数。 - 高效:在低争用情况下,可以直接读取
baseCount
,这比遍历整个counterCells
数组要高效得多。 - 准确性:即使在并发更新时,通过累加所有
CounterCell
的值,ConcurrentHashMap
也能提供准确的元素计数。
treeifyBin(Node<K,V>[] tab, int index)
这个方法用于将链表转换为红黑树。如果添加元素时,桶的长度超过阈值会调用该方法,如果哈希表容量不足,小于MIN_TREEIFY_CAPACITY(64)
,先会尝试扩容,否则才会将链表转换为树结构以优化搜索性能。
查看代码
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n;
// 如果表不为空
if (tab != null) {
// 获取表的长度
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 如果表长度小于最小树化容量,进行扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 对当前桶位加锁
synchronized (b) {
// 再次检查当前桶位是否被修改
if (tabAt(tab, index) == b) {
// 初始化头尾指针
TreeNode<K,V> hd = null, tl = null;
// 遍历链表,将链表节点转换为树节点
for (Node<K,V> e = b; e != null; e = e.next) {
// 创建树节点
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
// 如果是第一个节点,设置为头节点
if ((p.prev = tl) == null)
hd = p;
else
// 否则添加到链表末尾
tl.next = p;
// 更新尾节点指针
tl = p;
}
// 将转换后的树节点设置为桶位
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}