Hadoop源码分析15:IPC流程(10)ClientConnection 复用

两个连接请求发出后,生成ClientConnectionId对象,只要两个ClientConnectionId对象的hashcode相同,就视为一个同一个连接,从而复用ClientConnection 



   publicRPCInvoker(Class《?extendsVersionedProtocol》protocol,

       InetSocketAddress address, UserGroupInformation ticket,

       Configuration conf, SocketFactory factory,

       intrpcTimeout)throwsIOException{

     this.remoteId=ClientConnectionId.getConnectionId(address,protocol,

         ticket, rpcTimeout, conf);

     this.client= RPC.CLIENTS.getClient(conf,factory);

 

   }


  privateClientConnectiongetConnection(ClientConnectionId remoteId,

                                  ClientCall call)

                                  throwsIOException,InterruptedException {

   if (!running.get()){

    thrownewIOException("Theclient is stopped");

   }

   ClientConnection connection;

   do{

     synchronized(connections){

       connection =connections.get(remoteId);

       if(connection== null){

         connection = newClientConnection(remoteId,this);

         connections.put(remoteId,connection);

       }

     }

   } while(!connection.addCall(call));  

   connection.setupIOstreams();

   returnconnection;

 

 }


publicclassClientConnectionId{

 

   InetSocketAddress address;

   UserGroupInformation ticket;

   Class? protocol;

    staticfinalintPRIME=16777619;

    intrpcTimeout;

    String serverPrincipal;

    intmaxIdleTime;//connectionswill be culled if it was idle for

   //maxIdleTimemsecs

    intmaxRetries;//the max.no. of retries for socket connections

    booleantcpNoDelay;// if T thendisable Nagle's Algorithm

    intpingInterval;// how oftensends ping to the server in msecs

   

   ClientConnectionId(InetSocketAddress address,Class? protocol,

                UserGroupInformation ticket, intrpcTimeout,

                String serverPrincipal, intmaxIdleTime,

                intmaxRetries,booleantcpNoDelay,

                intpingInterval){

     this.protocol=protocol;

     this.address=address;

     this.ticket=ticket;

     this.rpcTimeout=rpcTimeout;

     this.serverPrincipal=serverPrincipal;

     this.maxIdleTime=maxIdleTime;

     this.maxRetries=maxRetries;

     this.tcpNoDelay=tcpNoDelay;

     this.pingInterval=pingInterval;

   }

 

   

 

   staticClientConnectionIdgetConnectionId(InetSocketAddress addr,

       Class? protocol,UserGroupInformation ticket, intrpcTimeout,

       Configuration conf) throwsIOException{

      returnnewClientConnectionId(addr,protocol, ticket,

         rpcTimeout, null,

         conf.getInt("ipc.client.connection.maxidletime",10000), //10s

         conf.getInt("ipc.client.connect.max.retries",10),

         conf.getBoolean("ipc.client.tcpnodelay",false),

         Client.getPingInterval(conf));

   }

   

   staticbooleanisEqual(Object a, Objectb) {

     returna ==null? b ==null:a.equals(b);

   }

 

   @Override

   publicbooleanequals(Objectobj) {

     if(obj== this){

       returntrue;

     }

     if(objinstanceofClientConnectionId){

       ClientConnectionId that = (ClientConnectionId) obj;

       returnisEqual(this.address,that.address)

           && this.maxIdleTime==that.maxIdleTime

           && this.maxRetries==that.maxRetries

           && this.pingInterval==that.pingInterval

           && isEqual(this.protocol,that.protocol)

           && this.rpcTimeout==that.rpcTimeout

           && isEqual(this.serverPrincipal,that.serverPrincipal)

           && this.tcpNoDelay==that.tcpNoDelay

           &&isEqual(this.ticket,that.ticket);

     }

     returnfalse;

   }

   

   @Override

   publicinthashCode() {

     intresult =1;

     result = PRIME* result +((address==null)? 0 : address.hashCode());

     result = PRIME* result+ maxIdleTime;

     result = PRIME* result+ maxRetries;

     result = PRIME* result+ pingInterval;

     result = PRIME* result +((protocol==null)? 0 : protocol.hashCode());

     result = PRIME*rpcTimeout;

     result = PRIME*result

         + ((serverPrincipal==null)? 0 : serverPrincipal.hashCode());

     result = PRIME* result +(tcpNoDelay? 1231 :1237);

     result = PRIME* result +((ticket==null)? 0 : ticket.hashCode());

     returnresult;

   }

 

}

原文地址:https://www.cnblogs.com/leeeee/p/7276518.html