NIO 包及工作原理收藏

NIO 包及工作原理


针对传统I/O 工作模式的不足,NIO 工具包提出了基于Buffer(缓冲区)、Channel(通
道)、Selector(选择器)的新模式;Selector(选择器)、可选择的Channel(通道)和
SelectionKey(选择键)配合起来使用,可以实现并发的非阻塞型I/O 能力。

NIO 工具包的成员


 Buffer(缓冲器)


Buffer 类是一个抽象类,它有7 个子类分别对应于七种基本的数据类型:ByteBuffer、
CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer 和ShortBuffer。每一个Buffer
对象相当于一个数据容器,可以把它看作内存中的一个大的数组,用来存储和提取所有基本
类型(boolean 型除外)的数据。Buffer 类的核心是一块内存区,可以直接对其执行与内存有关
的操作,利用操作系统特性和能力提高和改善Java 传统I/O 的性能。


 Channel(通道)


Channel 被认为是NIO 工具包的一大创新点,是(Buffer)缓冲器和I/O 服务之间的通道,
具有双向性,既可以读入也可以写出,可以更高效的传递数据。我们这里主要讨论
ServerSocketChannel 和SocketChannel,它们都继承了SelectableChannel,是可选择的通道,
分别可以工作在同步和异步两种方式下(这里的可选择不是指可以选择两种工作方式,而是
指可以有选择的注册自己感兴趣的事件)。当通道工作在同步方式时,它的功能和编程方法
与传统的ServerSocket、Socket 对象相似;当通道工作在异步工作方式时,进行输入输出处
理不必等到输入输出完毕才返回,并且可以将其感兴趣的(如:接受操作、连接操作、读出
操作、写入操作)事件注册到Selector 对象上,与Selector 对象协同工作可以更有效率的支

持和管理并发的网络套接字连接。


Selector(选择器)和SelectionKey(选择键)


各类 Buffer 是数据的容器对象;各类Channel 实现在各类Buffer 与各类I/O 服务间传输
数据。Selector 是实现并发型非阻塞I/O 的核心,各种可选择的通道将其感兴趣的事件注册
到Selector 对象上,Selector 在一个循环中不断轮循监视这各些注册在其上的Socket 通道。
SelectionKey 类则封装了SelectableChannel 对象在Selector 中的注册信息。当Selector 监测
到在某个注册的SelectableChannel 上发生了感兴趣的事件时,自动激活产生一个SelectionKey
对象,在这个对象中记录了哪一个SelectableChannel 上发生了哪种事件,通过对被激活的
SelectionKey 的分析,外界可以知道每个SelectableChannel 发生的具体事件类型,进行相应的

处理。

NIO 工作原理


通过上面的讨论,我们可以看出在并发型服务器程序中使用NIO,实际上是通过网络事
件驱动模型实现的。我们应用Select 机制,不用为每一个客户端连接新启线程处理,而是将
其注册到特定的Selector 对象上,这就可以在单线程中利用Selector 对象管理大量并发的网
络连接,更好的利用了系统资源;采用非阻塞I/O 的通信方式,不要求阻塞等待I/O 操作完
成即可返回,从而减少了管理I/O 连接导致的系统开销,大幅度提高了系统性能。

当有读或写等任何注册的事件发生时,可以从Selector 中获得相应的
SelectionKey , 从SelectionKey 中可以找到发生的事件和该事件所发生的具体的
SelectableChannel,以获得客户端发送过来的数据。由于在非阻塞网络I/O 中采用了事件触
发机制,处理程序可以得到系统的主动通知,从而可以实现底层网络I/O 无阻塞、流畅地读
写,而不像在原来的阻塞模式下处理程序需要不断循环等待。使用NIO,可以编写出性能更
好、更易扩展的并发型服务器程序。

服务器端程序:

public class HelloWorldServer {    

  

    static int BLOCK = 1024;    

    static String name = "";    

    protected Selector selector;    

    protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);    

    protected CharsetDecoder decoder;    

    static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();    

  

    public HelloWorldServer(int port) throws IOException {    

        selector = this.getSelector(port);    

        Charset charset = Charset.forName("GB2312");    

        decoder = charset.newDecoder();    

    }    

  

    // 获取Selector    

    protected Selector getSelector(int port) throws IOException {    

        ServerSocketChannel server = ServerSocketChannel.open();    

        Selector sel = Selector.open();    

        server.socket().bind(new InetSocketAddress(port));    

        server.configureBlocking(false);    

        server.register(sel, SelectionKey.OP_ACCEPT);    

        return sel;    

    }    

  

    // 监听端口    

    public void listen() {    

        try {    

            for (;;) {    

                selector.select();    

                Iterator iter = selector.selectedKeys().iterator();    

                while (iter.hasNext()) {    

                    SelectionKey key = (SelectionKey) iter.next();    

                    iter.remove();    

                    process(key);    

                }    

            }    

        } catch (IOException e) {    

            e.printStackTrace();    

        }    

    }    

  

    // 处理事件    

    protected void process(SelectionKey key) throws IOException {    

        if (key.isAcceptable()) { // 接收请求    

            ServerSocketChannel server = (ServerSocketChannel) key.channel();    

            SocketChannel channel = server.accept();    

            //设置非阻塞模式    

            channel.configureBlocking(false);    

            channel.register(selector, SelectionKey.OP_READ);    

        } else if (key.isReadable()) { // 读信息    

            SocketChannel channel = (SocketChannel) key.channel();    

            int count = channel.read(clientBuffer);    

            if (count > 0) {    

                clientBuffer.flip();    

                CharBuffer charBuffer = decoder.decode(clientBuffer);    

                name = charBuffer.toString();    

                // System.out.println(name);    

                SelectionKey sKey = channel.register(selector,    

                        SelectionKey.OP_WRITE);    

                sKey.attach(name);    

            } else {    

                channel.close();    

            }    

  

            clientBuffer.clear();    

        } else if (key.isWritable()) { // 写事件    

            SocketChannel channel = (SocketChannel) key.channel();    

            String name = (String) key.attachment();    

                

            ByteBuffer block = encoder.encode(CharBuffer    

                    .wrap("Hello !" + name));    

                

  

            channel.write(block);    

  

            //channel.close();    

  

        }    

    }    

  

    public static void main(String[] args) {    

        int port = 8888;    

        try {    

            HelloWorldServer server = new HelloWorldServer(port);    

            System.out.println("listening on " + port);    

                

            server.listen();    

                

        } catch (IOException e) {    

            e.printStackTrace();    

        }    

    }    

}   

客户端程序:

public class HelloWorldClient {    

  

    static int SIZE = 10;    

    static InetSocketAddress ip = new InetSocketAddress("localhost", 8888);    

    static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();    

  

    static class Message implements Runnable {    

        protected String name;    

        String msg = "";    

  

        public Message(String index) {    

            this.name = index;    

        }    

  

        public void run() {    

            try {    

                long start = System.currentTimeMillis();    

                //打开Socket通道    

                SocketChannel client = SocketChannel.open();    

                //设置为非阻塞模式    

                client.configureBlocking(false);    

                //打开选择器    

                Selector selector = Selector.open();    

                //注册连接服务端socket动作    

                client.register(selector, SelectionKey.OP_CONNECT);    

                //连接    

                client.connect(ip);    

                //分配内存    

                ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);    

                int total = 0;    

  

                _FOR: for (;;) {    

                    selector.select();    

                    Iterator iter = selector.selectedKeys().iterator();    

  

                    while (iter.hasNext()) {    

                        SelectionKey key = (SelectionKey) iter.next();    

                        iter.remove();    

                        if (key.isConnectable()) {    

                            SocketChannel channel = (SocketChannel) key    

                                    .channel();    

                            if (channel.isConnectionPending())    

                                channel.finishConnect();    

                            channel    

                                    .write(encoder    

                                            .encode(CharBuffer.wrap(name)));    

  

                            channel.register(selector, SelectionKey.OP_READ);    

                        } else if (key.isReadable()) {    

                            SocketChannel channel = (SocketChannel) key    

                                    .channel();    

                            int count = channel.read(buffer);    

                            if (count > 0) {    

                                total += count;    

                                buffer.flip();    

  

                                while (buffer.remaining() > 0) {    

                                    byte b = buffer.get();    

                                    msg += (char) b;    

                                        

                                }    

  

                                buffer.clear();    

                            } else {    

                                client.close();    

                                break _FOR;    

                            }    

                        }    

                    }    

                }    

                double last = (System.currentTimeMillis() - start) * 1.0 / 1000;    

                System.out.println(msg + "used time :" + last + "s.");    

                msg = "";    

            } catch (IOException e) {    

                e.printStackTrace();    

            }    

        }    

    }    

  

    public static void main(String[] args) throws IOException {    

        

        String names[] = new String[SIZE];    

  

        for (int index = 0; index < SIZE; index++) {    

            names[index] = "jeff[" + index + "]";    

            new Thread(new Message(names[index])).start();    

        }    

        

    }    

}   

原文地址:https://www.cnblogs.com/xinzhuangzi/p/4100469.html