Netty与NIO

初识Netty

Netty是由JBoss提供的一个Java的开源框架,是GitHub上的独立项目。

Netty是一个异步的,基于事件驱动的网络应用框架,用于快速开发高性能、高可靠的网络IO程序。

Netty主要针对于TCP协议下,面向客户端的高并发应用,或者是Peer-to-Peer场景下的大量数据次序传输的应用。

Netty本质上是一个NIO的框架,适用于服务器通讯相关的多种应用场景。

底层是NIO,NIO底层是Java IO和网络IO,再往下是TCP/IP协议。

Netty的应用场景

1、经典的Hadoop的高性能通信和序列化组件AVRO(实现数据文件的共享),他的Netty Service是基于Netty的二次封装。

2、在分布式系统中,各个节点之间需要远程服务调用例如RPC框架dubbo。

3、无论是手游服务端还是大型网络游戏,登录服务器都是用Netty作为高性能基础通信组件。

4、地图服务器之间可以方便的通过Netty进行高性能的通信。

IO模型

IO模型很大程度的决定了程序通信的性能。

Java共支持3种IO模型:BIO,NIO,AIO。

BIO:同步阻塞IO,也就是传统阻塞型的IO,服务器实现模式是一个连接对应一个线程。客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个链接不做任何事情会造成不必要的线程开销。

NIO:同步非阻塞IO,服务器实现模式是一个线程处理多个请求,客户端发送的链接请求都会注册到多路复用器上,多路复用器轮询到链接有IO请求就进行处理。

AIO:异步非阻塞,AIO引入了异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,他的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且链接时间较长的应用。

BIO的编程流程

1、服务端启动一个ServerSocket

2、客户端启动Socket对服务器进行通信,默认情况下对每个客户端建立一个线程。

3、客户端发送请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝。

4、如果有响应,客户端线程会等待请求结束后,才会继续执行。(阻塞,同步)

public class BIOServer {

    public static void main(String[] args) throws IOException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        //创建服务器端socket
        final ServerSocket serverSocket = new ServerSocket(6666);
        while (true){
            System.out.println("等待连接...");
            final Socket socket = serverSocket.accept();
            //连接一个客户端

            System.out.println("连接一个客户端");
            executorService.execute(new Runnable() {
                public void run() {
                    handler(socket);
                }
            });
        }

    }

    //编写一个handle方法用来处理客户端通讯
    public static void handler(Socket socket) {
        byte[] bytes = new byte[1024];
        try {
            InputStream inputStream = socket.getInputStream();
            //获取输入流,读取客户端发来的数据
            int i;
            System.out.println("线程id: "+Thread.currentThread().getId()+" 线程名称 "+Thread.currentThread().getName());
            System.out.println("等待读入信息");

            while ((i = inputStream.read(bytes)) != -1) {
                System.out.println("客户端数据: "+new String(bytes, 0, i));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("关闭与客户端的连接....");
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }


    }
}

启动main方法,打开cmd命令窗口。

输入telnet 127.0.0.1 6666 ;连接服务端,这相当于建立了一个客户端连接,然后Ctrl+],从客户端向服务器发送信息。

image-20201013202550724

send hello

image-20201013202625238

然后再开启一个连接。向服务器发送send success

image-20201013202758242

当关闭命令行窗口后客户端与服务器的链接就断开了。从上面可知BIO编程模型,每次建立一个连接,服务端就会创建一个线程。然后每次进行读取的时候,如果客户端不发送数据,服务端线程就一直阻塞在那,直到数据读取成功。

BIO 问题分析

1、每个请求都需要创建独立的线程,与对应的客户端进行数据读入,业务处理,数据写入。

2、当并发数较大时,需要创建大量的线程来处理连接,系统资源占用较大。

3、连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在Read上,造成不必要的资源浪费。

Java NIO

  • Java NIO全称是java non-blocking IO,是指JDK提供的新API。从JDK1.4开始,Java提供了一系列改进的输入输出的新特性,被通称为NIO,是同步非阻塞的IO模型。
  • NIO相关类放在java.nio包以及子包下。
  • NIO有三大核心部分:Channel(通道),Selector(选择器),Buffer(缓冲区)。
  • NIO是面向缓冲区,或者面向块编程的。数据读取到一个稍后处理的缓冲区,需要的时候可以在缓冲区前后移动,这就增加了处理过程的灵活性,使用它可以提供非阻塞的高伸缩网络。
  • Java NIO的非阻塞模式,是一个线程从某个通道发送请求或者读取数据,但是它仅仅能得到目前可用的数据,如果当前没有任务可做,他也不会阻塞等待,它可以去完成其他的事情。
  • NIO可以做到一个线程来处理多个操作,假设有10000个请求过来,根据实际情况,可以分配50到100个线程来处理,而不是必须要创建10000个线程。
  • HTTP2.0使用了多路复用技术,做到同一个连接并发处理多个请求,而且并发请求的数量比HTTP1.1打了好几个数量级。

