基于TPL异步编程的同步和互斥的类库

在多线程编程中,我们除了并发外,往往还需要各线程直接进行互斥和同步等操作,.Net Framework在System.Threading中提供了不少库供我们实现这些功能,如:Mutex,ReaderWriterLock,ManualResetEvent,AutoResetEvent,Barrier等。在基于TPL的异步编程中,我们往往也需要这样的类库来实现异步的同步和互斥功能,但.Net Framework没有为我们提供这些类库,往往得自己实现,比较麻烦。今天在MSDN Blog上发现了一个大牛的实现,非常全面,这里强烈推荐一下。 

AsyncManualResetEvent

View Code
    //http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx
    public class AsyncManualResetEvent
    {
        private volatile TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool>();

        public Task WaitAsync() { return m_tcs.Task; }

        public void Set() { m_tcs.TrySetResult(true); }

        public void Reset()
        {
            lock (m_tcs)
            {
                if (!m_tcs.Task.IsCompleted)
                {
                    Task.Factory.StartNew(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), m_tcs).Wait();
                }
                m_tcs = new TaskCompletionSource<bool>();
            }
            
        }
    }

AsyncAutoResetEvent

View Code
    //http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266923.aspx
    public class AsyncAutoResetEvent
    {
        private readonly static Task s_completed = Task.FromResult(true);
        private readonly Queue<TaskCompletionSource<bool>> m_waits = new Queue<TaskCompletionSource<bool>>();
        private bool m_signaled;

        public Task WaitAsync()
        {
            lock (m_waits)
            {
                if (m_signaled)
                {
                    m_signaled = false;
                    return s_completed;
                }
                else
                {
                    var tcs = new TaskCompletionSource<bool>();
                    m_waits.Enqueue(tcs);
                    return tcs.Task;
                }
            } 
        }

        public void Set()
        {
            TaskCompletionSource<bool> toRelease = null;
            lock (m_waits)
            {
                if (m_waits.Count > 0)
                    toRelease = m_waits.Dequeue();
                else if (!m_signaled)
                    m_signaled = true;
            }
            if (toRelease != null)
                toRelease.SetResult(true); 
        }
    }

AsyncCountdownEvent

