细水长流Hadoop源码分析(4)RPC服务器运行之Listener线程


目录

4.2 服务器执行流程... 29

4.2.1 Listener线程... 29


4.2 服务器执行流程

4.2.1 Listener线程

从Listener的构造我们知道,在Listener内部使用的选择器selector上已注册了acceptChannel这个ServerSocketChannel,所以下一步就是等待RPC客户端的连接。我们假设有一个客户端向此服务器发出了Socket连接请求(就是Client的Connection负责的连接建立请求),来观察Listener是如何处理的。首先给出Listener线程方法的代码,这段代码只给出了主循环,不包括异常处理,和终止线程时的清理代码:

public void run() {
  ...
      while (running) {
        SelectionKey key = null;
        try {
          selector.select();
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);
                else if (key.isReadable())
                  doRead(key);
              }
            } catch (IOException e) {
            }
            key = null;
          }
      ...
    }

如果注册到selector上的通道没有一个操作就绪,selector.select就会阻塞。根据我们上面的假设,此时客户端请求连接RPC服务器,selector得到通知,selector.select方法返回,selector.selectedKeys就会包含acceptChannel注册到selector的SelectionKey的OP_ACCEPT操作。遍历selectedKeys就会得到这个SelectionKey。

SelectionKey是通道和选择器的结合体,其中保存着通道和选择器的引用。在select方法返回之后的selector.selectedKeys集里,很可能经过一些操作之后这个集合中的某个SelectionKey已经被cancel,或者相关通道被关闭,或者selector被关闭从而使得某些已选中的SelectionKey无效,所以在遍历每一个SelectionKey之后,都要对这个key进行有效性检查,方法是调用SelectionKey.isValid方法,只有这个方法返回true,表示SelectionKey继续有效时,才能进一步进行accept、读或写的操作。

Selector对象的SelectionKey存放在三个集合中,分别为已注册键集合、已选择键集合、已取消键集合。所有注册到Selector上的键都会在“已注册键集合”,这个集合不能手动修改。每个SelectionKey内部保存的操作(可能操作:OP_ACCEPT、OP_READ、OP_WRITE、OP_CONNECT)也有两个集合,分别为interest集合和ready集合,前者是注册操作,后者是已经就绪的操作,比如ServerSocketChannel来了一个连接请求。如果某个SelectionKey已经存在于Selector对象的已选择键集合,那么,该SelectionKey就不会自动被Selector移除,即使这个SelectionKey没有任何新的操作就绪,仍旧不会移除,另外,处于已选择键集合内的SelectionKey的ready集合只会被设置新操作(ready其实用位表示,这就表示ready上的某位一旦置为1,就不会再被清除),只有当SelectionKey不在已选择键集合内,如果此SelectionKey有操作就绪,才会将ready清除,再重新置位。所以,这就要求遍历selector.selectedKeys的时候,将已遍历的SelectionKey从selectedKeys中删除。

上述就是为什么代码里每次遍历一个SelectionKey时,都要执行iter.remove方法和判断key.isValid的原因。除此之外,就是跟据此key的ready集合里有哪些操作,具体操作调用具体方法了,在Listener的线程方法里只处理两个操作:

操作

执行方法

说明

OP_ACCEPT(key.isAcceptable = true)

doAccept(key)

SeverSocketChannel收接到来自RPC客户端的连接请求

OP_READ(key.isReadable = true)

doRead(key)

SocketChannel接收到RPC客户端发送的方法调用数据,因此Listener要把它读进来

根据上面的假设,现在在场景是有RPC客户端发出Socket连接,因此执行的是doAccept方法。此key就是acceptChannel与selector注册的SelectionKey,下面,我们看看doAccept的具体操作。

void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      // accept up to 10 connections
      for (int i=0; i<10; i++) {
        SocketChannel channel = server.accept();
        if (channel==null) return;

        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
        c = new Connection(readKey, channel, System.currentTimeMillis());
        readKey.attach(c);
        synchronized (connectionList) {
          connectionList.add(numConnections, c);
          numConnections++;
        }
        if (LOG.isDebugEnabled())
          LOG.debug("Server connection from " + c.toString() +
              "; # active connections: " + numConnections +
              "; # queued calls: " + callQueue.size());
      }
    }

