Hbase C# Thrift 连接 , 提示 因为队列满或者系统缺乏足够的缓冲空间

原有方法:

public class HbaseClient : IDisposable
   {
       public Apache.Hadoop.Hbase.Hbase.Client Client { get; private set; }

       protected TBufferedTransport Transport { get; private set; }

       private TSocket socket;
       /// <summary>
       ///
       /// </summary>
       /// <param name="host"></param>
       /// <param name="prot"></param>
       public HbaseClient(string host, int prot)
       {
           socket = new TSocket(host, prot);

           this.Transport = new TBufferedTransport(socket);
           this.Transport.Open();


           var protocol = new TBinaryProtocol(Transport);
          

           Client = new Apache.Hadoop.Hbase.Hbase.Client(protocol);
       
       }

       /// <summary>
       ///
       /// </summary>
       public HbaseClient()
           : this(System.Configuration.ConfigurationManager.AppSettings["hbasehost"],
           int.Parse(System.Configuration.ConfigurationManager.AppSettings["hbaseport"])
           )
       {

       }

       /// <summary>
       ///
       /// </summary>
       public void Dispose()
       {
           this.socket.Close();
           this.Transport.Close();

       }
   }

public static void AddLog(string userid, string ip, DateTime logintime, bool result)
      {
          using (var hclient = new HbaseClient())
          {
              string strtime = H.Comm.TimeUtility.DescTimeStamp(logintime).ToString();
              string row = H.Comm.StringUtility.FixedLenString(userid, 20) + strtime;

              hclient.Client.mutateRow(tableName.ToBytes(), row.ToBytes(),
                  new List<Apache.Hadoop.Hbase.Mutation> {
              new Apache.Hadoop.Hbase.Mutation{ Column= "u:uid".ToBytes(), Value = userid.ToBytes() },
              new Apache.Hadoop.Hbase.Mutation{ Column= "u:ip".ToBytes(), Value = ip.ToBytes() },
              new Apache.Hadoop.Hbase.Mutation{ Column= "u:lt".ToBytes(), Value = logintime.ToString().ToBytes() },
               new Apache.Hadoop.Hbase.Mutation{ Column= "u:r".ToBytes(), Value = Convert.ToInt16(result).ToString().ToBytes() }
              });
          }


      }

//不断的进行插入操作,看插入到什么情况下会出现问题
       static void InsertRunOn()
       {
           int uid = 1000000;

           for (var i = 0; true; i++)
           {
               string struid = (uid + i).ToString();
               string ip = "192.168.1." + (int)(i % 255);
               System.DateTime dt = System.DateTime.Now;

               if (i % 100 == 0)
                   Console.WriteLine(i);

               H.BLL.LoginLog.AddLog(struid, ip, dt, true);
           }
       }

运行一会以后就出现错误了。提示 因为队列满或者系统缺乏足够的缓冲空间 之类,

netstat –an 发现本地端口已经用到 65535  , 一直在 timewait 了。


 

修改(添加)注册表,并重新启动系统:

HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services 下面:

TcpNumConnections
Key: Tcpip\Parameters
取值类型:REG_DWORD - Number
取值范围:0 - 0xfffffe
缺省值:0xfffffe
描述:本参数限制可以同时打开的TCP连接的数量

MaxUserPort
key: Tcpip\Parameters
取值类型:REG_DWORD - Number
取值范围:5000-65534 (十进制)
缺省值:0x1388 (5000 十进制)
描述:控制一个应用程序可以打开的最多端口数量。通常,短命的端口在1024-5000之间分配。
当试图发起5000以上端口的连接,系统将出现WSAENOBUFS(10055)错误:因为队列满或者系统
缺乏足够的缓冲空间。

如何修改TIME_WAIT超时时间以增加连接数。本设置定义了关闭连接前保持TIME_WAIT状态的时长。默认值为240,在一个繁忙的服务器上会将最大连接数限制在大约200/秒。减少该值可以增加最大连接数的限制。

具体操作步骤如下:

1.打开注册表编辑器,找到如下表所示的项。

2.按照下表设置,新建或者修改已有的一个名称为“TcpTimedWaitDelay”的DWORD值。
3.退出注册表编辑器,重新启动或者注销Windows以使改动生效。
设置:

项(系统): [HKEY_LOCAL_MACHINE/System/CurrectControlSet/Services/Tcpip/Parameters]

