Netty学习三:NIO

一、NIO基本概念:

    1.NIO全程java non-blocking IO或者new IO,是同步非阻塞的IO。

    2.NIO包含3大核心部分:Channel(通道)、Buffer(缓冲区)、Selector(选择器)

    3.NIO是面向缓冲区,或者面向块编程的

    4.NIO的非阻塞模式,使一个线程从某通道发送请求或读取数据,但他仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不去获取,而不是保持线程的阻塞,所以直道数据变得可读取为止,该线程可以继续做其他的事情。

    5.通俗的理解就是,NIO可以一个线程处理多个操作,假设有10000个请求,根据实际情况可以启动50或者100个线程来处理,不像BIO那样必须启动10000个

二、NIO的buffer基本使用

 1 package com.mytest.nio;
 2 
 3 import java.nio.IntBuffer;
 4 
 5 public class BasicBuffer {
 6 
 7     public static void main(String[] args) {
 8         //buffer的基本使用
 9         //创建一个BUffer,大小为5,可以存放5个int
10         IntBuffer intBuffer = IntBuffer.allocate(5);
11 
12         //向buffer种放入数据
13         for (int i = 0;i < 5 ; i++) {
14             intBuffer.put( i*2 );
15         }
16 
17         //从buffer种读取数据
18         //buffer读写切换
19         intBuffer.flip();
20         while (intBuffer.hasRemaining()) {
21             System.out.println(intBuffer.get());
22         }
23     }
24 }

三、NIO和BIO的比较

    1.BIO以流的方式读取数据,NIO以块的方式读取数据,块的效率比流的效率要高很多

    2.BIO阻塞,NIO非阻塞

    3.BIO根据字节流和字符流进行操作,NIO基于channel(通道)和Buffer(缓冲区)进行操作,数据总是通过通道读取到缓冲区中,或者从缓冲区写入通道中。

       selector(选择器)用于监听多个通道事件(比如请求连接,数据到达等),因此使用单个线程可以监听多个客户端通道。

四、NIO中Selector、Channel和Buffer的关系

    1.每个channel 都会对应一个Buffer

    2.Selector 对应一个线程, 一个线程对应多个channel(连接)

    3.下图反应了有三个channel 注册到 该selector //程序

    4.程序切换到哪个channel 是有事件决定的, Event 就是一个重要的概念

    5.Selector 会根据不同的事件,在各个通道上切换

    6.Buffer 就是一个内存块 , 底层是有一个数组

    7.数据的读取写入是通过Buffer, 这个和BIO , BIO 中要么是输入流,或者是输出流, 不能双向,但是NIO的Buffer 是可以读也可以写, 需要 flip 方法切换

    8.Channel 是双向的, 可以返回底层操作系统的情况, 比如Linux , 底层的操作系统 通道就是双向的

    

四、Buffer

    Buffer本质上是一个可以读写数据的内存块,可以理解为一个容器对象(含数组),该对象提供了一组方法,可以更轻松的使用内存块。缓冲区内置了一些机制,可以记录和跟踪缓冲对象的变化情况。Channel提供了从文件和网络读取数据的渠道,但是读取和写入数据都必须经过buffer。

    Buffer是一个顶级父类,他是一个抽象类,类的层级关系如下。

    

   一共有7种buffer,注意没有boolean类型的buffer,读取对应类型的数据时,使用相应的buffer即可。

   在buffer中,都是通过hb这个数组来进行存储数据的,不同的buffer,hb的类型不同。

           

五、Buffer的四个属性

    Buffer类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息。

    

属性

描述

Capacity

容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变

Limit

表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的

Position

位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备

Mark 标记

六、Buffer中常用的方法

//JDK1.4时,引入的api
public final int capacity( )//返回此缓冲区的容量
public final int position( )//返回此缓冲区的位置
public final Buffer position (int newPositio)//设置此缓冲区的位置
public final int limit( )//返回此缓冲区的限制
public final Buffer limit (int newLimit)//设置此缓冲区的限制
public final Buffer mark( )//在此缓冲区的位置设置标记
public final Buffer reset( )//将此缓冲区的位置重置为以前标记的位置
public final Buffer clear( )//清除此缓冲区, 即将各个标记恢复到初始状态,但是数据并没有真正擦除, 后面操作会覆盖
public final Buffer flip( )//反转此缓冲区
public final Buffer rewind( )//重绕此缓冲区
public final int remaining( )//返回当前位置与限制之间的元素数
public final boolean hasRemaining( )//告知在当前位置和限制之间是否有元素
public abstract boolean isReadOnly( );//告知此缓冲区是否为只读缓冲区

