netty之---netty的的原理以及介绍使用

第一章 netty的介绍

官网介绍

The Netty project is an effort to provide an asynchronous event-driven network application framework and tooling for the rapid development of maintainable high-performance and high-scalability protocol servers and clients.
In other words, Netty is an NIO client server framework that enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server development.
'Quick and easy' does not mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences learned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

netty是由JBOSS提供的一个java开源框架,是一个异步的,基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络IO程序,netty本质上是一个NIO框架,适用于服务器通讯相关应用场景。从下图可以看出关系(稍微有点丑)Netty是对NIO进一步封装。

第二章 Java BIO、NIO、AIO编程

首选需要搞清楚同步、异步、阻塞与非阻塞的概念,以银行取款为例:

  • 同步 : 自己亲自出马持银行卡到银行取钱(使用同步IO时,Java自己处理IO读写);
  • 异步 : 委托一小弟拿银行卡到银行取钱,替你取钱(使用异步IO时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小传给OS(银行卡和密码),OS需要支持异步IO操作API);
  • 阻塞 : ATM排队取款,你只能等待(使用阻塞IO时,Java调用会一直阻塞到读写完成才返回);
  • 非阻塞 : 柜台取款,取个号,然后坐在椅子上做其它事,等号广播会通知你办理,没到号你就不能去,你可以不断问大堂经理排到了没有,大堂经理如果说还没到你就不能去(使用非阻塞IO时,如果不能读写Java调用会马上返回,当IO事件分发器会通知可读写时再继续进行读写,不断循环直到读写完成)

搞清楚netty之前,需要先搞清楚NIO是什么?目前java共支持3种网络变成模型I/O模式 BIO,NIO,AIO,首先对比一下这三种模式的区别

Java对BIO、NIO、AIO的支持:

  • Java BIO : 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。

  • Java NIO : 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。(对于NIO,有的地方叫no-blocking io有的叫 new I/O,其实都是一样的意思)

  • Java AIO(NIO.2) : 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理

另外,I/O属于底层操作,需要操作系统支持,并发也需要操作系统的支持,所以性能方面不同操作系统差异会比较明显。

在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作。

在比较这两个模式之前,我们首先的搞明白几个概念,什么是阻塞和非阻塞,什么是同步和异步,同步和异步是针对应用程序和内核的交互而言的,同步指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪,而异步是指用户进程触发IO操作以后便开始做自己的事情,而当IO操作已经完成的时候会得到IO完成的通知。而阻塞和非阻塞是针对于进程在访问数据的时候,根据IO操作的就绪状态来采取的不同方式,说白了是一种读取或者写入操作函数的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入函数会立即返回一个状态值。

 一般来说I/O模型可以分为:同步阻塞,同步非阻塞,异步阻塞,异步非阻塞IO

同步阻塞IO:在此种方式下,用户进程在发起一个IO操作以后,必须等待IO操作的完成,只有当真正完成了IO操作以后,用户进程才能运行。JAVA传统的IO模型属于此种方式!

同步非阻塞IO:在此种方式下,用户进程发起一个IO操作以后边可返回做其它事情,但是用户进程需要时不时的询问IO操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的CPU资源浪费。其中目前JAVA的NIO就属于同步非阻塞IO。

异步阻塞IO:此种方式下是指应用发起一个IO操作以后,不等待内核IO操作的完成,等内核完成IO操作以后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为什么说是阻塞的呢?因为此时是通过select系统调用来完成的,而select函数本身的实现方式是阻塞的,而采用select函数有个好处就是它可以同时监听多个文件句柄,从而提高系统的并发性!

 异步非阻塞IO:在此种模式下,用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。

下面示意图简单描述了BIO到NIO的变迁

BIO、NIO、AIO适用场景分析:

  • BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。

  • NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。

  • AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

 2.1 Java BIO 编程

BIO提供了一种端到端的通信,相当于对传输层的一种封装,服务端启动,等待客户端的连接,在客户端连接到服务端后,服务端启动一个线程去监听客户端消息,客户端发送消息,并等待服务端返回(客户端一直阻塞),服务端收到消息,将消息返回给客户端,此时一次交互完成。若还需交互,则不释放连接,客户端再次将消息发送给服务端,并等待返回,若不需要交互,则客户端释放连接。

 简单demo:

服务端:

package com.yang.java.main.bio;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/9
 */
public class BioServer {

    public static void main(String[] args) throws IOException {
        // 创建一个线程池,当有socket链接,就创建一个线程,机型通信
        ExecutorService executorService = Executors.newCachedThreadPool();
        ServerSocket serverSocket = new ServerSocket(9999);
        System.out.println("socket server started.");
        // 循环监听
        do {
            System.out.println("start to listen, current thread is :" + Thread.currentThread().getName());
            // 接受连接
            Socket socket = serverSocket.accept();
            // 创建一个新的线程去处理接受的连接
            executorService.execute(() -> handler(socket));
        } while (true);
    }

