Hadoop源码 – ipc.Server

1、前言

昨天分析了ipc包下的RPC、Client类,今天来分析下ipc.Server。Server类因为是Hadoop自己使用,所以代码结构以及流程都很清晰,可以清楚的看到实例化、停止、运行等过程。

2、Server类结构

上面是Server的五个内部类,分别介绍一下:

1)Call

用以存储客户端发来的请求,这个请求会放入一个BlockQueue中;

2)Listener

监听类,用以监听客户端发来的请求。同时Listener下面还有一个静态类,Listener.Reader,当监听器监听到用户请求,便用让Reader读取用户请求。

3)Responder

响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。

4)Connection

连接类,真正的客户端请求读取逻辑在这个类中。

5)Handler

请求(blockQueueCall)处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

3、Server初始化

第一篇博客说了,Server的初始化入口在RPC.getServer中,getServer其实是调用的RPC.Server静态类中的构造方法,我们看看Namenode创建RPCServer的方法和RPC.Server构造方法代码:

  1. private void initialize(Configuration conf) throws IOException {  
  2.     …  
  3.     this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),   
  4.           dnSocketAddr.getPort(), serviceHandlerCount,  
  5.           false, conf, namesystem.getDelegationTokenSecretManager());  
  6.     this.serviceRpcServer.start(); // 运行服务器  
  7. }  
  1. public Server(Object instance, Configuration conf, String bindAddress,  int port,  
  2.                   int numHandlers, boolean verbose,   
  3.                   SecretManager<? extends TokenIdentifier> secretManager)   
  4.         throws IOException {  
  5.       super(bindAddress, port, Invocation.class, numHandlers, conf,  
  6.           classNameBase(instance.getClass().getName()), secretManager);  
  7.       this.instance = instance;  
  8.       this.verbose = verbose;  
  9.     }  

该方法调用了父类的构造方法,如下:

  1. protected Server(String bindAddress, int port,   
  2.                   Class<? extends Writable> paramClass, inthandlerCount,   
  3.                   Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)   
  4.     throws IOException {  
  5.     this.bindAddress = bindAddress;  
  6.     this.conf = conf;  
  7.     this.port = port;  
  8.     this.paramClass = paramClass;  
  9.     this.handlerCount = handlerCount;  
  10.     this.socketSendBufferSize = 0;  
  11.     this.maxQueueSize = handlerCount * conf.getInt(  
  12.                                 IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,  
  13.                                 IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);  
  14.     this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,  
  15.                                    IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);  
  16.     this.readThreads = conf.getInt(  
  17.         IPC_SERVER_RPC_READ_THREADS_KEY,  
  18.         IPC_SERVER_RPC_READ_THREADS_DEFAULT);  
  19.     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize);   
  20.     this.maxIdleTime =2*conf.getInt("ipc.client.connection.maxidletime"1000);  
  21.     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max"10);  
  22.     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold"4000);  
  23.     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;  
  24.     this.authorize =   
  25.       conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);  
  26.     this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();  
  27.       
  28.     // Start the listener here and let it bind to the port  
  29.     listener = new Listener();  
  30.     this.port = listener.getAddress().getPort();      
  31.     this.rpcMetrics = RpcInstrumentation.create(serverName,this.port);  
  32.     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay",false);  
  33.  
  34.     // Create the responder here  
  35.     responder = new Responder();  
  36.       
  37.     if (isSecurityEnabled) {  
  38.       SaslRpcServer.init(conf);  
  39.     }  
  40.   }  

不难看出,父类的构造方法就初始化了一些配置和变量。

4、Server运行

在上面第一段代码中,还有一句RpcServer.start()的方法,在调用构造函数初始化一些变量之后,Server就可以正式运行起来了:

  1. public synchronized void start() {  
  2.     responder.start();  
  3.     listener.start();  
  4.     handlers = new Handler[handlerCount];  
  5.       
  6.     for (int i = 0; i < handlerCount; i++) {  
  7.       handlers[i] = new Handler(i);  
  8.       handlers[i].start();  
  9.     }  
  10.   }  

