ConcurrentDictionary

ConcurrentDictionary

             ConcurrentDictionary一大特点是线程安全,在没有ConcurrentDictionary前

在多线程下用Dictionary,不管读写都要加个锁,不但麻烦,性能上也不是很好

微软得出的结果是默认的锁的数量是CPU核的个数,这个线程池默认的线程数量一样。随着Dictionary的扩容,锁的个数也可以跟着增加,这个可以在构造函数中自己指定。

private sealed class Tables

{

    internal readonly Node[] _buckets; // bucket成了这样,也就是ConcurrentDictionary可以认为是一个bucket数组,每个Bucket里又由next来形成链表

    internal readonly object[] _locks; // 这个就是锁的数组了

    internal volatile int[] _countPerLock; // 这个是每个锁罩的元素个数

    internal Tables(Node[] buckets, object[] locks, int[] countPerLock)

    {

        _buckets = buckets;

        _locks = locks;

        _countPerLock = countPerLock;

    }

}

//由Dictionary里的Entry改成Node,并且把next放到Node里

private sealed class Node

{

    internal readonly TKey _key;

    internal TValue _value;

    internal volatile Node _next; //next由volatile修饰,确保不被优化且读写原子性

    internal readonly int _hashcode;

    internal Node(TKey key, TValue value, int hashcode, Node next)

    {

        _key = key;

        _value = value;

        _next = next;

        _hashcode = hashcode;

    }

}

里面的变量:       

private volatile Tables _tables; // 这不同于Dictionary的bucket 数组,而是整个封装起来,而且用volatile来保证读写时的原子性

private readonly bool _growLockArray; // 是否在Dictionary扩容时也增加锁的数量

private int _budget; // 单个锁罩的元素的最大个数

private const int DefaultCapacity = 31;  //ConcurrentDictionary默认大小,和List,Dictionary不一样

private const int MaxLockNumber = 1024;  //最大锁的个数,不过也可以在构造函数中弄个更大的,不般没必要

public bool TryGetValue(TKey key, out TValue value)

{

    if (key == null) ThrowKeyNullException();

    return TryGetValueInternal(key, _comparer.GetHashCode(key), out value);

}

private bool TryGetValueInternal(TKey key, int hashcode, out TValue value)

{

    Debug.Assert(_comparer.GetHashCode(key) == hashcode);

            

    //先用本地变量存一下,免得在另外一个线程扩容时变了

    Tables tables = _tables;

    //又是hashcode取余哈,不多说

    //int bucketNo = (hashcode & 0x7fffffff) % bucketCount;

    int bucketNo = GetBucket(hashcode, tables._buckets.Length);

    //这个用Valatile确保读了最新的Node,一个定义为volatile的变量是说这变量可能会被意想不到地改变,这样,编译器就不会去假设这个变量的值了。精确地说就是,优化器在用到这个变量时必须每次都小心地重新读取这个变量的值,而不是使用保存在寄存器里的备份。简单地说就是防止编译器对代码进行优化。比如如下程序:

1

2

3

4

XBYTE[2]=0x55;

XBYTE[2]=0x56;

XBYTE[2]=0x57;

XBYTE[2]=0x58;

对外部硬件而言,上述四条语句分别表示不同的操作,会产生四种不同的动作,但是编译器却会对上述四条语句进行优化,认为只有XBYTE[2]=0x58(即忽略前三条语句,只产生一条机器代码)。如果键入volatile,则编译器会逐一地进行编译并产生相应的机器代码(产生四条代码)。

  • 原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性。

    Node n = Volatile.Read<Node>(ref tables._buckets[bucketNo]);

    //遍历bucket,真怀疑这些代码是几个人写的,风格都不一样

    while (n != null)

    {

        //找到了

        if (hashcode == n._hashcode && _comparer.Equals(n._key, key))

        {

            //返回true和value

            value = n._value;

            return true;

        }

        n = n._next;

    }

    value = default(TValue);

    return false;

}

public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory)

{

    if (key == null) ThrowKeyNullException();

    if (valueFactory == null) throw new ArgumentNullException(nameof(valueFactory));

    int hashcode = _comparer.GetHashCode(key);

    TValue resultingValue;

    //先TryGet,没有的再TryAdd

    if (!TryGetValueInternal(key, hashcode, out resultingValue))

    {

        TryAddInternal(key, hashcode, valueFactory(key), false, true, out resultingValue);

    }

    return resultingValue;

}