    private static void handler(Socket socket) {
        System.out.println("current thread is :" + Thread.currentThread().getName());
        try {
            InputStream inputStream = socket.getInputStream();
            byte[] bytes = new byte[1024];
            int read;
            do {
                read = inputStream.read(bytes);
                System.out.println("receive the message is: " + new String(bytes));
            } while (read != -1);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            System.out.println("close the socket");
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

客户端:

package com.yang.java.main.bio;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/9
 */
public class BioClient {

    public static void main(String[] args) throws IOException {
        // 绑定服务器
        Socket client = new Socket("127.0.0.1", 9999);
        // 创建输入
        Scanner scanner = new Scanner(System.in);
        System.out.println("please input the content: ");
        for (; ; ) {
            if (scanner.hasNext()) {
                String str = scanner.next();
                System.out.println(str);
                OutputStream outputStream = client.getOutputStream();
                outputStream.write(str.getBytes());
            }
        }
    }
}

起三个客户端,进行查看服务端输出:

socket server started.
start to listen, current thread is :main
start to listen, current thread is :main
current thread is :pool-1-thread-1
receive the message is: client                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
start to listen, current thread is :main
current thread is :pool-1-thread-2
receive the message is: client                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
start to listen, current thread is :main
current thread is :pool-1-thread-3
receive the message is: client                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
start to listen, current thread is :main
current thread is :pool-1-thread-4
receive the message is: client                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
start to listen, current thread is :main
current thread is :pool-1-thread-5
receive the message is: client                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          

从结果可以看出,每次进来一个就会启动一个新线程。

 因此可以确认BIO的弊端

  1. 当并发量过大的时候,服务端就会创建大量的线程来处理链接,造成资源的浪费,系统资源占用较大。
  2. 当连接建立之后,如果当前线程没有数据可读,那么当前线程就阻塞在read操作上,会造成线程资源浪费。

 2.2 Java NIO 编程

  1. java NIO 也叫 java no-blocking IO 或者 new IO
  2. BIO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。
  3. NIO和传统IO(以下简称IO)之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。
  4. IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 NIO的非阻塞读模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变得可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

因此NIO 的三大核心部分就是buffer,channel, selector

2.2.1 Buffer

Buffer:缓冲区,本质上是一个可以读写数据的内存块,可以理解为一个容器对象(包含数据),这个对象提供了一组方法,可以轻松的使用内存块,缓冲区对象内置了一些机制,可以跟踪和记录缓冲的状态变化

public abstract class Buffer {
    ...
    // Invariants: mark <= position <= limit <= capacity 字段大小
    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;
    ...
}

这几个字段就是Buffer实现管理内存块的保证,也就是保存这里面数组的位置状态,分析一下字段:

索引

说明

capacity

缓冲区数组的总长度

position

下一个要操作的数据元素的位置,不能超过limit

limit

缓冲区数组中不可操作的下一个元素的位置:limit<=capacity

mark

用于记录当前position的前一个位置或者默认是-1,这个字段一般很少用到

Buffer的一些方法

package com.yang.java.main.nio.buffer;


import java.nio.ByteBuffer;
import java.util.Arrays;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/9
 */
public class buffer {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.wrap(new byte[]{1,2,3,4,5,6});
        System.out.println("ByteBuffer: " + buffer);  // ByteBuffer: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]
        // 移动position以及limit
        buffer.position(2);
        buffer.limit(4);
        System.out.println("ByteBuffer: " + buffer);  // ByteBuffer: java.nio.HeapByteBuffer[pos=2 lim=4 cap=6]
        System.out.println("isReadOnly: " + buffer.isReadOnly()); // isReadOnly: false
        System.out.println("isDirect: " + buffer.isDirect()); // isDirect: false
        buffer.get();
        System.out.println("ByteBuffer: " + buffer);  // ByteBuffer: java.nio.HeapByteBuffer[pos=3 lim=4 cap=6]
        System.out.println("slice: " + buffer.slice()); // slice: java.nio.HeapByteBuffer[pos=0 lim=1 cap=1]
        ByteBuffer duplicate = buffer.duplicate(); // 源码 new DirectByteBuffer(this, this.markValue(), this.position(), this.limit(), this.capacity(), 0);
        System.out.println("duplicate compareTo: " + duplicate.compareTo(buffer)); // duplicate: true
        System.out.println("duplicate: " + duplicate.equals(buffer)); // duplicate: true
        System.out.println("duplicate: " + duplicate + "  ByteBuffer " + buffer); // duplicate: java.nio.HeapByteBuffer[pos=3 lim=4 cap=6]  ByteBuffer java.nio.HeapByteBuffer[pos=3 lim=4 cap=6]
        System.out.println("hasArray: " + buffer.hasArray()); // hasArray: true
        System.out.println("ByteBuffer: " + buffer);  // ByteBuffer: java.nio.HeapByteBuffer[pos=3 lim=4 cap=6]
        System.out.println("array: " + Arrays.toString(buffer.array())); // array: [1, 2, 3, 4, 5, 6]
        buffer.flip();
        System.out.println("flip ByteBuffer: " + buffer); // flip ByteBuffer: java.nio.HeapByteBuffer[pos=0 lim=3 cap=6]
        System.out.println("flip array: " + Arrays.toString(buffer.array())); // flip array: [1, 2, 3, 4, 5, 6]
        buffer.position(2);
        System.out.println("ByteBuffer: " + buffer); // ByteBuffer: java.nio.HeapByteBuffer[pos=2 lim=3 cap=6]
        buffer.compact();
        System.out.println("compact ByteBuffer: " + buffer); // compact ByteBuffer: java.nio.HeapByteBuffer[pos=1 lim=6 cap=6]
        System.out.println("compact array: " + Arrays.toString(buffer.array())); // compact array: [3, 2, 3, 4, 5, 6]
        buffer.rewind();
        System.out.println("rewind ByteBuffer: " + buffer); // rewind ByteBuffer: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]
        buffer.clear();
        System.out.println("clear ByteBuffer: " + buffer); // clear ByteBuffer: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]
        System.out.println("array: " + Arrays.toString(buffer.array())); // array: [3, 2, 3, 4, 5, 6]
    }
}

2.2.2 Channel

 Channel一般称为通道,java NIO 的通道类似于流,但又有一些不同

  • Channel既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的
  • Channel可以异步的读写
  • Channel中的数据总是要先读到一个Buffer中或者总是要从一个Buffer中写入

Channel最重要的通道的实现:

  • FileChannel:从文件中读写数据
  • DatagramChannel:能通过UDP读写网络中的数据
  • SocketChannel:能通过TCP读写网络中的数据
  • ServerSocketChannel:可以监听新进来的TCP连接,像WEB服务器那样。对每一个新进来的链接都会创建一个SocketChannel。

2.2.2.1 FileChannel类

FileChannel主要就是针对本地文件进行IO操作,常见的方法有:

package com.yang.java.main.nio.channel;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/10
 */
public class NIOFileChannel01 {

    public static void write() throws IOException {
        String str = "hell, mark";
        // 创建一个输出流
        FileOutputStream fileOutputStream = new FileOutputStream("d:\file01.txt");
        // 获取通道
        FileChannel channel = fileOutputStream.getChannel();
        // 创建一个缓冲区,并将str放入缓冲区
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
        System.out.println("buffer: " + buffer); // buffer: java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]
        // 如果使用allocate创建之后flip
        ByteBuffer allocate = ByteBuffer.allocate(512);
        allocate.put(str.getBytes());
        System.out.println("allocate: " + allocate); // allocate: java.nio.HeapByteBuffer[pos=10 lim=512 cap=512]
        allocate.flip();
        System.out.println("allocate flip: " + allocate);  // allocate flip: java.nio.HeapByteBuffer[pos=0 lim=10 cap=512]
        channel.write(buffer);
        fileOutputStream.close();
        // 进行flip的必要
        /**
         * static int write(FileDescriptor fd, ByteBuffer src, long position,
         *                      boolean directIO, int alignment, NativeDispatcher nd)
         *         throws IOException
         *     {
         *         if (src instanceof DirectBuffer) {
         *             return writeFromNativeBuffer(fd, src, position, directIO, alignment, nd);
         *         }
         *
         *         // Substitute a native buffer
         *         int pos = src.position();
         *         int lim = src.limit();
         *         assert (pos <= lim);
         *         int rem = (pos <= lim ? lim - pos : 0);  // 主要看这里
         *         ByteBuffer bb;
         *         if (directIO) {
         *             Util.checkRemainingBufferSizeAligned(rem, alignment);
         *             bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment);
         *         } else {
         *             bb = Util.getTemporaryDirectBuffer(rem);
         *         }
         *         try {
         *             bb.put(src);
         *             bb.flip();  // 看这里
         *             // Do not update src until we see how many bytes were written
         *             src.position(pos);
         *
         *             int n = writeFromNativeBuffer(fd, bb, position, directIO, alignment, nd);
         *             if (n > 0) {
         *                 // now update src
         *                 src.position(pos + n);
         *             }
         *             return n;
         *         } finally {
         *             Util.offerFirstTemporaryDirectBuffer(bb);
         *         }
         *     }
         */
    }

    public static void read() throws IOException {
        File file = new File("d:\file01.txt");
        // 创建输入流
        FileInputStream fileInputStream = new FileInputStream(file);
        // 获取Channel
        FileChannel channel = fileInputStream.getChannel();
        // 创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate((int) file.length());
        // 将通道内数据读入缓冲区,因为设定的缓冲区大小等于文件大小,因此一次是可以读完的,否则需要循环读取,返回值是读取的长度,如果等于-1,则说明无信息可读取
        int read = channel.read(buffer);
        System.out.println("read " + read);  // read 10
        System.out.println("read message: " + new String(buffer.array())); // read message: hell, mark
    }

    public static void transForm() throws IOException {
        FileInputStream fileInputStream = new FileInputStream("d:\package\sogou_pinyin_98a.exe");
        FileOutputStream fileOutputStream = new FileOutputStream("d:\sogou_pinyin_98a.exe");
        // 获取各个流的channel
        FileChannel fileInputChannel = fileInputStream.getChannel();
        FileChannel fileOutputChannel = fileOutputStream.getChannel();
        System.out.println("fileInputChannel isOpen: " + fileInputChannel.isOpen()); // fileInputChannel isOpen: true
        System.out.println("fileOutputChannel isOpen: " + fileOutputChannel.isOpen()); // fileOutputChannel isOpen: true
        // 使用transform进行copy一个可读的channel
        fileOutputChannel.transferFrom(fileInputChannel, 0, fileInputChannel.size());  // copy完成的文件与源文件一致
        // 关闭通道以及流
        fileInputChannel.close();
        fileOutputChannel.close();
        fileInputStream.close();
        fileOutputStream.close();
    }

    public static void position() throws IOException {
        FileInputStream fileInputStream = new FileInputStream("d:\file01.txt");
        FileChannel fileInputStreamChannel = fileInputStream.getChannel();
        System.out.println("fileInputStreamChannel size: " + fileInputStreamChannel.size());  // fileInputStreamChannel size: 10
        System.out.println("fileInputStreamChannel position: " + fileInputStreamChannel.position()); // fileInputStreamChannel position: 0
        fileInputStreamChannel.position(2);
        System.out.println("fileInputStreamChannel position: " + fileInputStreamChannel.position()); // fileInputStreamChannel position: 2
        ByteBuffer allocate = ByteBuffer.allocate((int) fileInputStreamChannel.size());
        // 从文件的position开始读取,一直读到结尾
        fileInputStreamChannel.read(allocate);
        System.out.println("byteBuffer: " + allocate);  // byteBuffer: java.nio.HeapByteBuffer[pos=8 lim=10 cap=10]
        System.out.println("message: " + new String(allocate.array()));  // message: ll, mark
        fileInputStreamChannel.close();
        fileInputStream.close();
    }

    public static void transferTo() throws IOException {
        FileInputStream fileInputStream = new FileInputStream("d:\file01.txt"); // 源文件内容:hello, mark
        FileOutputStream fileOutputStream = new FileOutputStream("d:\file02.txt");
        // 获取各个流的channel
        FileChannel fileInputChannel = fileInputStream.getChannel();
        FileChannel fileOutputChannel = fileOutputStream.getChannel();
        System.out.println("fileInputChannel isOpen: " + fileInputChannel.isOpen());  // fileInputChannel isOpen: true
        System.out.println("fileOutputChannel isOpen: " + fileOutputChannel.isOpen());  // fileOutputChannel isOpen: true
        // 发送数据给一个可写的channel,发送位置为2(从0开始计数),发送的字节数量为3
        fileInputChannel.transferTo(2, 3, fileOutputChannel); // 发送给的文件内容:llo
        // 关闭通道以及流
        fileInputChannel.close();
        fileOutputChannel.close();
        fileInputStream.close();
        fileOutputStream.close();
    }

    public static void main(String[] args) throws IOException {
        write();
        read();
        transForm();
        position();
        transferTo();
    }
}
MappedByteBuffer,可以让文件直接在内存中进行修改,操作系统不需要拷贝一次,因为涉及文件所以放在这里面,在这里面使用RandomAcessFIle进行操作。、
RandomAcessFIle是一个可以对文件随机访问,包括读写,该读写是基于指针的操作。可以把它理解为一个文本编辑器。
看构造器源码:
private RandomAccessFile(File file, String mode, boolean openAndDelete) throws FileNotFoundException {
        String name = (file != null ? file.getPath() : null);
        int imode = -1;
        if (mode.equals("r"))
            imode = O_RDONLY;
        else if (mode.startsWith("rw")) {
            imode = O_RDWR;
            rw = true;
            if (mode.length() > 2) {
                if (mode.equals("rws"))
                    imode |= O_SYNC;
                else if (mode.equals("rwd"))
                    imode |= O_DSYNC;
                else
                    imode = -1;
            }
        }
        ...      
}
该操作符定义了四种模式:
r 以只读的方式打开文本,也就意味着不能用write来操作文件
rw 读操作和写操作都是允许的
rws 每当进行写操作,同步的刷新到磁盘,刷新内容和元数据
rwd 每当进行写操作,同步的刷新到磁盘,刷新内容

 接下来看一下简单的demo

public class MappedByteBufferDemo {
    public static void main(String[] args) throws IOException {
        RandomAccessFile randomAccessFile = new RandomAccessFile("d:\file01.txt", "rw");
        FileChannel channel = randomAccessFile.getChannel();
        /**
         * 参数1:制定该channel的模式:
         *      public static final MapMode READ_ONLY = new MapMode("READ_ONLY");   只读模式
         *      public static final MapMode READ_WRITE =new MapMode("READ_WRITE");  读写模式
         *      public static final MapMode PRIVATE =new MapMode("PRIVATE");        写时复制
         * 参数2:可以直接修改的起始位置
         * 参数3:是映射到内存的大小,即当前文件有多少个字节可以映射到内存中,,一共有size大小的字节映射到内存
         */
        MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 12);
        map.put(0, (byte) 'A');
        map.put(1, (byte) 'B');
        randomAccessFile.close();  // 修改值之后文件内容(有两个空格):‘ABll, mark  ’
    }
}

2.2.2.2 DatagramChannel类

使用与ServerSocketChannel类似,一个是UDP,一个是TCP

2.2.2.3 SocketChannel 与ServerSocketChannel类

详见Selector。 

2.2.3 Selector

Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。

 

仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源(如内存)。但是随着现代计算机的增强,对于多CPU需要考虑实际情况进行选择线程数。

Selector的特点:

  • Netty的IO线程NIOEventLoop聚合了Selector(选择器,或者多路复用器),可以同时并发处理成百上千个客户端的链接
  • 当线程从客户端的Socket通道进行读写数据时,如没有数据可用,该线程可以执行其他任务
  • 线程通常将非阻塞IO的空闲时间用于在其他通道上执行IO操作,所以单独的线程可以管理多个输入与输出通道
  • 由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁I/O阻塞导致的线程挂起
  • 一个I/O线程可以并发处理N个客户端链接和读写操作,这从根本上解决了传统同步阻塞I/O,一连接一线程模型,架构的性能、弹性伸缩能力以及可靠性都得到了提升

selector的一些方法:

public abstract class Selector implements Closeable {

    public static java.nio.channels.Selector open() throws IOException // 得到一个选择对象

    public abstract boolean isOpen();

    public abstract SelectorProvider provider();

    /**
     * // The set of keys registered with this Selector
     *     private final Set<SelectionKey> keys;
     *
     *     // The set of keys with data ready for an operation
     *     private final Set<SelectionKey> selectedKeys;
     *
     *     // Public views of the key sets
     *     private final Set<SelectionKey> publicKeys;             // Immutable
     *     private final Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
     *
     *     // used to check for reentrancy
     *     private boolean inSelect;
     *
     *     protected SelectorImpl(SelectorProvider sp) {
     *         super(sp);
     *         keys = ConcurrentHashMap.newKeySet();
     *         selectedKeys = new HashSet<>();
     *         publicKeys = Collections.unmodifiableSet(keys);
     *         publicSelectedKeys = Util.ungrowableSet(selectedKeys);
     *     }
     * @return
     */
    public abstract Set<SelectionKey> keys();  // 从内部集合获取所有的selectionKey,实际上就是:publicKeys,也就是:keys