BIO和NIO的比较

1、BIO以流的方式处理数据,而NIO以块的方式处理数据,块IO的效率比流IO高很多

2、BIO是阻塞的,NIO是非阻塞的

3、NIO是基于字节流和字符流进行操作,而NIO是基于Channel和Buffer进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。selector用于监听多个通道的事件,比如连接请求,数据到达,因此使用单个线程就可以监听多个客户端通道。

NIO中Selector、Channel、Buffer的关系

  1. 每个channel都会对应一个buffer。
  2. 一个selector对应一个线程,一个线程对应多个channel。
  3. 程序切换到哪个channel是由Event(事件)决定的。
  4. selector会根据不同的事件,在各个通道上进行切换。
  5. buffer是一个内存块,底层有一个数组。
  6. 数据的读取和写入是通过buffer,buffer可以切换读写,通过flip方法,但是BIO是单向输出,要么是输入流,要么是输出流。
  7. channel是双向的,可以返回底层操作系统的情况,比如linux,底层的操作系统通道就是双向的。

Buffer

缓冲区:缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松的使用内存块;缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化。Channel提供从文件,网络读取数据的渠道,但是读取或写入的数据都必须经过Buffer。

Buffer是一个抽象类,类关系如下:

image-20201014213859848

属性表示的含义

  • capacity:缓冲区容量大小,缓冲区一旦初始化不能改变
  • limit:表示缓冲区当前终点,不能对缓冲区超过极限位置进行读写,limit可以修改。
  • position:位置,每次读取缓冲区,都会改变。
  • mark:标记 -1

ByteBuffer主要的方法如下:

   //创建初始缓冲区
   public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
   }
	//设置缓冲区的初始容量
    public static ByteBuffer allocate(int capacity) {
        if (capacity < 0)
            throw new IllegalArgumentException();
        return new HeapByteBuffer(capacity, capacity);
    }

   //构造初始化位置offset和上届length的缓冲区
    public static ByteBuffer wrap(byte[] array,
                                    int offset, int length)
    {
        try {
            return new HeapByteBuffer(array, offset, length);
        } catch (IllegalArgumentException x) {
            throw new IndexOutOfBoundsException();
        }
    }

   //把数组放到缓冲区中使用
    public static ByteBuffer wrap(byte[] array) {
        return wrap(array, 0, array.length);
    }

Channel

NIO的通道类似于流,但是区别如下。

  • 通道可以同时进行读写,而流只能读或者只能写
  • 通道可以实现异步读写数据
  • 通道可以从缓冲区读数据,也可以写数据到缓冲

BIO中的stream是单向的,例如FileInputStream对象只能进行读取数据的操作,NIO的Channel是双向的,可以读操作,也可以写操作。

Channel在NIO中是一个接口

image-20201015092328123

常用的Channel类有:FileChannel,DatagramChannel、ServerSocketChannel和SocketChannel。

image-20201015093128469

FileChannel用户文件的数据读写,DatagramChannel用于UDP的数据读写,ServerSocketChannel和SocketChannel用于TCP的数据读写。

FileChannel

FileChannel主要用来对文件进行IO操作

//从通道中读取数据放入缓冲区
public abstract int read(ByteBuffer dst) throws IOException;
//将缓冲区的数据写入通道
public abstract int write(ByteBuffer src) throws IOException;
//从目标通道中复制数据到当前通道
public abstract long transferTo(long position, long count,
                                    WritableByteChannel target)
        throws IOException;
//把数据从当前通道复制到目标通道
public abstract long transferFrom(ReadableByteChannel src,
                                      long position, long count)
        throws IOException;

实例:将数据写入到本地文件

文件不存在就创建

/**
 * 创建file_1.txt文件,向文件中写入“前研工作室”,通过管道写入
 */
public class NIOFileChannel {
    public static void main(String[] args) throws IOException {

        //创建文件输出流
        FileOutputStream fileOutputStream = new FileOutputStream("D:\file_1.txt");
        //写入的数据
        String message = "前研工作室";
        //获取一个管道,类型其实是FileChannelImpl
        FileChannel channel = fileOutputStream.getChannel();
        //创建一个缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        //将数据放入缓冲区
        byteBuffer.put(message.getBytes());
        //缓冲区转变成读状态
        byteBuffer.flip();
        //将byteBuffer数据写入fileChannel
        channel.write(byteBuffer);
        //关闭输入流
        fileOutputStream.close();

    }
}

执行结果

image-20201015145844952

使用前面学到的ByteBuffer和FileChannel将之前创建的文件file_1.txt中的数据读取到控制台上

代码实现