名称: TcpTimedWaitDelay
类型: REG_DWORD (DWORD 值)

值: 30-300 秒 (十进制)

上面的方法收效甚微,改变方法,做一个 连接池,这样会更有效些。

下面是改进后的代码。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Thrift.Transport;
using Thrift.Protocol;
using System.Threading;

namespace H.HbaseProvider
{
    public class HClient : IDisposable
    {
        private static string host = System.Configuration.ConfigurationManager.AppSettings["hbasehost"];
        private static int port = int.Parse(System.Configuration.ConfigurationManager.AppSettings["hbaseport"]);

        public bool IsFree;
        public Apache.Hadoop.Hbase.Hbase.Client Client;

        private TSocket socket;
        private TProtocol protocol;
        private TTransport Transport;
        public HClient()
        {
            socket = new TSocket(host, port);
            this.Transport = new TBufferedTransport(socket);
            //Transport.Open();
            this.protocol = new TBinaryProtocol(Transport);
            this.Client = new Apache.Hadoop.Hbase.Hbase.Client(protocol);

            this.IsFree = true;
        }

        public bool IsOpen
        {
            get
            {
                return this.Transport.IsOpen;
            }
        }

        public void Open()
        {
            this.Transport.Open();
        }

        public void Close()
        {
            this.Transport.Close();
        }

        public void Dispose()
        {
            IsFree = true;
        }
    }

    /// <summary>
    /// Hbase 连接池
    /// added by zbw911
    /// </summary>
    public class HBaseClientPool
    {


        private static int DEFAULT_COUNT = 10;
        private static int MAX_CLIENTCOUNT = 20;
        private static List<HClient> clientlist = null;


        private static Mutex m_mutex = new Mutex();


        static HBaseClientPool()
        {
            clientlist = new List<HClient>(DEFAULT_COUNT);

            for (var i = 0; i < DEFAULT_COUNT; i++)
                clientlist.Add(new HClient());

        }


        public static HClient GetHclient()
        {
            m_mutex.WaitOne(); //先阻塞
            for (var i = 0; i < clientlist.Count; i++)
            {
                if (clientlist[i].IsFree)
                {
                    if (!clientlist[i].IsOpen)
                    {
                        clientlist[i].Open();
                    }
                    clientlist[i].IsFree = false;
                    m_mutex.ReleaseMutex();//释放资源
                    return clientlist[i];
                }
            }

            if (clientlist.Count > MAX_CLIENTCOUNT) throw new Exception("超出最大HClinet最大个数");

            var item = new HClient();

            item.Open();
            item.IsFree = false;
            m_mutex.ReleaseMutex();//释放资源
            return item;
        }
    }
}

使用方法

public static void PoolAddLog(string userid, string ip, DateTime logintime, bool result)
      {
          using (var hclient = HBaseClientPool.GetHclient())
          {
              string strtime = H.Comm.TimeUtility.DescTimeStamp(logintime).ToString();
              string row = H.Comm.StringUtility.FixedLenString(userid, 20) + strtime;

              hclient.Client.mutateRow(tableName.ToBytes(), row.ToBytes(),
                  new List<Apache.Hadoop.Hbase.Mutation> {
              new Apache.Hadoop.Hbase.Mutation{ Column= "u:uid".ToBytes(), Value = userid.ToBytes() },
              new Apache.Hadoop.Hbase.Mutation{ Column= "u:ip".ToBytes(), Value = ip.ToBytes() },
              new Apache.Hadoop.Hbase.Mutation{ Column= "u:lt".ToBytes(), Value = logintime.ToString().ToBytes() },
               new Apache.Hadoop.Hbase.Mutation{ Column= "u:r".ToBytes(), Value = Convert.ToInt16(result).ToString().ToBytes() }
              });
          }


      }

// 从这里进行变态的插入操作吧

static void PoolInsertRunOn()
       {
           int uid = 1000000;

           for (var i = 0; true; i++)
           {
               string struid = (uid + i).ToString();
               string ip = "192.168.1." + (int)(i % 255);
               System.DateTime dt = System.DateTime.Now;

               if (i % 100 == 0)
                   Console.WriteLine(i);

               H.BLL.LoginLog.PoolAddLog(struid, ip, dt, true);
           }
       }

使用上面的方法以后,好多了。

MAX_CLIENTCOUNT 再根据实际环境进行调整吧。

原文地址:https://www.cnblogs.com/zbw911/p/2279269.html