C# Socket IPackets

using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Text;
using System.IO;

namespace Rocky.Net
{
    [ContractClass(typeof(IPacketsContract))]
    public interface IPackets
    {
        long ContentLength { get; }
        int? BufferSize { get; set; }
        bool Buffer { get; set; }
        Stream InputStream { get; }
    }
    [ContractClassFor(typeof(IPackets))]
    internal abstract class IPacketsContract : IPackets
    {
        long IPackets.ContentLength
        {
            get
            {
                Contract.Ensures(Contract.Result<long>() >= SocketHelper.Special);
                return default(long);
            }
        }

        int? IPackets.BufferSize
        {
            get
            {
                return default(int?);
            }
            set
            {

            }
        }

        bool IPackets.Buffer
        {
            get
            {
                return default(bool);
            }
            set
            {

            }
        }

        Stream IPackets.InputStream
        {
            get
            {
                Contract.Ensures(Contract.Result<Stream>() != null);
                return default(Stream);
            }
        }
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;

namespace Rocky.Net
{
    public interface IPacketsFactory
    {
        IPackets Create(object graph);
        IPackets Create(Stream inputStream, long contentLength);
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics.Contracts;
using System.IO;

namespace Rocky.Net
{
    internal class Packets : IPackets
    {
        #region Fields
        private Stream _stream;
        #endregion

        #region Properties
        public long ContentLength { get; private set; }
        public int? BufferSize { get; set; }
        public bool Buffer { get; set; }
        public Stream InputStream
        {
            get { return _stream; }
        }
        #endregion

        #region Constructors
        public Packets(byte[] binary)
            : this(new MemoryStream(binary, 0, binary.Length, true, false), binary.Length)
        {

        }
        public Packets(Stream inputStream, long contentLength)
        {
            Contract.Requires(inputStream != null);

            _stream = inputStream;
            this.ContentLength = contentLength;
        }
        #endregion
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;

namespace Rocky.Net
{
    internal sealed class PacketsFactory : IPacketsFactory
    {
        public IPackets Create(object graph)
        {
            byte[] binary = Serializer.Serialize(graph);
            return new Packets(binary);
        }

        public IPackets Create(Stream inputStream, long contentLength)
        {
            return new Packets(inputStream, contentLength);
        }
    }
}
#region Packets
        /// <summary>
        /// 同步发送数据包
        /// </summary>
        /// <param name="instance"></param>
        /// <param name="pack">数据包</param>
        /// <param name="progress">进度器</param>
        /// <returns>
        /// -1  远程连接关闭
        /// >=0 已发送的字节数
        /// </returns>
        public static long SendPackets(this Socket instance, IPackets pack, bool acceptLength = true, TransferProgress progress = null)
        {
            Contract.Requires(instance != null);
            Contract.Requires(pack != null);

            var netStream = new NetworkStream(instance, FileAccess.Write, false);
            Stream src = netStream, dest = pack.InputStream;
            if (pack.Buffer)
            {
                src = new BufferedStream(netStream, pack.BufferSize.GetValueOrDefault(4096));
            }
            long totalTransferred = 0L, contentLength = pack.ContentLength;
            if (contentLength == SocketHelper.Special)
            {
                try
                {
                    contentLength = pack.InputStream.Length;
                }
                catch (NotSupportedException)
                {

                }
            }
            ArraySegment<byte> seg;
            BufferSegment.MemoryBuffer.Take(out seg);
            bool doProg = false,
                unknownLength = contentLength == SocketHelper.Special;
            try
            {
                if (acceptLength)
                {
                    instance.Send(BitConverter.GetBytes(contentLength));
                }
                if (progress != null)
                {
                    doProg = true;
                    progress.Start(contentLength);
                }

                int read;
                while ((unknownLength || totalTransferred < contentLength) && (read = dest.Read(seg)) != 0)
                {
                    src.Write(seg, read);
                    src.Flush();
                    totalTransferred += read;
                    if (doProg)
                    {
                        progress.OnProgressChanged(read, totalTransferred);
                    }
#if Sleep
                    Thread.Sleep(10);
#endif
                }
                if (!unknownLength && totalTransferred != contentLength)
                {
                    throw new InvalidOperationException("输入流长度和数据包长度必须相同");
                }
            }
            catch (IOException ex)
            {
                var sockEx = ex.InnerException as SocketException;
                if (sockEx != null && sockEx.ErrorCode == 10054)
                {
                    return SocketHelper.Special;
                }
                throw;
            }
            finally
            {
                if (doProg)
                {
                    progress.Stop();
                }
                BufferSegment.MemoryBuffer.Return(ref seg);
                src.Dispose();
            }
            return totalTransferred;
        }