    public abstract Set<SelectionKey> selectedKeys();  // 从内部集合获取所有的selectionKey,实际上就是:publicSelectedKeys,也就是:selectedKeys

    public abstract int select() throws IOException; // 监控所有的注册通道,当其中有IO操作可以进行,将对应的selectionKey加入到内部稽核并返回,参数用来设置超时时间。会进行阻塞

    public abstract int selectNow() throws IOException;  // 不阻塞,立马返回

    public abstract java.nio.channels.Selector wakeup();  // 唤醒selector

    public abstract void close() throws IOException;    // 关闭
}

NIO,我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。例如对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。

其次,用一个死循环选择就绪的事件,会执行系统调用(Linux 2.6之前是select、poll,2.6之后是epoll,Windows是IOCP),还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。

注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以在一个while(true)里面调用这个函数而不用担心CPU空转。接下来看一张图:

 一个NIO的连接过程

  1. 当客户端连接时,会通过ServerSocketChannel得到SocketChannel
  2. Selector进行监听select方法,返回有事件发生的通道个数
  3. 将SocketChannel注册到Selector上,及调用register(Selector sel, int ops),一个selector可以注册多个SocketChannel
  4. 注册后返回一个SelectionKey,会和该Selector关联
  5. 进一步得到各个SelectionKey(有事件发生时)
  6. 可以通过得到的Channel,完成业务处理

一个简单的 NIO demo

package com.yang.java.main.nio.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/10
 */
public class NIOServer {

    public static void main(String[] args) throws IOException {
        // 创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定端口,进行服务监听
        serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 9999));
        // 设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        // 得到Selector对象
        Selector selector = Selector.open();
        // 将serverSocketChannel注册到selector中,并设置关心事件为accept
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 循环等待客户端的连接
        while (true){
            if(selector.select(2000) == 0){
                System.out.println("server wait for 2 seconds, but no connect");
                continue;
            }
            // 如果返回的大于0,就说明已经获取到关注的事件,然后获取关注事件的集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 使用迭代器
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                SelectionKey selectionKey = iterator.next();
                //判断发生的事件
                if(selectionKey.isAcceptable()){
                    // 有客户端连接进来了,生成一个Channel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("a new client is connect");
                    // 将socketChannel设置为非阻塞的
                    socketChannel.configureBlocking(false);
                    // 注册事件,同时可以为该channel关联一个ByteBuffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
                if(selectionKey.isReadable()){
                    // 通过selectionKey反向获取通道
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    // 获取关联的buffer
                    ByteBuffer attachment = (ByteBuffer) selectionKey.attachment();
                    channel.read(attachment);
                    System.out.println("receive message: " + new String(attachment.array()));
                    channel.close();
                }
                iterator.remove();
            }
        }
    }
}
package com.yang.java.main.nio.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/10
 */
public class NIOClient {

    public static void main(String[] args) throws IOException {
        // 获取一个socketChannel
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        // 连接服务器
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
        while (!socketChannel.finishConnect()){
            System.out.println("not finish connect, do other");
        }
        String str = "hello, service";
        ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
        socketChannel.write(byteBuffer);
        socketChannel.close();
    }
}

 2.3 Java AIO 编程

AIO 即 NIO2.0,叫做异步不阻塞的 IO。AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用

Java 异步 IO 提供了两种使用方式,分别是返回 Future 实例和使用回调函数。但是AIO编程代码量实在是太多了,就是各种异步,回调,到现在也感觉不是很流行,netty出来了用的就更少了。

3 NIO与零拷贝

首先说明概念:磁盘到内核空间属于DMA(direct memory access)拷贝(DMA即直接内存存取,原理是外部设备不通过CPU而直接与系统内存交换数据)。而内核空间到用户空间则需要CPU的参与进行拷贝,既然需要CPU参与,也就涉及到了内核态和用户态的相互切换。

传统IO操作(作图数据拷贝,右图内核态与用户态切换):

  1. 数据需要从磁盘拷贝到内核空间,再从内核空间拷到用户空间(JVM)。
  2. 程序可能进行数据修改等操作
  3. 再将数据拷贝到内核空间,内核空间再拷贝到网卡内存,通过网络发送出去(或拷贝到磁盘)。

 目前NIO(linux 2.4版本之后)(作图数据拷贝,右图内核态与用户态切换)

改进的地方:

  • 将上下文切换次数从4次减少到了2次;
  • 将数据拷贝次数从4次减少到了2次(DMA直接存取)。

 NIO的零拷贝由transferTo()方法实现,在fileChannel中已经使用。transferTo()方法将数据从FileChannel对象传送到可写的字节通道(如Socket Channel等)。在内部实现中,由native方法transferTo()来实现,它依赖底层操作系统的支持。在UNIX和Linux系统中,调用这个方法将会引起sendfile()系统调用。