//JDK1.6时引入的api
public abstract boolean hasArray();//告知此缓冲区是否具有可访问的底层实现数组
public abstract Object array();//返回此缓冲区的底层实现数组
public abstract int arrayOffset();//返回此缓冲区的底层实现数组中第一个缓冲区元素的偏移量
public abstract boolean isDirect();//告知此缓冲区是否为直接缓冲区

buffer中最常用的是bytebuffer,因为网络传输主要使用二进制,bytebuffer中的主要方法如下

//缓冲区创建相关api
public static ByteBuffer allocateDirect(int capacity)//创建直接缓冲区
public static ByteBuffer allocate(int capacity)//设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array)//把一个数组放到缓冲区中使用
//构造初始化位置offset和上界length的缓冲区
public static ByteBuffer wrap(byte[] array,int offset, int length)
//缓存区存取相关API
public abstract byte get( );//从当前位置position上get,get之后,position会自动+1
public abstract byte get (int index);//从绝对位置get
public abstract ByteBuffer put (byte b);//从当前位置上添加,put之后,position会自动+1
public abstract ByteBuffer put (int index, byte b);//从绝对位置上put

七、Channel的基本介绍

  1.NIO的通道类似于流,但是有如下区别:

    通道可以同时进行读写,而流只能读或者只能写

    通道可以实现异步写数据

    通道可以从缓冲读数据,也可以写数据到缓冲

  2.BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的,可以读操作,也可以写操作。

  3.Channel在NIO中是一个接口 public interface Channel extends Closeable{}

  4.常用的 Channel 类有:FileChannel、DatagramChannel、ServerSocketChannel 和 SocketChannel。【ServerSocketChanne 类似 ServerSocket , SocketChannel 类似 Socket】

  5.FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。

  FileChannel类

    FileChannel主要用来对本地文件进行 IO 操作,常见的方法有

    1.public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区中
    2.public int write(ByteBuffer src) ,把缓冲区的数据写到通道中
    3.public long transferFrom(ReadableByteChannel src, long position, long count),从目标通道中复制数据到当前通道
    4.public long transferTo(long position, long count, WritableByteChannel target),把数据从当前通道复制给目标通道

八、Channel的使用案例

  1.写文件

package com.mytest.netty.nio;

import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NioFileChannel01 {
    public static void main(String[] args) throws Exception {
        String str = "hello my world";
        //创建一个输出流 -> channel
        FileOutputStream fileOutputStream = new FileOutputStream("D:\work\myproject\test.txt");

        //通过fileOutputStream获取对应的FileChannel
        //真是类型实际是FileChannelImpl
        FileChannel fileChannel = fileOutputStream.getChannel();

        //创建一个缓冲区 ByteBuffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        //将str放入buffer
        byteBuffer.put(str.getBytes());

        //对byteBuffer进行反转
        byteBuffer.flip();

        //将buffer的数据写入channel
        fileChannel.write(byteBuffer);

        //关闭流
        fileOutputStream.close();

    }
}

  2.读文件

package com.mytest.netty.nio;

import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NioFileChannel02 {
    public static void main(String[] args) throws Exception {
        //创建文件输入流
        File file = new File("D:\work\myproject\test.txt");
        FileInputStream fileInputStream = new FileInputStream(file);

        //通过文件输入流 获取对应的channel
        FileChannel fileChannel = fileInputStream.getChannel();

        //创建缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());

        //将channel的数据读取到buffer中
        fileChannel.read(byteBuffer);

        //将fileBuffer中的字节数据转换为字符串
        System.out.println(new String(byteBuffer.array()));

        //关闭流
        fileInputStream.close();
    }
}

  3.复制文件

package com.mytest.netty.nio;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NioFileChannel03 {
    public static void main(String[] args) throws Exception {

        FileInputStream fileInputStream = new FileInputStream("D:\work\myproject\test.txt");

        FileChannel fileChannel01 = fileInputStream.getChannel();

        FileOutputStream fileOutputStream = new FileOutputStream("D:\work\myproject\test1.txt");

        FileChannel fileChannel02 = fileOutputStream.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        while(true) {//循环读取
            //一定要情况buffer
            byteBuffer.clear();
            int read = fileChannel01.read(byteBuffer);
            //如果read为-1 则表示读取完毕
            if (read == -1) {
                break;
            }
            byteBuffer.flip();
            fileChannel02.write(byteBuffer);
        }

        fileInputStream.close();
        fileOutputStream.close();
    }
}

  4.使用transferFrom复制文件

package com.mytest.netty.nio;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.channels.FileChannel;

