CLR 混合线程同步构造-续

本篇继上一篇讨论一下多线程并发的处理情况,以及如何编写异步的同步构造代码避免线程阻塞。CLR 到此篇就结束了,如果想看Jeffrey 原著的请留言,写下邮箱地址。

 

著名的双检索技术

CLR 很好的支持双检索技术,这应该归功于CLR 的内存模型以及 volatile 字段访问,以下代码演示了如何使用 C# 双检索技术。

public sealed class Singleton {
   private static Object s_lock = new Object();
   private static Singletion s_value = null;

   private Singleton()
  {
       //初始化单实例对象的代码放在这里 ...
  }

   public static Singleton GetSingleton()
  {
       if(s_value != null) return s_value;

       Monitor.Enter(s_lock);//还没有创建让一个线程创建它
       if(s_value == null)
      {
           Singleton temp = new Singleton();
           Volatile.Write(ref s_value, temp);
      }
       Monitor.Exit(s_lock);
       return s_value;
  }
}

双检索技术背后的思路在于,对 GetSingletion 方法的一个调用可以快速地检查 s_value 字段,判断对象是否创建。

在CLR 中,对任何锁方法的调用都构成了一个完整的内存栅栏在栅栏之前写入的任何变量都必须在栅栏之前完成在栅栏之后的任何变量读取都必须在栅栏之后开始。对于GetSingleton 方法,这意味着 s_value 字段的值必须在调用了 Monitor.Enter 之后重新读取。调用前缓存到寄存器中的东西作不了数。

 

假如第二个 if 语句中包含的是下面这行代码:

s_value = new Singleton(); //你极有可能这样写

你以为编译器会生成代码为一个 Singleton 分配内存,调用构造器来初始化字段,再将引用赋给 s_value 字段。使一个指对其他线程可见成为 发布但这只是你一厢情愿的想法。编译器可能会这样做:为Singleton 分配内存,将引用发布到 s_value,再调用构造器。

从单线程的角度看,像这样改变顺序是无关紧要的。但再将引用发布给 s_value 之后,并在调用构造器之前,如果另一个线程调用了 GetSingleton 方法,那会发生什么?这个线程会发现 s_value 不为 null,所以会开始使用 Singleton 对象,但对象的构造器还没有结束执行呢!

大多数时候,这个技术会损害效率。下面没有使用双检索技术,但行为和上一个版本相同。还更简洁了。

internal sealed class Singleton{
   private static Singleton s_value = new Singleton();

   //私有构造器防止 这个类外部的任何代码创建一个实例
   private Singleton(){
       //初始化单实例对象的代码放在这里
  }
   public static Singleton GetSingleton() { return s_value; }
}

由于代码首次访问类的成员时,CLR 会自动调用类型的类构造器,所以首次有一个线程查询 Singleton 的 GetSingleton 方法时,CLR 就会自动调用类构造器,从而创建一个对象实例。

这种方式的缺点在于,首次访问类的 任何成员 都会调用类型构造器。所以,如果Singleton 类型定义了其他静态成员,就会在访问其他任何静态成员时创建 Singleton 对象。 有人通过定义嵌套类型来解决这个问题。

internal sealed class Singleton{
   private static Singleton s_value = null;
   //私有构造器阻止这个类外部的任何代码创建实例
   private Singleton()
  {
       
  }

   //以下公共静态方法返回单实例对象
   public static Singleton GetSingleton()
  {
       if(s_value != null) return s_value;
       //创建一个新的单实例对象,并把它固定下来(如果另一个线程还没有固定它的话)
       Singleton temp = new Singleton();        
       Interlocked.CompareExchange(ref s_value, temp , null);

       //如果这个线程竞争失败,新建的第二个单实例对象会被垃圾回收
       return s_value;
  }
}

由于大多数应用都很少发生多个线程同时调用 GetSingleton 的情况,所以不太可能同时创建多个Singleton 对象。

上述代码有很多方面的优势,1 它的速度非常快。2 它永不阻塞任何线程。

