HDFS源码分析之DataXceiverServer

DataXceiverServer是Hadoop分布式文件系统HDFS的从节点--数据节点DataNode上的一个后台工作线程,它类似于一个小型的服务器,被用来接收数据读写请求,并为每个请求创建一个工作线程以进行请求的响应。那么,有以下几个问题:

        1、DataXceiverServer是什么?

        2、DataXceiverServer是如何初始化的?

        3、DataXceiverServer是如何工作的?

        带着这些问题,本文将带着你进入DataNode的DataXceiverServer世界。

        一、DataXceiverServer是什么?

        DataXceiverServer是数据节点DataNode上一个用于接收数据读写请求的后台工作线程,为每个数据读写请求创建一个单独的线程去处理。它的成员变量如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. // PeerServer是一个接口,实现了它的TcpPeerServer封装饿了一个ServerSocket,提供了Java Socket服务端的功能  
  2. private final PeerServer peerServer;  
  3.   
  4. // 该DataXceiverServer所属DataNode实例datanode  
  5. private final DataNode datanode;  
  6.   
  7. // Peer所在线程的映射集合peers  
  8. private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();  
  9.   
  10. // Peer与DataXceiver的映射集合peersXceiver  
  11. private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();  
  12.   
  13. // DataXceiverServer是否已关闭的标志位closed  
  14. private boolean closed = false;  
  15.   
  16. /** 
  17.  * Maximal number of concurrent xceivers per node. 
  18.  * Enforcing the limit is required in order to avoid data-node 
  19.  * running out of memory. 
  20.  *  
  21.  * 每个节点并行的最大DataXceivers数目。 
  22.  * 为了避免dataNode运行内存溢出,执行这个限制是必须的。 
  23.  * 定义是默认值为4096. 
  24.  */  
  25. int maxXceiverCount =  
  26.   DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;  
  27.   
  28. // 集群数据块平衡节流器balanceThrottler  
  29. final BlockBalanceThrottler balanceThrottler;  
  30.   
  31. /** 
  32.  * We need an estimate for block size to check if the disk partition has 
  33.  * enough space. Newer clients pass the expected block size to the DataNode. 
  34.  * For older clients we just use the server-side default block size. 
  35.  *  
  36.  * 我们需要估计块大小以检测磁盘分区是否有足够的空间。 
  37.  * 新客户端传递预期块大小给DataNode。 
  38.  * 对于旧客户端而言我们仅仅使用服务器端默认的块大小。 
  39.  */  
  40. final long estimateBlockSize;  

        其中,PeerServer类型的peerServer,实际上是DataXceiverServer实现功能最重要的一个类,在DataXceiverServer实例构造时,实际上传入的是实现了PeerServer接口的TcpPeerServer类,该类内部封装了一个ServerSocket,提供了Java Socket服务端的功能,用于监听来自客户端或其他DataNode的数据读写请求。

        DataXceiverServer内部还存在对于其载体DataNode的实例datanode,这样该线程就能随时获得DataNode状态、提供的一些列服务等;

        peers和peersXceiver是DataXceiverServer内部关于peer的两个数据结构,一个是Peer与其所在线程映射集合peers,另一个则是Peer与DataXceiver的映射集合peersXceiver,均是HashMap类型。Peer是什么呢?实际上就是对Socket的封装;

        closed为DataXceiverServer是否已关闭的标志位;

        maxXceiverCount为每个DataNode节点并行的最大DataXceivers数目,为了避免dataNode运行内存溢出,执行这个限制是必须的;

        balanceThrottler是DataXceiverServer内部一个关于集群中数据库平衡的节流器的实现,它实现了对于数据块移动时带宽、数量的控制。

        二、DataXceiverServer是如何初始化的?

        在数据节点DataNode进程启动的startDataNode()方法中,会调用initDataXceiver()方法,完成DataXceiverServer的初始化,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  private void initDataXceiver(Configuration conf) throws IOException {  
  2.    // find free port or use privileged port provided  
  3. // 找一个自由端口或使用已提供的特权端口  
  4.     
  5. // 构造TcpPeerServer实例tcpPeerServer,它实现了PeerServer接口,提供了ServerSocket的功能  
  6.    TcpPeerServer tcpPeerServer;  
  7.      
  8.    if (secureResources != null) {// 如果secureResources存在,根据secureResources创建tcpPeerServer  
  9.      tcpPeerServer = new TcpPeerServer(secureResources);  
  10.    } else {// 否则,根据配置信息创建tcpPeerServer  
  11.      tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,  
  12.          DataNode.getStreamingAddr(conf));  
  13.    }  
  14.      
  15.    // 设置数据接收缓冲区大小,默认为128KB  
  16.    tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);  
  17.      
  18.    // 获取Socket地址InetSocketAddress,赋值给DataNode成员变量streamingAddr  
  19.    streamingAddr = tcpPeerServer.getStreamingAddr();  
  20.    LOG.info("Opened streaming server at " + streamingAddr);  
  21.      
  22.    // 构造名字为dataXceiverServer的线程组threadGroup  
  23.    this.threadGroup = new ThreadGroup("dataXceiverServer");  
  24.      
  25.    // 构造DataXceiverServer实例xserver,传入tcpPeerServer  
  26.    xserver = new DataXceiverServer(tcpPeerServer, conf, this);  
  27.      
  28.    // 构造dataXceiverServer守护线程,并将xserver加入线程组threadGroup  
  29.    this.dataXceiverServer = new Daemon(threadGroup, xserver);  
  30.      
  31.    // 将线程组里的所有线程设置为设置为守护线程,方便虚拟机退出时自动销毁  
  32.    this.threadGroup.setDaemon(true); // auto destroy when empty  
  33.   
  34.    // 如果系统配置的参数dfs.client.read.shortcircuit为true(默认为false),  
  35.    // 或者配置的参数dfs.client.domain.socket.data.traffic为true(默认为false),  
  36.    //   
  37.    if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,  
  38.              DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||  
  39.        conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,  
  40.              DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {  
  41.      DomainPeerServer domainPeerServer =  
  42.                getDomainPeerServer(conf, streamingAddr.getPort());  
  43.      if (domainPeerServer != null) {  
  44.        this.localDataXceiverServer = new Daemon(threadGroup,  
  45.            new DataXceiverServer(domainPeerServer, conf, this));  
  46.        LOG.info("Listening on UNIX domain socket: " +  
  47.            domainPeerServer.getBindPath());  
  48.      }  
  49.    }  
  50.      
  51.    // 构造短路注册实例  
  52.    this.shortCircuitRegistry = new ShortCircuitRegistry(conf);  
  53.  }  

        整个初始化工作很简单:

        1、创建构造DataXceiverServer需要的TcpPeerServer实例tcpPeerServer,它内部封装了ServerSocket,是DataXceiverServer功能实现的最主要依托;

        2、从tcpPeerServer中获取Socket地址InetSocketAddress,赋值给DataNode成员变量streamingAddr

        3、然后构造DataXceiverServer实例xserver,传入tcpPeerServer;

        4、构造dataXceiverServer守护线程,并将xserver加入之前创建的线程组threadGroup;

        5、将线程组里的所有线程设置为设置为守护线程,方便虚拟机退出时自动销毁。

        下面,我们再看下DataXceiverServer的构造方法,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  DataXceiverServer(PeerServer peerServer, Configuration conf,  
  2.      DataNode datanode) {  
  3.      
  4. // 根据传入的peerServer设置同名成员变量  
  5.    this.peerServer = peerServer;  
  6.      
  7.    // 设置DataNode实例datanode  
  8.    this.datanode = datanode;  
  9.      
  10.    // 设置DataNode中DataXceiver的最大数目maxXceiverCount  
  11.    // 取参数dfs.datanode.max.transfer.threads,参数未配置的话,默认值为4096  
  12.    this.maxXceiverCount =   
  13.      conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,  
  14.                  DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT);  
  15.      
  16.    // 设置估计块大小estimateBlockSize  
  17.    // 取参数dfs.blocksize,参数未配置的话,默认值是128*1024*1024,即128M  
  18.    this.estimateBlockSize = conf.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,  
  19.        DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);  
  20.      
  21.    //set up parameter for cluster balancing  
  22.    // 设置集群平衡节流器  
  23.    // 带宽取参数dfs.datanode.balance.bandwidthPerSec,参数未配置默认为1024*1024  
  24.    // 最大线程数取参数dfs.datanode.balance.max.concurrent.moves,参数未配置默认为5  
  25.    this.balanceThrottler = new BlockBalanceThrottler(  
  26.        conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,  
  27.            DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT),  
  28.        conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,  
  29.            DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));  
  30.  }  

        在构造DataXceiverServer时,会根据传入的peerServer设置同名成员变量、设置DataNode实例datanode等,并初始化两个重要的指标,第一个是设置DataNode中DataXceiver的最大数目maxXceiverCount,它取参数dfs.datanode.max.transfer.threads,参数未配置的话,默认值为4096;第二个便是设置估计块大小estimateBlockSize,它取参数取参数dfs.blocksize,参数未配置的话,默认值是128*1024*1024,即128M,最后,设置集群平衡节流器,带宽取参数dfs.datanode.balance.bandwidthPerSec,参数未配置默认为1024*1024,最大线程数取参数dfs.datanode.balance.max.concurrent.moves,参数未配置默认为5。

        三、DataXceiverServer是如何工作的?

        既然是一个线程,那么它的工作主要就体现在run()方法内,下面,我们来看下它的run()方法:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. @Override  
  2. // 核心方法  
  3. public void run() {  
  4.   Peer peer = null;  
  5.     
  6.   while (datanode.shouldRun && !datanode.shutdownForUpgrade) {// 如果标志位shouldRun为true,且没有为升级而执行shutdown  
  7.   
  8.     try {  
  9.         
  10.     // 阻塞,直到接收到客户端或者其他DataNode的连接请求  
  11.       peer = peerServer.accept();  
  12.   
  13.       // Make sure the xceiver count is not exceeded  
  14.       // 确保DataXceiver数目没有超过最大限制  
  15.       /** 
  16.        * DataNode的getXceiverCount方法计算得到,返回线程组的活跃线程数目 
  17.        * threadGroup == null ? 0 : threadGroup.activeCount(); 
  18.        */  
  19.       int curXceiverCount = datanode.getXceiverCount();  
  20.       if (curXceiverCount > maxXceiverCount) {  
  21.         throw new IOException("Xceiver count " + curXceiverCount  
  22.             + " exceeds the limit of concurrent xcievers: "  
  23.             + maxXceiverCount);  
  24.       }  
  25.   
  26.       // 创建一个后台线程,DataXceiver,并加入到线程组datanode.threadGroup  
  27.       new Daemon(datanode.threadGroup,  
  28.           DataXceiver.create(peer, datanode, this))  
  29.           .start();  
  30.     } catch (SocketTimeoutException ignored) {  
  31.       // wake up to see if should continue to run  
  32.     // 等待唤醒看看是否能够继续运行  
  33.     } catch (AsynchronousCloseException ace) {// 异步的关闭异常  
  34.       // another thread closed our listener socket - that's expected during shutdown,  
  35.       // but not in other circumstances  
  36.         
  37.     // 正如我们所预料的,只有在关机的过程中,通过其他线程关闭我们的侦听套接字,其他情况下则不会发生  
  38.         
  39.       
  40.       if (datanode.shouldRun && !datanode.shutdownForUpgrade) {  
  41.         LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);  
  42.       }  
  43.     } catch (IOException ie) {  
  44.       IOUtils.cleanup(null, peer);  
  45.       LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);  
  46.     } catch (OutOfMemoryError ie) {  
  47.       IOUtils.cleanup(null, peer);  
  48.       // DataNode can run out of memory if there is too many transfers.  
  49.       // Log the event, Sleep for 30 seconds, other transfers may complete by  
  50.       // then.  
  51.         
  52.       // 数据节点可能由于存在太多的数据传输导致内存溢出,记录该事件,并等待30秒,其他的数据传输可能到时就完成了  
  53.       LOG.warn("DataNode is out of memory. Will retry in 30 seconds.", ie);  
  54.       try {  
  55.         Thread.sleep(30 * 1000);  
  56.       } catch (InterruptedException e) {  
  57.         // ignore  
  58.       }  
  59.     } catch (Throwable te) {  
  60.       LOG.error(datanode.getDisplayName()  
  61.           + ":DataXceiverServer: Exiting due to: ", te);  
  62.       datanode.shouldRun = false;  
  63.     }  
  64.   }  
  65.   
  66.   // Close the server to stop reception of more requests.  
  67.   // 关闭服务器停止接收更多请求  
  68.   try {  
  69.     peerServer.close();  
  70.     closed = true;  
  71.   } catch (IOException ie) {  
  72.     LOG.warn(datanode.getDisplayName()  
  73.         + " :DataXceiverServer: close exception", ie);  
  74.   }  
  75.   
  76.   // if in restart prep stage, notify peers before closing them.  
  77.   // 如果在重新启动前准备阶段,在关闭前通知peers  
  78.   if (datanode.shutdownForUpgrade) {  
  79.     restartNotifyPeers();  
  80.     // Each thread needs some time to process it. If a thread needs  
  81.     // to send an OOB message to the client, but blocked on network for  
  82.     // long time, we need to force its termination.  
  83.     LOG.info("Shutting down DataXceiverServer before restart");  
  84.     // Allow roughly up to 2 seconds.  
  85.     for (int i = 0; getNumPeers() > 0 && i < 10; i++) {  
  86.       try {  
  87.         Thread.sleep(200);  
  88.       } catch (InterruptedException e) {  
  89.         // ignore  
  90.       }  
  91.     }  
  92.   }  
  93.   // Close all peers.  
  94.   // 关闭所有的peers  
  95.   closeAllPeers();  
  96. }  

        通过run()方法我们得知,当datanode正常运转的时候,DataXceiverServer线程主要负责在一个while循环中利用TcpPeerServer(也就是ServerSocket)的accept()方法阻塞,直到接收到客户端或者其他DataNode的连接请求,然后:

        1、获得peer,即Socket的封装;

        2、判断当前DataNode上DataXceiver线程数量是否超过阈值,如果超过的话,直接抛出IOException,利用IOUtils的cleanup()方法关闭peer后继续循环,否则继续3;

        3、创建一个后台线程DataXceiver,并将其加入到datanode的线程组threadGroup中,并启动该线程,响应数据读写请求;

        上面主流程还是非常简单的。我们先看下accept()方法在TcpPeerServer中的实现:

[java] view plain copy
 
  1. @Override  
  2. public Peer accept() throws IOException, SocketTimeoutException {  
  3.   Peer peer = peerFromSocket(serverSocket.accept());  
  4.   return peer;  
  5. }  

        它是通过调用peerFromSocket()方法来实现的,方法的入参是一个Socket,通过ServerSocket类型的serverSocket的accept()方法获得。peerFromSocket()方法代码如下:

[java] view plain copy
 
  1. public static Peer peerFromSocket(Socket socket)  
  2.     throws IOException {  
  3.   Peer peer = null;  
  4.   boolean success = false;  
  5.   try {  
  6.     // TCP_NODELAY is crucial here because of bad interactions between  
  7.     // Nagle's Algorithm and Delayed ACKs. With connection keepalive  
  8.     // between the client and DN, the conversation looks like:  
  9.     //   1. Client -> DN: Read block X  
  10.     //   2. DN -> Client: data for block X  
  11.     //   3. Client -> DN: Status OK (successful read)  
  12.     //   4. Client -> DN: Read block Y  
  13.     // The fact that step #3 and #4 are both in the client->DN direction  
  14.     // triggers Nagling. If the DN is using delayed ACKs, this results  
  15.     // in a delay of 40ms or more.  
  16.     //  
  17.     // TCP_NODELAY disables nagling and thus avoids this performance  
  18.     // disaster.  
  19.     
  20.     // 设置TCP无延迟  
  21.     socket.setTcpNoDelay(true);  
  22.       
  23.     // 获得SocketChannel  
  24.     SocketChannel channel = socket.getChannel();  
  25.       
  26.     // 利用socket创建peer,如果通道channel为null,则创建基本的BasicInetPeer,否则创建NioInetPeer  
  27.     if (channel == null) {  
  28.       peer = new BasicInetPeer(socket);  
  29.     } else {  
  30.       peer = new NioInetPeer(socket);  
  31.     }  
  32.       
  33.     // 标志位success设置为true  
  34.     success = true;  
  35.     return peer;  
  36.   } finally {  
  37.     if (!success) {// 如果创建不成功,peer不为空的话,关闭之  
  38.       if (peer != null) peer.close();  
  39.         
  40.       // 关闭socket  
  41.       socket.close();  
  42.     }  
  43.   }  
  44. }  

        其中,Peer是对Socket、输入输出流等的封装,有BasicInetPeer和NioInetPeer两种。BasicInetPeer代表了基本的Peer,封装了Socket、OutputStream、InputStream三者;而NioInetPeer代表了一种我们可通过使用非阻塞IO进行Socket通讯的一种Peer,封装了Socket、SocketInputStream、SocketOutputStream三者,具体代码不再列举,读者可自行查阅。

        紧接着,我们说下对于异常的处理。整个过程中一共出现了5种异常,包括对它们的处理如下所示:

        1、SocketTimeoutException:Socket连接超时异常,忽略直接进入下一个循环即可,继续阻塞侦听请求;

        2、AsynchronousCloseException:异步关闭异常,这里我们需要通过判断DataNode的状态来决定是否继续进行循环,继续阻塞侦听请求;

        3、IOException:此时,对应于上述主流程的第2步,我们需要关闭Socket后进入下一个循环,继续阻塞侦听请求;

        4、OutOfMemoryError:内存溢出错误,这种情况下数据节点可能由于存在太多的数据传输导致内存溢出,记录该事件,并等待30秒,其他的数据传输可能到时就完成了。我们需要做的就是,首先需要利用IOUtils的cleanup()方法关闭peer,记录警告日志信息,然后线程休眠30秒,等待DataNode其他数据读写服务完成后,进入下一个循环,继续阻塞侦听请求;

        5、Throwable:无话可说,DataNode就是为提高数据存储、读写服务而生的,既然出现了Throwable,那么DataNode也应该停止了,记录error日志信息,设置datanode的shouldRun为false,退出循环。

        从上面的异常可以看出,DataNode中DataXceiverServer的设计是很严谨与合理的,DataNode能够提供大吞吐量的数据读写服务与此不无关系。

        当Throwable发生,退出循环后,我们就需要做一些列的关闭操作,关闭peerServer,设置DataXceiverServer标志位closed为true,关闭所有的peers等。其中关闭所有的peers的closeAllPeers()方法如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  // Close all peers and clear the map.  
  2.  synchronized void closeAllPeers() {  
  3.      
  4. // 记录info日志信息  
  5. LOG.info("Closing all peers.");  
  6.      
  7.    // 循环关闭所有的peer  
  8.    for (Peer p : peers.keySet()) {  
  9.      IOUtils.cleanup(LOG, p);  
  10.    }  
  11.      
  12.    // 清空peer数据集合  
  13.    peers.clear();  
  14.    peersXceiver.clear();  
  15.  }  

        代码非常简单,不再赘述。

        并且,如果在重新启动前准备阶段,在关闭peers之前,需要先通知它们,通知的方式就是通过调用restartNotifyPeers()方法,获取peers的每个peer所在线程,通过interrupt()方法打断它们,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. // Notify all peers of the shutdown and restart.  
  2. // datanode.shouldRun should still be true and datanode.restarting should  
  3. // be set true before calling this method.  
  4. synchronized void restartNotifyPeers() {  
  5.   assert (datanode.shouldRun == true && datanode.shutdownForUpgrade);  
  6.   for (Peer p : peers.keySet()) {  
  7.     // interrupt each and every DataXceiver thread.  
  8.     // 中断每个DataXceiver线程  
  9.     peers.get(p).interrupt();  
  10.   }  
  11. }  

        并且,每个线程需要一些时间去完成自己。如果一个线程需要发送OOB至客户端,但是在网络上被阻塞的了一段时间,我们需要强迫使其停止。此时,我们需要大约2秒的时间等待它们的完成。

        另外,这里我们需要说下集群数据块平衡节流器balanceThrottler的实现,其成员变量、构造方法代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. /** A manager to make sure that cluster balancing does not 
  2.  * take too much resources. 
  3.  *  
  4.  * It limits the number of block moves for balancing and 
  5.  * the total amount of bandwidth they can use. 
  6.  *  
  7.  * 确保集群平衡不占用太多资源的一种手段或管理者。 
  8.  * 它限制了为集群平衡所做的块移动的数量及它们所占用的总宽带,是一种节流器的概念。 
  9.  */  
  10. static class BlockBalanceThrottler extends DataTransferThrottler {  
  11.  private int numThreads;// 表示当前移动数据块的线程数numThreads  
  12.  private int maxThreads;// 表示移动数据块的最大线程数maxThreads  
  13.    
  14.  /**Constructor 
  15.   * 构造方法 
  16.   * @param bandwidth Total amount of bandwidth can be used for balancing  
  17.   */  
  18.  private BlockBalanceThrottler(long bandwidth, int maxThreads) {  
  19.    super(bandwidth);  
  20.    // 设置移动数据块的最大线程数maxThreads  
  21.    this.maxThreads = maxThreads;  
  22.    LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");  
  23.    LOG.info("Number threads for balancing is "+ maxThreads);  
  24.  }  
  25. }  

        可以看到,它内部有三个非常重要的指标,表示当前移动数据块的线程数的numThreads, 和表示移动数据块的最大线程数maxThreads,还有数据传输的带宽bandwidth。在其构造方法内,会调用父类的构造方法设置带宽bandwidth,并在子类中设置移动数据块的最大线程数maxThreads。那么它是如何实现集群内数据库移动节流控制的呢?答案就在方法acquire()中,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. /** Check if the block move can start.  
  2.  * 检测block移动是否可以开始 
  3.  *  
  4.  * Return true if the thread quota is not exceeded and  
  5.  * the counter is incremented; False otherwise. 
  6.  */  
  7. synchronized boolean acquire() {  
  8.    
  9. / 当前线程数numThreads大于等于最大线程数maxThreads时,返回false,block不可以移动  
  10.   if (numThreads >= maxThreads) {  
  11.     return false;  
  12.   }  
  13.     
  14.   // 否则,当前线程数numThreads累加,并返回true,block可以移动  
  15.   numThreads++;  
  16.   return true;  
  17. }  

        在移动数据块block之前,会调用acquire()方法,确认一个数据块是否可以移动。实际上是当前线程数numThreads大于等于最大线程数maxThreads时,返回false,block不可以移动;否则,当前线程数numThreads累加,并返回true,block可以移动。就是这么简单,而当数据块移动完毕后,则调用release()方法,标志移动已完成,线程计数器减一,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. /** Mark that the move is completed. The thread counter is decremented. */  
  2. // 标志移动已完成,线程计数器减一  
  3. synchronized void release() {  
  4. / 当前线程数numThreads减1  
  5.   numThreads--;  
  6. }  

        另外,DataXceiverServer中还提供了一些功能性方法,比如:

        1、实现了杀死DataXceiverServer线程的kill()方法,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  void kill() {  
  2.     
  3. // DataXceiverServer线程被kill时,需要确定datanode的标志位shouldRun为false,或者标志位shutdownForUpgradetrue  
  4. // 也就意味着,当datanode不应该继续运行,或者为了升级而关闭时,DataXceiverServer线程才可以被kill  
  5.    assert (datanode.shouldRun == false || datanode.shutdownForUpgrade) :  
  6.      "shoudRun should be set to false or restarting should be true"  
  7.      + " before killing";  
  8.      
  9.    try {  
  10.      // 关闭peerServer,即关闭ServerSocket  
  11.      this.peerServer.close();  
  12.        
  13.      // 设置标志位closed为true  
  14.      this.closed = true;  
  15.    } catch (IOException ie) {  
  16.      LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie);  
  17.    }  
  18.  }  

        2、添加一个Peer的addPeer()方法,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)  
  2.      throws IOException {  
  3.      
  4. // 首先判断DataXceiverServer线程的标志位closed,为true时,说明服务线程已被关闭,不能再提供Socket通讯服务  
  5. if (closed) {  
  6.      throw new IOException("Server closed.");  
  7.    }  
  8.   
  9. // 将peer与其所在线程t的映射关系加入到peers中  
  10.    peers.put(peer, t);  
  11.      
  12.    // 将peer与其所属DataXceiver xceiver映射关系加入到peersXceiver中  
  13.    peersXceiver.put(peer, xceiver);  
  14.  }  

        3、关闭一个Peer的closePeer()方法,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  // 关闭Peer  
  2.  synchronized void closePeer(Peer peer) {  
  3.      
  4. // 从数据结构peers、peersXceiver移除peer对应记录  
  5. peers.remove(peer);  
  6.    peersXceiver.remove(peer);  
  7.      
  8.    // 利用IOUtils的cleanup关闭peer,即关闭socket  
  9.    IOUtils.cleanup(null, peer);  
  10.  }  

        4、关闭所有Peer的closeAllPeers()方法,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  // Close all peers and clear the map.  
  2.  synchronized void closeAllPeers() {  
  3.      
  4. // 记录info日志信息  
  5. LOG.info("Closing all peers.");  
  6.      
  7.    // 利用IOUtils的cleanup()方法循环关闭所有的peer,即关闭socket  
  8.    for (Peer p : peers.keySet()) {  
  9.      IOUtils.cleanup(LOG, p);  
  10.    }  
  11.      
  12.    // 清空peer数据集合peers、peersXceiver  
  13.    peers.clear();  
  14.    peersXceiver.clear();  
  15.  }  

        上述代码非常简单,并且注释详细,读者可自行阅读与分析,这里不再做详细介绍。

        部分未叙述细节放至DataXceiver的分析文章中去讲,敬请留意!

        总结

         DataXceiverServer是数据节点DataNode上一个用于接收数据读写请求的后台工作线程,为每个数据读写请求创建一个单独的线程去处理。它提供了一请求一线程的模式,并对线程数目做了控制,对接收数据读写请求时发生的各种异常做了很好的容错处理,特别是针对内存溢出异常,允许等待短暂时间再继续提供服务,避免内存使用高峰期等等。而且,线程组与后台线程的应用,也大大简化可这些线程的管理工作;对数据读写请求处理线程的数目,集群内数据块移动线程数目都做了严格控制,避免资源无节制耗费等。这些设计很好的支撑了HDFS大吞吐量数据的性能要求,可以说,是一个很好的设计方案,值得我们在其他类似需求的系统中借鉴。

原文地址:https://www.cnblogs.com/jirimutu01/p/5556183.html