NettyReactor线程模型(NIO实现)

Netty-Reactor线程模型(NIO)

这一期重点介绍Reactor模型。

Netty的整体架构就是基于了这一高性能网络编程模型——Reactor模型。 了解了该模型对后面阅读及学习理解Netty会有非常大的帮助。

而这一期使用的是NIO来编写,主要原因是Netty底层就是使用NIO来编写的,我们通过使用NIO来实现 Reactor模型,在编写的同时要去思考 为什么要这么实现,这样实现相比之前有什么好处。

传统的多线程IO

由于是BIO相关,就不贴代码了。

我们知道在使用BIO 编程时,有个致命的缺点就是 阻塞问题

不管是 ServerSocketaccept 还是 Socketread 方法,其都是阻塞的。

当一个客户端Socket在 read的时候,由于是阻塞的,则无法去创建其它客户端的socket。 因此该种模式下基本没有并发可言,效率那是相当的低。

为了解决这个问题, 在此基础上 使用了多线程 解决了 在单个socket 的 read 上阻塞的问题。 表面上看虽然是解决了并发的问题,但是也极大的消耗了系统的资源,因为每来一个Socket连接就要创建一个线程,这样的线程反复创建和销毁也是需要代价的。 (虽然可以使用线程池来解决,但是本质上还是改变不了 一个Socket连接需要对应着一个线程)。

如果想要彻底解决该问题,那么就需要使用 基于事件驱动设计的 多路复用器。 从本质上解决阻塞的问题,这也是 NIO的由来。

Reactor模型

可参考Doug Lea 大神编写的 Reactor模型文章 http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf。

有三种 Reactor 模型

  1. 单线程 单Reactor模型
  2. 多线程 单Reactor模型
  3. 多线程 多Reactor模型

友情提示: 后面的介绍 如果实在理解起来困难,就可以把 Reactor 想象成 Selector(多路复用器) ,作用就是用来分发事件。

单线程单Reactor模型

由图可知,主要有四个组件

  • Reactor: 事件监听器

  • dispatch: 事件分发器 ,主要就是将 accept事件 和 read,write事件 分开,并分发到对应的处理器上。

  • Acceptor: accept 事件处理器, 主要就是创建客户端连接

  • handler: read/write事件处理器,主要就是处理客户端的消息接收和发送。

代码实现如下:

Reactor

public class Reactor implements Runnable{

    public static void main(String[] args) {
        Reactor reactor = new Reactor();
        reactor.run();
    }

    private Selector selector;
    private ServerSocketChannel ssc;

    public Reactor() {
        try {
            ssc = ServerSocketChannel.open();
            selector = Selector.open();

            ssc.configureBlocking(false);
            ssc.bind(new InetSocketAddress("127.0.0.1",8080));
            SelectionKey selectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
            // 绑定 相应的事件处理器
            selectionKey.attach(new Acceptor(ssc,selector));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @SneakyThrows
    @Override
    public void run() {
        while (true){
            selector.select(); //事件监听器监听
            Set<SelectionKey> keys = selector.selectedKeys();
			
            for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext();){
                SelectionKey key = it.next();
                it.remove();
                dispatch(key); //调用事件分发器
            }
        }
    }

    // 事件分发器
    public void dispatch(SelectionKey key){
        Runnable r = (Runnable) key.attachment();  // 取出相应的事件处理器
        if (r != null){
            r.run();
        }
    }

}

Acceptor

/**
 *  连接事件处理器:
 *    1. 创建SocketChannel
 *    2. 将SocketChannel设置到 selector上
 *    3. 绑定相应的事件处理器
 */
public class Acceptor implements Runnable{

    private ServerSocketChannel ssc;
    private Selector selector;

    public Acceptor(ServerSocketChannel ssc,Selector selector) {
        this.ssc = ssc;
        this.selector = selector;
    }


    @SneakyThrows
    @Override
    public void run() {
        SocketChannel sc = ssc.accept();
        new Handler(sc,selector);
    }
}

Handler

/**
 *  read /write 事件处理器
 */
public class Handler implements Runnable{