注意:零拷贝是建立在不需要进行数据文件操作的情况下使用的

零拷贝是指没有系统CPU参与,但是这里其实是有一次CPU拷贝的,也就是从内核态拷贝到发送通道,但是拷贝的信息比较少,比如length,offset,消耗很低,可以忽略

零拷贝的理解

  • 零拷贝是从操作系统角度来讲,也就是说内核缓冲区之间,没有数据是重复的只有kernel buffer有一份数据
  • 零拷贝不仅仅带来更少的数据复制,还带来性能优势,例如更少的上下文切换,更少的CPU缓存。伪共享以及无CPU校验和计算

demo查看区别,首先起一个服务端,不需要保存文件,主要看客户端

public class Service {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 9999));
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            int read;
            do {
                read = socketChannel.read(buffer);
                buffer.clear();
            } while (read != -1);
            System.out.println("finished read");
            socketChannel.close();
        }
    }
}

客户端

package com.yang.java.main.nio.zeroCopy;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/11
 */
public class Client {

    public static void main(String[] args) throws IOException {
        zeroCopy();
        tradition();
    }

    public static void zeroCopy() throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
        FileInputStream inputStream = new FileInputStream("D:\package\netty-4.1.zip");
        FileChannel channel = inputStream.getChannel();
        System.out.println("file size: " + channel.size()); // file size: 5375946
        long startTime = System.currentTimeMillis();
        long l = channel.transferTo(0, channel.size(), socketChannel);
        System.out.println("send the total size: " + l + "用时:=" + (System.currentTimeMillis() - startTime)); // send the total size: 5375946用时:=5
        channel.close();
        inputStream.close();
        socketChannel.close();
    }

    public static void tradition() throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
        FileInputStream inputStream = new FileInputStream("D:\package\netty-4.1.zip");
        FileChannel channel = inputStream.getChannel();
        System.out.println("file size: " + channel.size());  // file size: 5375946
        ByteBuffer buffer = ByteBuffer.allocate(4096);
        int read;
        long startTime = System.currentTimeMillis();
        do {
            read = channel.read(buffer);
            socketChannel.write(buffer);
            buffer.rewind();
        } while (read != -1);
        System.out.println("用时:=" + (System.currentTimeMillis() - startTime)); // 用时:=12
        channel.close();
        inputStream.close();
        socketChannel.close();
    }
}

从结果可以看出,零拷贝模式比传统拷贝速度快很多。

但是大部分时候我们是需要对数据进行操作,但又想快速的,这个时候就可以使用NIO的直接内存,一个典型的例子就是Buffer中的例子MappedByteBuffer

直接内存则介于传统拷贝以及零拷贝之间,且可操作文件数据。直接内存(mmap技术)将文件直接映射到内核空间的内存,返回一个操作地址(address),它解决了文件数据需要拷贝到JVM才能进行操作的窘境。而是直接在内核空间直接进行操作,省去了内核空间拷贝到用户空间这一步操作。

NIO的直接内存(mmap技术)是由MappedByteBuffer实现的。核心即是map()方法,该方法把文件映射到内存中,获得内存地址addr,然后通过这个addr构造MappedByteBuffer类,以暴露各种文件操作API。mmap适合小数据量读写,他需要4次上下文切换,3次数据拷贝

4 Netty

来一张netty的官网照片,显示其厉害之处。

  1.  netty的设计比较优雅,适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点,高度可定制的线程模型 - 单线程,一个或多个线程池。
  2. netty使用方便,对比NIO编程较多的API,netty基本做好了封装,用户基本只需要关注业务代码的实现,并且netty还有比较详细的javadoc
  3. netty对比NIO性能能更好,吞吐量更高,减少资源消耗,最小化不必要的内存复制
  4. 完全的SSLTLS以及StartTLS支持

4.1 同步和异步、阻塞与非阻塞概念

同步和异步的概念描述的是用户线程与内核的交互方式:同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;而异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

阻塞和非阻塞的概念描述的是用户线程调用内核IO操作的方式:阻塞是指IO操作需要彻底完成后才返回到用户空间;而非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。

4.2 Reactor

4.2.1 概述

Reactor对应的叫法:反应器模式、分发者模式、通知者模式

Reactor模式由一个或多个输入同时传递给服务器的模式(基于事件驱动),服务端程序处理传入的多个请求,并将它们同步分派到相应的处理线程。

Reactor核心组成

  1. Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO作出反应(也就是分派到相应的处理线程)
  2. Handler,处理程序执行I/O事件要完成的实际事件。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。

4.2.2 模式分类

根据Reactor的数量以及处理资源线程池数量不同,有3种实现

  • 单Reactor 单线程
  • 单Reactor 多线程
  • 主从 Reactor 多线程

4.2.2.1 单Reactor单线程

流程:

  1. Reactor通过Select监控客户端请求事件,收到事件后通过Dispatch进行分发
  2. 如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个handler完成后续业务处理
  3. 如果不是连接事件,则会分发给连接对应的Handler进行处理业务
  4. Handler进行进行 read -> 业务处理 -> send 流程

优缺点:

优点:模型简单,没有多线程。进程通信、竞争的问题,全部都在一个线程完成

缺点:

  • 性能问题,只有一个线程,无法完全发挥多核CPU的性能。Handler在处理某个连接业务时,整个线程无法再次处理其他连接事件,较容易造成性能瓶颈
  • 可靠性问题,线程意外终止,或进入死循环,会导致整个通讯模块不可用

使用场景:客户端的数量有限,业务处理非常迅速,比如Redis在业务处理的事件复杂度O(1)情况

4.2.2.2 单Reactor多线程

流程:

  1. Reactor会通过select进行监听客户端的请求事件,收到事件之后会通过dispatch进行分发
  2. 如果是建立连接的请求,会发给Acceptor通过accept接受事件,并创建一个Handler对象处理完成连接之后的事件
  3. 不是连接请求,则分发给连接对应的handler处理
  4. handler只负责相应事件,不做具体业务处理,通过read读取数据之后,分发给后面worker线程池处理业务
  5. worker线程池会分配线程完成真正的业务,并将结果返回给handler
  6. handler收到响应之后,通过send将结果返回给客户端

优点:可以充分利用多核能力

缺点:多线程数据共享以及访问比较复杂,Reactor处理所有的监听以及响应事件,在单线程运行,高并发场景容易出现瓶颈

4.2.2.3 主从Reactor多线程

流程

  1. Reactor主线程MainReactor对象通过select监听连接事件,收到事件之后分发给Acceptor处理连接事件
  2. Acceptor处理完连接事件之后,MainReactor将链接分配给SubReactor
  3. Subreactor将加入到链接队列进行监听,并创建handler进行各种业务处理
  4. 当有新事件发生时,SubReactor会调用对应的handler进行处理
  5. handler通过read读取数据,分发给后面的worker线程池处理
  6. worker线程池分配独立的worker线程进行业务处理,并返回结果给handler
  7. handler接收响应,在通过send将结果返回给clieng

