Java集合—ConcurrentHashMap原理分析

在介绍之前先抛出三个问题:

(1)ConcurrentHashMap的锁分段技术

(2)ConcurrentHashMap的读是否要加锁,为什么?如果没有锁,靠什么来确保数据的一致性;

(3)ConcurrentHashMap的迭代器是强一致性的迭代器还是弱一致性的迭代器

   

  集合是编程中最常用的数据结构。而谈到并发,几乎总是离不开集合这类高级数据结构的支持。比如两个线程需要同时访问一个中间临界区(Queue),比如常会用缓存作为外部文件的副本(HashMap)。这篇文章主要分析jdk1.5的3种并发集合类型(concurrent,copyonright,queue)中的ConcurrentHashMap,让我们从原理上细致的了解它们,能够让我们在深度项目开发中获益非浅。

   通过分析Hashtable就知道,synchronized是针对整张Hash表的,即每次锁住整张表让线程独占,ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。
有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。

 一、结构解析

   ConcurrentHashMap和Hashtable主要区别就是围绕着锁的粒度以及如何锁,可以简单理解成把一个大的HashTable分解成多个,形成了锁分离。如图:

   concurrenthashmap采用了二次hash的方式,第一次hash将key映射到对应的segment,而第二次hash则是映射到segment的不同桶中。

blob.png


而Hashtable的实现方式是—锁整个hash表

二、应用场景

    当有一个大数组时需要在多个线程共享时就可以考虑是否把它给分层多个节点了,避免大锁。并可以考虑通过hash算法进行一些模块定位。

    其实不止用于线程,当设计数据表的事务时(事务某种意义上也是同步机制的体现),可以把一个表看成一个需要同步的数组,如果操作的表数据太多时就可以考虑事务分离了(这也是为什么要避免大表的出现),比如把数据进行字段拆分,水平分表等.

三、源码解读

 ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点),对应上面的图可以看出之间的关系:

 /** 
* The segments, each of which is a specialized hash table 
*/  final Segment<K,V>[] segments;

   不变(Immutable)和易变(Volatile)
   ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个hash链中的一个节点,其结构如下所示:

  //HashEntry是一个单向链表    
static final class HashEntry<K,V> {  
    //哈希值    
    final int hash;  
    //存储的key和值value    
    final K key;  
    volatile V value;  
    //指向的下一个HashEntry,即链表的下一个节点    
    volatile HashEntry<K,V> next;  
}

  HashEntry的key,hash采用final,可以避免并发修改问题,HashEntry链的尾部是不能修改的,而next和value采用volatile,可以避免使用同步造成的并发性能灾难,线程可见性。

   所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。
其它
    为了加快定位段以及段中hash槽的速度,每个段hash槽的的个数都是2^n,这使得通过位运算就可以定位段和段中hash槽的位置。当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪个段中。但是我们也不要忘记《算法导论》给我们的教训:hash槽的的个数不应该是 2^n,这可能导致hash槽分配不均,这需要对hash值重新再hash一次。

 这是定位段的方法:

/** 
 * 查找Segment对象,这里Unsafe的主要作用是提供原子操作。 
 */  
@SuppressWarnings("unchecked")  
private Segment<K,V> segmentForHash(int h) {  
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;  
    return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);  
}

 数据结构

   关于Hash表的基础数据结构,这里不想做过多的探讨。Hash表的一个很重要方面就是如何解决hash冲突,ConcurrentHashMap 和HashMap使用相同的方式,都是将hash值相同的节点放在一个hash链中。与HashMap不同的是,ConcurrentHashMap使用多个子Hash表,也就是段(Segment)。下面是ConcurrentHashMap的数据成员:

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>  
         implements ConcurrentMap<K, V>, Serializable {  
    
  /**
     * Mask value for indexing into segments. The upper bits of a
     * key's hash code are used to choose the segment.
     */
    final int segmentMask;

    /**
     * Shift value for indexing within segments.
     */
    final int segmentShift;

    /**
     * The segments, each of which is a specialized hash table.
     */
    final Segment<K,V>[] segments;

    transient Set<K> keySet;
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;
 }

所有的成员都是final的,其中segmentMask和segmentShift主要是为了定位段,参见上面的segmentForHash方法。
每个Segment相当于一个子Hash表,它的数据成员如下:

//Segment内部维护了一个链表数组  
static final class Segment<K,V> extends ReentrantLock implements Serializable {  
  
    //链表数组,数组中的每一个元素代表了一个链表的头部  
    transient volatile HashEntry<K,V>[] table;  
  
    //Segment中元素的数量  
    transient int count;  
  
    //对table的大小造成影响的操作的数量(比如put或者remove操作)  
    transient int modCount;  
  
    //阈值,Segment里面元素的数量超过这个值会对Segment进行扩容,扩容后大小=old*2*负载因子  
    transient int threshold;  
  
    //负载因子,用于确定threshold  
    final float loadFactor;  
}

   count用来统计该段数据的个数,它是volatile(volatile 变量使用指南),它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作开始都要读取count的值。这利用了 Java 5中对volatile语义的增强,对同一个volatile变量的写和读存在happens-before关系。modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变,在讲述跨段操作时会还会详述。threashold用来表示需要进行rehash的界限值。table数组存储段中节点,每个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得能够读取到最新的 table值而不需要同步。loadFactor表示负载因子。

先来看下删除操作remove(key):

 public V remove(Object key) {
        int hash = hash(key);
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.remove(key, hash, null);
    }