        /// <summary>
        /// 同步接收数据包
        /// </summary>
        /// <param name="instance"></param>
        /// <param name="pack">数据包</param>
        /// <param name="progress">进度器</param>
        /// <returns>
        /// -1  远程连接关闭
        /// >=0 已接受的字节数
        /// </returns>
        public static long ReceivePackets(this Socket instance, IPackets pack, bool acceptLength = true, TransferProgress progress = null)
        {
            Contract.Requires(instance != null);
            Contract.Requires(pack != null);

            var netStream = new NetworkStream(instance, FileAccess.Read, false);
            Stream src = netStream, dest = pack.InputStream;
            if (pack.Buffer)
            {
                src = new BufferedStream(netStream, pack.BufferSize.GetValueOrDefault(4096));
            }
            long totalTransferred = 0L, contentLength = pack.ContentLength;
            ArraySegment<byte> seg;
            BufferSegment.MemoryBuffer.Take(out seg);
            bool doProg = false,
                unknownLength = contentLength == SocketHelper.Special;
            try
            {
                if (acceptLength)
                {
                    byte[] buffer = new byte[SocketHelper.PerLongSize];
                    instance.Receive(buffer);
                    contentLength = buffer.ToInt64(0);
                }
                if (progress != null)
                {
                    doProg = true;
                    progress.Start(contentLength);
                }

                int read;
                while ((unknownLength || totalTransferred < contentLength) && (read = src.Read(seg)) != 0)
                {
                    dest.Write(seg, read);
                    dest.Flush();
                    totalTransferred += read;
                    if (doProg)
                    {
                        progress.OnProgressChanged(read, totalTransferred);
                    }
#if Sleep
                    Thread.Sleep(10);
#endif
                }
                // recv == 0, 远程主动关闭连接
                if (totalTransferred < contentLength)
                {
                    return SocketHelper.Special;
                }
            }
            catch (IOException ex)
            {
                var sockEx = ex.InnerException as SocketException;
                // 远程强制关闭连接
                if (sockEx != null && sockEx.ErrorCode == 10054)
                {
                    return SocketHelper.Special;
                }
                throw;
            }
            finally
            {
                if (doProg)
                {
                    progress.Stop();
                }
                BufferSegment.MemoryBuffer.Return(ref seg);
                src.Dispose();
            }
            return totalTransferred;
        }
        #endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;

namespace Rocky.Net
{
    public class SocketSyncArgs : EventArgs
    {
        #region Fields
        private IPacketsFactory _packFac;
        private IPackets _header, _content;
        #endregion

        #region Properties
        public bool Buffer { get; set; }
        public bool TransferHeader { get; set; }
        public object HeaderEntity { get; set; }
        public TransferProgress Progress { get; set; }
        public bool IsShutdown { get; internal set; }
        #endregion

        #region Constructors
        public SocketSyncArgs()
        {
            _packFac = SocketHelper.CreatePacketsFactory();
        }
        #endregion

        #region Methods
        [DebuggerStepThrough]
        internal IPackets[] GetPackets()
        {
            if (_content == null)
            {
                throw new InvalidOperationException("未设置内容包");
            }
            if (this.HeaderEntity != null)
            {
                _header = _packFac.Create(this.HeaderEntity);
            }
            return new IPackets[] { _header, _content };
        }

        public void SetPackets(Stream inputStream, long contentLength)
        {
            Contract.Requires(inputStream != null);

            _content = _packFac.Create(inputStream, contentLength);
            this.IsShutdown = false;
        }
        #endregion
    }
}
/// <summary>
        /// 同步发送SocketSyncArgs
        /// </summary>
        /// <param name="sock"></param>
        /// <param name="e"></param>
        /// <returns>已发送字节数(不包含HeaderPackets)</returns>
        public static long SendSync(this Socket sock, SocketSyncArgs e, bool acceptLength = true)
        {
            var packs = e.GetPackets();
            long sent;
            if (e.TransferHeader)
            {
                if (packs[0] == null)
                {
                    throw new InvalidOperationException("头对象为空");
                }
                sent = sock.SendPackets(packs[0]);
                if (sent == SocketHelper.Special)
                {
                    e.IsShutdown = true;
                    goto done;
                }
            }
            packs[1].Buffer = e.Buffer;
            sent = sock.SendPackets(packs[1], acceptLength, e.Progress);
            if (sent == SocketHelper.Special)
            {
                e.IsShutdown = true;
            }
        done:
            return sent;
        }

        /// <summary>
        /// 同步接收SocketSyncArgs
        /// </summary>
        /// <param name="sock"></param>
        /// <param name="e"></param>
        /// <param name="transferLength"></param>
        /// <returns>已接收字节数(不包含HeaderPackets)</returns>
        public static long ReceiveSync(this Socket sock, SocketSyncArgs e, bool acceptLength = true)
        {
            var packs = e.GetPackets();
            long recv;
            if (e.TransferHeader)
            {
                if (packs[0] == null)
                {
                    throw new InvalidOperationException("头对象为空");
                }
                recv = sock.ReceivePackets(packs[0]);
                if (recv == SocketHelper.Special)
                {
                    e.IsShutdown = true;
                    goto done;
                }
            }
            packs[1].Buffer = e.Buffer;
            recv = sock.ReceivePackets(packs[1], acceptLength, e.Progress);
            if (recv == SocketHelper.Special)
            {
                e.IsShutdown = true;
            }
        done:
            return recv;
        }
原文地址:https://www.cnblogs.com/Googler/p/3055300.html