从key中获取ServerSocketChannel对象server。因为server是非阻塞的,所以调用server.accept方法会立即返回,如果有连接请求,则返回SocketChannel,否则,返回null。在此期间,server也可能接收了不止一次的Socket连接,因此只调用一次accept并不合理,可能会遗漏一些连接请求。doAccept方法的做法是循环accept十次,但十次也并不合理,Hadoop新版中已经改了过来,可以接收无限次的连接请求,直到某一次accept调用返回null为止。

当接收一个RPC客户端的连接请求时,server.accept方法返回一个SocketChannel对象,就是代码里的channel。doAccept方法把channel的OP_READ也注册到Listener的selector选择器上了,这样selector既负责acceptChannel的OP_ACCEPT操作,也负责所有从acceptChannel接收的客户端连接的Socket的数据读操作。

通道注册到选择器之前必须被设置为非阻塞的,所以在调用channel.register之前要调用channel.configureBlocking将channel设置为非阻塞的。然后把channel的OP_READ操作注册到selector上,得到注册的SelectionKey对象readKey。然后,由readKey、channel和系统当前时间构造一个Server.Connection对象c,再将c设置为readKey的附加对象,所以,每个SocketChannel在selector上注册而得到的SelectionKey对象,都附带有为此SocketChannel生成的Connection对象。

Server.connectionList是线程安全的List对象,然而connectionList也会在其它线程被操作(比如运行Server的线程),这里connectionList.add操作又必须与numConnections++形成原子操作,故这里在connectionList上同步,然后将刚刚生成的Connection添加到connectionList,所以,connectionList里的每个Connection,都对应着一条RPC客户端与RPC服务器的Socket连接。

doAccept完成之后,再回到Listener的线程方法,现在就有可能接收方法调用请求数据了。在描述Listener是如何接收方法调用请求数据之前,由于doAccept里有Server.Connection对象的构造,我们先看看Server.Connection中重要的数据成员:

成员

说明

boolean versionRead

RPC客户端发送的协议签名和协议版本是否读取,参见下图

boolean headerRead

RPC客户端发送的ConnectionHeader和其长度是否读取

SocketChannel channel

此连接对应的Socket通道,此通道与RPC客户端连接

ByteBuffer data

一个暂存数据的地方,dataLengthBuffer在每次读取数据之前会有固定分配的长度。此字节缓冲每次读取数据之前都可能要重新分配空间

ByteBuffer dataLengthBuffer

一个暂存数据的地方,用于读取固定长度的数据

LinkedList<Call> responseQueue

此队列不在Connection内部处理而是在Responder类里处理,因此这个队列用于存放执行完之后的方法调用,每个方法都保存了调用返回值,等待Responder线程处理

volatile int rpcCount = 0

未处理的远程过程调用个数

Socket socket;

SocketChannel对应的Socket对象

String hostAddress;

此连接Socket对应的远程RPC客户端的地址

int remotePort;

此连接Socket对应的远程RPC客户端的端口号

ConnectionHeader header

连接头

Class<?> protocol

RPC客户端进行此连接时指定的协议接口类

Subject user

此连接Socket对应的远程RPC客户端的用户名和所属组信息

我们知道RPC客户端向RPC服务器的每条Socket连接发送的数据顺序如下,其中第一部分是当Socket连接第一次建立发送的头信息,而第二部分是每个方法调用发送的数据,后面的数据格式可能会发送多次,数据格式如下所示:

clip_image023

clip_image024

每个Connection对应一个与RPC客户端的Socket连接,此Socket连接是Listener.acceptChannel接听的SocketChannel管理的连接。此Socket连接会注册到Listener线程的selector选择器上,所以接收数据在Listener线程,而处理数据在Connection类里。