如果,一个线程池线程在一个 Monitor 或者 其他任何内核模式的线程同步构造上阻塞,线程池线程就会创建另一个线程来保持 CPU 的“饱和”。因此会初始化更多的内存,而其所有 DLL 都会收到一个线程连接通知。

 

FCL 有两个类型封装了本节描述的模式。下面是泛型System.Lazy 类。

public class Lazy<T> {
   public Lazy(Func<T> valueFactory, LazyThreadSafetyMode mode);
   public Boolean IsValueCreated { get; }
   public T Value { get; }
}

下面代码演示它如何工作的:

public static void Main() {
   // Create a lazy-initialization wrapper around getting the DateTime
   Lazy<String> s = new Lazy<String>(() => DateTime.Now.ToLongTimeString(), true);
   
   Console.WriteLine(s.IsValueCreated); // Returns false because Value not queried yet
   Console.WriteLine(s.Value); // The delegate is invoked now
   Console.WriteLine(s.IsValueCreated); // Returns true because Value was queried
   Thread.Sleep(10000); // Wait 10 seconds and display the time again
   Console.WriteLine(s.Value); // The delegate is NOT invoked now; same result
}

当我运行这段代码后,结果如下:

False
2:40:42 PM
True
2:40:42 PM ß Notice that the time did not change 10 seconds later

上述代码构造Lazy 类的实例,并向它传递某个 LazyThreadSafetyMode 标志。

public enum LazyThreadSafetyMode {
   None, // 完全没有线程安全支持,适合GUI 应用程序
   ExecutionAndPublication // Uses the double-check locking technique
   PublicationOnly, // Uses the Interlocked.CompareExchange technique
}

 

内存有限时可能不想创建Lazy 类的实例。这时可调用 System.Threading.LazyInitializer 类的静态方法。下面展示了这个类:

public static class LazyInitializer {
   // These two methods use Interlocked.CompareExchange internally:
   public static T EnsureInitialized<T>(ref T target) where T: class;
   public static T EnsureInitialized<T>(ref T target, Func<T> valueFactory) where T: class;
   // These two methods pass the syncLock to Monitor's Enter and Exit methods internally
   public static T EnsureInitialized<T>(ref T target, ref Boolean initialized, ref Object syncLock);
   public static T EnsureInitialized<T>(ref T target, ref Boolean initialized,ref Object syncLock, Func<T> valueFactory);
}

另外,为EnsureInitialized 方法的 syncLock 参数显式指定同步对象,可以用同一个锁保护多个初始化函数和字段。下面展示了如何使用这个类的方法:

public static void Main() {
   String name = null;
   // Because name is null, the delegate runs and initializes name
   LazyInitializer.EnsureInitialized(ref name, () => "Jeffrey");
   Console.WriteLine(name); // Displays "Jeffrey"
   
   // Because name is not null, the delegate does not run; name doesn’t change
   LazyInitializer.EnsureInitialized(ref name, () => "Richter");
   Console.WriteLine(name); // Also displays "Jeffrey"
}

 

条件变量模式

假定一个线程希望在一个复合条件为true 时执行一些代码。一个选项是让线程连续“自旋”,反复测试条件,但这会浪费 CPU时间,也不可能对构成复合条件的多个变量进行原子性的测试。

幸好有这样一个模式 允许 线程根据一个复合条件来同步它们的操作,而且不会浪费资源。这个模式称为 条件变量模式。

internal sealed class ConditionVariablePattern {
   private readonly Object m_lock = new Object();
   private Boolean m_condition = false;
   public void Thread1() {
       Monitor.Enter(m_lock); // Acquire a mutual-exclusive lock
       // While under the lock, test the complex condition "atomically"
       while (!m_condition) {
       // If condition is not met, wait for another thread to change the condition
      Monitor.Wait(m_lock); // Temporarily release lock so other threads can get it
  }
       // The condition was met, process the data...
       Monitor.Exit(m_lock); // Permanently release lock
  }
   public void Thread2() {
       Monitor.Enter(m_lock); // Acquire a mutual-exclusive lock
       // Process data and modify the condition...
       m_condition = true;
       // Monitor.Pulse(m_lock); // Wakes one waiter AFTER lock is released
       Monitor.PulseAll(m_lock); // Wakes all waiters AFTER lock is released
       Monitor.Exit(m_lock); // Release lock
  }
}