注意:Reactor主线程可以有多个Reactor子线程,即一个MainReactor可以关联多个SunReactor

 

 优点:

  • 父线程与子线程数据交互简单职责明确,父线程只需要接受新连接,子线程完成业务处理
  • 父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据

缺点:编码复杂性高

4.2.3 Reactor工作原理

 EventHandler抽象类表示IO事件处理器,它拥有IO文件句柄Handle(通过get_handle获取),以及对Handle的操作handle_event(读/写等)。继承于EventHandler的子类可以对事件处理器的行为进行定制。Reactor类用于管理EventHandler(注册、删除等),并使用handle_events实现事件循环,不断调用同步事件多路分离器(一般是内核)的多路分离函数select,只要某个文件句柄被激活(可读/写等),select就返回(阻塞),handle_events就会调用与文件句柄关联的事件处理器的handle_event进行相关操作。

通过Reactor的方式,可以将用户线程轮询IO操作状态的工作统一交给handle_events事件循环进行处理。用户线程注册事件处理器之后可以继续执行做其他的工作(异步),而Reactor线程负责调用内核的select函数检查socket状态。当有socket被激活时,则通知相应的用户线程(或执行用户线程的回调函数),执行handle_event进行数据读取、处理的工作。由于select函数是阻塞的,因此多路IO复用模型也被称为异步阻塞IO模型。注意,这里的所说的阻塞是指select函数执行时线程被阻塞,而不是指socket。一般在使用IO多路复用模型时,socket都是设置为NONBLOCK的,不过这并不会产生影响,因为用户发起IO请求时,数据已经到达了,用户线程一定不会被阻塞。

事件循环不断地调用select获取被激活的socket,然后根据获取socket对应的EventHandler,执行器handle_event函数即可。

IO多路复用是最常使用的IO模型,但是其异步程度还不够“彻底”,因为它使用了会阻塞线程的select系统调用。因此IO多路复用只能称为异步阻塞IO,而非真正的异步IO。

4.3 Netty模型

Netty就是基于上述的Reactor主从多线程模型实现的

介绍:

  • Netty抽象出两个线程池,BossGroup 专门负责处理接受客户端的链接,WorkerGroup专门负责网络的读写
  • BossGroup和WorkerGroup类型都是NioEventLoopGroup
  • NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop
  • NioEventLoop表示一个不断循环的处理执任务的线程,每一个NioEventLoop都有一个selector,用于监听绑定在其上的socket通信
  • NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop
  • 每个BossGroup中NioEventLoop循环执行的步骤:
    • 轮询accept事件
    • 处理accept事件,与client建立连接之后,生成NioSocketChannel,并将其注册到WorkerGroup的NioEventLoop上面的selector上面
    • 处理任务队列中的任务,runAllTasks
  • 每个WorkerGroup的NioEventLoop循环执行的步骤:
    • 轮询read,write事件
    • 处理I/O事件,即read,write事件,在对应NioSocketChannel处理
    • 处理任务队列的任务,runAllTasks
  • 每个WorkGroup中NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel,即通过pipeline可以获取对应管道,管道中维护了很多的handler

 一个Demo:

 服务端:

package com.yang.java.main.netty.demo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/11
 */
public class NettyServer {
    public static void main(String[] args) {
        // 创建两个线程组,默认的子线程NioEventLoop个数为cpu核数*2,也可以自己自定数量
        // DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        /* 最终会调用这个方法
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }         */
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        // 创建服务器端的启动对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workGroup)  // 设置两个线程组
                .channel(NioServerSocketChannel.class) // 设置服务器通道实现的类
                .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到线程连接的个数
                .handler(new LoggingHandler(LogLevel.DEBUG)) // handler是设置bossGroup的,这是一个日志级别handler
                .childHandler(new ChannelInitializer<SocketChannel>() {  // childHandler是设置workerGroup的
                    // 微通道为通道设置处理器
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //addLast也就是在通道中标所有的handler字后面加(除了tail handler,这个是netty跪地必须要通道尾部)
                        ch.pipeline().addLast(new NettyServerHandler());  // 这里面添加的类必须实现ChannelHandler
                    }
                });
        System.out.println("the server is ready ");
        // 绑定端口生成future对象
        ChannelFuture channelFuture = serverBootstrap.bind(9999);
        // 对关闭通道进行监听
        try {
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 最终关闭两个线程组
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }

    }
}

class NettyServerHandler extends ChannelInboundHandlerAdapter {

    // 有连接上来
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive current thread is: " + Thread.currentThread().getName());
    }

    /**
     * 读取数据
     *
     * @param ctx 上下文含有pipeline,channel等信息
     * @param msg 客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("channelRead current thread is: " + Thread.currentThread().getName());
        ByteBuf buffer = (ByteBuf) msg;
        System.out.println("receive message is: " + buffer.toString(CharsetUtil.UTF_8));
        System.out.println("client address is: " + ctx.channel().remoteAddress());
     }
// 数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete current thread is: " + Thread.currentThread().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer("server received the message", CharsetUtil.UTF_8)); } // 异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 关闭 ctx.close(); } }

客户端:

package com.yang.java.main.netty.demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/11
 */