构造Connection需要一个SelectionKey,一个SocketChannel,Connection的构造方法代码如下:

public Connection(SelectionKey key, SocketChannel channel, 
                      long lastContact) {
      this.channel = channel;
      this.lastContact = lastContact;
      this.data = null;
      this.dataLengthBuffer = ByteBuffer.allocate(4);
      this.socket = channel.socket();
      InetAddress addr = socket.getInetAddress();
      if (addr == null) {
        this.hostAddress = "*Unknown*";
      } else {
        this.hostAddress = addr.getHostAddress();
      }
      this.remotePort = socket.getPort();
      this.responseQueue = new LinkedList<Call>();
      if (socketSendBufferSize != 0) {
        try {
          socket.setSendBufferSize(socketSendBufferSize);
        } catch (IOException e) {
          LOG.warn("Connection: unable to set socket send buffer size to " +
                   socketSendBufferSize);
        }
      }
    }

这个构造方法初始化Connection的各数据成员。这里唯一要说明的就是dataLengthBuffer初始化。因为传递进来的SocketChannel对象channel是非阻塞的,所以一次从channel读数据可能只读出所需要的部分字节。比如读取RPC客户端与服务顺端建立连接时第一次发送的连接头信息的开头四字节,如果第一次只能读3字节,那么必须等到Listener.selector选择器的select调用再次返回,该channel再次可读,才能继续读剩下的一字节。所以这里的读取数据不是设计为顺序一次读取成功的,而是可能读取多次才能读到要求的字节数。

ByteBuffer类型的缓冲对象保存了其中存放数据、读取位置和写入位置的完整状态。当从channel读取或写入时,ByteBuffer的remaining提示还有多少空间可供读取或写入。不像普通缓存每次向其中保存数据都会从索引0位置开始保存,ByteBuffer缓冲会记录每一次读取或写入后缓存内部的状态。比如,如果缓存dataLengthBuffer容量为4字节,那么从channel读取3字节到channel后,下次再从channel读取往dataLengthBuffer存储时,只能存储1字节,且这1字节是存放在dataLengthBuffer内部存储的最后一个字节上。当使用ReadableByteChannel的方法读取数据时,read方法也会根据ByteBuffer的内部状态决定读取多少数据,而不是通道内有多少读多少,这就是Java NIO的方便之处。

分析完Connecton的数据成员和构造方法后,我们了解了当Listener接收一个RPC客户端发出的连接请求时进行的操作:accept请求获得SocketChannel对象,注册此对象到Listener.selector选择器得到SelectionKey对象,再构造一个Connection对象,并将此Connection对象attach到SelectionKey对象。现在Listener的selector就可以监听数据读取操作了。

如上所述,Listener读取数据要调用doRead(key),这个方法的核心关键操作很少,key里有Connection对象,所以首先把这个对象取出,然后调用Connection的readAndProcess方法,执行真正的远程过程调用数据读取。如果发生异常,则关闭此Connection对象。

RPC发送过来的数据可分为三部分:version、header和call数据。任一部分的第一字段都是4个字节。另外,除了正常数据之外,客户端还可能发送ping数据,这是一个长度为4字节,值为1的整型数据。ping数据来源于RPC客户端的输入流,当输入流读取远程过程调用返回值时,如果出问题,就会向RPC服务器发送ping数据。由于readAndProcess每次读完ConnectionHeader和方法调用数据后都会把data设置为null,所以正常情况下,ping数据肯定在versionRead=true,并且data为null时才会收到,这也就是为什么readAndProcess代码中在是判断完versioneRead和data是否为null的时候才去读Client.PING_CALL_ID并判断之。

这样的话,PING_CALL_ID用于RPC客户端读取返回值异常时测试与服务器的连通性。那么当RPC客户端发送数据异常时,比如发送数据过程中网络突然中断,客户端和服务端是如何检测的呢?在客户端,OutputSteam的write的方法肯定会抛出IOException异常,那么服务器端的ReadableByteChannel也会抛出IOException异常,这个异常最终会被Listener.doRead处理。当doRead检测到某一次虽然有数据可读(selector.select返回说明OP_READ操作就绪),但读取异常(包括IOException异常),就会调用closeConnection(c)将当前涉及的Connection对象关闭。我们看看Server.closeConnection方法:

private void closeConnection(Connection connection) {
    synchronized (connectionList) {
      if (connectionList.remove(connection))
        numConnections--;
    }
    try {
      connection.close();
    } catch (IOException e) {
    }
  }

该方法同时扣只剩connectionList和numConnections,因此要在connectionList上同步,然后调用connection.close方法执行真正的connection关闭,在connecton.close方法里执行如下操作:

private synchronized void close() throws IOException {
      data = null;
      dataLengthBuffer = null;
      if (!channel.isOpen())
        return;
      try {socket.shutdownOutput();} catch(Exception e) {}
      if (channel.isOpen()) {
        try {channel.close();} catch(Exception e) {}
      }
      try {socket.close();} catch(Exception e) {}
    }
  }

将data和dataLengthBuffer设置为null,关闭socket,关闭channel,Server.closeConnection操作再加上此操作,完全清除了RPC客户端与此服务器这一条连接。

继续看readAndProcess的代码,其代码很多:

public int readAndProcess() throws IOException, InterruptedException {
      while (true) {
        /* Read at most one RPC. If the header is not read completely yet
         * then iterate until we read first RPC or until there is no data left.
         */    
        int count = -1;
        if (dataLengthBuffer.remaining() > 0) {
          count = channelRead(channel, dataLengthBuffer);       
          if (count < 0 || dataLengthBuffer.remaining() > 0) 
            return count;
        }
      
        if (!versionRead) {
          //Every connection is expected to send the header.
          ByteBuffer versionBuffer = ByteBuffer.allocate(1);
          count = channelRead(channel, versionBuffer);
          if (count <= 0) {
            return count;
          }
          int version = versionBuffer.get(0);
          
          dataLengthBuffer.flip();          
          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
            return -1;
          }
          dataLengthBuffer.clear();
          versionRead = true;
          continue;
        }
        
        if (data == null) {
          dataLengthBuffer.flip();
          dataLength = dataLengthBuffer.getInt();
       
          if (dataLength == Client.PING_CALL_ID) {
            dataLengthBuffer.clear();
            return 0;  //ping message
          }
          data = ByteBuffer.allocate(dataLength);
          incRpcCount();  // Increment the rpc count
        }
        
        count = channelRead(channel, data);
        
        if (data.remaining() == 0) {
          dataLengthBuffer.clear();
          data.flip();
          if (headerRead) {
            processData();
            data = null;
            return count;
          } else {
            processHeader();
            headerRead = true;
            data = null;
            
            // Authorize the connection
            try {
              authorize(user, header);
              
              if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully authorized " + header);
              }
            } catch (AuthorizationException ae) {
              authFailedCall.connection = this;
              setupResponse(authFailedResponse, authFailedCall, 
                            Status.FATAL, null, 
                            ae.getClass().getName(), ae.getMessage());
              responder.doRespond(authFailedCall);
              
              // Close this connection
              return -1;
            }

            continue;
          }
        } 
        return count;
      }
    }

首先我们知道,如果readAndProcess发生异常或者返回值count小于0,这个Connection就会被清除,我们首先看一下哪些情况下count会小于0:

l 某一次的channelRead操作返回值小于0,这一般发生在channelRead没读到数据时;

l 当读取的服务器头(“hrpc”)不等于Server.HEADER时;

l 当读取的协议版本(1字节)不等于Server.version时;

l 当authorize(user,header)失败时。

readAndProcess每次开始执行时都要将dataLengthBuffer读满,这次读不满下次也要读满,这是很巧妙的,参看上述说明为什么这样做。然后进行版本验证、服务器头信息验证,当这些都做完之后,开始读ConnectionHeader和方法调用数据,主要是dataLengthBuffer和data两个ByteBuffer的结合使用进行读取。这里主要有两部分,一部分是ConnectionHeader的读取和处理,一部分是远程过程调用数据的读取和处理。一次只读取一个远程过程调用。