下面展示了一个线程安全的队列,它允许多个线程在其中对数据项 进行入队和出对操作。注意,除了有一个可供处理的数据项,否则试图出队一个数据项会一直阻塞。

internal sealed class SynchronizedQueue<T> {
   private readonly Object m_lock = new Object();
   private readonly Queue<T> m_queue = new Queue<T>();
   public void Enqueue(T item) {
       Monitor.Enter(m_lock);
       // After enqueuing an item, wake up any/all waiters
       m_queue.Enqueue(item);
       Monitor.PulseAll(m_lock);
       Monitor.Exit(m_lock);
  }
   
   public T Dequeue() {
       Monitor.Enter(m_lock);
       // Loop while the queue is empty (the condition)
       while (m_queue.Count == 0)
           Monitor.Wait(m_lock);
           // Dequeue an item from the queue and return it for processing
       T item = m_queue.Dequeue();
       Monitor.Exit(m_lock);
       return item;
  }
}

 

异步的同步构造

假定客户端向网站发出请求。客户端请求到达时,一个线程池线程开始处理客户端请求。假定这个客户端想以线程安全的方式修改数据,所以它请求一个 reader-writer 锁来进行写入。假定这个锁被长时间占有。在锁占有期间,另一个客户端请求到达了,所以线程池为这个请求创建新线程。然后,线程阻塞,尝试获取 reader-writer 锁来进行读取。事实上,随着越来越多的客户端请求到达,线程池线程会创建越来越多的线程,所以这些线程都要傻傻地在锁上阻塞。服务器把它的所有时间都花在创建线程上面,而目的仅仅是让它们停止运行!这样的服务器完全没有伸缩性可言。

更糟的是,当Writer 线程释放锁时,所有reader线程都同时解除阻塞开始执行。现在又变成了大量线程试图在相对数量很少的CPU 上运行。所以 , Windows 开始在线程之间不停地进行上下文切换。由于上下文切换产生了大量开销,所以真正的工作反而没有得到很好的处理。 这些构造想要解决的许多问题其实最好就是用 第 27 章讨论的Task 类来完成。

拿 Barrier 类来说:可以生成几个 Task 对象来处理一个阶段。然后,当所有这些任务完成后,可以用另外一个或多个 Task 对象继续。和本章展示的大量构造相比,任务具有下述许多优势。

  • 任务使用的内存比线程少得多,创建和销毁所需的时间也少得多。

  • 线程池根据可用CPU 数量自动伸缩任务规模。

  • 每个任务完成一个阶段后,运行任务的线程回到线程池,在那里能接受新任务。

  • 线程池是站在整个进程的高度观察任务,所有,它能更好地调度这些任务,减少进程中的线程数,并减少上下文切换。

 

重点来了:如果代码能通过异步的同步构造指出它想要一个锁,那么会非常有用。在这种情况下,如果线程得不到锁,可直接返回并执行其他工作,而不必在那里傻傻地阻塞。以后当锁可用时,代码可恢复执行并访问锁保护的资源。

SemaphoreSlim 类通过 WaitAsync 方法实现了这个思路,下面是该方法的最复杂重载版本的签名。

public Task<Boolean> WaitAsync(Int32 millisecondsTimeout, CancellationToken cancellationToken);

可用它异步地同步对一个资源的访问(不阻塞任何线程)。

private static async Task AccessResourceViaAsyncSynchronization(SemaphoreSlim asyncLock) {
   // TODO: Execute whatever code you want here...
   
   await asyncLock.WaitAsync(); // Request exclusive access to a resource via its lock
   // When we get here, we know that no other thread is accessing the resource
   // TODO: Access the resource (exclusively)...
   
   // When done accessing resource, relinquish lock so other code can access the resource
   asyncLock.Release();
   // TODO: Execute whatever code you want here...
}

