实现延迟消息队列

  • 交流

    个人博客交流群:580749909 , 顺便推广一下自己和伙伴一起建立wpf交流群:130108655。

  • 简要

 因为在偶然的一次机会下,公司让我着手开发一个数据分发端基于socket通讯的一个中间件。主要用来解决向客户端分发数据的问题,后来多了一个需求就是未付费的用户拿到的数据是有延迟的。

而付费用户则是正常的。这个时候在网上搜了很久没有找到合适的解决方案,其实能解决这个问题的方案有很多比如说用到一些大厂贡献的xxMQ中间件之类的,确实能解决问题。但是目前项目比较小

根本用不上这么重的框架,然后又搜索了半天没有暂时没有发现有人用c#来实现,所以才动手写了这个方案。

附上github源码地址

  • 思路

这个方案是借鉴了另一位博主的开发思路,受到这位博主的启发然后根据自己的理解写了这个方案。附上该博主的链接地址:  1分钟实现“延迟消息”功能

在此我就不多赘述里面的内容了。

  • 代码

首先写一个方案要理清楚自己的项目结构,我做了如下分层。

  • Interfaces , 这层里主要约束延迟消息队列的队列和消息任务行。
 1   public interface IRingQueue<T>
 2     {
 3         /// <summary>
 4         /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
 5         /// </summary>
 6         /// <param name="delayTime">The specified task is executed after N seconds.</param>
 7         /// <param name="action">Definitions of callback</param>
 8         void Add(long delayTime,Action<T> action);
 9 
10         /// <summary>
11         /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
12         /// </summary>
13         /// <param name="delayTime">The specified task is executed after N seconds.</param>
14         /// <param name="action">Definitions of callback.</param>
15         /// <param name="data">Parameters used in the callback function.</param>
16         void Add(long delayTime, Action<T> action, T data);
17 
18         /// <summary>
19         /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
20         /// </summary>
21         /// <param name="delayTime"></param>
22         /// <param name="action">Definitions of callback</param>
23         /// <param name="data">Parameters used in the callback function.</param>
24         /// <param name="id">Task ID, used when deleting tasks.</param>
25         void Add(long delayTime, Action<T> action, T data, long id);
26 
27         /// <summary>
28         /// Remove tasks [need to know: where the task is, which specific task].
29         /// </summary>
30         /// <param name="index">Task slot location</param>
31         /// <param name="id">Task ID, used when deleting tasks.</param>
32         void Remove(long id);
33 
34         /// <summary>
35         /// Launch queue.
36         /// </summary>
37         void Start();
38     }
1 public interface ITask
2     {
3     }
  • Achieves,这层里实现之前定义的接口,这里写成抽象类是为了后面方便扩展。
  1    public abstract class BaseQueue<T> : IRingQueue<T>
  2     {
  3         private long _pointer = 0L;
  4         private ConcurrentBag<BaseTask<T>>[] _arraySlot;
  5         private int ArrayMax;
  6 
  7         /// <summary>
  8         /// Ring queue.
  9         /// </summary>
 10         public ConcurrentBag<BaseTask<T>>[] ArraySlot
 11         {
 12             get { return _arraySlot ?? (_arraySlot = new ConcurrentBag<BaseTask<T>>[ArrayMax]); }
 13         }
 14         
 15         public BaseQueue(int arrayMax)
 16         {
 17             if (arrayMax < 60 && arrayMax % 60 == 0)
 18                 throw new Exception("Ring queue length cannot be less than 60 and is a multiple of 60 .");
 19 
 20             ArrayMax = arrayMax;
 21         }
 22 
 23         public void Add(long delayTime, Action<T> action)
 24         {
 25             Add(delayTime, action, default(T));
 26         }
 27 
 28         public void Add(long delayTime,Action<T> action,T data)
 29         {
 30             Add(delayTime, action, data,0);
 31         }
 32 
 33         public void Add(long delayTime, Action<T> action, T data,long id)
 34         {
 35             NextSlot(delayTime, out long cycle, out long pointer);
 36             ArraySlot[pointer] =  ArraySlot[pointer] ?? (ArraySlot[pointer] = new ConcurrentBag<BaseTask<T>>());
 37             var baseTask = new BaseTask<T>(cycle, action, data,id);
 38             ArraySlot[pointer].Add(baseTask);
 39         }
 40 
 41         /// <summary>
 42         /// Remove tasks based on ID.
 43         /// </summary>
 44         /// <param name="id"></param>
 45         public void Remove(long id)
 46         {
 47             try
 48             {
 49                 Parallel.ForEach(ArraySlot, (ConcurrentBag<BaseTask<T>> collection, ParallelLoopState state) =>
 50                 {
 51                     var resulTask = collection.FirstOrDefault(p => p.Id == id);
 52                     if (resulTask != null)
 53                     {
 54                         collection.TryTake(out resulTask);
 55                         state.Break();
 56                     }
 57                 });
 58             }
 59             catch (Exception e)
 60             {
 61                 Console.WriteLine(e);
 62             }
 63         }
 64         
 65         public void Start()
 66         {
 67             while (true)
 68             {
 69                 RightMovePointer();
 70                 Thread.Sleep(1000);
 71                 Console.WriteLine(DateTime.Now.ToString());
 72             }
 73         }
 74 
 75         /// <summary>
 76         /// Calculate the information of the next slot.
 77         /// </summary>
 78         /// <param name="delayTime">Delayed execution time.</param>
 79         /// <param name="cycle">Number of turns.</param>
 80         /// <param name="index">Task location.</param>
 81         private void NextSlot(long delayTime, out long cycle,out long index)
 82         {
 83             try
 84             {
 85                 var circle = delayTime / ArrayMax;
 86                 var second = delayTime % ArrayMax;
 87                 var current_pointer = GetPointer();
 88                 var queue_index = 0L;
 89 
 90                 if (delayTime - ArrayMax > ArrayMax)
 91                 {
 92                     circle = 1;
 93                 }
 94                 else if (second > ArrayMax)
 95                 {
 96                     circle += 1;
 97                 }
 98 
 99                 if (delayTime - circle * ArrayMax < ArrayMax)
100                 {
101                     second = delayTime - circle * ArrayMax;
102                 }
103 
104                 if (current_pointer + delayTime >= ArrayMax)
105                 {
106                     cycle = (int)((current_pointer + delayTime) / ArrayMax);
107                     if (current_pointer + second - ArrayMax < 0)
108                     {
109                         queue_index = current_pointer + second;
110                     }
111                     else if (current_pointer + second - ArrayMax > 0)
112                     {
113                         queue_index = current_pointer + second - ArrayMax;
114                     }
115                 }
116                 else
117                 {
118                     cycle = 0;
119                     queue_index = current_pointer + second;
120                 }
121                 index = queue_index;
122             }
123             catch (Exception e)
124             {
125                 Console.WriteLine(e);
126                 throw;
127             }
128         }
129 
130         /// <summary>
131         /// Get the current location of the pointer.
132         /// </summary>
133         /// <returns></returns>
134         private long GetPointer()
135         {
136             return Interlocked.Read(ref _pointer);
137         }
138 
139         /// <summary>
140         /// Reset pointer position.
141         /// </summary>
142         private void ReSetPointer()
143         {
144             Interlocked.Exchange(ref _pointer, 0);
145         }
146 
147         /// <summary>
148         /// Pointer moves clockwise.
149         /// </summary>
150         private void RightMovePointer()
151         {
152             try
153             {
154                 if (GetPointer() >= ArrayMax - 1)
155                 {
156                     ReSetPointer();
157                 }
158                 else
159                 {
160                     Interlocked.Increment(ref _pointer);
161                 }
162 
163                 var pointer = GetPointer();
164                 var taskCollection = ArraySlot[pointer];
165                 if (taskCollection == null || taskCollection.Count == 0) return;
166 
167                 Parallel.ForEach(taskCollection, (BaseTask<T> task) =>
168                 {
169                     if (task.Cycle > 0)
170                     {
171                         task.SubCycleNumber();
172                     }
173 
174                     if (task.Cycle <= 0)
175                     {
176                         taskCollection.TryTake(out task);
177                         task.TaskAction(task.Data);
178                     }
179                 });
180             }
181             catch (Exception e)
182             {
183                 Console.WriteLine(e);
184                 throw;
185             }
186         }
187     }
BaseQueue
 1   public class BaseTask<T> : ITask
 2     {
 3         private long _cycle;
 4         private long _id;
 5         private T _data;
 6 
 7         public Action<T> TaskAction { get; set; }
 8 
 9         public long Cycle
10         {
11             get { return Interlocked.Read(ref _cycle); }
12             set { Interlocked.Exchange(ref _cycle, value); }
13         }
14 
15         public long Id
16         {
17             get { return _id; }
18             set { _id = value; }
19         }
20 
21         public T Data
22         {
23             get { return _data; }
24             set { _data = value; }
25         }
26 
27         public BaseTask(long cycle, Action<T> action, T data,long id)
28         {
29             Cycle = cycle;
30             TaskAction = action;
31             Data = data;
32             Id = id;
33         }
34 
35         public BaseTask(long cycle, Action<T> action,T data)
36         {
37             Cycle = cycle;
38             TaskAction = action;
39             Data = data;
40         }
41 
42         public BaseTask(long cycle, Action<T> action)
43         {
44             Cycle = cycle;
45             TaskAction = action;
46         }
47         
48         public void SubCycleNumber()
49         {
50             Interlocked.Decrement(ref _cycle);
51         }
52     }
BaseTask
  • Logic,这层主要实现调用逻辑,调用者最终只需要关心把任务放进队列并指定什么时候执行就行了,根本不需要关心其它的任何信息。
 1  public static void Start()
 2         {
 3             //1.Initialize queues of different granularity.
 4             IRingQueue<NewsModel> minuteRingQueue = new MinuteQueue<NewsModel>();
 5 
 6             //2.Open thread.
 7             var lstTasks = new List<Task>
 8             {
 9                 Task.Factory.StartNew(minuteRingQueue.Start)
10             };
11 
12             //3.Add tasks performed in different periods.
13             minuteRingQueue.Add(5, new Action<NewsModel>((NewsModel newsObj) =>
14             {
15                 Console.WriteLine(newsObj.News);
16             }), new NewsModel() { News = "Trump's visit to China!" });
17 
18             minuteRingQueue.Add(10, new Action<NewsModel>((NewsModel newsObj) =>
19             {
20                 Console.WriteLine(newsObj.News);
21             }), new NewsModel() { News = "Putin Pu's visit to China!" });
22 
23             minuteRingQueue.Add(60, new Action<NewsModel>((NewsModel newsObj) =>
24             {
25                 Console.WriteLine(newsObj.News);
26             }), new NewsModel() { News = "Eisenhower's visit to China!" });
27 
28             minuteRingQueue.Add(120, new Action<NewsModel>((NewsModel newsObj) =>
29             {
30                 Console.WriteLine(newsObj.News);
31             }), new NewsModel() { News = "Xi Jinping's visit to the US!" });
32 
33             //3.Waiting for all tasks to complete is usually not completed. Because there is an infinite loop.
34             //F5 Run the program and see the effect.
35             Task.WaitAll(lstTasks.ToArray());
36             Console.Read();
37         }
  • Models,这层就是用来在延迟任务中带入的数据模型类而已了。自己用的时候换成任意自定义类型都可以。
  • 截图

原文地址:https://www.cnblogs.com/justzhuzhu/p/9692857.html