public class NioFileChannel04 {
    public static void main(String[] args) throws Exception {
        FileInputStream fileInputStream = new FileInputStream("D:\work\myproject\test.txt");
        FileOutputStream fileOutputStream = new FileOutputStream("D:\work\myproject\test2.txt");

        FileChannel sourcech = fileInputStream.getChannel();
        FileChannel destch = fileOutputStream.getChannel();

        destch.transferFrom(sourcech, 0, sourcech.size());

        fileInputStream.close();
        fileOutputStream.close();
    }
}

九、buffer和channel的注意事项

  1、ByteBuffer 支持类型化的put 和 get, put 放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。
  2、可以将一个普通Buffer 转成只读Buffer
  3、NIO 还提供了 MappedByteBuffer, 可以让文件直接在内存(堆外的内存)中进行修改, 而如何同步到文件由NIO 来完成.
  4、NIO 还支持 通过多个Buffer (即 Buffer 数组) 完成读写操作,即 Scttering 和 Gathering

  Scttering:分散  Gathering:聚集

  代码应用如下:

 1 package com.mytest.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.ServerSocketChannel;
 6 import java.nio.channels.SocketChannel;
 7 import java.util.Arrays;
 8 
 9 /**
10  * Scttering:将数据写入到buffer中时,可以采用buffer数组,依次写入(一个buffer写满了,可以写到下一个buffer中)
11  * Gathering:从buffer中读取数据时,也可以采用buffer数组,一次读
12  */
13 public class SctteringAndGatheringTest {
14     public static void main(String[] args) throws Exception {
15 
16         //使用ServerSocketChannel 和SocketChannel 网络
17         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
18         InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
19 
20         //绑定端口到Socket,并启动
21         serverSocketChannel.socket().bind(inetSocketAddress);
22 
23         //创建buffer数组
24         ByteBuffer[] byteBuffers = new ByteBuffer[2];
25         byteBuffers[0] = ByteBuffer.allocate(5);
26         byteBuffers[1] = ByteBuffer.allocate(3);
27 
28         //等待客户端连接
29         SocketChannel socketChannel = serverSocketChannel.accept();
30         int messageLength = 8;  //假定从客户端接受8个字节
31 
32         //循环读取
33         while (true) {
34             int byteRead = 0;
35 
36             while (byteRead < messageLength) {
37                 long l = socketChannel.read(byteBuffers);
38                 byteRead += l;
39                 System.out.println("byteRead = " + byteRead);
40                 //使用流打印,打印处buffer的position和limit
41                 Arrays.asList(byteBuffers).stream().map(buffer -> "position="
42                         + buffer.position() +",limit=" + buffer.limit()).forEach(System.out::println);
43             }
44 
45             //将每一个buffer都进行反转
46             Arrays.asList(byteBuffers).stream().map(buffer -> buffer.flip());
47 
48             //将数据读出显示到客户端
49             long byteWrite = 0;
50             while (byteWrite < messageLength) {
51                 long l = socketChannel.write(byteBuffers);
52                 byteWrite += l;
53             }
54 
55             //将所有的buffer进行clear
56             Arrays.asList(byteBuffers).stream().map(buffer -> buffer.clear());
57 
58             System.out.println("byteRead=" + byteRead + ", byteWrite=" + byteWrite + ", messageLength=" + messageLength);
59         }
60 
61     }
62 }

  MappedByteBuffer的使用:可以让文件在内存中(堆外内存)直接修改,操作系统不需要进行拷贝。

十、Selector(选择器)的基本介绍

  1、Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器)。

  2、Selector 能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。

  3、只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,避免了多线程之间的上下文切换导致的开销。

  Selector的基本方法:

  public static Selector open();//得到一个选择器对象 

  public int select(long timeout);//监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集合中并返回,参数用来设置超时时间

  public Set<SelectionKey> selectedKeys();//从内部集合中得到所有的 SelectionKey

   

  Selector、SelectionKey、ServerSocketChannel和SocketChannel的关系:

  

  对上图的解释:

  1、当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel

  2、Selector 进行监听 select 方法, 返回有事件发生的通道的个数.

  3、将socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel

  4、注册后返回一个 SelectionKey, 会和该Selector 关联(集合)

  5、进一步得到各个 SelectionKey (有事件发生),再通过 SelectionKey反向获取 SocketChannel, 方法 channel(),可以通过得到的channel, 完成业务处理

 