一般创建最大技术为1 的 SemaphoreSlim,从而对 SemaphoreSlim 保护的资源进行互斥访问。所以这和使用 Monitor 时的行为相似,只是 SemaphoreSlim 不支持所有权和递归语义。

 

对于 reader-writer 语义, .Net Framework 提供了:ConcurrentExclusiveSchedulerPair 类。

public class ConcurrentExclusiveSchedulerPair {
   public ConcurrentExclusiveSchedulerPair();
   public TaskScheduler ExclusiveScheduler { get; }
   public TaskScheduler ConcurrentScheduler { get; }
   // Other methods not shown...
}

这个类的两个 TaskScheduler 对象,它们在调度任务时负责 提供 reader/writer 语义。只要当前没有运行使用 ConcurrentScheduler 调度的任务,使用 ExclusiveScheduler 调度的任何任务将独占式地运行。另外,只要当前没有运行使用 ExclusiveScheduler 调度的任务,使用 ConcurrentScheduler 调度的任务就可同时运行。

private static void ConcurrentExclusiveSchedulerDemo() {
   var cesp = new ConcurrentExclusiveSchedulerPair();
   var tfExclusive = new TaskFactory(cesp.ExclusiveScheduler);
   var tfConcurrent = new TaskFactory(cesp.ConcurrentScheduler);
   
   for (Int32 operation = 0; operation < 5; operation++) {
       var exclusive = operation < 2; // For demo, I make 2 exclusive & 3 concurrent
       
      (exclusive ? tfExclusive : tfConcurrent).StartNew(() => {
           Console.WriteLine("{0} access", exclusive ? "exclusive" : "concurrent");
           // TODO: Do exclusive write or concurrent read computation here...
      });
  }
}

遗憾的是 .Net Framework 没有提供具有 reader/writer 语义的异步锁。但作者构建了一个这样的类, AsyncOneManyLock。用法和SemaphoreSlim 一样。

 

作者的AsyncOneManyLock 类 内部没有使用任何内核构造。只使用了一个SpinLock,它在内部使用了用户模式的构造。 WaitAsync 和 Realse 方法 用锁保护的只是一些整数计算和比较,以及构造一个 TaskCompletionSource ,并把它添加/删除 从队列中。这花不了多少时间,能保证锁只是短时间被占有。

private static async Task AccessResourceViaAsyncSynchronization(AsyncOneManyLock asyncLock) {
   // TODO: Execute whatever code you want here...
   
   // Pass OneManyMode.Exclusive or OneManyMode.Shared for wanted concurrent access
   await asyncLock.AcquireAsync(OneManyMode.Shared); // Request shared access
   // When we get here, no threads are writing to the resource; other threads may be reading
   // TODO: Read from the resource...
   
   // When done accessing resource, relinquish lock so other code can access the resource
   asyncLock.Release();
   // TODO: Execute whatever code you want here...
}

 

并发集合类

FCL 自带4个 线程安全的集合类,在 System.Collections.Concurrent 命名空间中定义。它们是:ConcurrentQueue,ConcurrentStack,ConcurrentDictionary 和 ConcurrentBag。

所有这些集合都是“非阻塞”的。换言之,如果一个线程试图提取一个不存在的元素(数据项),线程会立即返回;线程不会阻塞在那里,等着一个元素的出现。

一个集合“非阻塞”,并不意味着它就不需要锁了。ConcurrentDictionary 类在内部使用了 Monitor。ConcurrentQueue 和 ConcurrentStack 确实不需要锁;它们在内部都使用 Interlocked 的方法来操纵集合。一个 ConcurrentBag 对象由大量迷你集合对象构成,每个线程一个。

ConcurrentStack,ConcurrentQueue 和 ConcurrentBag 这三个并发集合类都实现了 IProducerConsumerCollection 接口。实现了这个接口的任何类 都能转变成一个阻塞集合。要将非阻塞的集合转变为阻塞集合,需要构造一个System.Collections.Concurrent.BlockingColllection 类,向它的构造器传递对非阻塞集合的引用。

 

原文地址:https://www.cnblogs.com/mingjie-c/p/11767759.html