C# CountdownEvent实现

关于CountdownEvent网上的介绍比较少,因为它是实现和使用都很简单,先看看网上的一些评论吧:

CountDownEvent调用成员函数Wait()将阻塞,直至成员函数Signal() 被调用达特定的次数,这时CountDownEvent称作就绪态,对于处于就绪态的CountDownEvent,调用Wait()函数将不会再阻塞,只有手动调用Reset()函数后,调用Wait()函数将再次阻塞。CountDownEvent可以通过TryAddCount()和AddCount()函数来增加函数Signal() 需被调用的次数,但只有当CountDownEvent处于未就绪态时才会成功。否则根据调用函数的不同,将有可能抛出异常
当有新的需要同步的任务产生时,就调用AddCount增加它的计数,当有任务到达同步点是,就调用Signal函数减小它的计数,当CountdownEvent的计数为零时,就表示所有需要同步的任务已经完成,可以开始下一步任务了。

我们来看看CountdownEvent的实现:

public class CountdownEvent : IDisposable
{
    private int m_initialCount; // The original # of signals the latch was instantiated with.
    private volatile int m_currentCount;  // The # of outstanding signals before the latch transitions to a signaled state.
    private ManualResetEventSlim m_event;   // An event used to manage blocking and signaling.
    private volatile bool m_disposed; // Whether the latch has been disposed.
    public CountdownEvent(int initialCount)
    {
        if (initialCount < 0)
        {
            throw new ArgumentOutOfRangeException("initialCount");
        }

        m_initialCount = initialCount;
        m_currentCount = initialCount;

        // Allocate a thin event, which internally defers creation of an actual Win32 event.
        m_event = new ManualResetEventSlim();

        // If the latch was created with a count of 0, then it's already in the signaled state.
        if (initialCount == 0)
        {
            m_event.Set();
        }
    }
    public int CurrentCount
    {
        get 
        {
            int observedCount = m_currentCount;
            return observedCount < 0 ? 0 : observedCount;
        }
    }
    public bool IsSet
    {
        get
        {
            return (m_currentCount <= 0);
        }
    }
    
    //<returns>true if the signal caused the count to reach zero and the event was set; otherwise, false.
    public bool Signal()
    {
        ThrowIfDisposed();
        Contract.Assert(m_event != null);

        if (m_currentCount <= 0)
        {
            throw new InvalidOperationException(Environment.GetResourceString("CountdownEvent_Decrement_BelowZero"));
        }
        int newCount = Interlocked.Decrement(ref m_currentCount);
        if (newCount == 0)
        {
            m_event.Set();
            return true;
        }
        else if (newCount < 0)
        {
            throw new InvalidOperationException(Environment.GetResourceString("CountdownEvent_Decrement_BelowZero"));
        }
        return false;
    }
    
    //<returns>true if the signals caused the count to reach zero and the event was set; otherwise, false.
    public bool Signal(int signalCount)
    {
        if (signalCount <= 0)
        {
            throw new ArgumentOutOfRangeException("signalCount");
        }

        ThrowIfDisposed();
        Contract.Assert(m_event != null);

        int observedCount;
        SpinWait spin = new SpinWait();
        while (true)
        {
            observedCount = m_currentCount;
            if (observedCount < signalCount)
            {
                throw new InvalidOperationException(Environment.GetResourceString("CountdownEvent_Decrement_BelowZero"));
            }
            if (Interlocked.CompareExchange(ref m_currentCount, observedCount - signalCount, observedCount) == observedCount)
            {
                break;
            }
            spin.SpinOnce();
        }
        if (observedCount == signalCount)
        {
            m_event.Set();
            return true;
        }
        Contract.Assert(m_currentCount >= 0, "latch was decremented below zero");
        return false;
    }

    public void AddCount(int signalCount)
    {
        if (!TryAddCount(signalCount))
        {
            throw new InvalidOperationException(Environment.GetResourceString("CountdownEvent_Increment_AlreadyZero"));
        }
    }
    //<returns> true if the increment succeeded; otherwise, false
    public bool TryAddCount(int signalCount)
    {
        if (signalCount <= 0)
        {
            throw new ArgumentOutOfRangeException("signalCount");
        }

        ThrowIfDisposed();
        int observedCount;
        SpinWait spin = new SpinWait();
        while (true)
        {
            observedCount = m_currentCount;
            if (observedCount <= 0)
            {
                return false;
            }
            else if (observedCount > (Int32.MaxValue - signalCount))
            {
                throw new InvalidOperationException(Environment.GetResourceString("CountdownEvent_Increment_AlreadyMax"));
            }

            if (Interlocked.CompareExchange(ref m_currentCount, observedCount + signalCount, observedCount) == observedCount)
            {
                break;
            }
            spin.SpinOnce();
        }
        return true;
    }
    
    public void Reset()
    {
        Reset(m_initialCount);
    }
    public void Reset(int count)
    {
        ThrowIfDisposed();
        if (count < 0)
        {
            throw new ArgumentOutOfRangeException("count");
        }
        m_currentCount = count;
        m_initialCount = count;

        if (count == 0)
        {
            m_event.Set();
        }
        else
        {
            m_event.Reset();
        }
    }
    //Blocks the current thread until the is set
    public void Wait()
    {
        Wait(Timeout.Infinite, new CancellationToken());
    }
    public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken)
    {
        if (millisecondsTimeout < -1)
        {
            throw new ArgumentOutOfRangeException("millisecondsTimeout");
        }

        ThrowIfDisposed();
        cancellationToken.ThrowIfCancellationRequested();
        bool returnValue = IsSet;
        // If not completed yet, wait on the event.
        if (!returnValue)
        {
            // ** the actual wait
            returnValue = m_event.Wait(millisecondsTimeout, cancellationToken);
            //the Wait will throw OCE itself if the token is canceled.
        }
        return returnValue;
    }   
}

代码非常简单,CountdownEvent内部是实现还是依赖于ManualResetEventSlim实例,int initialCount参数为0是就调用ManualResetEventSlim的set方法,这样Wait的对象就不被阻塞了。IsSet是一个主要属性【return (m_currentCount <= 0)】,Signal()和Signal(int signalCount)方法就是减少m_currentCount的,主要采用原子操作【Interlocked.Decrement(ref m_currentCount)】 和【Interlocked.CompareExchange(ref m_currentCount, observedCount - signalCount, observedCount) == observedCount】,如果减少后m_currentCount==0 就调用set方法,为Wait的线程放行;注意这里面有使用SpinWait的自旋,那么AddCount、TryAddCount和Signal方法相反,主要是增加m_currentCount,实现方式一样,采用原子操作+自旋;Reset还原为初始值,Reset(int count)还原为制定的值,Wait方法主要看是否是IsSet【是则是调用了Set方法的】,则直接返回【当前m_currentCount==0】,否者就调用ManualResetEventSlim的Wai方法

原文地址:https://www.cnblogs.com/majiang/p/7897091.html