我们首先看ConnectionHeader的读取,ConnectionHeader读取完后,首先调用processHeader进行处理。processHeader从读取的数据data恢复出ConnectionHeader对象Connection.header(没错,就是Connection的数据成员),然后从header里取出协议类Connection.protocol和客户端ugi,客户端ugi被转换成了Subject对象Connection.user。也即在processHeader里初始了三个Connection成员:header、protocol、user。

然后,调用Server.authorize(user,header)对此用户进行调用权限验证,此方法的实现在RPC.Server类里,RPC.Server.authorize方法主要调用ServiceAuthorizationManager.authorize(user, protocol),对此方法的调用请参考这一期研究的“认证&授权&安全”部分的文档的2节。

Connection.processHeader方法一个很奇怪的地方在于,它没有验证RPC客户端请求的协议接口此RPC服务器是否支持。org.apache.hadoop.ipc.Server类是一个抽象类,这个类并没有保存实现协议的实例对象,而是由此类的子类RPC.Server保存,RPC.Server有数据成员instance用于保存协议接口的实现对象,所以如果要验证实现对象是否实现了RPC客户端要求的协议接口,必须在RPC.Server里验证,但RPC.Server也没有明显的验证。所以,这种验证一定发生在RPC.Server的call方法里。如果instance不支持RPC客户端请求的协议接口方法,那么当call方法以instance执行RPC客户端请求的方法时肯定会抛出异常,此异常会以IOException的形式再次被抛出,这个异常不会被Connection.processHeader处理,也不会被Connection.readAndProcess处理,最终会被Listener的doRead捕获,doRead捕获到异常之后,关闭这个连接,这也算是正确的解决办法。

对ConnectionHeader的处于就到这里,下面是对方法调用数据的处理。方法调用数据的处理在Connection.processData方法里,下面是processData方法的代码:

private void processData() throws  IOException, InterruptedException {
      DataInputStream dis =
        new DataInputStream(new ByteArrayInputStream(data.array()));
      int id = dis.readInt();  // try to read an id

      Writable param = ReflectionUtils.newInstance(paramClass, conf);
      param.readFields(dis);
        
      Call call = new Call(id, param, this);
      callQueue.put(call);  // queue the call; maybe blocked here
    }

远程过程调用数据可分为两部分:int类型的调用id和Invocation类型的调用参数。processData方法首先读出这个id,然后再读出param,用id、call和当前的Connection对象this构造一个Server.Call对象。Server.Call对象很简单,仅仅是对数据的封装,如下图所示:

clip_image025

这里保存的id最初由RPC客户端生成,param由构造方法的同名参数初始化,每个Call里保存了其所属的Connection对象,response由如下两个方法,表示设置方法调用返回值:

clip_image027

readAndProcess设置Call.response是因为权限验证失败,此远程过程调用不能执行,设置其返回值为异常。Handler线程的run方法执行完方法调用后,设置response为方法调用的返回值。

Call构造完之后,processData方法将其加入callQueue,这是一个BlockingQueue,相比于BlockingQueue,此数据结构不仅是线程安全的,而且支持两种附加操作:当读取遇到队列为空时会等待队列有元素,当写入遇到队列满时会等待队列有剩余空间,所以注释上说此操作可能会阻塞。根据如下图所示的调用栈,这样的阻塞实际上会阻塞整个Listener线程:

clip_image029

按上述流程,到现在为止,已经打开了服务器监听(Listener.acceptChannel),建立了同RPC客户端的Socket连接,接收了RPC客户端发送过来的远程过程调用数据,并将远程过程调用加入到了Server.callQueue队列中,现在应该是处理这些远程过程调用请求的时候了,这个处理就在Handler线程中。

原文地址:https://www.cnblogs.com/ahhuiyang/p/3858940.html