responder、listener、handlers三个对象的线程均阻塞了,前两个阻塞在selector.select()方法上,handler阻塞在callQueue.take()方法,都在等待客户端请求。Responder设置了超时时间,为15分钟。而listener还开启了Reader线程,该线程也阻塞了。

4、Server接受请求流程

1)监听到请求

Listener监听到请求,获得所有请求的SelectionKey,执行doAccept(key)方法,该方法将所有的连接对象放入list中,并将connection对象与key绑定,以供reader使用。初始化玩所有的conne对象之后,就可以激活Reader线程了。

  1. void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {  
  2.       Connection c = null;  
  3.       ServerSocketChannel server = (ServerSocketChannel) key.channel();  
  4.       SocketChannel channel;  
  5.       while ((channel = server.accept()) != null) {  
  6.         channel.configureBlocking(false);  
  7.         channel.socket().setTcpNoDelay(tcpNoDelay);  
  8.         Reader reader = getReader();  
  9.         try {  
  10.           reader.startAdd();  // 激活readSelector,设置adding为true  
  11.           SelectionKey readKey = reader.registerChannel(channel);  
  12.           c = new Connection(readKey, channel, System.currentTimeMillis());  
  13.           readKey.attach(c);  
  14.           synchronized (connectionList) {  
  15.             connectionList.add(numConnections, c);  
  16.             numConnections++;  
  17.           }  
  18.           …           
  19.         } finally {  
  20.           reader.finishAdd(); // add完毕,设置adding为false,Reader开始工作  
  21.         }  
  22.     }  
  23. }  

2)接收请求

Reader的run方法和Listener基本一致,也是获得所有的SelectionKey,再执行doRead(key)方法。该方法获得key中绑定的connection,并执行conection的readAndProcess()方法:

  1. void doRead(SelectionKey key) throws InterruptedException {  
  2.       int count = 0;  
  3.       Connection c = (Connection)key.attachment(); // 获得连接对象  
  4.       if (c == null) {  
  5.         return;    
  6.       }  
  7.       c.setLastContact(System.currentTimeMillis());  
  8.         
  9.       try {  
  10.         count = c.readAndProcess(); // 接受并处理请求  
  11.       } catch (InterruptedException ieo) {  
  12.         …  
  13.       }  
  14.       if (count < 0) {  
  15.         …  
  16.         closeConnection(c);  
  17.         c = null;  
  18.       }  
  19.       else {  
  20.         c.setLastContact(System.currentTimeMillis());  
  21.       }  
  22.     }  
  1. public int readAndProcess() throws IOException, InterruptedException {  
  2. // 一次最多读取一次RPC请求,如果头没读完,继续迭代直到  
  3. // 读完所有请求数据    
  4. while (true) {   
  5.         int count = -1;  
  6.         if (dataLengthBuffer.remaining() > 0) {  
  7.           count = channelRead(channel, dataLengthBuffer);         
  8.           …  
  9.         if (!rpcHeaderRead) {  
  10.           //读取请求头.  
  11.           if (rpcHeaderBuffer == null) {  
  12.             rpcHeaderBuffer = ByteBuffer.allocate(2);  
  13.           }  
  14.           count = channelRead(channel, rpcHeaderBuffer);  
  15.           if (count < 0 || rpcHeaderBuffer.remaining() > 0) {  
  16.             return count;  
  17.           }  
  18.           // 读取请求版本号  
  19.           int version = rpcHeaderBuffer.get(0);  
  20.           byte[] method = new byte[] {rpcHeaderBuffer.get(1)};  
  21.           authMethod = AuthMethod.read(new DataInputStream(  
  22.               new ByteArrayInputStream(method)));  
  23.           dataLengthBuffer.flip();            
  24.           …  
  25.           dataLengthBuffer.clear();  
  26.           …  
  27.             
  28.           rpcHeaderBuffer = null;  
  29.           rpcHeaderRead = true;  
  30.           continue;  
  31.         }   
  32.         …  
  33.           data = ByteBuffer.allocate(dataLength);  
  34.         }  
  35.           
  36.         // 读取请求  
  37.         count = channelRead(channel, data);  
  38.           
  39.         if (data.remaining() == 0) {  
  40.           …  
  41.           if (useSasl) {  
  42.             saslReadAndProcess(data.array());  
  43.           } else {  
  44.             // 执行RPC请求,先解析header请求,下次循环解析param请求  
  45.             processOneRpc(data.array());  
  46.           }  
  47.           …  
  48.         }   
  49.         return count;  
  50.       }  
  51.     }  

