Hadoop源码分析14: IPC流程(9) SelectionKey





      publicServerListener(Serverserver) throwsIOException{



             // Create anew server socket and set to non blocking mode

             acceptChannel= ServerSocketChannel.open();



             // Bind theserver socket to the local host and port

             Server.bind(acceptChannel.socket(),address, backlogLength);


             // create aselector;

             selector =Selector.open();



             for(inti = 0;i server.readThreads;i++) {

                    Selector readSelector = Selector.open();

                    ServerListenerReader reader = newServerListenerReader(

                                 readSelector, this);

                    readers[i]= reader;





             this.setName("IPCServer listener on " +server.port);




2. SelectionKey.OP_READ


public classServerListenerextends Thread{


     voiddoAccept(SelectionKeykey) throwsIOException,OutOfMemoryError {

             ServerConnection c = null;

             ServerSocketChannel serverSocketChannel=(ServerSocketChannel)key.channel();

             SocketChannel channel;

             while((channel=serverSocketChannel.accept())!= null){



                    ServerListenerReader reader = readers[(currentReader+ 1) %readers.length];



                          SelectionKey readKey = channel.register(reader.readSelector,SelectionKey.OP_READ);

                          c = newServerConnection(readKey,channel,

                                        System.currentTimeMillis(), this.server);








                    } finally{







3. SelectionKey.OP_WRITE




     booleanerror= true;

     booleandone =false;      

     intnumElements =0;

     Call call = null;




         numElements = responseQueue.size();

         if(numElements== 0) {

           error = false;

           returntrue;             // no moredata for this channel.



         call = responseQueue.removeFirst();

         SocketChannelchannel = call.connection.channel;


         intnumBytes =channelWrite(channel, call.response);






           if(numElements== 1){   // last callfully processes.

             done = true;            // no moredata for this channel.

           } else{

             done = false;           // morecalls pending to be sent.



         } else{





             // set theserve time when the response has to be sent later






               channel.register(writeSelector, SelectionKey.OP_WRITE,call);

             } catch(ClosedChannelExceptione) {

               //Its ok.channel might be closed else where.

               done = true;

             } finally{





         error = false;             //everything went off well


     } finally{

       if(error&& call != null){ 

         done = true;              // error. nomore data for this channel.







