1 public delegate void Consumer(IList<object> monitors); 2 3 public enum ConsumeErrorAction 4 { 5 AbandonAndLogException 6 } 7 public enum NotReachBatchCountConsumeAction 8 { 9 ConsumeAllItems 10 } 11 public enum ReachMaxItemCountAction 12 { 13 AbandonOldItems 14 } 15 public class MyQueueConfig 16 { 17 private string queueName; 18 public Consumer Consumer { get; private set; } 19 public MyQueueConfig(string name, Consumer c) 20 { 21 queueName = name; 22 Consumer = c; 23 } 24 25 public int MaxItemCount { get; set; } 26 public ConsumeErrorAction ConsumeErrorAction { get; set; } 27 public int ConsumeIntervalMilliseconds { get; set; } 28 public int ConsumeItemCountInOneBatch { get; set; } 29 public int ConsumeThreadCount { get; set; } 30 public NotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; } 31 public ReachMaxItemCountAction ReachMaxItemCountAction { get; set; } 32 } 33 public class MyQueue 34 { 35 private MyQueueConfig c; 36 private Queue<object> queue; 37 private Dictionary<int, Queue<List<object>>> queueData; 38 private Dictionary<int, Thread> threads; 39 public MyQueue() 40 { 41 queue = new Queue<object>(); 42 queueData = new Dictionary<int, Queue<List<object>>>(); 43 threads = new Dictionary<int, Thread>(); 44 } 45 46 public void Init(MyQueueConfig config) 47 { 48 c = config; 49 for (int i = 0; i < c.ConsumeThreadCount; i++) 50 { 51 Thread thread = new Thread(consumer); 52 thread.Start(i); 53 threads.Add(i, thread); 54 queueData.Add(i, new Queue<List<object>>()); 55 } 56 Thread threadBack = new Thread(consumer); 57 threadBack.Start(c.ConsumeThreadCount); 58 queueData.Add(c.ConsumeThreadCount, new Queue<List<object>>()); 59 60 new Thread(sendConsumer).Start(); 61 } 62 63 private void sendConsumer() 64 { 65 try 66 { 67 while (true) 68 { 69 List<object> forConsumer = new List<object>(); 70 int queueCount = queue.Count; 71 //队列空时 72 if (queueCount == 0) 73 { 74 Thread.Sleep(1000); 75 continue; 76 } 77 //队列过大时 78 if (queueCount > c.MaxItemCount - 1000) 79 { 80 queue.Clear(); 81 continue; 82 } 83 int consumerCount = c.ConsumeItemCountInOneBatch; 84 //队列不满每次消费数量时 85 if (queueCount < consumerCount) 86 { 87 consumerCount = queueCount; 88 } 89 for (int i = 0; i < consumerCount; i++) 90 { 91 forConsumer.Add(queue.Dequeue()); 92 } 93 94 List<object[]> batchs = forConsumer.BatchesOf(consumerCount / c.ConsumeThreadCount).ToList(); 95 if (batchs.Count < c.ConsumeThreadCount) 96 { 97 batchs.ForEach(t => 98 { 99 queueData[0].Enqueue(t.ToList()); 100 }); 101 } 102 else 103 { 104 for (int i = 0; i < batchs.Count; i++) 105 { 106 queueData[i].Enqueue(batchs[i].ToList()); 107 } 108 } 109 //获取大队列数据 110 //分发到线程数量的小队列中 111 Thread.Sleep(c.ConsumeIntervalMilliseconds); 112 } 113 } 114 catch (Exception ex) 115 { 116 //异常117 } 118 119 } 120 121 private void consumer(object index) 122 { 123 try 124 { 125 int queueIndex = Convert.ToInt32(index); 126 while (true) 127 { 128 if (queueData[queueIndex].Count > 0) 129 { 130 List<object> forConsumerQueue = queueData[queueIndex].Dequeue(); 131 if (forConsumerQueue.Count > 0) 132 { 133 c.Consumer(forConsumerQueue); 134 } 135 else 136 { 137 Thread.Sleep(c.ConsumeIntervalMilliseconds); 138 } 139 } 140 else 141 { 142 Thread.Sleep(c.ConsumeIntervalMilliseconds); 143 } 144 //Thread.Sleep(c.ConsumeIntervalMilliseconds); 145 } 146 } 147 catch (Exception ex) 148 { 149 //异常150 } 151 152 } 153 154 public void Enqueue(object obj) 155 { 156 queue.Enqueue(obj); 157 } 158 }
消费过程如下:
1.启动构建主队列,主队列线程;消费队列,消费队列线程
2.主队列线程负责拉取指定数量的数据到待消费队列
3.待消费队列每个单独启动一个线程进行消费,
过程每个队列单线程消费,保证消费多线程的同时又无锁,我是这么认为的。
中间有个拓展方法:
BatchesOf
public static class EnumerableExtensions { public static IEnumerable<T[]> BatchesOf<T>(this IEnumerable<T> sequence, int batchSize) { List<T> iteratorVariable0 = new List<T>(batchSize); foreach (T iteratorVariable1 in sequence) { iteratorVariable0.Add(iteratorVariable1); if (iteratorVariable0.Count >= batchSize) { yield return iteratorVariable0.ToArray(); iteratorVariable0.Clear(); } } if (iteratorVariable0.Count > 0) { yield return iteratorVariable0.ToArray(); iteratorVariable0.Clear(); } } }
调用如下:
public static MyQueue queueService = new MyQueue(); private static bool _enable; private static int _threadCount = 2; private static int _mSecond = 1000; private static int _oneBatch = 500; private static object _root = new object(); public static QueueAsycAction Instance { get { if (_instance == null) lock (_root) if (_instance == null) _instance = new QueueAsycAction(); return _instance; } } private QueueAsycAction() { InitMemoryQueueService(); } /// <summary> /// 初始化队列 /// </summary> private void InitMemoryQueueService() { try { queueService.Init(new MyQueueConfig("Member_AsycAction", Consumer) { ConsumeIntervalMilliseconds = _mSecond, ConsumeItemCountInOneBatch = _oneBatch, ConsumeThreadCount = _threadCount, MaxItemCount = 100000, NotReachBatchCountConsumeAction = NotReachBatchCountConsumeAction.ConsumeAllItems, ReachMaxItemCountAction = ReachMaxItemCountAction.AbandonOldItems, }); } catch (Exception ex) { //异常处理 } } /// <summary> /// 消费者 /// </summary> /// <param name="monitors"></param> private void Consumer(IList<object> monitors) { try { foreach (object item in monitors) { //消费方法 } } catch (Exception ex) { //异常处理 } } /// <summary> /// 添加队列项 /// </summary> /// <param name="log"></param> public void AddItem(object action) { try { queueService.Enqueue(action); } catch (Exception ex) { //异常处理 } }
采用单例模式调用,可以实例化多条队列做不同的操作。
望指正