public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        // 创建客户端启动对象
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)  // 设置客户端通道的实现类
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 加入处理器
                        ch.pipeline().addLast(new NettyClientHandler());
                    }
                });
        System.out.println("client ready");
        ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9999));
        try {
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

class NettyClientHandler extends ChannelInboundHandlerAdapter {
    // 通道就绪会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("client channel is ready, start send message");
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        System.out.println("channelReadComplete");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buffer = (ByteBuf) msg;
        System.out.println("receive from server: " + buffer.toString(CharsetUtil.UTF_8));
        System.out.println("server address is: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

服务端输出结果:

the server is ready 
channelActive current thread is: nioEventLoopGroup-3-1
channelRead current thread is: nioEventLoopGroup-3-1
receive message is: hello, server
client address is: /127.0.0.1:59018
channelReadComplete current thread is: nioEventLoopGroup-3-1

使用的简单实例,任务队列典型使用场景

  • 用户自定义的普通任务
  • 用户自定义的定时任务
class NewNettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 1 用户自定义的普通任务
        ctx.channel().eventLoop().execute(() -> lateSend(ctx));

        // 2 用户自定义的定时场景
        ctx.channel().eventLoop().schedule(() ->lateSend(ctx), 5, TimeUnit.SECONDS);
    }

    private void lateSend(ChannelHandlerContext ctx) {
        try {
            System.out.println("current thread: " + Thread.currentThread().getName());
            Thread.sleep(5 * 1000);
            ctx.writeAndFlush(Unpooled.copiedBuffer("wait 5s", CharsetUtil.UTF_8));
            System.out.println("current channel" + ctx.channel().hashCode());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
结果
execute start time: 2020-09-11T16:38:26.929054500
schedule start time: 2020-09-11T16:38:26.932026300
execute lateSend time: 2020-09-11T16:38:26.933025600
execute current thread: nioEventLoopGroup-3-1
execute current channel-1557798060
schedule lateSend time: 2020-09-11T16:38:31.972724600
schedule current thread: nioEventLoopGroup-3-1
schedule current channel-1557798060

4.4 Netty的核心模块组件

4.4.1 Bootstrap、ServerBootstrap

这两个类主要作用就是配置Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端的启动引导类

4.4.2 Future、ChannelFuture

Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件常见的方法有:
  • Channel channel(),返回当前正在进行 IO 操作的通道
  • ChannelFuture sync(),等待异步操作执行完毕
  • ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); 添加监听器
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");
        synchronized (this) {
            addListener0(listener);
        }
        // 收到结果,进行通知
        if (isDone()) {
            notifyListeners();
        }
        return this;
}

4.4.3 Channel

  1. Netty 网络通信的组件,能够用于执行网络 I/O 操作。
  2. 通过 Channel 可获得当前网络连接的通道的状态
  3. 通过 Channel 可获得 网络连接的配置参数 (例如接收缓冲区大小)
  4. Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成
  5. 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方
  6. 支持关联 I/O 操作与对应的处理程序
  7. 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应

4.4.4 Selector

Netty基于Selcctor对象实现I/O多路复用,通过Selector一个线程可以监听多个连接的Channel事件

当向一个Selector中注册Channel后,Selector内部的机制就可以自动不断的查询(select)这些注册的Channel是否有已就绪的I/O事件(可读、可写、网络连接等),这样程序就可以使用一个线程的管理多个Channel。

4.4.5 ChannelHandler

ChannelHandler是一个接口,处理I/O事件或者拦截I/O操作,并将其转发到对应的ChannelPipeline中的下一个处理程序

ChannelHandler本身并没有提供很多方法。但是通常情况下我们必须实现他的子类:

  • ChannelInboundHandler处理进栈的I/O事件
类型描述
channelRegistered Invoked when a Channel is registered to its EventLoop and is able to handle I/O.
channelUnregistered Invoked when a Channel is deregistered from its EventLoop and cannot handle any I/O.
channelActive Invoked when a Channel is active; the Channel is connected/bound and ready.
channelInactive Invoked when a Channel leaves active state and is no longer connected to its remote peer.
channelReadComplete Invoked when a read operation on the Channel has completed.
channelRead Invoked if data are read from the Channel.
channelWritabilityChanged Invoked when the writability state of the Channel changes. The user can ensure writes are not done too fast (with risk of an OutOfMemoryError) or can resume writes when the Channel becomes writable again.Channel.isWritable() can be used to detect the actual writability of the channel. The threshold for writability can be set via Channel.config().setWriteHighWaterMark() and Channel.config().setWriteLowWaterMark().
userEventTriggered(...) Invoked when a user calls Channel.fireUserEventTriggered(...) to pass a pojo through the ChannelPipeline. This can be used to pass user specific events through the ChannelPipeline and so allow handling those events.
  • ChannelOutboundHandler处理出栈的I/O事件
类型描述
bind Invoked on request to bind the Channel to a local address
connect Invoked on request to connect the Channel to the remote peer
disconnect Invoked on request to disconnect the Channel from the remote peer
close Invoked on request to close the Channel
deregister Invoked on request to deregister the Channel from its EventLoop
read Invoked on request to read more data from the Channel
flush Invoked on request to flush queued data to the remote peer through the Channel
write Invoked on request to write data through the Channel to the remote peer

几乎所有的方法都将 ChannelPromise 作为参数,一旦请求结束要通过 ChannelPipeline 转发的时候,必须通知此参数。

ChannelPromise vs. ChannelFuture

ChannelPromise 是 特殊的 ChannelFuture,允许你的 ChannelPromise 及其 操作 成功或失败。所以任何时候调用例如 Channel.write(...) 一个新的 ChannelPromise将会创建并且通过 ChannelPipeline传递。这次写操作本身将会返回 ChannelFuture, 这样只允许你得到一次操作完成的通知。Netty 本身使用 ChannelPromise 作为返回的 ChannelFuture 的通知,事实上在大多数时候就是 ChannelPromise 自身(ChannelPromise 扩展了 ChannelFuture)

ChannelDuplexHandler可以处理进站以及出站事件,但是通常不推荐使用

4.4.6 ChannelPipeline

ChannelPipeline是一个Handler的集合,他负责处理和拦截inbound或者outbound的事件和操作,相当于一个贯穿Netty的链(ChannelPipe是保存ChannelHandler的链表,由于处理和拦截Channel的入栈和出栈事件操作)

ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个ChannelHandler的交互

在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应

一个Channel包含了一个ChannelPipeline,二ChannelPipeline 中维护了一个有ChannelHandlerContext组成的双向链表,并且每一个ChannelHandlercontext中又关联着一个ChannelHandler

入站事件和出站事件在同一个双向链表中,入站事件会从链表head(默认的一个ChannelHandler,永远在第一位)依次向后传递一直到最后一个handler,

出站事件会从链表tail(默认的handler,永远在结尾,创建pipeline创建,属于defaultChannelHandler)往前传递会传递到最前一个出站的handler,两种类型的handler互不干扰,下图描述了入站以及出站的路程

源码中有标识入站以及出站事件 ,socket read一般就是入站,socket write一般就是出站

4.4.7 ChannelHandlerContext

保存Channel相关所有的上下文信息,同时关联一个ChannelHandler对象

channelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和Channel的信息,方便对ChannelHandler进行调用

重要的是要注意,虽然在 Channel 或者 ChannelPipeline 上调用write() 都会把事件在整个管道传播,但是在 ChannelHandler 级别上,从一个处理程序转到下一个却要通过在 ChannelHandlerContext 调用方法实现。

  1. 事件传递给 ChannelPipeline 的第一个 ChannelHandler
  2. ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的 下一个
  3. ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的 下一个

4.4.8 EventLoopGroup

EventLoopGroup是一组EventLoop的抽象,Netty为了更好的利用多核CPU资源,一般会有多个EventLoop同时工作,没哟呀EventLoop维护着一个Selector实例

EventLoopGroup提供next接口,可以从组里面按照一定的规则获取其中一个EventLoop来处理任务。

EventLoop就是一个事件循环,它运行在一个循环中,知道它停止。网络框架需要在一个循环中为一个特定的连接运行时间

    for (;;) {
            Runnable task = takeTask();
            if (task != null) {
                task.run();
                updateLastExecutionTime();
            }

            if (confirmShutdown()) {
                break;
            }
        }

EventLoop是从EventExecutor和ScheduledExecutorService扩展而来,所以可以将任务直接交给EventLoop执行,另外关于事件和任务的执行顺序是FIFO

如果任务与EventLoop的Thread是相同的,那么代码块就会被执行,如果线程不同,会安排一个任务并在一个内部队列后执行,先涂就是一个EventLoop中调度任务的执行逻辑

 

  •  应在EventLoop中执行的任务
  • 任务传递到执行方法时候,自行检查调用线程是否与分配给EventLoop是一样的
  • 线程一样,则可以直接执行任务
  • 线程不一样,则会家务任务队列,当EventLoop事件执行时,队列中的任务就会执行

对于长时间的任务或阻塞的I/O请使用EventExector,之前演示了这个机制,现在过一下步骤

在指定的延时时间后调度任务

  • 任务被插入到EventLoop的Schedul-Tasks-Queue(调度任务队列)
  • 如果任务需要马上执行,EventLoop检查并运行
  • 如果有一个任务要执行,EventLoop将立即执行,并从队列中删除
  • EventLoop等待下一次运行,然乎从上一步开始重复

对于EventLoop,EventLoopGroup以及Channel的关系

  • 所有的EventLoop由EventLoopGroup分配
  • EventLoop处理所有分配给他的管道的事件和任务。每个EventLoop绑定到一个Thread
  • 管道绑定到EventLoop,所以所有的操作总是倍同一个线程在Channel的生命周期执行,一个管道只属于一个连接 

4.4.9 Unpooled

首先了解一个Netty提供的数据缓冲容器ByteBuf

ByteBuf中有两个索引,一个用来写,一个用来读,这个是与ByteBuffer最大的不同,写入数据到ByteBuf后,writeindex会增加写入的字节数。读取字节后,readerIndex会增加读出去的字节数。可以读取字节知道与写入索引相同,再读抛出异常

 Unpooled类就是一个专门用来操作缓冲区的工具类

5 编解码器以及handler的调用机制

5.1 解码器

解码器就是负责将入站数据从一种格式转化到另外一种格式,目前主要分为两类:

  • 解码字节到消息(ByteToMessageDevoder和ReplayingDecoder)
  • 解码消息到消息(MessageToMessageDecoder)

Netty的解码器是一种ChannelInboundHandler的抽象实现。就是将入站数据转化格式之后传递到ChannelPipeline中的下一个ChannelInboundHandler进行处理。

主要看一下ByteToMessageDecoder这个类,将字节码转化为消息,下面是两个比较重要的方法

  /**
     * 从字节码解析到另外一种数据格式,这个方法会在没有任何数据输入的时候调用
     *
     * @param ctx           上下文
     * @param in            入站信息
     * @param out           传递给下一个ChannelInboundHandler数据
     * @throws Exception    异常
     */
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

  /**
     * 这个方法将会在channelInActive之后调用,也是这个推出的时候,这个方法其实也是在调用decode方法*/
    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.isReadable()) {
            // Only call decode() if there is something left in the buffer to decode.
            decodeRemovalReentryProtection(ctx, in, out);
        }
    }