private bool TryAddInternal(TKey key, int hashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)

{

    Debug.Assert(_comparer.GetHashCode(key) == hashcode);

    while (true)

    {

        int bucketNo, lockNo;

        Tables tables = _tables;

        //GetBucketAndLockNo函数里面就是下面两句

        //bucketNo = (hashcode & 0x7fffffff) % bucketCount; 取余得bucket No.,和Dictionary一样

        //lockNo = bucketNo % lockCount; 也是取余得锁No. 也就是一个锁也是可能给多个Bucket用的

        GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length);

        bool resizeDesired = false;

        bool lockTaken = false;

        try

        {

            if (acquireLock) //参数指定需要锁的话就锁上这个bucket的锁,也就在构造函数初始化时不需要锁

                Monitor.Enter(tables._locks[lockNo], ref lockTaken);

            //这里是做个校验,判断tables是否在这边取完锁后其他线程把元素给扩容了,扩容会生成一个新的tables,tables变了的话上面的锁就没意义了,需要重来,所以这整个是在while(true)里面

            if (tables != _tables)

            {

                continue;

            }

            Node prev = null;

            //这里就遍历bucket里的链表了,和Dictionary差不多

            for (Node node = tables._buckets[bucketNo]; node != null; node = node._next)

            {

                Debug.Assert((prev == null && node == tables._buckets[bucketNo]) || prev._next == node);

                if (hashcode == node._hashcode && _comparer.Equals(node._key, key))//看是否找到

                {

                    //看是否需要更新node

                    if (updateIfExists)

                    {

                        if (s_isValueWriteAtomic) //这个是判断是否是支持原子操作的值类型,比如32位上byte,int,byte,short都是原子的,而long,double就不是了,支持原子操作的直接赋值就可以了,得注意是值类型,引用类型可不能这么搞

                        {

                            node._value = value;

                        }

                        else //不是原子操作的值类型就new一个node

                        {

                            Node newNode = new Node(node._key, value, hashcode, node._next);

                            if (prev == null)

                            {

                                tables._buckets[bucketNo] = newNode;

                            }

                            else

                            {

                                prev._next = newNode;

                            }

                        }

                        resultingValue = value;

                    }

                    else//不更新就直接取值

                    {

                        resultingValue = node._value;

                    }

                    return false;  //找到了返回false,表示不用Add就Get了

                }

                prev = node;

            }

            // 找了一圈没找着,就Add吧,new一个node用Volatile的写操作写到bucket里

            Volatile.Write<Node>(ref tables._buckets[bucketNo], new Node(key, value, hashcode, tables._buckets[bucketNo]));

            checked//这里如果超出int大小,抛overflow exception, 能进这里表示一个锁罩int.MaxValue大小的Node,真成扛把子了,极端情况下只有一个锁而且Node的大小已经是Int.MaxValue才可能会出现(还要看budget同不同意)

            {

                tables._countPerLock[lockNo]++;

            }

            //如果锁罩的Node个数大于budget就表示差不多需要扩容了,黑社会表示地盘不够用了

            if (tables._countPerLock[lockNo] > _budget)

            {

                resizeDesired = true;

            }

        }

        finally

        {

            if (lockTaken) //出现异常要把锁释放掉

                Monitor.Exit(tables._locks[lockNo]);

        }

        if (resizeDesired)

        {

            GrowTable(tables); //扩容

        }

        resultingValue = value; //result值

        return true;

    }

}

扩容:

private void GrowTable(Tables tables)