View Code
    //http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266930.aspx
    public class AsyncCountdownEvent
    {
        private readonly AsyncManualResetEvent m_amre = new AsyncManualResetEvent();
        private int m_count;

        public AsyncCountdownEvent(int initialCount)
        {
            if (initialCount <= 0throw new ArgumentOutOfRangeException("initialCount");
            m_count = initialCount; 
        }
        
        public Task WaitAsync() { return m_amre.WaitAsync(); }
        
        public void Signal()
        {
            if (m_count <= 0)
                throw new InvalidOperationException();

            int newCount = Interlocked.Decrement(ref m_count);
            if (newCount == 0)
                m_amre.Set();
            else if (newCount < 0)
                throw new InvalidOperationException();
        }

        public Task SignalAndWait()
        {
            Signal();
            return WaitAsync();
        }
    }

AsyncBarrier

View Code
    //http://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266932.aspx
    public class AsyncBarrier
    {
        private readonly int m_participantCount;
        private int m_remainingParticipants;
        private ConcurrentStack<TaskCompletionSource<bool>> m_waiters;

        public AsyncBarrier(int participantCount)
        {
            if (participantCount <= 0throw new ArgumentOutOfRangeException("participantCount");
            m_remainingParticipants = m_participantCount = participantCount;
            m_waiters = new ConcurrentStack<TaskCompletionSource<bool>>();
        }

        public Task SignalAndWait()
        {
            var tcs = new TaskCompletionSource<bool>();
            m_waiters.Push(tcs);
            if (Interlocked.Decrement(ref m_remainingParticipants) == 0)
            {
                m_remainingParticipants = m_participantCount;
                var waiters = m_waiters;
                m_waiters = new ConcurrentStack<TaskCompletionSource<bool>>();
                Parallel.ForEach(waiters, w => w.SetResult(true));
            }
            return tcs.Task;
        }
    }

 AsyncSemaphore 

View Code
    //http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266983.aspx
    public class AsyncSemaphore
    {
        private readonly static Task s_completed = Task.FromResult(true);
        private readonly Queue<TaskCompletionSource<bool>> m_waiters = new Queue<TaskCompletionSource<bool>>();
        private int m_currentCount;

        public AsyncSemaphore(int initialCount)
        {
            if (initialCount < 0throw new ArgumentOutOfRangeException("initialCount");
            m_currentCount = initialCount;
        } 

        public Task WaitAsync()
        {
            lock (m_waiters)
            {
                if (m_currentCount > 0)
                {
                    --m_currentCount;
                    return s_completed;
                }
                else
                {
                    var waiter = new TaskCompletionSource<bool>();
                    m_waiters.Enqueue(waiter);
                    return waiter.Task;
                }
            }
        }

        public void Release()
        {
            TaskCompletionSource<bool> toRelease = null;
            lock (m_waiters)
            {
                if (m_waiters.Count > 0)
                    toRelease = m_waiters.Dequeue();
                else
                    ++m_currentCount;
            }
            if (toRelease != null)
                toRelease.SetResult(true);
        }
    }

  AsyncLock

View Code
    //http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266988.aspx
    public class AsyncLock
    {
        private readonly AsyncSemaphore m_semaphore;
        private readonly Task<Releaser> m_releaser;

        public AsyncLock()
        {
            m_semaphore = new AsyncSemaphore(1);
            m_releaser = Task.FromResult(new Releaser(this));
        }

        public Task<Releaser> LockAsync()
        {
            var wait = m_semaphore.WaitAsync();
            return wait.IsCompleted ?
                m_releaser :
                wait.ContinueWith((_, state) => new Releaser((AsyncLock)state),
                    this, CancellationToken.None,
                    TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
        }

        public struct Releaser : IDisposable
        {
            private readonly AsyncLock m_toRelease;

            internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; }

            public void Dispose()
            {
                if (m_toRelease != null)
                    m_toRelease.m_semaphore.Release();
            }
        }
    }

 AsyncReaderWriterLock

View Code
    //http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10267069.aspx
    public class AsyncReaderWriterLock
    {
        private readonly Task<Releaser> m_readerReleaser;
        private readonly Task<Releaser> m_writerReleaser;

        private readonly Queue<TaskCompletionSource<Releaser>> m_waitingWriters = new Queue<TaskCompletionSource<Releaser>>();
        private TaskCompletionSource<Releaser> m_waitingReader = new TaskCompletionSource<Releaser>();
        private int m_readersWaiting;
        private int m_status;

        public AsyncReaderWriterLock()
        {
            m_readerReleaser = Task.FromResult(new Releaser(thisfalse));
            m_writerReleaser = Task.FromResult(new Releaser(thistrue));
        }

        public Task<Releaser> ReaderLockAsync()
        {
            lock (m_waitingWriters)
            {
                if (m_status >= 0 && m_waitingWriters.Count == 0)
                {
                    ++m_status;
                    return m_readerReleaser;
                }
                else
                {
                    ++m_readersWaiting;
                    return m_waitingReader.Task;
                }
            }
        }


        public Task<Releaser> WriterLockAsync()
        {
            lock (m_waitingWriters)
            {
                if (m_status == 0)
                {
                    m_status = -1;
                    return m_writerReleaser;
                }
                else
                {
                    var waiter = new TaskCompletionSource<Releaser>();
                    m_waitingWriters.Enqueue(waiter);
                    return waiter.Task;
                }
            }
        }

        private void ReaderRelease()
        {
            TaskCompletionSource<Releaser> toWake = null;

            lock (m_waitingWriters)
            {
                --m_status;
                if (m_status == 0 && m_waitingWriters.Count > 0)
                {
                    m_status = -1;
                    toWake = m_waitingWriters.Dequeue();
                }
            }

            if (toWake != null)
                toWake.SetResult(new Releaser(thistrue));
        }

        private void WriterRelease()
        {
            TaskCompletionSource<Releaser> toWake = null;
            bool toWakeIsWriter = false;

            lock (m_waitingWriters)
            {
                if (m_waitingWriters.Count > 0)
                {
                    toWake = m_waitingWriters.Dequeue();
                    toWakeIsWriter = true;
                }
                else if (m_readersWaiting > 0)
                {
                    toWake = m_waitingReader;
                    m_status = m_readersWaiting;
                    m_readersWaiting = 0;
                    m_waitingReader = new TaskCompletionSource<Releaser>();
                }
                else m_status = 0;
            }

            if (toWake != null)
                toWake.SetResult(new Releaser(this, toWakeIsWriter));
        }

        public struct Releaser : IDisposable
        {
            private readonly AsyncReaderWriterLock m_toRelease;
            private readonly bool m_writer;

            internal Releaser(AsyncReaderWriterLock toRelease, bool writer)
            {
                m_toRelease = toRelease;
                m_writer = writer;
            }

            public void Dispose()
            {
                if (m_toRelease != null)
                {
                    if (m_writer) m_toRelease.WriterRelease();
                    else m_toRelease.ReaderRelease();
                }
            }
        }
    }
原文地址:https://www.cnblogs.com/TianFang/p/2559972.html