demo(我们解码器的demo都在客户端发送1,2,3数字)

class ToIntegerDecoder extends ByteToMessageDecoder{

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if(in.readableBytes() >= 4){
            out.add(in.readInt());
        }
    }
}
receive message is: 1
receive message is: 2
receive message is: 3

从输出结果也可以看出,每次从入站的ByteBuf读取四个字节,解码成整形,所包含的内容会被发送到下一个ChannelInboundHandler

ByteToMessageDecoder进行decode时,每次都需要我们进行校验ByteBuf中是否还有足够的字节进行转码,Netty还提供ReplayingDecoder的抽象基类,继承自ByteMessageByte,使用这个类就无需自己检查,如果ByteBuf中有足够的字节会正常读取,如果没有,则会停止解码,但是同时也会带来一些局限性:

  • 并非所有的标准的ByteBuf操作都支持
  • 速会慢一些 

当然还有一些其他解码器,不过大同小异。

Netty是一部框架,需要在内存中缓冲字节,知道能够急嘛,因此,我们不能让解码器缓存太多数据以免耗尽可用内存。为了避免这个问题,可以在解码器中设置最大字节阈值,后续可以进行处理。

class SafeByteToMessageDecoder extends ByteToMessageDecoder{

    private static final int MAX_FRAME_SIZE = 2;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readableBytes = in.readableBytes();
        if(readableBytes > MAX_FRAME_SIZE){
            in.skipBytes(readableBytes);
            // 可以选择抛出异常,会被下一个handler的异常捕捉接收
//            throw new TooLongFrameException("frame to big");
            out.add(10);
        }
        if(in.readableBytes() >= 4){
            out.add(in.readInt());
        }
    }
}
receive message is: 10

从结果可以看出,因为接受到的自己超出我们设定的阈值,因此会被放弃,也就是不在读取,可以看下属代码,将ByteBuf的读取下标向后移动放弃的数量

  @Override
    public ByteBuf skipBytes(int length) {
        checkReadableBytes(length);
        readerIndex += length;
        return this;
    }

5.2 编码器

有解码器当然就有编码器,就是用来将出战数据从一种格式转化为另外一种格式,因为是出战,因此他继承了ChannelOutboundHandler,这个也有两种类型

  • 编码从消息到字节(MessageToByteEncoder)
  • 编码从消息到 消息(MessageToMessageEncoder)

 demo(从服务端传递1,2,3)

class IntegerToByteEncoder extends MessageToByteEncoder<Integer>{

    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
        out.writeInt(msg);
    }
}

客户端接受

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buffer = (ByteBuf) msg;
        byte[] bytes = new byte[buffer.readableBytes()];
        buffer.readBytes(bytes);
        System.out.println("receive from server: " + Arrays.toString(bytes));  // receive from server: [0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3] 客户端也可以定义解码器,到时候就跟上述解码器输出结果一样
        System.out.println("server address is: " + ctx.channel().remoteAddress());
    }

MessageToMessage类似

5.3 Codec

当然还有codec,就是结合编码器与解码器,不再赘述

5.4 调用机制

 ChannelHandler充当了处理入站和出站的数据的应用程序逻辑的容器(只需要自定义的类实现ChannelInboundHandler或者ChannelOutboundHandler),从而可以处理入站事件和数据,如果需要发送响应,直接调用ChannelContext的write进行写入,flush进行刷新
这里面入站和出战的区分:入站主要是从socket的管道向内做就是入站,反之就是出站

另外在调用过程中,发现不论编码器还是解码器,接收消息的类型必须和待处理的消息类型一直,否则这个handler就不会执行。

原文地址:https://www.cnblogs.com/yangshixiong/p/13640035.html