Hadoop源码分析11: IPC流程(6)volatile

1.Server 的 running

public abstract class Server {

   volatile boolean running =true; // true while server runs

    publicsynchronized void join() throwsInterruptedException {
       while (running){
       wait();
     }
  }

    public synchronized voidstop() {
    running = false;
       if (handlers != null) {
           for (int i= 0; i < handlerCount; i++) {
              if (handlers[i] != null){
             handlers[i].interrupt();
           }
       }
    }
   listener.interrupt();
    listener.doStop();
   responder.interrupt();
    notifyAll();
  } 
}

public class ServerHandler extends Thread{
 
@Override
public void run() {
   Server.SERVER.set(server);
    ByteArrayOutputStreambuf = new ByteArrayOutputStream(
   Server.INITIAL_RESP_BUF_SIZE);
    while(server.running) {
    try {
       final ServerCall call = server.callQueue.take(); 
       ............................
    } catch(InterruptedException e) {
       if (server.running) { // unexpected -- logit
          .............
       }
    } catch (Exception e){
   }
   }
}
}

public class ServerListener extends Thread{

    public void run(){
   Server.SERVER.set(server);
    while(server.running) {
       SelectionKey key = null;
t       try {
     。。。。。。
             } 
         }
     }
}


public class ServerListenerReader implements Runnable{

 public void run() {
      synchronized (this) {
       while (serverListener.server.running){
         SelectionKey key =null;
    。。。。。。。。。。。。。。

    
      }
 。。。。。
      }catch (InterruptedException e) {
           if(serverListener.server.running) {                  // unexpected -- log it
           }
         } catch (IOException ex){
          }
   }
}

public class ServerResponder extendsThread{

  public void run() {
    Server.SERVER.set(server);
     longlastPurgeTime = 0;  
      while (server.running) {
       try {
         waitPending();    // If a channel is beingregistered, wait.
        writeSelector.select(PURGE_INTERVAL);
         Iterator<SelectionKey> iter =writeSelector.selectedKeys().iterator();
      }     
    }
  } 
}

2.ServerConnection 的 rpcCount

public class ServerConnection {
       volatile intrpcCount = 0;

   private voidprocessData(byte[] buf) throws IOException,
    InterruptedException{
       DataInputStream dis = new DataInputStream(newByteArrayInputStream(buf));
       int id = dis.readInt(); // try to read anid

       Writable param =ReflectionUtils.newInstance(server.paramClass,
       server.conf);// read param
       param.readFields(dis);

       ServerCall call = new ServerCall(id, param,this);
       server.callQueue.put(call); // queue the call;maybe blocked here
       rpcCount++; // Increment the rpccount
    }
   }

public class ServerListener extends Thread{

    private voidcleanupConnections(boolean force) {
       ...............
         if ( c.rpcCount == 0&& currentTime - c.lastContact > server.maxIdleTime ){
             server.closeConnection(c);
         numNuked++;
         end--;
           c =null; 
      ...............
    }
}

public class ServerResponder extends Thread{

    private booleanprocessResponse(LinkedList<ServerCall> responseQueue,
                               booleaninHandler) throws IOException {
         。。。。。
         if(!call.response.hasRemaining()) {
          call.connection.rpcCount--;
           if(numElements == 1) {    // lastcall fully processes.
            done = true;           // no more data for thischannel.
           } else{
            done = false;          // more calls pending to besent.
           }
          
         } 
         。。。。。
    }
}

3. ServerListenerReader的 adding

public class ServerListenerReader implements Runnable{

    private volatileboolean adding = false;

    public void run(){
      synchronized (this) {
       while (serverListener.server.running) {
         SelectionKey key =null;
         try {
          readSelector.select();
           while(adding) {
            this.wait(1000);
           }  
        .......................     
      }
   }    

    public voidstartAdd() {
     adding = true;
     readSelector.wakeup();
    }


    public synchronized voidfinishAdd() {
     adding = false;
     this.notify();       
    }
}

public class ServerListener extends Thread{

    voiddoAccept(SelectionKey key) throws IOException,OutOfMemoryError {
            ServerConnection c = null;
            ServerSocketChannelserverSocketChannel = (ServerSocketChannel) key.channel();
            SocketChannel channel;
            while ((channel =serverSocketChannel.accept()) != null) {
                channel.configureBlocking(false);
                channel.socket().setTcpNoDelay(this.server.tcpNoDelay);
                ServerListenerReader reader =readers[(currentReader + 1) % readers.length];
                try {
                 reader.startAdd();
                  SelectionKey readKey =reader.registerChannel(channel);
                  c = newServerConnection(readKey,channel,System.currentTimeMillis(),this.server);
                  readKey.attach(c);
                  synchronized(this.server.connectionList) {
                     this.server.connectionList.add(this.server.numConnections,c);
                      this.server.numConnections++;
                  }
                } finally{
               reader.finishAdd();
                }
        }
    }
}



原文地址:https://www.cnblogs.com/leeeee/p/7276525.html