集合:ConcurrentHashMap

为什么要用ConcurrentHashMap

hashmap虽然线程不安全,但是使用Hashmap进行put操作可能会引起死循环(带环链表),在多线程的情况下不适合使用

hashtable线程安全但是效率较低,无论度还是写都会对整个集合加锁

1、构造方法

(1)无参构造

 (2)有参构造

构造方法:

获取初始容量:

 sizeCtl:

注意:以上这些构造方法中,都涉及到一个变量sizeCtl,这个变量是一个非常重要的变量,而且具有非常丰富的含义,它的值不同,对应的含义也不一样:

  • sizeCtl为0,代表数组未初始化,且数组的初始容量为16
  • sizeCtl为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值
  • sizeCtl为-1,表示数组正在进行初始化
  • sizeCtl小于0,并且不是-1,表示数组正在扩容,-(1+n),表示此时有n个线程正在共同完成数组的扩容操作

 2、添加元素保证线程安全

 final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());//获取hash值
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {//死循环
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)//做数组的初始化
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//取数组相应索引处的元素并判断是否为空,没有元素直接添加元素
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))//成功的话跳出循环,否则还会再次判断
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//代表正在扩容,此时不允许添加元素(往一个数组中添加第一个元素)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {//加锁保证此桶位的元素插入是安全的,不会影响其他桶位元素的操作,而hashTable锁的是整个数组,这里是只对桶位加锁
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {//循环遍历链表进行equals判断,插入元素
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//红黑树插入元素
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

初始化数组:

 private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)//初始化
                Thread.yield(); // 有其他线程在做初始化的话就让出CPU的使用权
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//当前值与主存中的值进行比对,相等的话就把主存的值改为-1,
设置失败的话就意味着you其他线程在进行初始化,就会再次执行循环,没有加锁但是能保证安全
try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//sc大于0表示已经初始化这是给自己的初始值,否则初始容量为16 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//创建存储元素的数组 table = tab = nt; sc = n - (n >>> 2);//n减去四分之一n,也就是乘以0.75,但是这里将乘法改为减法和位移,效率更高 } } finally { sizeCtl = sc;//记录数组的扩容阈值 } break; } } return tab; }

总结:只给插入的桶位加锁,而不是整个数组

获取集合的长度

 private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||//如果只有一个线程的话可以直接对basecount添加数值
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;//如果是多个线程可能会有线程对basecount添加操作不能成功,此时就需要一个数组协助
            if (as == null || (m = as.length - 1) < 0 ||//先计算出维护元素个数的数组位置,然后对value加1,如果多次尝试添加都没有成功就需要对数组扩容
//最终的集合的长度是baseCount加上数组中所有value的和
(a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }

3、数组扩容

数据迁移:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;

将数组分割为不同的块,最少一个线程负责16个元素的迁移,如果计算的数量大于16那么就负责这么多的元素,小于等于16就负责16个元素

 synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }

迁移的时候加锁其他线程没有办法添加,保证数据不会丢失

ConcurrentHashMap总结:

数据结构:数组+链表+红黑树

jdk1.8开始底结构变为哈希表(数组+链表+红黑树)

并发原理:CAS+synchronized锁

CAS:在数组的初始化的时候,用到了CAS算法,sizeCtl为-1,表示数组正在进行初始化,当一个线程已经把sizeCtl改为-1后,其他的线程就不能再对sizeCtl进行更改了,该线程也就不能进行数组的初始化了

synchronized锁:插入数据的时候只会对桶位加锁,加锁保证此桶位的元素插入是安全的,不会影响其他桶位元素的操作,而hashTable锁的是整个数组,这里是只对桶位加锁

迁移的时候加锁其他线程没有办法添加,保证数据不会丢失

加锁对象:数组每个位置的头节点

如插入数据的时候

每个人都会有一段异常艰难的时光 。 生活的压力 , 工作的失意 , 学业的压力。 爱的惶惶不可终日。 挺过来的 ,人生就会豁然开朗。 挺不过来的 ,时间也会教你 ,怎么与它们握手言和 ,所以不必害怕的。 ——杨绛
原文地址:https://www.cnblogs.com/zhai1997/p/13602526.html