Java NIO

参考文章 http://www.cnblogs.com/geason/p/5774096.html

     http://www.iteye.com/magazines/132-Java-NIO

阻塞I/O通信模型存在以下缺点:
  1. 当客户端多时,会创建大量的处理线程。且每个线程都要占用栈空间和一些CPU时间
  2. 阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。

同步非阻塞的NIO模型:
  JDK 1. 4中新加入了 NIO( New Input/ Output) 类, 引入了一种基于通道和缓冲区的 I/O 方式。NIO 是一种同步非阻塞的 IO 模型。同步的核心就是 Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是通道和缓冲区,当 IO 事件就绪时,先将数据写入缓冲区,再由缓冲区交给线程,因此线程无需阻塞地等待IO。

1. 缓存

  缓冲区本质上是一块可以读写数据的内存。这块内存被包装成NIO Buffer对象,使用 Buffer 读写数据一般遵循以下四个步骤:

  1. 写入数据到 Buffer;
  2. 调用 flip() 方法;
  3. 从 Buffer 中读取数据;
  4. 调用 clear() 方法或者 compact() 方法。

  当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式。在读模式下,可以读取之前写入到buffer的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用clear()或compact()方法。clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。 

  NIO中的关键Buffer实现有:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分别对应基本数据类型。

Buffer的三个属性(capacity,position,limit)

  position和limit的含义取决于Buffer处在读模式还是写模式。不管Buffer处在什么模式,capacity的含义总是一样的。 

  

(1) capacity 
  capacity表示Buffer的大小值。一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。 
(2) position 
  写模式下position表示当前的位置,初始值为0。当一个byte或long数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元,最大可为capacity-1。 
  当将Buffer从写模式切换到读模式,position会被重置为0。当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。 
(3) limit 
  在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据, 写模式下limit等于Buffer的capacity。 
  当切换Buffer到读模式时, limit会被设置成写模式下的position值,表示你最多能读到多少数据。换句话说,你能读到之前写入的所有数据。

 

2. 通道

  通道和流的区别:

  • Channel是双向的,既可以读又可以写,而流是单向的
  • Channel可以进行异步的读写
  • 对Channel的读写必须通过buffer对象

  channel的实现 : 

  • FileChannel:从文件中读写数据。
  • DatagramChannel:能通过UDP读写网络中的数据。
  • SocketChannel:能通过TCP读写网络中的数据。
  • ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。

// FileChannel的实现
RandomAccessFile aFile = new RandomAccessFile("data.txt", "rw");  
FileChannel inChannel = aFile.getChannel();  
ByteBuffer buf = ByteBuffer.allocate(1024);  
  
int bytesRead = inChannel.read(buf);   
while (bytesRead != -1) {  
    System.out.println("Read " + bytesRead);  
    buf.flip();   // 将buffer切换为读模式
    while(buf.hasRemaining()){  
        System.out.print((char) buf.get());  // 读取缓存
    }  
    buf.clear();  // 清空缓存
    bytesRead = inChannel.read(buf);  
}  
aFile.close();  

 

3. Selector

  Selector用于监听多个通道的事件(如连接打开,数据到达),因此单个线程可以监听多个数据通道。Selector允许单线程处理多个 Channel,使得线程无需阻塞地等待IO事件的就绪。如果你的应用打开了多个连接,但每个连接的流量都很低,使用Selector就会很方便:向Selector注册Channel,然后调用它的select()方法,该方法会一直阻塞直到某个注册通道的事件就绪。一旦这个方法返回,线程就可以处理这些事件(如新连接进来,数据接收等)。

  这是在一个单线程中使用一个Selector处理3个Channel的图示: 

Selector中注册的感兴趣事件有:

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

select()方法

  • select()阻塞到至少有一个通道在你注册的事件上就绪了,返回的int值表示有多少通道已经就绪。
  • select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。 
  • selectNow()不会阻塞,不管什么通道就绪都立刻返回,此方法执行非阻塞的选择操作。
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
    int readyChannels = selector.select();
    if(readyChannels == 0) continue;
    Set selectedKeys = selector.selectedKeys();
    Iterator keyIterator = selectedKeys.iterator();
    while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if(key.isAcceptable()) {
            // a connection was accepted by a ServerSocketChannel.
        } else if (key.isConnectable()) {
            // a connection was established with a remote server.
         } else if (key.isReadable()) {
            // a channel is ready for reading
        } else if (key.isWritable()) {
            // a channel is ready for writing
        }
        keyIterator.remove(); // 手动移除SelectionKey实例
    }
}

SocketChannel服务端:

public class NioServer {
    private Selector selector;
    
    public NioServer(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.socket().bind(new InetSocketAddress(port));
        selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 将通道管理器与通道绑定,并为该通道注册感兴趣的事件
        System.out.println("服务器端启动成功");
    }
    
    public void process() throws IOException {
        while(true) {
            selector.select();
            Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
            while(ite.hasNext()) {
                SelectionKey key = ite.next();
                if(key.isAcceptable()) {      // 2. 接受客户端连接请求,并发送消息
                    ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获得客户端连接通道
                    SocketChannel channel = server.accept();
                    channel.configureBlocking(false);
                    channel.write(ByteBuffer.wrap("send message to client".getBytes()));
                    channel.register(selector, SelectionKey.OP_READ);
                    System.out.println("客户端请求连接事件");
                } else if(key.isReadable()) {  // 3. 接收客户端消息
                    SocketChannel channel = (SocketChannel)key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int read = channel.read(buffer);
                    String message = new String(buffer.array());
                    System.out.println("receive message from client, size:" + buffer.position() + ", msg: " + message);
                }
                ite.remove();
            }
        }
    }
    
    public static void main(String[] args) throws IOException {
        new NioServer(9989).process();
    }
}

  ServerSocketChannel是一个基于通道的socket监听器。它同我们所熟悉的java.net.ServerSocket执行相同的基本任务,不过它增加了通道语义,因此能够在非阻塞模式下运行。当没有传入连接在等待时,ServerSocketChannel.accept( )会立即返回null。以下是ServerSocketChannel的代码实现。

SocketChannel客户端:

public class NioClient {
    private Selector selector;
    
    public NioClient(String ip, int port) throws IOException {
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress(ip, port));
        selector = Selector.open();
        channel.register(selector, SelectionKey.OP_CONNECT);
    }
    
    public void process() throws IOException {
        while(true) {
            selector.select();
            Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
            while(ite.hasNext()) {
                SelectionKey key = ite.next();
                if(key.isConnectable()) {    // 1. 连接服务端,并发送消息
                    SocketChannel channel = (SocketChannel) key.channel();
                    if(channel.isConnectionPending())
                        channel.finishConnect();
                    channel.configureBlocking(false);
                    channel.write(ByteBuffer.wrap("send message to server".getBytes()));
                    channel.register(selector, SelectionKey.OP_READ);
                } else if(key.isReadable()) {  // 4. 接收服务端的消息
                    SocketChannel channel = (SocketChannel)key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int read = channel.read(buffer);
                    String message = new String(buffer.array());
                    System.out.println("receive message from server, size:" + buffer.position() + ", msg: " + message);
                }
                ite.remove();
            }
            
        }
    }
    
    public static void main(String[] args) throws IOException {
        new NioClient("127.0.0.1", 9989).process();
    }
}

输出结果: 

  服务器端启动成功
  客户端请求连接事件
  receive message from client, size:22, msg: send message to server
  receive message from server, size:22, msg: send message to client

Java NIO提供了与标准IO不同的工作方式: 

面向流与面向缓冲

  Java IO是面向流的,每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。

  Java NIO是基于通道和缓冲区进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。这增加了处理过程中的灵活性。但是需要检查该缓冲区中是否包含所有数据,还需确保不要覆盖缓冲区里尚未处理的数据。

阻塞与非阻塞IO

  Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。

  Java NIO是非阻塞的,当一个线程从某通道发送请求读取数据,它仅能得到目前可用的数据(读),如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入(写),这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

选择器(Selector)

  Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。

  以服务端为例,如果服务端的selector上注册了读事件,阻塞I/O这时会调用read()方法阻塞地读取数据,而NIO的服务端的处理线程会轮询地访问selector,如果访问selector时发现有感兴趣的事件到达,则处理这些事件,如果没有感兴趣的事件到达,则处理线程会一直阻塞直到感兴趣的事件到达为止。

 

分散(Scatter)/ 聚集(Gather)

分散:将Channel中的数据读到多个buffer中

聚集:是指数据从多个buffer写入到同一个channel

ByteBuffer header = ByteBuffer.allocate(128);  
ByteBuffer body   = ByteBuffer.allocate(1024);  
ByteBuffer[] bufferArray = { header, body };  
channel.read(bufferArray);   // read()方法按照buffer在数组中的顺序将从channel中读取的数据写入到buffer,当一个buffer被写满后,再写入另一个buffer中
channel.write(bufferArray);  // write()方法会按照buffer在数组中的顺序,将数据写入到channel
原文地址:https://www.cnblogs.com/anxiao/p/7269856.html