3)获得call请求

在Connection中解析param请求中,解析了请求数据,并构造Call对象,将其加入callQueue。

  1. private void processData(byte[] buf) throws  IOException, InterruptedException {  
  2.       DataInputStream dis =  
  3.         new DataInputStream(new ByteArrayInputStream(buf));  
  4.       int id = dis.readInt();         // 读取请求id  
  5.         …  
  6.  
  7.       Writable param = ReflectionUtils.newInstance(paramClass, conf);// 获取参数,paramClass是参数的实体类,在构造Server对象的时候传入 
  8.       param.readFields(dis);          
  9.           
  10.       Call call = new Call(id, param, this);  
  11.       callQueue.put(call);              // 添加进阻塞队列,不过队列有max限制,有可能也会阻塞  
  12.       incRpcCount();   
  13.     }  

4)处理call对象

Connection给callQueue添加了call对象,阻塞的Handler可以继续运行了,拿出一个call对象,并调用RPC.Call方法

  1. // 关键代码  
  2. while (running) {  
  3. final Call call = callQueue.take(); // 弹出call对象  
  4. CurCall.set(call);  
  5.      value = call(call.connection.protocol, call.param,   
  6.                            call.timestamp); // 调用RPC.Server中的call  
  7.      CurCall.set(null);  
  8.  
  9. synchronized (call.connection.responseQueue) {  
  10.          setupResponse(buf, call,   
  11.                         (error == null) ? Status.SUCCESS : Status.ERROR,   
  12.                         value, errorClass, error);  
  13.          …  
  14.          responder.doRespond(call);  
  15.      }  
  16. }  

5)响应请求

上面代码中的setupResponse将call的id和状态发送回去,再设置了call中的response:ByteBuffer,之后就开始responder.doRespond(call)了,processResponse以及Responder.run()没太弄明白,就先不说了。

  1. void doRespond(Call call) throws IOException {  
  2.       synchronized (call.connection.responseQueue) {  
  3.         // 这行没懂  
  4.         call.connection.responseQueue.addLast(call);  
  5.         if (call.connection.responseQueue.size() == 1) {  
  6.           // 返回响应结果,并激活writeSelector  
  7.           processResponse(call.connection.responseQueue,true);  
  8.         }  
  9.       }  
  10.     }  

6、总结

Server用的标准的Java TCP/IP NIO通信,同时请求的超时使用基于BlockingQueue以及wait/notify机制实现。使用的模式是reactor模式,关于nio和reactor可以参考这个博客

对于服务器端接收多个连接请求的需求,Server采用Listener来监听连接的事件,并用Listener.Reader来监听网络流读以及Responder监听写的事件,当有实际的网络流读写时间发生之后,解析了请求Call之后,添加进阻塞队列,并交由多个Handlers来处理请求。

这个方法比TCP/IP BIO好处就是可接受很多的连接,而这些连接只在真实的请求时才会创建线程处理,称之为一请求一处理。但是,连接上的请求发送非常频繁时,TCP/IP NIO的方法并不会带来太大的优势。

但是Hadoop实际场景中,通常是服务器端支持大量的连接数(Namenode连上几千个Datanode),但是连接发送的请求并不会太多(heartbeat、blockreport都有较长间隔)。这样就造成了Hadoop不适合实时的、多请求的运算,带来的代价是模型、实现简单,但是这也为以后的扩展埋下了祸根。

P.S.: 以上分析基于稳定版0.20.203.0rc1。

原文地址:https://www.cnblogs.com/java20130722/p/3206956.html