十一、NIO简单的DEMO

  服务端代码

 1 package com.mytest.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.*;
 6 import java.util.Iterator;
 7 import java.util.Set;
 8 
 9 public class NioServer {
10     public static void main(String[] args) throws Exception {
11 
12         //创建一个ServerSocketChannel -> ServerSocket
13         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
14 
15         //得到一个Selector对象
16         Selector selector = Selector.open();
17 
18         //绑定一个端口,并在服务器端进行监听
19         serverSocketChannel.socket().bind(new InetSocketAddress(6666));
20 
21         //设置为非阻塞
22         serverSocketChannel.configureBlocking(false);
23 
24         //将serverSocketChannel注册到selector 关心事件为op_accept
25         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
26 
27         //循环客户端连接
28         while(true) {
29 
30             if (selector.select(1000) == 0) { //没有事件发生
31                 System.out.println("服务器等待了1秒,无连接");
32                 continue;
33             }
34 
35             //如果返回的>0,就获取到相应的SelectionKey集合
36             //1.如果返回的>0,就表示已经获取到管住的事件
37             //2.selector.selectedKeys获取到关注事件的集合
38             //  通过selectionKeys
39             Set<SelectionKey> selectionKeys = selector.selectedKeys();
40 
41             //遍历selectionKeys,使用迭代器
42             Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
43 
44             while (keyIterator.hasNext()) {
45                 //获取到SelectionKey
46                 SelectionKey key = keyIterator.next();
47 
48                 //根据key,对应的通道发生的事件做出相应的处理
49                 if (key.isAcceptable()) {   //如果是OP_ACCEPT,则是有新客户端连接
50                     //该客户端生成一个SocketChannel
51                     SocketChannel socketChannel = serverSocketChannel.accept();
52                     socketChannel.configureBlocking(false);
53                     //将SocketChannel注册到selector,关注事件为OP_READ,同时给socketChannel关联一个Buffer
54                     socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
55 
56                 }
57                 if (key.isReadable()) {
58                     //通过key反向过去channel
59                     SocketChannel channel = (SocketChannel) key.channel();
60                     //获取到该channel关联的buffer
61                     ByteBuffer buffer = (ByteBuffer) key.attachment();
62                     channel.read(buffer);
63                     System.out.println("from 客户端" + new String(buffer.array()));
64                 }
65                 //手动从集合中移除当前selectionKey,防止重复操作
66                 keyIterator.remove();
67             }
68         }
69     }
70 }

  客户端代码

 1 package com.mytest.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SocketChannel;
 6 
 7 public class NioClient {
 8     public static void main(String[] args) throws Exception {
 9 
10         //得到一个网络通道
11         SocketChannel socketChannel = SocketChannel.open();
12 
13         //设置非阻塞
14         socketChannel.configureBlocking(false);
15 
16         //提供服务端的IP和端口
17         InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
18 
19         //连接服务器
20         if (!socketChannel.connect(inetSocketAddress)) {
21             while (!socketChannel.finishConnect()) {
22                 System.out.println("因为客户端的连接需要时间,客户端不会阻塞,可以做其他工作");
23             }
24         }
25 
26         //如果连接成功,就发送数据
27         String str = "hello";
28 
29         ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
30 
31         //发送数据,将Buffer写入Channel
32         socketChannel.write(buffer);
33         System.in.read();
34     }
35 }

  selectionKey的常用方法: 

    1、public abstract Selector selector();//得到与之关联的 Selector 对象

    2、public abstract SelectableChannel channel();//得到与之关联的通道

    3、public final Object attachment();//得到与之关联的共享数据

    4、public abstract SelectionKey interestOps(int ops);//设置或改变监听事件

    5、public final boolean isAcceptable();//是否可以 accept

    6、public final boolean isReadable();//是否可以读

    7、public final boolean isWritable();//是否可以写

  ServerSocketChannel:

    在服务端监听新的客户端Socket连接。

    常用方法:

    1、public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道

    2、public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号

    3、public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式

    4、public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象

    5、public final SelectionKey register(Selector sel, int ops),注册一个选择器并设置监听事件

   SocketChannel:

    网络IO通道,具体负责进行读写操作。NIO把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。

    常用方法: 

    1、public static SocketChannel open();//得到一个 SocketChannel 通道

    2、public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式

    3、public boolean connect(SocketAddress remote);//连接服务器

    4、public boolean finishConnect();//如果上面的方法连接失败,接下来就要通过该方法完成连接操作

    5、public int write(ByteBuffer src);//往通道里写数据

    6、public int read(ByteBuffer dst);//从通道里读数据

    7、public final SelectionKey register(Selector sel, int ops, Object att);//注册一个选择器并设置监听事件,最后一个参数可以设置共享数据

    8、public final void close();//关闭通道

原文地址:https://www.cnblogs.com/the-zym/p/14648749.html