Java-NIO-Selector

扩展阅读:

Java NIO类库Selector机制解析(上)

Java NIO类库Selector机制解析(下)

 Java NIO的选择器

三个重要的类:

1,Selector 选择器,完成主要的选择功能。select(), 并保存有注册到他上面的通道集合。 
2,SelectableChannel 可被注册到Selector上的通道。 
3,SelectionKey 描述一个Selector和SelectableChannel的关系。并保存有通道所关心的操作。 

通用的流程:
1、创建选择器
2、注册通道
3、选择就绪通道,
4、处理已就绪通道数据

package socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer2 {

    private void startServer() throws IOException {
        Selector selector = Selector.open();
        
        {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ServerSocket ss = ssc.socket();
            InetSocketAddress address = new InetSocketAddress(9000);
            ss.bind(address);
            
            System.out.println("ssc 0 : " + ssc);
            System.out.println("ss 0 : " + ss);
            
            SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("acceptKey: " + acceptKey);
            printKeyInfo(acceptKey);
            System.out.println("Going to listen on 9000");
        }
        
        while (true) {
            System.out.println("===================================
start select...");
            int num = selector.select();
            System.out.println("NIOServer: Number of keys after select operation: " + num);
            
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectionKeys.iterator();
            
            while (it.hasNext()) {
                SelectionKey key = it.next();
                System.out.println("key: " + key);
                printKeyInfo(key);
                
                it.remove();
                
                
                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                    System.out.println("select ACCEPT");
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    
                    System.out.println("ssc 1 : " + ssc);
                    System.out.println("sc 1 : " + sc);
                    
                    SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
                    System.out.println("new key:" + newKey);
                    printKeyInfo(newKey);
                }
                else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//                    System.out.println("select READ");
//                    System.out.print("before cancel:");printKeyInfo(key);
//                    key.cancel();    //这是取消注册,取消之后就select不到了
//                    System.out.println("after cancel:");printKeyInfo(key);
                    SocketChannel sc = (SocketChannel) key.channel();
                    System.out.println("sc 2 : " + sc);
                    
                    //echo data
                    int nbytes = 0;
                    ByteBuffer echoBuffer = ByteBuffer.allocate(1024);
            //必须读取channel中的数据,否则selector中会一直有channel数据到达,不停地在这个else if里面执行
                    while (true) {
                        echoBuffer.clear();
                        int r = sc.read(echoBuffer);
                        if (r <= 0) break;
                        echoBuffer.flip();
                        sc.write(echoBuffer);
                        nbytes += r;
                    }
                    System.out.println("echoed " + nbytes + " from " + sc);
                }// if ... else if
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }//while
        }//while
    }
    
    private static void printKeyInfo(SelectionKey sk) {
        String s = new String();

        s = "Att: " + (sk.attachment() == null ? "no" : "yes");
        s += ", Read: " + sk.isReadable();
        s += ", Acpt: " + sk.isAcceptable();
        s += ", Cnct: " + sk.isConnectable();
        s += ", Wrt: " + sk.isWritable();
        s += ", Valid: " + sk.isValid();
        s += ", interestOps: " + sk.interestOps();
        s += ", readyOps: " + sk.readyOps();
        System.out.println(s);
    }
    
    public static void main(String[] args) {
        try {
            new NIOServer2().startServer();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

从代码看,我们做完select以后,就开始遍历selectedKey,找到符合要求的key,进行读数据操作。这里还要注意的是,使用完key以后,需要从selectedKey集合中删除。 

一、SelectionKey:表示channel和Selector的注册等关系

SelectionKey 包含了两个集合:

1、 注册的感兴趣的操作集合

2、已经准备好的集合(就绪集合)

第一个集合基本上是注册就确定的,或者通过interestOps(int)来改变。select是不会改变interest集合的。但是select改变的是 ready集合。也就是准备好的感兴趣的操作的集合,这样说,也说明,ready集合实际上是interest集合的子集。 

SelectionKey的cancel操作:

对于SelectionKey, 还可以执行cancel操作,一个被cancel掉的SelectionKey,实际上只是被放到了Selector的cancel键集合里,键马上失效,但是通道依然是注册状态,要等到下一个select时才真正取消注册。 

二、Selector:阻塞select()就绪的channel,返回channel对应的SelectionKey

现在,我们再来看看选择器做了什么。选择器Selector是就绪选择的核心,它包含了注册到它上面的通道与操作关系的Key

(一)Selector维护了三个集合。 

1、已经注册的键集合 调用, keys() 
2、已经选择的键集合 调用, selectedKeys() 
3、已经取消的键集合 私有。 

(二)select调用流程

选择器虽然封装了select,poll等底层的系统调用,但是她有自己的一套来管理这些键。 

每当select被调用时,她做如下检查: 

1、检查已经取消的键的集合。如果非空,从其他两个集合中移除已经取消的键,注销相关通道,清空已经取消的键的集合。 
2、已注册的键的集合中的键的interest集合被检查。例如有新的interest的操作注册。但是这一步不会影响后面的操作。这是延时到下一次select调用时才会影响的。 
就绪条件确认后,底层系统进行查询。依赖于select方法的参数,如果没有通道准备好,根select带的参数超时设置,可能会阻塞线程。 
系统调用完成后,可以对操作系统指示的已经准备好的interest集合中的一种操作的通道,执行以下操作: 
  a: 如果通道的键还没有在已经选择的键的集合(selectedKeys)中,那么键的ready集合将被清空。然后表示操作系统发现的当前通道已经准备好的操作的比特掩码将被设置。 
  b: 否则,一旦通道的键被放入已经选择的键的集合中时,ready集合不会被清除,而是累积。这就是说,如果之前的状态是ready的操作,本次已经不是ready了,但是他的bit位依然表示是ready,不会被清除。 
3、步骤2可能会有很长一段时间的休眠。所以在步骤2完成以后,步骤1继续执行以确保被取消的键正确处理。 
4、返回值,select的返回值说明的是从上一次调用到本次调用,就绪选择的个数。如果上一次就已经是就绪的,那么本次不统计。这是是为何返回为0时,我们continue的原因。 

这里使用的延迟注销方法,正是为了解决注销键的问题。如果线程在取消键的同时进行通道注销,那么很可能阻塞并与正在进行的选择操作发生冲突。 

(三)有3种select方法可以选择:

1、select()   :会阻塞线程直到又一个通道就绪。
2、select(long timeout)   :会在特定时间内阻塞,或者至少有一个通道就绪。
3、selectNow()  :如果没有发现就绪,就直接返回。

 

(四)中断select()方法

select()方法会阻塞住,等待有channel就绪才返回。有时候,希望停止阻塞,中断select方法,让线程继续。

有三种方法。 
1, wakeup()这是一种优雅的方法,同时也是延时的。如果当前没有正进行的选择操作,也就是要等到下一个select才起作用。 
2, close()选择器的close被调用,则所有在选择操作中阻塞的线程被唤醒,相关通道被注销,键也被取消。 
3, interrupt() 实际上interrupt并不会中断线程。而是设置线程中断标志。 
然后依然是调用wakeup()。这是因为 Selector 捕获了interruptedException,然后在异常处理中调用了 wakeup() 

(五)selector、channel、SelectionKey处理细节

这里需要记住的是,ready集合中的比特位,是累积的。根据步骤2,如果一个键是在选择集合中,那么这个键的ready集合是不会被清除的。而如果这个键不在选择集合中,那么就要首先清空这个键的ready集合,然后把就绪信息更新到这个ready集合上,最后,就是把这个键加入到已选择的集合中。 

也就是说,如果一个selectionKey已经在Selector的seletedKeys()返回的集合里面,那么这个selectionKey的ready集合是不会被清除的。否则,先清空这个selectionKey的ready集合,然后把Selector的就绪几个更新到这个ready集合上。最后,把这个selectionKey添加到selector的selectedKeys()要返回的集合里面。

例如:一个channel中的数据没读完(或有数据而不处理),那么,这个channel一直处于就绪状态中,所以每次selector的selectedKeys()方法总能返回与这个channel关联的Selectionkey,然后就会不停地循环select(),除非读完channel中的数据,或者把这个SelectionKey给cancel掉。

Reference:

Java-NIO-Selector

 Java NIO系列教程(六) Selector

原文地址:https://www.cnblogs.com/549294286/p/3758199.html