UDP

internal class IocpUdpServer
    {
        private Socket _listen;
        private bool _isRun;
        private readonly int _bufferLength;

        //Sae池
        private readonly SaeQueuePool _saeQueue = new SaeQueuePool(256);

        /// <summary>
        /// 获取SEA池中的可用项数量
        /// </summary>
        public int SeaCount => _saeQueue.Count;



        /// <summary>
        /// 获取系统分配的UDP端口号
        /// </summary>
        public int Port { get; set; }

        //报文接收处理队列
        private readonly IUdpRequestHandler _handlerQueue;

        /// <summary>
        /// 实例化UDP服务器,并直接启动监听
        /// </summary>
        /// <param name="handlerQueue">报文接收处理队列</param>
        /// <param name="bufferLength">缓冲区大小</param>
        /// <param name="port">监听的端口号</param>
        public IocpUdpServer(IUdpRequestHandler handlerQueue, int port, int bufferLength = 1024)
        {
            _handlerQueue = handlerQueue;
            _bufferLength = bufferLength; 
            Port = port;

            Task.Factory.StartNew(SendPackageProcess, TaskCreationOptions.LongRunning);
        }

        private void SendPackageProcess()
        {
            foreach (var socketSendAsync in _sendBlocking.GetConsumingEnumerable())
            {
                try
                {
                    if (!_listen.SendToAsync(socketSendAsync))
                    {
                        _sendBlocking.Add(socketSendAsync);
                    }

                }
                catch (Exception ex)
                {
                  LogHelper.WriteErrLog(ex.Message, ex);
                        
                }
            }
        }

        /// <summary>
        /// 收到数据事件
        /// </summary>
        public event EventHandler<ReceiveDataEventArgs> ReceiveData;

        bool IsBinding = false;
        /// <summary>
        /// 启动UPD监听,接收来自网络上的UDP消息
        /// </summary>
        public void Start()
        {
            if (!_isRun)
            {
                var local = new IPEndPoint(IPAddress.Any, Port);
                _listen = new Socket(local.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
                _listen.Bind(local);

                var socketAsync = new SocketAsyncEventArgs();
                socketAsync.SetBuffer(new byte[_bufferLength], 0, _bufferLength);
                socketAsync.Completed += SocketAsync_Completed;
                socketAsync.RemoteEndPoint = local;
                _isRun = true;

                StartReceive(socketAsync);
            }
        }

        public void Stop()
        {
            Close();
        }

        /// <summary>
        /// 开始异步接收UDP消息
        /// </summary>
        private void StartReceive(SocketAsyncEventArgs socketAsync)
        {
            if (!_isRun) return;
            if (!_listen.ReceiveFromAsync(socketAsync))
                SocketAsync_Completed(_listen, socketAsync);
        }

        private void SocketAsync_Completed(object sender, SocketAsyncEventArgs e)
        {
            if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
            {
                if (e.LastOperation == SocketAsyncOperation.ReceiveFrom)
                {
                    var data = new byte[e.BytesTransferred];
                    Buffer.BlockCopy(e.Buffer, e.Offset, data, 0, data.Length); 
                    StartReceive(e); 
                    OnReceive((IPEndPoint)e.RemoteEndPoint, data);
                    return;
                }
            }

            StartReceive(e);
        }

        /// <summary>
        /// 远程端点收到消息处理
        /// </summary>
        private void OnReceive(IPEndPoint iPEndPoint, byte[] data)
        {
            if (!_isRun) return; 

            //小于8字节的报文不处理
            //if (data.Length < 8 + 4)
            //{
            //    LogHelper.WriteInfoLog("报文过短,抛弃:" + data.ToHexString());
            //    return;
            //}

            //UDP的到达顺序不确定性可能导致问题
            //TODO:拆包
            var prefix = data.First();
            var subfix = data.Last();
            if (prefix != G.FrameHeader || subfix != G.FrameTail)
            {
                LogHelper.WriteInfoLog("收尾错误,抛弃:" + data.ToHexString());
                return;
            }
            _handlerQueue.Enqueue(new UdpRequestWrapper(data, iPEndPoint));
            //抛出收到数据事件
            ReceiveData?.Invoke(this, new ReceiveDataEventArgs
            {
                Data = data
            });
        }
        private readonly BlockingCollection<SocketAsyncEventArgs> _sendBlocking = new BlockingCollection<SocketAsyncEventArgs>();
        /// <summary>
        /// 向远程网络端点发送数据
        /// </summary>
        public void SendAsync(EndPoint endPoint, byte[] bytes)
        {
            if (!_isRun || _listen == null)
                return;

            //优先从可复用队列中取
            var socketSendAsync = new SocketAsyncEventArgs(); 
            socketSendAsync.SetBuffer(bytes, 0, bytes.Length);
            socketSendAsync.RemoteEndPoint = endPoint;
            _sendBlocking.Add(socketSendAsync);


          
            //var socketSendAsync = _saeQueue.Dequeue();
            //socketSendAsync.SetBuffer(bytes, 0, bytes.Length);
            //socketSendAsync.RemoteEndPoint = endPoint;
            
            ////如果发送失败,则强制归队
            //if (!_listen.SendToAsync(socketSendAsync))
            //    _saeQueue.Enqueue(socketSendAsync);
        }

        /// <summary>
        /// 关闭服务
        /// </summary>
        public void Close()
        {
            _isRun = false;
            _listen?.Close();
            _listen = null;
        }
    }
原文地址:https://www.cnblogs.com/robertyao/p/12713586.html