    private SocketChannel sc;
    private Selector selector;
    private SelectionKey sk;
    static final int READING = 0, WRITING = 1;
    private ByteBuffer inputBuf = ByteBuffer.allocate(1024);
    private ByteBuffer outputBuf = ByteBuffer.allocate(1024);

    private int state = READING;  // initial state


    public Handler(SocketChannel sc, Selector selector) throws IOException {
        this.sc = sc;
        this.selector = selector;
        this.sc.configureBlocking(false);
        this.sk = this.sc.register(selector, SelectionKey.OP_READ);
        this.sk.attach(this);
        this.selector.wakeup();
    }


    @Override
    public void run() {
        if (state == READING) read();
        else if (state == WRITING) send();
    }

    private void read(){
        try {
            sc.read(inputBuf);
            process(); // 处理业务
            state = WRITING;
            sk.interestOps(SelectionKey.OP_WRITE); // 监听写事件
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void send() {
        try {
            sc.write(outputBuf);
            sk.interestOps(SelectionKey.OP_READ); //监听读事件
            state = READING;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    // 处理业务的方法
    private void process(){}
}

该模型虽然是 单线程,但是由于非阻塞的优势存在,其并发处理能力还是挺好的。

但是在实际业务中, IO操作 与 业务操作(非IO操作) 同样都存在着 耗时问题, 不管是哪种操作若占用了大量的时间,则会影响到其它连接的处理。 因此我们需要进一步的优化: 增加 Worker线程,专门用来处理非IO操作。

多线程单Reactor模型

多线程单Reactor模型的特点:

  1. 通过卸载非IO操作来提升Reactor线程的处理性能
  2. 可以利用线程池的方式对线程进行控制和调优。
  3. 非IO操作 同样可设计为时间驱动的模式。

下面是多线程单Reactor模型的示意图:

修改了Handler

/**
 *  read /write 事件处理器
 */
public class Handler implements Runnable{

    private SocketChannel sc;
    private Selector selector;
    private SelectionKey sk;
    static final int READING = 0, WRITING = 1;
    private ByteBuffer inputBuf = ByteBuffer.allocate(1024);
    private ByteBuffer outputBuf = ByteBuffer.allocate(1024);

    private ExecutorService PROCESS_THREAD_POOL = Executors.newFixedThreadPool(10);



    private int state = READING;  // initial state


    public Handler(SocketChannel sc, Selector selector) throws IOException {
        this.sc = sc;
        this.selector = selector;
        this.sc.configureBlocking(false);
        this.sk = this.sc.register(selector, SelectionKey.OP_READ);
        this.sk.attach(this);
        this.selector.wakeup();
    }


    @Override
    public void run() {
        if (state == READING) read();
        else if (state == WRITING) send();
    }

    private void read(){
        try {
            sc.read(inputBuf);
            PROCESS_THREAD_POOL.execute(new Process()); // 异步提交非IO操作任务
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void send() {
        try {
            sc.write(outputBuf);
            sk.interestOps(SelectionKey.OP_READ); //监听读事件
            state = READING;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 处理业务(非IO)操作任务类
     */
    class Process implements Runnable{

        @Override
        public void run() {
            process(); // 处理业务逻辑方法
            state = WRITING;
            sk.interestOps(SelectionKey.OP_WRITE); // 监听写事件
        }
        private void process(){};
    }
}

多线程多Reactor模型(Netty线程模型)

上面多线程单Reactor模型解决了 业务操作(非IO操作) 上耗时过多的问题,但是IO操作仍存在问题。

而多线程多Reactor模型 再其基础上 减轻了IO操作耗时问题。

拆分并增加反应器Reactor线程,一方面在压力较大时可以饱和处理IO操作,提高处理能力;另一方面维持多个Reactor线程也可以做负载均衡使用;线程的数量可以根据程序本身是CPU密集型还是IO密集型操作来进行合理的分配。

多线程多Reactor模型示意图如下:

其中每个selector 对应一个Reactor 线程,并将不同的处理程序绑定到不同的IO事件

具体实现代码详见 https://gitee.com/zhou_ze_ping/Netty-Reactor.git

万般皆下品,唯有读书高!
原文地址:https://www.cnblogs.com/s686zhou/p/15727054.html