final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> e = entryAt(tab, index);
                HashEntry<K,V> pred = null;
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {
                            if (pred == null)
                                setEntryAt(tab, index, next);
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

   在JDK 1.6版本中,remove操作比较直观,它先找到key对应的节点链的链头(数组中的某个项),然后遍历该节点链,如果在节点链中找到key相等的节点,则为该节点之前的所有节点重新创建节点并组成一条新链,

       将该新链的链尾指向找到节点的下一个节点。这样如前面rehash提到的,同时有两条链存在,即使有另一个线程正在该链上遍历也不会出问题。然而Doug Lea又挖掘到了新的优化点,为了减少新链的创建同时利用CPU缓存的特性,

        在1.7中,他不再重新创建一条新的链,而是只在当前缓存中将链中找到的节点移除,而另一个遍历线程的缓存中继续存在原来的链。当移除的是链头则更新数组项的值,否则更新找到节点的前一个节点的next指针。

      这也是HashEntry中next指针没有设置成final的原因。当然remove操作如果第一次尝试获得锁失败也会如put操作一样先进入自旋状态,这里的scanAndLock和scanAndLockForPut类似,只是它不做预创建节点的步骤,不再细说:

blob.png

添加元素:

blob.png

    步骤一:进入Segment的put操作时先进行加锁保护。如果加锁没有成功,调用scanAndLockForPut方法(详细步骤见下面scanAndLockForPut()源码分析)进入自旋状态,该方法持续查找key对应的节点链中是已存在该机节点,如果没有找到,则预创建一个新节点,并且尝试n次,直到尝试次数操作限制,才真正进入加锁等待状态,自旋结束并返回节点(如果返回了一个非空节点,则表示在链表中没有找到相应的节点)。对最大尝试次数,目前的实现单核次数为1,多核为64。
     步骤二:使用(tab.length – 1) & hash计算第一个节点位置,再通过entryAt()方法去查找第一个节点。如果节点存在,遍历链表找到key值所在的节点,如果找到了这个节点则直接更新旧value,结束循环。其中value使用了volatile,它更新后的值立马对其它线程可见。如果节点不存在,将步骤一预创建的新节点(如果没有则重新创建)添加到链表中,添加前先检查添加后节点数量是否超过容器大小,如果超过了,则rehash操作。没有的话调用setNext或setEntryAt方法添加新节点;
要注意的是在更新链表时使用了Unsafe.putOrderedObject()方法,这个方法能够实现非堵塞的写入,这些写入不会被
Java的JIT重新排序指令(instruction reordering),使得它能更加快速的存储。
     问题:为什么还要调用entryAt()方法获取数组项的值而不是通过tab[index]方式直接获取?
    虽然在开始时volatile table将引用赋值给了变量tab,但是多线程下table里的值可能发生改变,使用tab[index]并不能获得最新的值。。为了保证接下来的put操作能够读取到上一次的更新结果,需要使用volatile的语法去读取节点链的链头.

public V put(K key, V value) {  
    Segment<K,V> s;  
    if (value == null)  
        throw new NullPointerException();  
    int hash = hash(key);  
    //计算Segment的位置,在初始化的时候对segmentShift和segmentMask做了解释  
    int j = (hash >>> segmentShift) & segmentMask;  
    //从Segment数组中获取segment元素的位置  
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck  
            (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment  
        s = ensureSegment(j);  
    //  
    return s.put(key, hash, value, false);  
}  
  
//往Segment的HashEntry中添加元素,使用了分锁机制  
final V put(K key, int hash, V value, boolean onlyIfAbsent) {  
    //tryLock 仅在调用时锁为空闲状态才获取该锁。如果锁可用,则获取锁,并立即返回值 true。否则是false  
    //scanAndLockForPut 下面单独说scanAndLockForPut  
    HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);  
    V oldValue;  
    try {  
        HashEntry<K,V>[] tab = table;  
        int index = (tab.length - 1) & hash;  
        HashEntry<K,V> first = entryAt(tab, index);  
        for (HashEntry<K,V> e = first;;) {  
            if (e != null) {  
                K k;  
                if ((k = e.key) == key ||  
                        (e.hash == hash && key.equals(k))) {  
                    oldValue = e.value;  
                    if (!onlyIfAbsent) {  
                        e.value = value;  
                        ++modCount;  
                    }  
                    break;  
                }  
                e = e.next;  
            }  
            else {  
                if (node != null)  
                    node.setNext(first);  
                else  
                    node = new HashEntry<K,V>(hash, key, value, first);  
                int c = count + 1;  
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)  
                    rehash(node);  
                else  
                    setEntryAt(tab, index, node);  
                ++modCount;  
                count = c;  
                oldValue = null;  
                break;  
            }  
        }  
    } finally {  
        unlock();  
    }  
    return oldValue;  
}

获取操作:

public V get(Object key) {  
       Segment<K,V> s; // manually integrate access methods to reduce overhead  
       HashEntry<K,V>[] tab;  
       int h = hash(key);  
       long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;  
       if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&  
           (tab = s.table) != null) {  
           for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile  
                    (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);  
                e != null; e = e.next) {  
               K k;  
               if ((k = e.key) == key || (e.hash == h && key.equals(k)))  
                   return e.value;  
           }  
       }  
       return null;  
   }
   从代码可以看出get方法并没有调用锁,它使用了volatile的可见性来实现线程安全的。
    另一个操作是containsKey,这个实现就要简单得多了,因为它不需要读取值:
public boolean containsKey(Object key) {
        Segment<K,V> s; // same as get() except no need for volatile value read
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return true;
            }
        }
        return false;
    }

参考链接:

http://www.cnblogs.com/ITtangtang/p/3948786.html

http://www.blogjava.net/DLevin/archive/2013/10/18/405030.html

http://www.importnew.com/22007.html

http://www.cnblogs.com/zuolun2017/p/5691551.html


发表评论