C# Socket 多线程可断点传送大文件3

更新使用.net 4.0 Parallel 来代替 new Thread();

/*********************************************************************************
** File Name    :    FileTransmitor.cs
** Copyright (C) 2010 Snda Network Corporation. All Rights Reserved.
** Creator        :    RockyWong
** Create Date    :    2010-06-02 11:22:45
** Update Date    :    2013-01-11 11:35:26
** Description    :    多线程多管道可断点传输大文件
** Version No    :    
*********************************************************************************/
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Rocky.Net
{
    public sealed class FileTransfer : Disposable
    {
        #region Fields
        internal const int PerLongSize = sizeof(long);
        internal const string PointExtension = ".dat";
        internal const string TempExtension = ".temp";

        private string _savePath;
        private Socket _listener;
        #endregion

        #region Properties
        public event EventHandler<TransferEventArgs> Prepare;
        public event EventHandler<TransferEventArgs> ProgressChanged;
        public event EventHandler<TransferEventArgs> Completed;

        public string DirectoryPath
        {
            get { return _savePath; }
            set
            {
                _savePath = value + @"\" + DateTime.Now.ToString("yyyy-MM") + @"\";
            }
        }
        #endregion

        #region Constructors
        public FileTransfer()
        {

        }

        protected override void DisposeInternal(bool disposing)
        {
            if (disposing)
            {
                SocketHelper.DisposeListener(ref _listener);
            }
            _listener = null;
            Prepare = null;
            ProgressChanged = null;
            Completed = null;
        }
        #endregion

        #region Methods
        private void OnPrepare(TransferEventArgs e)
        {
            if (this.Prepare != null)
            {
                this.Prepare(this, e);
            }
        }

        private void OnProgressChanged(TransferEventArgs e)
        {
            if (this.ProgressChanged != null)
            {
                this.ProgressChanged(this, e);
            }
        }

        private void OnCompleted(TransferEventArgs e)
        {
            if (this.Completed != null)
            {
                this.Completed(this, e);
            }
        }
        #endregion

        #region Receive
        /// <summary>
        /// Listen & Receive
        /// </summary>
        /// <param name="savePath"></param>
        /// <param name="port"></param>
        public void Listen(string savePath, ushort port)
        {
            Contract.Requires(!string.IsNullOrEmpty(savePath));
            if (_listener != null)
            {
                throw new ApplicationException("已启动监听");
            }

            Runtime.CreateDirectory(_savePath = savePath);
            var localIpe = new IPEndPoint(IPAddress.Any, port);
            //最多支持16线程
            _listener = SocketHelper.CreateListener(localIpe, 16);
            TaskHelper.Factory.StartNew(() =>
            {
                while (_listener != null)
                {
                    Socket controlClient = _listener.Accept();
                    Runtime.LogInfo("TunnelTest 双工通讯: {0}.", controlClient.RemoteEndPoint);

                    TransferConfig config;
                    controlClient.Receive(out config);
                    var e = new TransferEventArgs(config);
                    this.OnPrepare(e);
                    if (e.Cancel)
                    {
                        controlClient.Close();
                        continue;
                    }

                    var chunkGroup = new ReceiveChunkModel[config.ThreadCount];
                    chunkGroup[0] = new ReceiveChunkModel(controlClient);
                    for (int i = 1; i < chunkGroup.Length; i++)
                    {
                        chunkGroup[i] = new ReceiveChunkModel(_listener.Accept());
                    }
                    TaskHelper.Factory.StartNew(Receive, new object[] { e, chunkGroup });
                }
            });
        }

        private void Receive(object state)
        {
            var args = (object[])state;
            var e = (TransferEventArgs)args[0];
            var chunkGroup = (ReceiveChunkModel[])args[1];
            var controlClient = chunkGroup[0].Client;

            e.Progress = new TransferProgress();
            e.Progress.Start(e.Config.FileLength);
            #region Breakpoint
            int perPairCount = PerLongSize * 2, count = perPairCount * chunkGroup.Length;
            byte[] bufferInfo = new byte[count];
            string filePath = Path.Combine(_savePath, e.Config.Checksum + Path.GetExtension(e.Config.FileName)),
                pointFilePath = Path.ChangeExtension(filePath, PointExtension), tempFilePath = Path.ChangeExtension(filePath, TempExtension);
            FileStream pointStream;
            long oddSize, avgSize = Math.DivRem(e.Config.FileLength, (long)chunkGroup.Length, out oddSize);
            if (File.Exists(pointFilePath) && File.Exists(tempFilePath))
            {
                pointStream = new FileStream(pointFilePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None);
                pointStream.Read(bufferInfo, 0, count);
                long fValue, tValue;
                for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
                {
                    fValue = BitConverter.ToInt64(bufferInfo, i * perPairCount);
                    tValue = BitConverter.ToInt64(bufferInfo, i * perPairCount + PerLongSize);
                    chunkGroup[i].Initialize(tempFilePath, e.Config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue);
                    Runtime.LogDebug("[Multi]Local{0} breakpoint read{1}:{2}/{3}.", _listener.LocalEndPoint, i, fValue, tValue);
                }
                controlClient.Send(bufferInfo);
            }
            else
            {
                pointStream = new FileStream(pointFilePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None);
                FileStream stream = new FileStream(tempFilePath, FileMode.CreateNew, FileAccess.Write, FileShare.Write);
                stream.SetLength(e.Config.FileLength);
                stream.Flush();
                stream.Dispose();
                for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
                {
                    chunkGroup[i].Initialize(tempFilePath, e.Config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize);
                }
                controlClient.Send(bufferInfo, 0, 4, SocketFlags.None);
            }
            var timer = new Timer(arg =>
            {
                long fValue, tValue;
                for (int i = 0; i < chunkGroup.Length; i++)
                {
                    chunkGroup[i].ReportProgress(out fValue, out tValue);
                    Buffer.BlockCopy(BitConverter.GetBytes(fValue), 0, bufferInfo, i * perPairCount, 8);
                    Buffer.BlockCopy(BitConverter.GetBytes(tValue), 0, bufferInfo, i * perPairCount + PerLongSize, 8);
                    Runtime.LogDebug("[Multi]Local{0} breakpoint write{1}:{2}/{3}.", _listener.LocalEndPoint, i, fValue, tValue);
                }
                pointStream.Position = 0L;
                pointStream.Write(bufferInfo, 0, count);
                pointStream.Flush();
            }, null, TimeSpan.Zero, TimeSpan.FromSeconds(4));
            #endregion
            Parallel.ForEach(chunkGroup, chunk => chunk.Run());
            long bytesTransferred = 0L;
            do
            {
                chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
                this.OnProgressChanged(e);
                Thread.Sleep(1000);
            }
            while (!chunkGroup.IsAllCompleted());
            chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
            this.OnProgressChanged(e);
            timer.Dispose();
            pointStream.Dispose();
            File.Delete(pointFilePath);
            File.Move(tempFilePath, filePath);

            e.Progress.Stop();
            this.OnCompleted(e);
        }
        #endregion

        #region Send
        public void Send(TransferConfig config, IPEndPoint remoteIpe)
        {
            Contract.Requires(config != null && remoteIpe != null);

            var controlChunk = new SendChunkModel(remoteIpe);
            controlChunk.Client.Send(config);
            var e = new TransferEventArgs(config);
            this.OnPrepare(e);
            if (e.Cancel || !controlChunk.Client.Connected)
            {
                controlChunk.Client.Close();
                return;
            }

            var chunkGroup = new SendChunkModel[config.ThreadCount];
            chunkGroup[0] = controlChunk;
            for (int i = 1; i < chunkGroup.Length; i++)
            {
                chunkGroup[i] = new SendChunkModel(remoteIpe);
            }

            e.Progress = new TransferProgress();
            e.Progress.Start(config.FileLength);
            #region Breakpoint
            int perPairCount = PerLongSize * 2, count = perPairCount * chunkGroup.Length;
            byte[] bufferInfo = new byte[count];
            long oddSize, avgSize = Math.DivRem(config.FileLength, (long)chunkGroup.Length, out oddSize);
            if (controlChunk.Client.Receive(bufferInfo) == 4)
            {
                for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
                {
                    chunkGroup[i].Initialize(config.FilePath, config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize);
                }
            }
            else
            {
                long fValue, tValue;
                for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
                {
                    fValue = BitConverter.ToInt64(bufferInfo, i * perPairCount);
                    tValue = BitConverter.ToInt64(bufferInfo, i * perPairCount + PerLongSize);
                    chunkGroup[i].Initialize(config.FilePath, config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue);
                    Runtime.LogDebug("[Multi]Remote{0} breakpoint{1}:{2}/{3}.", remoteIpe, i, fValue, tValue);
                }
            }
            Thread.Sleep(200);
            #endregion
            Parallel.ForEach(chunkGroup, chunk => chunk.Run());
            long bytesTransferred = 0L;
            do
            {
                chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
                this.OnProgressChanged(e);
                Thread.Sleep(1000);
            }
            while (!chunkGroup.IsAllCompleted());
            chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
            this.OnProgressChanged(e);

            e.Progress.Stop();
            this.OnCompleted(e);
        }
        #endregion
    }
}

另外推荐一个网络小工具:

内测安装地址:http://publish.xineworld.com/cloudagent/publish.htm

上篇文章:http://www.cnblogs.com/Googler/archive/2013/01/11/2856219.html

原文地址:https://www.cnblogs.com/Googler/p/3095286.html