{

    const int MaxArrayLength = 0X7FEFFFFF;

    int locksAcquired = 0;

    try

    {

        // 先把第一个锁锁住,免得其他线程也要扩容走进来

        AcquireLocks(0, 1, ref locksAcquired);

        //如果table已经变了,也就是那些等着上面锁的线程进来发现已经扩容完了直接返回就好了

        if (tables != _tables)

        {

            return;

        }

        // 计算每个锁罩的元素的个数总和,也就是当前元素的个数

        long approxCount = 0;

        for (int i = 0; i < tables._countPerLock.Length; i++)

        {

            approxCount += tables._countPerLock[i];

        }

        //如果元素总和不到Bucket大小的1/4,说明扩容扩得不是时候,归根结底是budget小了

        if (approxCount < tables._buckets.Length / 4)

        {

            _budget = 2 * _budget;//2倍增加budget

            if (_budget < 0) //小于0说明overflow了,看看,前面用check,这里又用小于0。。

            {

                _budget = int.MaxValue; //直接最大值吧

            }

            return;

        }

        int newLength = 0;

        bool maximizeTableSize = false;

        try

        {

            checked

            {

                //2倍+1取得一个奇数作了新的容量

                newLength = tables._buckets.Length * 2 + 1;

                //看是否能整除3/5/7,能就+2,直到不能整除为止,也挺奇怪这算法,List是2倍,Dictionary是比2倍大的一个质数,这里又是另外一种,只能说各人有各人的算法

                while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0)

                {

                    newLength += 2;

                }

                Debug.Assert(newLength % 2 != 0);

                if (newLength > MaxArrayLength)

                {

                    maximizeTableSize = true;

                }

            }

        }

        catch (OverflowException)

        {

            maximizeTableSize = true;

        }

        if (maximizeTableSize)//进这里表示溢出了

        {

            newLength = MaxArrayLength; //直接给最大值

            _budget = int.MaxValue; //budget也给最大值,因为没法再扩容了,给小了进来也没意义

        }

        //扩容之后又是熟悉的重新分配元素,和Dictionary基本一致,这里要先把所有锁锁住,前面已经锁了第一个,这里锁其他的

        AcquireLocks(1, tables._locks.Length, ref locksAcquired);

        object[] newLocks = tables._locks;

        //如果允许增加锁并则锁的个数还不到1024,就增加锁

        if (_growLockArray && tables._locks.Length < MaxLockNumber)

        {

            newLocks = new object[tables._locks.Length * 2]; //也是2倍增加

            Array.Copy(tables._locks, 0, newLocks, 0, tables._locks.Length); //旧锁复制到新数组里

            for (int i = tables._locks.Length; i < newLocks.Length; i++) //再初始化增的锁

            {

                newLocks[i] = new object();

            }

        }

        //新的Node数组

        Node[] newBuckets = new Node[newLength];

        int[] newCountPerLock = new int[newLocks.Length];

        //遍历bucket

        for (int i = 0; i < tables._buckets.Length; i++)

        {

            Node current = tables._buckets[i];//当前node

            while (current != null)

            {

                Node next = current._next;

                int newBucketNo, newLockNo;

                //算新的bucket No.和lock No.

                GetBucketAndLockNo(current._hashcode, out newBucketNo, out newLockNo, newBuckets.Length, newLocks.Length);

                //重建个新的node,注意next指到了上一个node,和Dictionary里一样

                newBuckets[newBucketNo] = new Node(current._key, current._value, current._hashcode, newBuckets[newBucketNo]);

                checked

                {

                    newCountPerLock[newLockNo]++; //这个锁又罩了一个小弟,加一个

                }

                current = next;

            }

        }

        //调整下budget

        _budget = Math.Max(1, newBuckets.Length / newLocks.Length);

        //得到新的table

        _tables = new Tables(newBuckets, newLocks, newCountPerLock);

    }

    finally

    {

        // 释放锁

        ReleaseLocks(0, locksAcquired);

    }

}

说完了,总结下,ConcurrentDictionary可以说是为了避免一个大锁锁住整个Dictionary带来的性能损失而出来的,当然也是采用空间换时间,不过这空间换得还是很值得的,一些object而已。
原理在于Dictionary本质是是一个链表数组,只有在多线程同时操作到数组里同一个链表时才需要锁,所以就用到一个锁数组,每个锁罩着几个小弟(bucket及bucket内的链表元素),这样多线程读写不同锁罩的区域的时候可以同时进行而不会等待,进而提高多线程性能。
不过也凡事无绝对,不同业务场景的需求不一样,可能Dictionary配合ReaderWriterLockSlim在某些场景(比如读的机会远大于写的)可能会有更好的表现。

引自:https://www.cnblogs.com/brookshi/p/5583892.html

原文地址:https://www.cnblogs.com/mcyushao/p/10648320.html