public static String readFileByChannel() throws IOException {
        //创建文件输入流
        File file = new File("D:\file_1.txt");
        FileInputStream fileInputStream = new FileInputStream(file);
        //创建缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
        //获取通道
        FileChannel channel = fileInputStream.getChannel();
        //读取到通道中    
        channel.read(byteBuffer);
        return new String(byteBuffer.array());

    }

读取结果

 public static void main(String[] args) throws IOException {
        String messages = readFileByChannel();
        System.out.println("file_01: "+messages);
     	//file_01: 前研工作室
    }

再来一个实例,使用Buffer完成文件的复制

要求

  • 使用FileChannel和read,write完成文件的拷贝
  • 拷贝文本文件file_1.txt,放在当前目录下

代码实现:

public static void copyFileByChannelAndBuffer(File file) throws IOException {
        //从指定文件中读取数据复制到file_2.txt
        FileInputStream fileInputStream = new FileInputStream(file);
        FileChannel inputStreamChannel = fileInputStream.getChannel();
        //写入到file_2.txt文件中
        FileOutputStream fileOutputStream = new FileOutputStream("file_2.txt");
        FileChannel outputStreamChannel = fileOutputStream.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());

        while (true) {
            //每次读取之前要清空缓冲区,否则会写入
            byteBuffer.clear();
            int read = inputStreamChannel.read(byteBuffer);
            //文件读取完毕退出循环
            if (read == -1) {
                break;
            }
            byteBuffer.flip();
            outputStreamChannel.write(byteBuffer);

        }
        //关闭相关流的操作
        fileInputStream.close();
        fileOutputStream.close();
    }

最终执行结果

image-20201015170225069

实例:拷贝文件通过transferFrom方法

使用FileChannel和方法transferFrom,完成文件的拷贝

需求:将D盘下的pic_02.jpg复制到当前目录下

第一步创建相关的输入输出流,第二步是获取对应流的通道,第三步是使用transferFrom去完成拷贝,最后关闭相关通道和流。

public class NiOFileChannel2 {
    public static void main(String[] args) throws IOException {
        //创建相关流
        FileInputStream fileInputStream = new FileInputStream("D:\pic_02.jpg");
        FileOutputStream fileOutputStream = new FileOutputStream("D:\pic_03.jpg");
        //获取对应的fileChannel
        FileChannel inputStreamChannel = fileInputStream.getChannel();
        FileChannel outputStreamChannel = fileOutputStream.getChannel();
        //使用transferFrom去完成拷贝
        outputStreamChannel.transferFrom(inputStreamChannel,0,inputStreamChannel.size());
        //关闭相关通道和流
        inputStreamChannel.close();
        outputStreamChannel.close();
        fileInputStream.close();
        fileOutputStream.close();

    }

}

关于Buffer和Channel的注意事项和细节

ByteBuffer支持类型化的put和get,put放入的是什么数据类型,get就应该使用相应的数据类型来取出.当遇到java.nio.BufferOverflowException异常时,可能是你所创建的缓冲区带下已经不能容纳你所加入的数据。

public class NIOByteBufferGetPut {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        long val = 12345;
        buffer.putInt(100);
        //因为缓冲区一共分配了10个字节,int占用4个字节,long占8个字节,两者加起来已经大于10个字节,所以会抛出java.nio.BufferOverflowException
        buffer.putLong(val);
        buffer.putChar('特');
        buffer.putShort((short) 14);
        buffer.flip();
        System.out.println(buffer.getInt());
        System.out.println(buffer.getLong());
        System.out.println(buffer.getChar());
        System.out.println(buffer.getShort());

    }
}

image-20201015173703602

可以将一个普通的Buffer转成只读的Buffer。

public static void main(String[] args) {
    IntBuffer intBuffer = IntBuffer.allocate(10);
    for (int i = 0; i < intBuffer.capacity(); i++) {
        intBuffer.put(i*2);
    }
    IntBuffer intBuffer1 = intBuffer.asReadOnlyBuffer();
    System.out.println(intBuffer1.getClass().toString());

    while (intBuffer1.hasRemaining()) {
        System.out.println(intBuffer1.get());
    }
    intBuffer1.put(1);// 会抛出ReadOnlyBufferException异常
}

image-20201015175125599

asReadOnlyBuffer()方法返回的是IntBuffer

image-20201015175223809

duplicate方法中是创建了一个HeapIntBufferR实例,biang通过构造函数将readOnly属性设置成了true。HeapIntBufferR是IntBuffer的子类。

protected HeapIntBufferR(int[] buf,
                               int mark, int pos, int lim, int cap,
                               int off)
{
    super(buf, mark, pos, lim, cap, off);
    this.isReadOnly = true;
}
public IntBuffer duplicate() {
        return new HeapIntBufferR(hb,
                                        this.markValue(),
                                        this.position(),
                                        this.limit(),
                                        this.capacity(),
                                        offset);
    }

未完待续,以上总结的可能有错误,欢迎指出!!

原文地址:https://www.cnblogs.com/dataoblogs/p/14121847.html