MessageReceiver

class MessageReceiver
    {
        private RelayEngine<MessageCollection> _MessageRelayEngine;
        private string _Hostname;
        private int _MessageDispatchServerPort;
        private string _SessionId;
        private TcpClient _Client;
        private bool _Stopped = false;
        private Thread _Worker = null;
        private ulong _LastMessageSequence = 0;

        internal MessageReceiver(RelayEngine<MessageCollection> messageRelayEngine)
        {
            this._MessageRelayEngine = messageRelayEngine;
        }

        internal void Start(string hostname, int messageDispatchServerPort, string sessionId)
        {
            this.Stop();

            this._Stopped = false;

            this._Hostname = hostname;
            this._MessageDispatchServerPort = messageDispatchServerPort;
            this._SessionId = sessionId;

            this._Worker = new Thread(ConnectServerAndReceiveMessage);
            this._Worker.IsBackground = true;
            this._Worker.Start();
        }

        internal void SetSessionId(string newSessionId)
        {
            this._SessionId = newSessionId;
        }

        private void ConnectServerAndReceiveMessage(object state)
        {
            byte[] header = new byte[4];
            int defaultBufferLen = 1024 * 64;
            byte[] defaultBuffer = new byte[defaultBufferLen];

            while (!this._Stopped)
            {
                this._Client = null;
                NetworkStream stream = null;

                try
                {
                    Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage to connect {0}:{1}", _Hostname, _MessageDispatchServerPort);

                    this._Client = new TcpClient();
                    this._Client.Connect(_Hostname, _MessageDispatchServerPort);
                    stream = this._Client.GetStream();

                    byte[] sessionData = ASCIIEncoding.ASCII.GetBytes(this._SessionId);
                    byte[] sessionDataLen = new byte[2] { (byte)(sessionData.Length >> 8), (byte)sessionData.Length };                    
                    stream.Write(sessionDataLen, 0, sessionDataLen.Length);
                    stream.Write(sessionData, 0, sessionData.Length);
                }
                catch (Exception ex)
                {
                    Logger.TraceEvent(TraceEventType.Error, "MessageReceiver.ConnectServerAndReceiveMessage error:
{0}", ex);
                    this.CloseConectionSilently();
                    continue;
                }
                
                while (true)
                {
                    Array.Clear(header, 0, header.Length);
                    if (!this.ReadAll(stream, header))
                    {
                        Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage read header data failed");
                        this.CloseConectionSilently();
                        break;//reconnect
                    }

                    int dataLength = ((int)header[0] << 24) + ((int)header[1] << 16) + ((int)header[2] << 8) + header[3];
                    byte[] buffer = dataLength <= defaultBufferLen ? defaultBuffer : new byte[dataLength];

                    if (!this.ReadAll(stream, buffer, dataLength))
                    {
                        Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage read message data failed");
                        this.CloseConectionSilently();
                        break;//reconnect
                    }

                    try
                    {
                        MessageCollection message = CompressHelper.FromByteArray<MessageCollection>(buffer, dataLength);
                        if (message.Sequence != this._LastMessageSequence)
                        {
                            Logger.TraceEvent(TraceEventType.Warning, "MessageReceiver.ConnectServerAndReceiveMessage got message with worng sequence {0}, excepted sequece is {1}",
                                message.Sequence, this._LastMessageSequence);                            
                        }
                        this._LastMessageSequence++;

                        this._MessageRelayEngine.AddItem(message);
                        ConsoleClient.Instance.RefreshLastMsgTime();
                    }
                    catch (Exception ex)
                    {
                        Logger.TraceEvent(TraceEventType.Error, "MessageReceiver.ConnectServerAndReceiveMessage add message to engine error:
{0}", ex);
                    }
                }
            }
        }

        private void CloseConectionSilently()
        {
            try
            {
                if (this._Client != null)
                {
                    this._Client.Close();
                    this._Client = null;
                }
            }
            catch (Exception ex)
            {
                Logger.TraceEvent(TraceEventType.Error, "MessageReceiver CloseConectionSilently error:
{0}", ex);
            }
        }

        private bool ReadAll(NetworkStream stream, byte[] buffer, int? dataLength = null)
        {
            try
            {
                int offset = 0;
                int len = dataLength.HasValue ? dataLength.Value : buffer.Length;

                while (len > 0)
                {
                    if (!stream.DataAvailable)
                    {
                        Thread.Sleep(100);
                        continue;
                    }
                    int readLength = stream.Read(buffer, offset, len);
                    if (readLength == 0)
                    {
                        return false;
                    }
                    else
                    {
                        offset += readLength;
                        len -= offset;
                    }
                }
                return true;
            }
            catch (Exception ex)
            {
                Logger.TraceEvent(TraceEventType.Warning, "MessageReceiver.ReadAll error:
{0}", ex.ToString());
                return false;
            }
        }

        internal void Stop()
        {
            this._Stopped = true;
            this._LastMessageSequence = 0;
            this.CloseConectionSilently();
            if (this._Worker != null)
            {
                this._Worker.Join(1000);
                this._Worker = null;
            }
        }
    }
原文地址:https://www.cnblogs.com/feige/p/6036708.html