超实用文件监控多线程FTP上传工具

这是自己很久以前写的一个多线程FTP 上传工具,支持多账户,自定义线程数,自定义文件监控目录,可用做文件发布使用,非常实用,今天有小伙伴问起,现分享出来:

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.IO;
 5 using System.Text;
 6 using System.Threading.Tasks;
 7 
 8 namespace NuFTPCmmndv5
 9 {
10     public class TaskFile
11     {
12         public TaskFile()
13         {
14             GUID = Guid.NewGuid();
15         }
16         private long _fileSize { get; set; }
17 
18         public Guid GUID { get; set; }
19         public string HOST { get; set; }
20         public string DIR { get; set; }
21         public string LCD { get; set; }
22         public string Priority { get; set; }
23         public string Filename { get; set; }
24         public long Size
25         {
26             get
27             {
28                 if (File.Exists(this.LCD))
29                 {
30                     _fileSize = new FileInfo(this.LCD).Length;
31                 };
32                 return _fileSize;
33             }
34             set { _fileSize = value; }
35         }
36 
37     }
38 }
View Code
  1 using System;
  2 using System.IO;
  3 using System.Collections.Generic;
  4 using System.ComponentModel;
  5 using System.Data;
  6 using System.Drawing;
  7 using System.Linq;
  8 using System.Text;
  9 using System.Threading;
 10 using System.Threading.Tasks;
 11 using System.Windows.Forms;
 12 
 13 using FluentScheduler;
 14 
 15 namespace NuFTPCmmndv5
 16 {
 17     public partial class FormMain : Form
 18     {
 19 
 20         private Setting setting;
 21         private static Semaphore sema = new Semaphore(1, 1);
 22         private object syncRoot = new object();
 23         public FormMain()
 24         {
 25             InitializeComponent();
 26             setting = new Setting();
 27         }
 28 
 29         private void FormMain_Load(object sender, EventArgs e)
 30         {
 31             setting = setting.loadSetting();
 32 
 33             if (Directory.Exists(setting.FolderToMonitor + "err\"))
 34                 Directory.CreateDirectory(setting.FolderToMonitor + "err\");
 35 
 36             if (Directory.Exists(setting.FolderToMonitor + "pending\"))
 37                 Directory.CreateDirectory(setting.FolderToMonitor + "pending\");
 38 
 39             foreach (var f in new DirectoryInfo(setting.FolderToMonitor + "pending\").GetFiles())
 40             {
 41                 f.CopyTo(setting.FolderToMonitor + "err\" + f.Name, true);
 42                 f.Delete();
 43             }
 44 
 45             SetStatus(setting.FolderToMonitor);
 46 
 47             //开启上传任务
 48             StartRunUploadTask();
 49 
 50             //开始监控任务
 51             StartRunMonitorTask(1);
 52         }
 53 
 54 
 55         /// <summary>
 56         /// 启动监控任务
 57         /// </summary>
 58         /// <param name="timer"></param>
 59         public void StartRunMonitorTask(int timer)
 60         {
 61             JobManager.AddJob(() =>
 62             {
 63                 sema.WaitOne();
 64                 SetToolStripStatusStatus(DateTime.Now + " (" + DataGridFiles.Rows.Count + ")");
 65                 RunTask();
 66                 sema.Release();
 67             },
 68             t =>
 69             {
 70                 t.WithName("StartRunMonitorTask").ToRunNow().AndEvery(timer).Seconds();
 71             });
 72         }
 73 
 74         /// <summary>
 75         /// 运行监控任务
 76         /// </summary>
 77         public void RunTask()
 78         {
 79             //lock (syncRoot)
 80             //{
 81                 #region
 82                 try
 83                 {
 84                     //每5分钟读取出错文件并追加到待上传
 85                     if (DateTime.Now.Second == 0 && DateTime.Now.Minute % 5 == 0)
 86                     {
 87                         foreach (var f in new DirectoryInfo(setting.FolderToMonitor + "err\").GetFiles())
 88                         {
 89                             f.CopyTo(setting.FolderToMonitor + f.Name, true);
 90                             f.Delete();
 91                             LogText("Retrying: " + f.Name);
 92                         }
 93                     }
 94                     //写监控
 95                     WriteMon(DateTime.Now.ToString());
 96 
 97                     //待上传文件大于200 报警
 98                     if (DataGridFiles.Rows.Count > 200)
 99                     {
100                         WriteMon("=ERROR=NuFTPCmmndv4 pending upload: " + DataGridFiles.Rows.Count);
101                     }
102 
103                     //删除超过30天日志
104                     foreach (var f in new DirectoryInfo(AppDomain.CurrentDomain.BaseDirectory + "logs\").GetFiles())
105                     {
106                         if (f.LastWriteTime < DateTime.Now.AddDays(-30))
107                             f.Delete();
108                     }
109                 }
110                 catch (Exception ex)
111                 {
112                 }
113                 #endregion
114 
115                 //读取dat文件
116                 foreach (var f in new DirectoryInfo(setting.FolderToMonitor).GetFiles("*.dat"))
117                 {
118                     try
119                     {
120                         var task = new TaskFile();
121 
122                         #region
123                         //按行读取
124                         int curLine = 1;
125                         using (var fileStream = new FileStream(f.FullName, FileMode.Open, FileAccess.Read))
126                         {
127                             fileStream.Seek(0, SeekOrigin.Begin);
128                             string[] lines = System.IO.File.ReadAllLines(f.FullName);
129                             using (var streamReader = new StreamReader(fileStream, Encoding.Default))
130                             {
131                                 string content = streamReader.ReadLine();
132                                 while (!string.IsNullOrEmpty(content))
133                                 {
134                                     if (content.Substring(0, 5) == "HOST=")
135                                     {
136                                         //FTP
137                                         task.HOST = content.Substring(content.IndexOf("=") + 1);
138                                     }
139                                     else if (content.Substring(0, 4) == "LCD=")
140                                     {
141                                         //本地目录文件
142                                         task.LCD = content.Substring(content.IndexOf("=") + 1);
143                                     }
144                                     else if (content.Substring(0, 4) == "DIR=")
145                                     {
146                                         //远程对应目录文件
147                                         task.DIR = content.Substring(content.IndexOf("=") + 1);
148                                         task.DIR = task.DIR.Replace("\", "/");
149                                     }
150                                     else if (content.Substring(0, 9) == "priority=")
151                                     {
152                                         //优先级
153                                         task.Priority = content.Substring(content.IndexOf("=") + 1);
154                                     }
155 
156                                     content = streamReader.ReadLine();
157                                     curLine++;
158                                 }
159                             }
160                         }
161 
162                         #endregion
163 
164                         //上传文件名
165                         task.Filename = f.Name;
166 
167                         //拷贝到待上传目录
168                         f.CopyTo(setting.FolderToMonitor + "pending\" + f.Name, true);
169                         f.Delete();
170 
171                         //遍历账户配置
172                         var account = setting.FTPAccounts.Select(a => a).Where(a => a.FTPName == task.HOST).FirstOrDefault();
173                         if (account != null)
174                         {
175                             //是否已经存在
176                             if (!account.Contains(task))
177                             {
178                                 //添加到待传队列
179                                 account.AddQueue(task);
180                                 //刷新GridView
181                                 InvokeAddGridView(this.DataGridFiles, task);
182                             }
183                             else
184                             {
185                                 //存在则移除文件
186                                 LogText(task.HOST + " The file already exists in the Queue:" + task.HOST + "/" + task.DIR);
187                             }
188                         }
189                     }
190                     catch (Exception ex)
191                     {
192                         LogText(ex.Message + ";" + ex.StackTrace);
193                     }
194                 }
195             //}
196         }
197         /// <summary>
198         /// 开启上传任务
199         /// </summary>
200         public void StartRunUploadTask()
201         {
202             foreach (var account in setting.FTPAccounts)
203             {
204                 account.setting = setting;
205                 //注册上传完成事件
206                 account.Completed += account_Completed;
207                 //注册上传进度事件
208                 account.ProcessProgress += account_ProcessProgress;
209                 //注册删除上传事件
210                 account.Deleted += account_Deleted;
211                 //终止上传事件
212                 account.Aborted += account_Aborted;
213                 //注册上传日志事件
214                 account.ProcessLog += account_ProcessLog;
215                 //开始上传队列
216                 account.Start();
217             }
218         }
219 
220 
221         private void account_ProcessProgress(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
222         {
223             InvokeUpdateGridView(this.DataGridFiles, arg1, arg2);
224         }
225         private void account_Completed(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
226         {
227             InvokeRemoveGridView(this.DataGridFiles, arg1);
228         }
229         private void account_Aborted(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
230         {
231             foreach (FileInfo f in new DirectoryInfo(setting.FolderToMonitor + "pending\").GetFiles())
232             {
233                 if (arg1.Filename == f.Name)
234                 {
235                     f.CopyTo(setting.FolderToMonitor + "err\" + f.Name, true);
236                     f.Delete();
237                     break;
238                 }
239             }
240         }
241         private void account_Deleted(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
242         {
243             //删除行
244             InvokeRemoveGridView(this.DataGridFiles, arg1);
245             //删除文件
246             try
247             {
248                 System.IO.File.Delete(setting.FolderToMonitor + "err\" + arg1.Filename);
249             }
250             catch (Exception ex)
251             {
252                 LogText(ex.Message + ";" + ex.StackTrace);
253             }
254         }
255         private void account_ProcessLog(string obj)
256         {
257             LogText(obj);
258         }
259 
260 
261 
262         public void InvokeAddGridView(DataGridView dataGridView, TaskFile task)
263         {
264             if (dataGridView.InvokeRequired)
265             {
266                 this.Invoke(new Action(() =>
267                 {
268                     AddGridView(dataGridView, task);
269                 }));
270                 return;
271             }
272             AddGridView(dataGridView, task);
273         }
274         public void AddGridView(DataGridView dataGridView, TaskFile task)
275         {
276             try
277             {
278                 int index = dataGridView.Rows.Add();
279                 dataGridView.Rows[index].Cells[0].Value = task.LCD;
280                 dataGridView.Rows[index].Cells[1].Value = FormatFileSize(task.Size);
281                 dataGridView.Rows[index].Cells[2].Value = "Pending";
282                 dataGridView.Rows[index].Cells[3].Value = task.Filename;
283                 dataGridView.Rows[index].Cells[4].Value = task.GUID.ToString();
284             }
285             catch (Exception ex)
286             {
287                 LogText(ex.Message + ";" + ex.StackTrace);
288             }
289         }
290 
291 
292         public String FormatFileSize(Int64 fileSize)
293         {
294             if (fileSize < 0)
295                 return "0";
296             else if (fileSize >= 1024 * 1024 * 1024)
297                 return string.Format("{0:########0.00} GB", ((Double)fileSize) / (1024 * 1024 * 1024));
298             else if (fileSize >= 1024 * 1024)
299                 return string.Format("{0:####0.00} MB", ((Double)fileSize) / (1024 * 1024));
300             else if (fileSize >= 1024)
301                 return string.Format("{0:####0.00} KB", ((Double)fileSize) / 1024);
302             else
303                 return string.Format("{0} bytes", fileSize);
304         }
305 
306 
307         public void InvokeUpdateGridView(DataGridView dataGridView, TaskFile task, FTPAccount.CompetedEventArgs arg)
308         {
309             if (dataGridView.InvokeRequired)
310             {
311                 this.Invoke(new Action(() =>
312                 {
313                     UpdateGridView(dataGridView, task, arg);
314                 }));
315                 return;
316             }
317             UpdateGridView(dataGridView, task, arg);
318         }
319         public void UpdateGridView(DataGridView dataGridView, TaskFile task, FTPAccount.CompetedEventArgs arg)
320         {
321             try
322             {
323                 foreach (DataGridViewRow r in dataGridView.Rows)
324                 {
325                     if (!r.IsNewRow && (r.Cells["Guid"].Value.ToString() == task.GUID.ToString()))
326                     {
327                         if (arg.uploadStatus == FTPAccount.UploadStatus.Failed)
328                         {
329                             r.Cells["Completed"].Value = "Failed";
330                             r.Cells["Completed"].Style.BackColor = Color.LightPink;
331                         }
332                         else if (arg.uploadStatus == FTPAccount.UploadStatus.Timeout)
333                         {
334                             r.Cells["Completed"].Value = "Timeout";
335                             r.Cells["Completed"].Style.BackColor = Color.LightPink;
336                         }
337                         else if (arg.uploadStatus == FTPAccount.UploadStatus.Cancel)
338                         {
339                             r.Cells["Completed"].Value = "Cancel";
340                             r.Cells["Completed"].Style.BackColor = Color.LightPink;
341                         }
342                         else if (arg.uploadStatus == FTPAccount.UploadStatus.Uploading && arg.CompetedPrecent != 0)
343                         {
344                             r.Cells["Completed"].Value = arg.CompetedPrecent + "%";
345                             r.Cells["Completed"].Style.BackColor = Color.White;
346                         }
347                         else if (arg.uploadStatus == FTPAccount.UploadStatus.Uploading && arg.CompetedPrecent == 0)
348                         {
349                             r.Cells["Completed"].Value = "Uploading";
350                             r.Cells["Completed"].Style.BackColor = Color.White;
351                         }
352                     }
353                 }
354                 dataGridView.Sort(dataGridView.Columns["Completed"], ListSortDirection.Ascending); 
355             }
356             catch (Exception ex)
357             {
358                 LogText(ex.Message + ";" + ex.StackTrace);
359             }
360         }
361 
362 
363         public void InvokeRemoveGridView(DataGridView dataGridView, TaskFile task)
364         {
365             if (dataGridView.InvokeRequired)
366             {
367                 this.Invoke(new Action(() =>
368                 {
369                     RemoveGridView(dataGridView, task);
370                 }));
371                 return;
372             }
373             RemoveGridView(dataGridView, task);
374         }
375         public void RemoveGridView(DataGridView dataGridView, TaskFile task)
376         {
377             try
378             {
379                 foreach (DataGridViewRow r in dataGridView.Rows)
380                 {
381                     if (!r.IsNewRow && (r.Cells["Guid"].Value.ToString() == task.GUID.ToString()))
382                     {
383                         dataGridView.Rows.Remove(r);
384                     }
385                 }
386             }
387             catch (Exception ex)
388             {
389                 LogText(ex.Message + ";" + ex.StackTrace);
390             }
391         }
392 
393 
394         public void WriteMon(string txt)
395         {
396 
397         }
398 
399         public void LogText(string msg)
400         {
401             var dir = AppDomain.CurrentDomain.BaseDirectory + "logs\";
402             if (!Directory.Exists(dir))
403                 Directory.CreateDirectory(dir);
404 
405             using (var w = new StreamWriter(dir + DateTime.Now.ToString("yyyy-MM-dd") + ".log", true))
406             {
407                 w.WriteLine(DateTime.Now.ToString() + ": " + msg);
408                 w.Close();
409             }
410 
411             if (TextBoxLog.InvokeRequired)
412             {
413                 this.Invoke(new Action(() =>
414                 {
415                     TextBoxLog.AppendText(msg + "
");
416                     if (this.TextBoxLog.Lines.Count() > setting.MAX_LOG_LINES)
417                     {
418                         this.TextBoxLog.Text = "";
419                     }
420                     Application.DoEvents();
421                 }));
422                 return;
423             }
424 
425             TextBoxLog.AppendText(msg + "
");
426             if (this.TextBoxLog.Lines.Count() > setting.MAX_LOG_LINES)
427             {
428                 this.TextBoxLog.Text = "";
429             }
430             Application.DoEvents();
431         }
432 
433         public void SetStatus(string msg)
434         {
435             if (this.InvokeRequired)
436             {
437                 this.Invoke(new Action(() =>
438                 {
439                     this.LabelStatus.Text = msg;
440                     Application.DoEvents();
441                 }));
442                 return;
443             }
444             this.LabelStatus.Text = msg;
445             Application.DoEvents();
446         }
447 
448 
449         public void SetToolStripStatusStatus(string msg)
450         {
451             if (this.InvokeRequired)
452             {
453                 this.Invoke(new Action(() =>
454                 {
455                     this.toolStripStatusLabel2.Text = msg;
456                     Application.DoEvents();
457                 }));
458                 return;
459             }
460             this.toolStripStatusLabel2.Text = msg;
461             Application.DoEvents();
462         }
463 
464 
465         public void WriteLog(TextBox textBox, string msg)
466         {
467             if (textBox.InvokeRequired)
468             {
469                 textBox.Invoke(new Action(() =>
470                 {
471                     textBox.AppendText(msg + "
");
472                 }));
473                 return;
474             }
475             textBox.AppendText(msg + "
");
476         }
477 
478         private void button2_Click(object sender, EventArgs e)
479         {
480 
481             if (ButtonPauseResume.Text == "Pause")
482             {
483                 JobManager.Stop();
484                 ButtonPauseResume.Text = "Resume";
485             }
486             else
487             {
488                 JobManager.Start();
489                 ButtonPauseResume.Text = "Pause";
490             }
491         }
492 
493         /// <summary>
494         /// 取消上传
495         /// </summary>
496         /// <param name="sender"></param>
497         /// <param name="e"></param>
498         private void cancelUploadToolStripMenuItem_Click(object sender, EventArgs e)
499         {
500             DataGridViewRow row;
501             var task = new TaskFile();
502             row = DataGridFiles.SelectedRows[0];
503             var fileName = row.Cells["FileName"].Value.ToString();
504 
505             #region
506 
507             if (System.IO.File.Exists(setting.FolderToMonitor + "pending\" + fileName))
508             {
509                 var f = new FileInfo(setting.FolderToMonitor + "pending\" + fileName);
510                 //按行读取
511                 int curLine = 1;
512                 using (var fileStream = new FileStream(f.FullName, FileMode.Open, FileAccess.Read))
513                 {
514                     fileStream.Seek(0, SeekOrigin.Begin);
515                     string[] lines = System.IO.File.ReadAllLines(f.FullName);
516                     using (var streamReader = new StreamReader(fileStream, Encoding.Default))
517                     {
518                         string content = streamReader.ReadLine();
519                         while (!string.IsNullOrEmpty(content))
520                         {
521                             if (content.Substring(0, 5) == "HOST=")
522                             {
523                                 //FTP
524                                 task.HOST = content.Substring(content.IndexOf("=") + 1);
525                             }
526                             else if (content.Substring(0, 4) == "LCD=")
527                             {
528                                 //本地目录文件
529                                 task.LCD = content.Substring(content.IndexOf("=") + 1);
530                             }
531                             else if (content.Substring(0, 4) == "DIR=")
532                             {
533                                 //远程对应目录文件
534                                 task.DIR = content.Substring(content.IndexOf("=") + 1);
535                                 task.DIR = task.DIR.Replace("\", "/");
536                             }
537                             else if (content.Substring(0, 9) == "priority=")
538                             {
539                                 //优先级
540                                 task.Priority = content.Substring(content.IndexOf("=") + 1);
541                             }
542 
543                             content = streamReader.ReadLine();
544                             curLine++;
545                         }
546                     }
547                 }
548 
549                 foreach (var account in setting.FTPAccounts)
550                 {
551                     if (account.FTPName == task.HOST)
552                     {
553                         account.cancelTask = task;
554                         break;
555                     }
556                 }
557 
558                 //--------------Copyto file to err
559                 f.CopyTo(setting.FolderToMonitor + "err\" + f.Name, true);
560                 System.IO.File.Delete(setting.FolderToMonitor + "pending\" + f.Name);
561                 //--------------Copyto file to err
562             }
563 
564             #endregion
565         }
566 
567         /// <summary>
568         /// 删除上传
569         /// </summary>
570         /// <param name="sender"></param>
571         /// <param name="e"></param>
572         private void deleteUploadToolStripMenuItem_Click(object sender, EventArgs e)
573         {
574             DataGridViewRow row;
575             var task = new TaskFile();
576             row = DataGridFiles.SelectedRows[0];
577             var fileName = row.Cells["FileName"].Value.ToString();
578             var fileGuid = row.Cells["Guid"].Value.ToString();
579             #region
580 
581             if (System.IO.File.Exists(setting.FolderToMonitor + "pending\" + fileName))
582             {
583                 var f = new FileInfo(setting.FolderToMonitor + "pending\" + fileName);
584                 //按行读取
585                 int curLine = 1;
586                 using (var fileStream = new FileStream(f.FullName, FileMode.Open, FileAccess.Read))
587                 {
588                     fileStream.Seek(0, SeekOrigin.Begin);
589                     string[] lines = System.IO.File.ReadAllLines(f.FullName);
590                     using (var streamReader = new StreamReader(fileStream, Encoding.Default))
591                     {
592                         string content = streamReader.ReadLine();
593                         while (!string.IsNullOrEmpty(content))
594                         {
595                             if (content.Substring(0, 5) == "HOST=")
596                             {
597                                 //FTP
598                                 task.HOST = content.Substring(content.IndexOf("=") + 1);
599                             }
600                             else if (content.Substring(0, 4) == "LCD=")
601                             {
602                                 //本地目录文件
603                                 task.LCD = content.Substring(content.IndexOf("=") + 1);
604                             }
605                             else if (content.Substring(0, 4) == "DIR=")
606                             {
607                                 //远程对应目录文件
608                                 task.DIR = content.Substring(content.IndexOf("=") + 1);
609                                 task.DIR = task.DIR.Replace("\", "/");
610                             }
611                             else if (content.Substring(0, 9) == "priority=")
612                             {
613                                 //优先级
614                                 task.Priority = content.Substring(content.IndexOf("=") + 1);
615                             }
616 
617                             content = streamReader.ReadLine();
618                             curLine++;
619                         }
620                     }
621                 }
622 
623                 if (!string.IsNullOrEmpty(task.HOST))
624                 {
625                     foreach (var account in setting.FTPAccounts)
626                     {
627                         if (account.FTPName == task.HOST)
628                         {
629                             account.delTask = task;
630                             break;
631                         }
632                     }
633                 }
634                 
635                 LogText("Delete File:" + "pending\" + f.Name);
636                 System.IO.File.Delete(setting.FolderToMonitor + "pending\" + f.Name);
637             }
638 
639             if (!string.IsNullOrEmpty(task.HOST))
640                 RemoveGridView(this.DataGridFiles, task);
641             else
642             {
643                 task.GUID = System.Guid.Parse(fileGuid);
644                 RemoveGridView(this.DataGridFiles, task);
645             }
646 
647             #endregion
648         }
649 
650         private void DataGridFiles_CellMouseDown(object sender, DataGridViewCellMouseEventArgs e)
651         {
652             if (e.Button == MouseButtons.Right)
653             {
654                 if (e.RowIndex >= 0)
655                 {
656                     //若行已是选中状态就不再进行设置
657                     if (DataGridFiles.Rows[e.RowIndex].Selected == false)
658                     {
659                         DataGridFiles.ClearSelection();
660                         DataGridFiles.Rows[e.RowIndex].Selected = true;
661                     }
662                     //只选中一行时设置活动单元格
663                     if (DataGridFiles.SelectedRows.Count == 1)
664                     {
665                         DataGridFiles.CurrentCell = DataGridFiles.Rows[e.RowIndex].Cells[e.ColumnIndex];
666                     }
667                     //弹出操作菜单
668                     contextMenuStrip1.Show(MousePosition.X, MousePosition.Y);
669                 }
670             }
671         }
672 
673         private void FormMain_FormClosing(object sender, FormClosingEventArgs e)
674         {
675             if (JobManager.GetSchedule("StartRunMonitorTask") != null)
676             {
677                 JobManager.Stop();
678                 JobManager.RemoveJob("StartRunMonitorTask");
679             }
680             System.Environment.Exit(0);
681             Application.Exit();
682         }
683 
684         private void button1_Click(object sender, EventArgs e)
685         {
686             var fa = new FormAccounts(setting);
687             fa.Owner = this;
688             fa.ShowDialog();
689         }
690     }
691 }
View Code
  1 using System;
  2 using System.IO;
  3 using System.Net;
  4 using System.Collections.Generic;
  5 using System.Linq;
  6 using System.Text;
  7 using System.Threading.Tasks;
  8 using System.Xml;
  9 using System.Xml.Linq;
 10 using System.Xml.Serialization;
 11 using System.Threading;
 12 using System.Security.Cryptography;
 13 
 14 namespace NuFTPCmmndv5
 15 {
 16 
 17     public class FTPAccount
 18     {
 19         [XmlIgnore]
 20         private ProcessQueue<TaskFile> processQueue;
 21 
 22         [XmlIgnore]
 23         public Setting setting;
 24 
 25         public FTPAccount()
 26         {
 27             //processQueue = new ProcessQueue<TaskFile>(UpLoadFileTest);
 28             processQueue = new ProcessQueue<TaskFile>(UpLoadFile);
 29             cancelTask = new TaskFile();
 30             delTask = new TaskFile();
 31         }
 32 
 33         [XmlElement]
 34         /// <summary>
 35         /// FTP名
 36         /// </summary>
 37         public string FTPName { get; set; }
 38         [XmlElement]
 39         /// <summary>
 40         /// FTP对应IP地址
 41         /// </summary>
 42         public string IP { get; set; }
 43         [XmlElement]
 44         /// <summary>
 45         /// 端口号
 46         /// </summary>
 47         public int Port { get; set; }
 48         [XmlElement]
 49         /// <summary>
 50         /// 账户名
 51         /// </summary>
 52         public string Username { get; set; }
 53         [XmlElement]
 54         /// <summary>
 55         /// 密码
 56         /// </summary>
 57         public string Password { get; set; }
 58         [XmlElement]
 59         public int MaxThreadNum { get; set; }
 60 
 61 
 62         [XmlIgnore]
 63         public TaskFile cancelTask { get; set; }
 64         [XmlIgnore]
 65         public TaskFile delTask { get; set; }
 66 
 67 
 68         /// <summary>
 69         /// 开始处理上传队列
 70         /// </summary>
 71         public void Start()
 72         {
 73             processQueue.Start();
 74         }
 75 
 76         /// <summary>
 77         /// 添加到队列
 78         /// </summary>
 79         /// <param name="task"></param>
 80         public void AddQueue(TaskFile task)
 81         {
 82             processQueue.Enqueue(task);
 83         }
 84 
 85         /// <summary>
 86         /// 是否已经包含在队列
 87         /// </summary>
 88         /// <param name="task"></param>
 89         /// <returns></returns>
 90         public bool Contains(TaskFile task)
 91         {
 92             return processQueue.Contains(task);
 93         }
 94 
 95 
 96         /// <summary>
 97         /// 上传进度
 98         /// </summary>
 99         public event Action<TaskFile, CompetedEventArgs> ProcessProgress;
100         /// <summary>
101         /// 上传完成
102         /// </summary>
103         public event Action<TaskFile, CompetedEventArgs> Completed;
104         /// <summary>
105         /// 终止上传
106         /// </summary>
107         public event Action<TaskFile, CompetedEventArgs> Aborted;
108         /// <summary>
109         /// 删除上传
110         /// </summary>
111         public event Action<TaskFile, CompetedEventArgs> Deleted;
112         /// <summary>
113         /// 上传日志
114         /// </summary>
115         public event Action<string> ProcessLog;
116 
117 
118         private void OnProcessProgress(TaskFile pendingValue, CompetedEventArgs args)
119         {
120             if (ProcessProgress != null)
121             {
122                 try
123                 {
124                     ProcessProgress(pendingValue, args);
125                 }
126                 catch { }
127             }
128         }
129 
130         private void OnCompleted(TaskFile pendingValue, CompetedEventArgs args)
131         {
132             if (Completed != null)
133             {
134                 try
135                 {
136                     Completed(pendingValue, args);
137                 }
138                 catch { }
139             }
140         }
141         private void OnAborted(TaskFile pendingValue, CompetedEventArgs args)
142         {
143             if (Aborted != null)
144             {
145                 try
146                 {
147                     Aborted(pendingValue, args);
148                 }
149                 catch { }
150             }
151         }
152         private void OnDeleted(TaskFile pendingValue, CompetedEventArgs args)
153         {
154             if (Deleted != null)
155             {
156                 try
157                 {
158                     Deleted(pendingValue, args);
159                 }
160                 catch { }
161             }
162         }
163         private void OnProcessLog(string log)
164         {
165             if (ProcessLog != null)
166             {
167                 try
168                 {
169                     ProcessLog(log);
170                 }
171                 catch { }
172             }
173         }
174 
175 
176         public void UpLoadFileTest(TaskFile task)
177         {
178             OnProcessLog("Thread:" + Thread.CurrentThread.ManagedThreadId);
179             OnProcessLog("Uploading: " + FTPName + "://" + IP + "/" + task.DIR);
180             OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Uploading });
181             for (int i = 1; i <= 100; i++)
182             {
183                 OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = i, uploadStatus = UploadStatus.Uploading });
184                 Thread.Sleep(GenerateRandomInteger(100,200));
185             }
186             OnProcessLog("File Uploaded Successfully: " + FTPName + "://" + IP + "/" + task.DIR);
187             OnCompleted(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Completed });
188             //移除文件
189             File.Delete(setting.FolderToMonitor + "pending\" + task.Filename);
190         }
191 
192         public int GenerateRandomInteger(int min = 0, int max = 2147483647)
193         {
194             var randomNumberBuffer = new byte[10];
195             new RNGCryptoServiceProvider().GetBytes(randomNumberBuffer);
196             return new Random(BitConverter.ToInt32(randomNumberBuffer, 0)).Next(min, max);
197         }
198 
199         /// <summary>
200         /// 上传文件
201         /// </summary>
202         /// <param name="task"></param>
203         public void UpLoadFile(TaskFile task)
204         {
205             FileInfo _FileInfo = null;
206             FtpWebRequest _FtpWebRequest;
207             Stream ftpStream = null;
208             FileStream _FileStream = null;
209             int buffLength = 0;
210             byte[] buffer;
211 
212             int contentLen = 0;
213             long uploaded = 0;
214 
215             string _directory = "";
216             string dirStr = "";
217             FtpWebResponse response;
218 
219 
220             var uploadDatetime = DateTime.Now;
221 
222             try
223             {
224                 _FileInfo = new FileInfo(task.LCD);
225                 OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Uploading });
226 
227                 //创建目录
228                 if (task.DIR.IndexOf("/") >= 0)
229                 {
230                     //"/data/test/1.XML"
231                     _directory = task.DIR.Substring(1, task.DIR.LastIndexOf("/") - 1);
232                     OnProcessLog("Creating DIR: " + FTPName + "://" + IP + "/" + _directory);
233                     dirStr = "";
234 
235                     #region
236                     foreach (var dir in _directory.Split(new char[] { '/' }, StringSplitOptions.RemoveEmptyEntries))
237                     {
238                         dirStr += dir + "/";
239                         try
240                         {
241                             _FtpWebRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri("ftp://" + IP + "/" + dirStr));
242                             _FtpWebRequest.Method = WebRequestMethods.Ftp.MakeDirectory;
243                             _FtpWebRequest.UseBinary = true;
244                             _FtpWebRequest.Credentials = new NetworkCredential(Username, Password);
245 
246                             response = (FtpWebResponse)_FtpWebRequest.GetResponse();
247                             ftpStream = response.GetResponseStream();
248                             ftpStream.Close();
249                             response.Close();
250                         }
251                         catch (Exception ex)
252                         {
253                             if (ftpStream != null)
254                             {
255                                 ftpStream.Close();
256                                 ftpStream.Dispose();
257                             }
258                         }
259                     }
260                     #endregion
261                 }
262 
263                 OnProcessLog("Uploading: " + FTPName + "://" + IP + "/" + task.DIR);
264 
265                 _FtpWebRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri("ftp://" + IP + "/" + task.DIR));
266                 _FtpWebRequest.Credentials = new NetworkCredential(Username, Password);
267                 _FtpWebRequest.KeepAlive = false;
268                 _FtpWebRequest.Timeout = 20000;
269                 _FtpWebRequest.Method = WebRequestMethods.Ftp.UploadFile;
270                 _FtpWebRequest.UseBinary = true;
271                 _FtpWebRequest.ContentLength = _FileInfo.Length;
272 
273                 buffLength = 1024;
274                 buffer = new byte[buffLength];
275 
276                 _FileStream = _FileInfo.OpenRead();
277                 ftpStream = _FtpWebRequest.GetRequestStream();
278                 contentLen = _FileStream.Read(buffer, 0, buffLength);
279                 uploaded = contentLen;
280 
281                 var cancel = false;
282                 var delete = false;
283                 var timeOut = false;
284 
285                 while (contentLen > 0)
286                 {
287                     if (cancelTask.Filename == task.Filename)
288                         cancel = true;
289 
290                     if (delTask.Filename == task.Filename)
291                         delete = true;
292 
293                     if (DateTime.Now.Subtract(uploadDatetime).Seconds > 600)
294                         timeOut = true;
295 
296                     if (cancel)
297                     {
298                         OnProcessLog("Thread Cancel: " + FTPName + "://" + IP + "/" + task.DIR);
299                         throw new Exception("Cancel");
300                         cancel = false;
301                     }
302                     else if (delete)
303                     {
304                         OnProcessLog("Thread Delete: " + FTPName + "://" + IP + "/" + task.DIR);
305                         throw new Exception("Delete");
306                         delete = false;
307                     }
308                     else if (timeOut)
309                     {
310                         OnProcessLog("Thread Timeout: " + FTPName + "://" + IP + "/" + task.DIR);
311                         throw new Exception("Timeout");
312                         timeOut = false;
313                     }
314 
315                     ftpStream.Write(buffer, 0, contentLen);
316                     contentLen = _FileStream.Read(buffer, 0, buffLength);
317                     uploaded += contentLen;
318                     //上传进度
319                     OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = (int)(uploaded / _FileInfo.Length) * 100, uploadStatus = UploadStatus.Uploading });
320                 }
321 
322                 ftpStream.Close();
323                 ftpStream.Dispose();
324                 _FileStream.Close();
325                 _FileStream.Dispose();
326 
327                 //上传完成
328                 OnProcessLog("File Uploaded Successfully: " + FTPName + "://" + IP + "/" + task.DIR);
329                 OnCompleted(task, new CompetedEventArgs() { CompetedPrecent = 100, uploadStatus = UploadStatus.Completed });
330 
331                 //移除文件
332                 File.Delete(setting.FolderToMonitor + "pending\" + task.Filename);
333             }
334             catch (Exception ex)
335             {
336                 //上传失败
337                 OnProcessLog("Failed to upload: " + FTPName + "://" + IP + "/" + task.DIR);
338 
339                 try
340                 {
341                     File.Move(setting.FolderToMonitor + "pending\" + task.Filename, setting.FolderToMonitor + "err\" + task.Filename);
342                 }
343                 catch (Exception ex1)
344                 {
345                     OnProcessLog("Moving Files Err:" + task.HOST + ": " + task.DIR + ex1.Message);
346                     try
347                     {
348                         File.Copy(setting.FolderToMonitor + "pending\" + task.Filename, setting.FolderToMonitor + "err\" + task.Filename);
349                         File.Delete(setting.FolderToMonitor + "pending\" + task.Filename);
350                     }
351                     catch (Exception ex2)
352                     {
353 
354                         OnProcessLog("Uploading Err:" + task.HOST + "/" + task.DIR + ex2.Message);
355                         OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Failed });
356                     }
357                 }
358                 finally
359                 {
360                     OnProcessLog("Uploading Err:" + task.HOST + "/" + task.DIR + ex.Message);
361 
362                     if (ex.Message == "TimeOut")
363                         OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Timeout });
364                     else if (ex.Message == "Cancel")
365                         OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Cancel });
366                     else if (ex.Message == "Delete")
367                         OnDeleted(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Delete });
368                     else
369                         OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Failed });
370                 }
371 
372             }
373             finally
374             {
375                 if (ftpStream != null)
376                 {
377                     ftpStream.Close();
378                     ftpStream.Dispose();
379                 }
380                 if (_FileStream != null)
381                 {
382                     _FileStream.Close();
383                     _FileStream.Dispose();
384                 }
385             }
386         }
387 
388 
389         /// <summary>
390         /// 完成事件数据
391         /// </summary>
392         public class CompetedEventArgs : EventArgs
393         {
394             public CompetedEventArgs()
395             {
396             }
397             public UploadStatus uploadStatus { get; set; }
398             /// <summary>
399             /// 完成百分率
400             /// </summary>
401             public int CompetedPrecent { get; set; }
402             /// <summary>
403             /// 异常信息
404             /// </summary>
405             public Exception InnerException { get; set; }
406         }
407 
408         public enum DoWorkResult
409         {
410             /// <summary>
411             /// 继续运行,默认
412             /// </summary>
413             ContinueThread = 0,
414             /// <summary>
415             /// 终止当前线程
416             /// </summary>
417             AbortCurrentThread = 1,
418             /// <summary>
419             /// 终止全部线程
420             /// </summary>
421             AbortAllThread = 2
422         }
423 
424         public enum UploadStatus
425         {
426             Completed = 0,
427             Failed = 1,
428             Timeout = 3,
429             Cancel = 4,
430             Uploading = 5,
431             Delete = 6,
432             Abort = 7,
433         }
434     }
435 }
View Code
  1 using System;
  2 using System.Collections.Generic;
  3 using System.Linq;
  4 using System.Text;
  5 using System.Collections;
  6 using System.Threading;
  7 
  8 namespace NuFTPCmmndv5
  9 {
 10     /// <summary>
 11     /// 表示一个线程(同步)安全的通用泛型处理队列
 12     /// </summary>
 13     /// <typeparam name="T">ProcessQueue中包含的数据类型</typeparam>
 14     public class ProcessQueue<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
 15     {
 16         #region Instance Variables
 17 
 18         private object syncRoot = new object();
 19         private Queue<T> queue;
 20         private List<WorkerThread> threads = new List<WorkerThread>();
 21         private Action<T> operation;
 22         bool isDisposed;
 23         bool isRunning;
 24 
 25         private int _maxThreadCount;
 26 
 27         public int MaxThreadCount
 28         {
 29             get { return _maxThreadCount == 0 ? 5 : _maxThreadCount; }
 30             set { value = _maxThreadCount; }
 31         }
 32 
 33         #endregion
 34 
 35         #region Constructors
 36         /// <summary>
 37         /// 初始一个新的ProcessQueue 并指定具有特定容量的工作线程
 38         /// </summary>
 39         public ProcessQueue(Action<T> action) : this(action, 5) { }
 40         /// <summary>
 41         /// 初始一个新的ProcessQueue
 42         /// </summary>
 43         public ProcessQueue(int capacity, Action<T> action) : this(capacity, action, 5) { }
 44         /// <summary>
 45         /// 初始一个新的ProcessQueue
 46         /// </summary>
 47         public ProcessQueue(IEnumerable<T> collection, Action<T> action) : this(collection, action, 5) { }
 48 
 49         /// <summary>
 50         /// 初始一个新的ProcessQueue
 51         /// </summary>
 52         public ProcessQueue(Action<T> action, int threadCount)
 53         {
 54             queue = new Queue<T>();
 55             operation = action;
 56 
 57             SetThreadCount(MaxThreadCount);
 58         }
 59 
 60         /// <summary>
 61         /// 初始一个新的ProcessQueue
 62         /// </summary>
 63         /// <param name="capacity">初始容量</param>
 64         public ProcessQueue(int capacity, Action<T> action, int threadCount)
 65         {
 66             queue = new Queue<T>(capacity);
 67             operation = action;
 68 
 69             SetThreadCount(MaxThreadCount);
 70         }
 71 
 72         /// <summary>
 73         /// 初始一个新的ProcessQueue
 74         /// </summary>
 75         /// <param name="collection">将数据复制到ProcessQueue.</param>
 76         public ProcessQueue(IEnumerable<T> collection, Action<T> action, int threadCount)
 77         {
 78             queue = new Queue<T>(collection);
 79             operation = action;
 80 
 81             SetThreadCount(MaxThreadCount);
 82         }
 83         #endregion
 84 
 85         #region Processing Control
 86 
 87         /// <summary>
 88         /// 停止 (挂起) 
 89         /// </summary>
 90         public void Stop()
 91         {
 92             lock (syncRoot)
 93             {
 94                 foreach (WorkerThread thread in threads)
 95                 {
 96                     thread.Pause();
 97                 }
 98 
 99                 isRunning = false;
100             }
101         }
102 
103         /// <summary>
104         /// 开始运行
105         /// </summary>
106         public void Start()
107         {
108             lock (syncRoot)
109             {
110                 //清空队列集合重新创建新的线程
111                 RegenerateIfDisposed();
112                 //如果新进的项目少于当前线程集合总的线程数,则创建当前新进的项目数
113                 //如果新进 的项目多余当前线程集合的线程数,则创建同样多的数程集合的线程数
114                 for (int i = 0; i < Math.Min(threads.Count, queue.Count); i++)
115                 {
116                     //设置信号让其运行
117                     threads[i].Signal();
118                 }
119                 isRunning = true;
120             }
121         }
122 
123         /// <summary>
124         /// 获取此ProcessQueue使用的工作线程数。 使用SetThreadCount更改此值。
125         /// </summary>
126         public int ThreadCount { get { return threads.Count; } }
127 
128         /// <summary>
129         /// 设置此ProcessQueue使用的工作线程数,并根据需要分配或释放线程。
130         /// </summary>
131         /// <param name="threadCount">线程数</param>
132         public void SetThreadCount(int threadCount)
133         {
134             //至少要有一个线程
135             if (threadCount < 1) throw new ArgumentOutOfRangeException("threadCount", "The ProcessQueue class requires at least one worker thread.");
136             //同步线程
137             lock (syncRoot)
138             {
139                 // 等待队列
140                 int pending = queue.Count;
141                 // 创建一个指定最大工作线程数的线程集合,每个线程用来处理排队的项目
142                 for (int i = threads.Count; i < threadCount; i++) 
143                 {
144                     //注意:在实例化工作线程WorkerThread 时,已经创建了一个ThreadProc 无限循环方法,改方法检测 signalEvent, abortEvent 信号
145                     //在收到 abortEvent 时终止并退出,收到 Signal时 循环调用 ProcessItems() 用来处理队列里排队的项目
146                     WorkerThread thread = new ProcessQueue<T>.WorkerThread(this);
147                     //添加到列表
148                     threads.Add(thread);
149                     //线程启动
150                     thread.Start();
151                     //如果队列总有待排项目时
152                     if (pending > 1)
153                     {
154                         //设置信号,让当前工作线程运行(不等待)
155                         thread.Signal();
156                     }
157                     //待排队数减一
158                     pending--;
159                 }
160 
161                 //如果其它线程调用了SetThreadCount,或者多次调用了 SetThreadCount,从而导致当前实际的线程集合有可能远远大于最大线程数
162                 //在这种情况下,需要移除多余的线程,从而保证当前threadCount有效
163                 //移除的线程数 = 当前创建的工作线程集合总数 - 设置的最大线程数
164                 int toRemove = threads.Count - threadCount;
165                 if (toRemove > 0)
166                 {
167                     //IsSignaled 如果当前实例收到信号,则为 true;否则为 false
168                     //从线程集合里取出正在等待的线程
169                     foreach (WorkerThread thread in threads.Where(t => !t.IsSignaled).ToList())
170                     {
171                         //设置信号使得该线程终止
172                         thread.Abort();
173                         //从集合中移除改项
174                         threads.Remove(thread);
175                         //移除数减一
176                         toRemove--;
177                     }
178                     //如果待移除的线程正在运行中
179                     //则强制移除该线程直到移除完为止
180                     while (toRemove > 0)
181                     {
182                         WorkerThread thread = threads[threads.Count - 1];
183                         thread.Abort();
184                         threads.Remove(thread);
185                         toRemove--;
186                     }
187                 }
188             }
189         }
190 
191         /// <summary>
192         /// 处理队列项
193         /// </summary>
194         /// <param name="item"></param>
195         private void ProcessItem(T item)
196         {
197             operation(item);
198         }
199 
200         /// <summary>
201         /// 释放时重置线程
202         /// </summary>
203         private void RegenerateIfDisposed()
204         {
205             if (isDisposed)
206             {
207                 int threadCount = threads.Count;
208 
209                 threads.Clear();
210 
211                 SetThreadCount(threadCount);
212             }
213 
214             isDisposed = false;
215         }
216         #endregion
217 
218         /// <summary>
219         /// 从ProcessQueue清除所有未处理的项目
220         /// </summary>
221         public void Clear()
222         {
223             lock (syncRoot)
224             {
225                 queue.Clear();
226             }
227         }
228 
229         /// <summary>
230         /// 尝试从ProcessQueue中检索获取下一个项目(如果存在)。 如果不存在,则将值设置为其默认值。
231         /// </summary>
232         /// <param name="value">如果不存在,则该变量将被设置为默认值.</param>
233         /// <returns>如果ProcessQueue包含一个项,则为真,如果没有则为False</returns>
234         public bool TryDequeue(out T value)
235         {
236             lock (syncRoot)
237             {
238                 if (queue.Count > 0)
239                 {
240                     value = queue.Dequeue();
241 
242                     return true;
243                 }
244                 else
245                 {
246                     value = default(T);
247 
248                     return false;
249                 }
250             }
251         }
252 
253         /// <summary>
254         /// 确定队列是否包含指定项
255         /// </summary>
256         /// <param name="item">当前项</param>
257         /// <returns>存在则 True, 不存在则 False</returns>
258         public bool Contains(T item)
259         {
260             lock (syncRoot)
261             {
262                 return queue.Contains(item);
263             }
264         }
265 
266         /// <summary>
267         /// 将ProcessQueue的内容复制到外部数组,而不影响ProcessQueue的内容
268         /// </summary>
269         /// <param name="array">The array to copy the items into</param>
270         /// <param name="arrayIndex">The starting index in the array</param>
271         public void CopyTo(T[] array, int arrayIndex)
272         {
273             lock (syncRoot)
274             {
275                 queue.CopyTo(array, arrayIndex);
276             }
277         }
278 
279         /// <summary>
280         /// 从ProcessQueue中检索下一个未处理的项目并将其删除
281         /// </summary>
282         /// <returns>The next unprocessed item in the ProcessQueue</returns>
283         public T Dequeue()
284         {
285             lock (syncRoot)
286             {
287                 return queue.Dequeue();
288             }
289         }
290 
291         /// <summary>
292         /// 将一个项目添加到处理队列的末尾
293         /// </summary>
294         /// <param name="item">添加项</param>
295         public void Enqueue(T item)
296         {
297             lock (syncRoot)
298             {
299                 //新进队列项
300                 queue.Enqueue(item);
301                 //当前处理队列正在运行时
302                 if (isRunning)
303                 {
304                     //清空队列集合重新创建新的线程
305                     RegenerateIfDisposed();
306                     //取出一个等待的线程
307                     WorkerThread firstThread = threads.Where(t => !t.IsSignaled).FirstOrDefault();
308                     //存在则运行它
309                     if (firstThread != null) firstThread.Signal();
310                 }
311             }
312         }
313 
314         /// <summary>
315         /// 从ProcessQueue中检索下一个未处理的项目,而不删除它
316         /// </summary>
317         /// <returns>The next unprocessed item in the ProcessQueue</returns>
318         public T Peek()
319         {
320             lock (syncRoot)
321             {
322                 return queue.Peek();
323             }
324         }
325 
326         /// <summary>
327         /// 返回一个包含ProcessQueue中所有未处理项目的数组
328         /// </summary>
329         /// <returns></returns>
330         public T[] ToArray()
331         {
332             lock (syncRoot)
333             {
334                 return queue.ToArray();
335             }
336         }
337 
338         /// <summary>
339         /// 将ProcessQueue的容量设置为其包含的项目的实际数量,除非该数量超过当前容量的90%。
340         /// </summary>
341         public void TrimExcess()
342         {
343             lock (syncRoot)
344             {
345                 queue.TrimExcess();
346             }
347         }
348 
349         #region IEnumerable<T> Members
350 
351         public IEnumerator<T> GetEnumerator()
352         {
353             return queue.GetEnumerator();
354         }
355 
356         #endregion
357 
358         #region IEnumerable Members
359 
360         IEnumerator IEnumerable.GetEnumerator()
361         {
362             return queue.GetEnumerator();
363         }
364 
365         #endregion
366 
367         #region ICollection Members
368 
369         void ICollection.CopyTo(Array array, int index)
370         {
371             lock (syncRoot)
372             {
373                 ((ICollection)queue).CopyTo(array, index);
374             }
375         }
376 
377         public int Count
378         {
379             get
380             {
381                 lock (syncRoot)
382                 {
383                     return queue.Count;
384                 }
385             }
386         }
387 
388         bool ICollection.IsSynchronized
389         {
390             get { return true; }
391         }
392 
393         object ICollection.SyncRoot
394         {
395             get { return syncRoot; }
396         }
397 
398         #endregion
399 
400         #region IDisposable Members
401 
402         public void Dispose()
403         {
404             Dispose(true);
405         }
406 
407         private void Dispose(bool disposing)
408         {
409             if (disposing)
410             {
411                 foreach (WorkerThread thread in threads) thread.Abort();
412             }
413 
414             isDisposed = true;
415         }
416 
417         #endregion
418 
419         /// <summary>
420         /// 封装.NET Thread对象并管理与控制其行为相关联的WaitHandles
421         /// </summary>
422         private class WorkerThread
423         {
424             private ManualResetEvent abortEvent;
425             private ManualResetEvent signalEvent;
426             private ProcessQueue<T> queue;
427 
428             private Thread thread;
429 
430             public WorkerThread(ProcessQueue<T> queue)
431             {
432                 abortEvent = new ManualResetEvent(false);
433                 signalEvent = new ManualResetEvent(false);
434                 this.queue = queue;
435 
436                 thread = new Thread(ThreadProc);
437                 thread.Name = "ProcessQueue Worker ID " + thread.ManagedThreadId;
438             }
439 
440             /// <summary>
441             /// 运行当前线程
442             /// </summary>
443             public void Start()
444             {
445                 thread.Start();
446             }
447 
448             /// <summary>
449             /// 终止当前线程
450             /// </summary>
451             public void Abort()
452             {
453                 abortEvent.Set();
454 
455                 thread.Join();
456             }
457 
458             /// <summary>
459             /// 清除信号WaitHandle,导致线程完成当前的迭代后暂停
460             /// </summary>
461             public void Pause()
462             {
463                 signalEvent.Reset();
464             }
465 
466             /// <summary>
467             /// 设置信号WaitHandle,等待线程使其恢复运行(如果暂停)
468             /// </summary>
469             public void Signal()
470             {
471                 signalEvent.Set();
472             }
473 
474             public bool IsSignaled
475             {
476                 get { return signalEvent.WaitOne(0); }
477             }
478 
479             /// <summary>
480             /// ThreadProc 总线程方法由一个无限循环组成,在发出中止事件时退出
481             /// </summary>
482             private void ThreadProc()
483             {
484                 WaitHandle[] handles = new WaitHandle[] { signalEvent, abortEvent };
485 
486                 while (true)
487                 {
488                     //等待指定数组中的任一元素收到信号
489                     switch (WaitHandle.WaitAny(handles))
490                     {
491                         case 0: // signal
492                             {
493                                 ProcessItems();
494                             }
495                             break;
496                         case 1: // abort
497                             {
498                                 return;
499                             }
500                     }
501                 }
502             }
503 
504             /// <summary>
505             /// 处理项目
506             /// </summary>
507             private void ProcessItems()
508             {
509                 T item;
510                 //从队列中取出一项,这是一个同步的过程
511                 while (queue.TryDequeue(out item))
512                 {
513                     //处理队列项
514                     queue.ProcessItem(item);
515                     //如果当前实例收到信号,则为 true;否则为 false。
516                     //等待当前队列完成 在 调用 signalEvent.Set() 或者 abortEvent.Set() 时 
517                     if (!signalEvent.WaitOne(0) || abortEvent.WaitOne(0)) return;
518                 }
519                 //线程状态设置为非终止状态
520                 signalEvent.Reset();
521             }
522         }
523     }
524 }
View Code
原文地址:https://www.cnblogs.com/mschen/p/11602178.html