Netty概述

1. 原生NIO存在的问题

  NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。

  开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。

2. netty 介绍

1. 介绍

官网: https://netty.io/

1.  Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

翻译之后就是:Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器

2.Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序

3.Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程

4.Netty 是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

5. 架构图如下:

 2. 优点

1. Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述NIO编程难的问题。

2. 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.

3. 使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。(netty版本分为 netty3.x 和 netty4.x、netty5.x(因为Netty5出现重大bug,已经被官网废弃了,目前推荐使用的是Netty4.x的稳定版本))

4. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。

5. 安全:完整的 SSL/TLS 和 StartTLS 支持。

6. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入

3. Netty高性能架构设计 

1. 线程模型基本介绍

目前存在的线程模型有:传统阻塞 I/O 服务模型 和 Reactor 模式

Reactor 模式根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现

(1)单 Reactor 单线程;

(2)单 Reactor 多线程;

(3)主从 Reactor 多线程

Netty 线程模式(Netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor)

2. 传统的IO服务模型

  传统的IO模型就是我们的一请求一线程模型。这样会存在以下问题:

当并发数很大,就会创建大量的线程,占用很大系统资源

连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read 操作,造成线程资源浪费

3. Reactor 模式

  I/O 复用结合线程池,就是 Reactor 模式基本设计思想。

  通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动),服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此Reactor模式也叫 Dispatcher模式。Reactor 模式使用IO复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键。

核心组成:

Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。

Handlers:处理程序执行 I/O 事件要完成的实际事件,Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。

4.Reactor 模式分类

1. 单 Reactor 单线程

  服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑。简单点说就是一个Reactor处理多个请求,且Handler是以同步阻塞的方式处理请求。

对应结构图如下:

例如:

Server:

package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;


public class GroupChatServer {

    /**
     * Selector 选择器
     */
    private Selector selector;

    /**
     * ServerSocketChannel
     */
    private ServerSocketChannel serverSocketChannel;

    /**
     * 端口
     */
    private static final int PORT = 6667;

    public GroupChatServer() {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            // 绑定端口
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
            // 设置非阻塞模式
            serverSocketChannel.configureBlocking(false);
            // 注册连接请求事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 监听
     */
    public void listen() {
        System.out.println("监听线程: " + Thread.currentThread().getName());

        try {
            while (true) {
                int count = selector.select();
                // 有事件处理
                if (count > 0) {
                    // 处理监听到的事件
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    SelectionKey key = null;
                    while (iterator.hasNext()) {
                        key = iterator.next();
                        // 连接请求事件
                        if (key.isAcceptable()) {
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            // 设置非阻塞
                            socketChannel.configureBlocking(false);
                            // 注册读取事件
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            // 提示
                            System.out.println(socketChannel.getRemoteAddress() + " 上线 ");
                        }

                        // 数据读取事件
                        if (key.isReadable()) {
                            new GroupServerHandler(selector).readData(key);
                        }

                        // 删除当前的key,防止重复处理
                        iterator.remove();
                    }
                }
            }
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        //创建服务器对象
        GroupChatServer groupChatServer = new GroupChatServer();
        groupChatServer.listen();
    }
}

Client

package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

public class GroupChatClient {

    /**
     * 服务器地址
     */
    private final String HOST = "127.0.0.1";
    /**
     * 服务器端口
     */
    private final int PORT = 6667;

    private Selector selector;
    private SocketChannel socketChannel;
    private String username;

    public GroupChatClient() {
        try {
            selector = Selector.open();
            // 连接服务器
            socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
            // 设置非阻塞
            socketChannel.configureBlocking(false);
            // 将channel 注册到selector, 注册读取事件
            socketChannel.register(selector, SelectionKey.OP_READ);
            // 得到username
            username = socketChannel.getLocalAddress().toString().substring(1);
            System.out.println(username + " is ok!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendMsg(String msg) {
        msg = username + " 说:" + msg;
        try {
            socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void readMsg() {
        try {
            int count = selector.select();
            // 有可用的通道
            if (count > 0) {
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        // 开启一个buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        channel.read(buffer);
                        //把读到的缓冲区的数据转成字符串
                        String msg = new String(buffer.array());
                        System.out.println(msg.trim());
                    }
                }

                // 删除当前的selectionKey, 防止重复操作
                iterator.remove();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        GroupChatClient chatClient = new GroupChatClient();
        // 启动一个线程, 每3秒,读取从服务器发送数据
        new Thread() {
            public void run() {
                while (true) {
                    chatClient.readMsg();
                    try {
                        Thread.currentThread().sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        //发送数据给服务器端
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String s = scanner.nextLine();
            chatClient.sendMsg(s);
        }
    }
}

Handler

package nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class GroupServerHandler {

    /**
     * Selector 选择器
     */
    private Selector selector;

    public GroupServerHandler(Selector selector) {
        this.selector = selector;
    }

    /**
     * 读取数据
     *
     * @param key
     */
    public void readData(SelectionKey key) {
        SocketChannel channel = null;
        try {
            channel = (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int read = channel.read(byteBuffer);
            if (read > 0) {
                //把缓存区的数据转成字符串
                String msg = new String(byteBuffer.array());
                System.out.println("form 客户端: " + msg);

                //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                sendInfoToOtherClients(msg, channel);
            }
        } catch (Exception e) {
            e.printStackTrace();

            try {
                System.out.println(channel.getRemoteAddress() + " 离线了..");
                //取消注册
                key.cancel();
                //关闭通道
                channel.close();
            } catch (IOException e2) {
                e2.printStackTrace();
            }
        }
    }

    private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException {
        System.out.println("服务器转发消息中...");
        System.out.println("当前线程: " + Thread.currentThread().getName());

        try {
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
            // ignore
        }

        for (SelectionKey key : selector.keys()) {
            // 通过 key  取出对应的 SocketChannel
            Channel targetChannel = key.channel();
            // 排除掉自己
            if (targetChannel instanceof SocketChannel && targetChannel != channel) {
                // 转型
                SocketChannel dest = (SocketChannel) targetChannel;
                // 将msg 存储到buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                // 将buffer 的数据写入 通道
                dest.write(buffer);
                System.out.println("转发给: " + dest.getRemoteAddress());
            }
        }
    }
}

方案分析:

优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成

缺点:

(1) 性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈

(2) 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障

使用场景:客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度 O(1) 的情况

2. 单 Reactor 多线程

  说白了就是一个Reactor,后面Handler处理的时候用多线程处理。handler 只负责响应事件,不做具体的业务处理, 通过read 读取数据后,会分发给后面的worker线程池的某个线程处理业务。

 改造上面代码:

(1) 增加线程工具类

package nio;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadUtils {

    private static final ExecutorService executorService = Executors.newFixedThreadPool(3);

    public static void execute(Runnable run) {
        executorService.execute(run);
    }
}

(2) 修改Handler

package nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class GroupServerHandler {

    /**
     * Selector 选择器
     */
    private Selector selector;

    public GroupServerHandler(Selector selector) {
        this.selector = selector;
    }

    /**
     * 读取数据
     *
     * @param key
     */
    public void readData(SelectionKey key) {
        ThreadUtils.execute(new Runnable() {
            @Override
            public void run() {
                SocketChannel channel = null;
                try {
                    channel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    int read = channel.read(byteBuffer);
                    if (read > 0) {
                        //把缓存区的数据转成字符串
                        String msg = new String(byteBuffer.array());
                        System.out.println("form 客户端: " + msg);

                        //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                        sendInfoToOtherClients(msg, channel);
                    }
                } catch (Exception e) {
                    e.printStackTrace();

                    try {
                        System.out.println(channel.getRemoteAddress() + " 离线了..");
                        //取消注册
                        key.cancel();
                        //关闭通道
                        channel.close();
                    } catch (IOException e2) {
                        e2.printStackTrace();
                    }
                }
            }
        });
    }

    private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException {
        System.out.println("服务器转发消息中...");
        System.out.println("当前线程: " + Thread.currentThread().getName());

        try {
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
            // ignore
        }

        for (SelectionKey key : selector.keys()) {
            // 通过 key  取出对应的 SocketChannel
            Channel targetChannel = key.channel();
            // 排除掉自己
            if (targetChannel instanceof SocketChannel && targetChannel != channel) {
                // 转型
                SocketChannel dest = (SocketChannel) targetChannel;
                // 将msg 存储到buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                // 将buffer 的数据写入 通道
                dest.write(buffer);
                System.out.println("转发给: " + dest.getRemoteAddress());
            }
        }
    }
}

方案分析:

优点:可以充分的利用多核cpu 的处理能力

缺点:多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈.

3. 主从 Reactor 多线程

  简单理解就是Reactor也加入多线程。Reactor主线程 MainReactor 对象通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件。当 Acceptor 处理连接事件后,MainReactor 将连接分配给SubReactor。

代码改造:

1. MainReactor 和 SubReactor:

package nio;

import org.apache.commons.collections.CollectionUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;


public class MainReactor {

    /**
     * Selector 选择器
     */
    private Selector selector;

    /**
     * ServerSocketChannel
     */
    private ServerSocketChannel serverSocketChannel;

    /**
     * 端口
     */
    private static final int PORT = 6667;

    public MainReactor() {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            // 绑定端口
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
            // 设置非阻塞模式
            serverSocketChannel.configureBlocking(false);
            // 注册连接请求事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 启动
     */
    public void start() {
        System.out.println("监听线程: " + Thread.currentThread().getName());

        // 启动SubReactor 用于处理事件
        initSubReactor();

        try {
            int index = -1;
            while (true) {
                int count = selector.select();
                // 有事件处理
                if (count > 0) {
                    // 处理监听到的事件
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    SelectionKey key = null;
                    while (iterator.hasNext()) {
                        key = iterator.next();
                        // 连接请求事件。只监听OP_ACCEPT 事件, 监听到之后转交给SubReactor
                        if (key.isAcceptable()) {
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            if (socketChannel != null) {
                                // 设置非阻塞
                                socketChannel.configureBlocking(false);
                                // 交给子线程
                                System.out.println(socketChannel.getRemoteAddress() + " 上线 ");
                                subReactors.get((++index) % 2).registerChannel(socketChannel);
                            }
                        }

                        iterator.remove();
                    }
                }
            }
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }

    private List<SubReactor> subReactors = new ArrayList<>();

    /**
     * 初始化subReactor, 实际上两个够用了。 Tomcat 8 默认也是用的两个子线程。
     */
    public void initSubReactor() {
        if (CollectionUtils.isNotEmpty(subReactors)) {
            return;
        }

        subReactors.add(new SubReactor());
        subReactors.add(new SubReactor());
        for (SubReactor subReactor : subReactors) {
            new Thread(subReactor).start();
        }
    }

    /**
     * SubReactor 负责处理事件
     * 这里需要注意: 参考https://blog.csdn.net/wanger61/article/details/104951149/
     * 1. SubReactor线程在执行selector.select()时由于有已就绪的,所以不会阻塞而继续往下执行。
     * 2. Reactor线程在第一次执行selector.select()时,必须要有已就绪的事件,否则后面即使有了已就绪的事件,还是会阻塞在selector.select()上。
     */
    public static class SubReactor implements Runnable {

        private Selector selector;

        public SubReactor() {
            // 每个SubReactor 一个selector
            try {
                this.selector = SelectorProvider.provider().openSelector();
            } catch (Exception exception) {
                exception.printStackTrace();
            }
        }

        public void registerChannel(SocketChannel sc) throws Exception {
            System.out.println("接收到注册sc: " + sc.getRemoteAddress());
            sc.register(selector, SelectionKey.OP_READ);
//            selector.wakeup();
        }

        @Override
        public void run() {
            System.out.println("SubReactor线程: " + Thread.currentThread().getName());

            try {
                while (true) {
                    // 注意这里不能用selector.select(), 用这个方法会一直阻塞。 select(1) 会造成CPU的浪费。
//                    int count = selector.select();
                    int count = selector.select(1);
                    // 有事件处理
                    if (count > 0) {
                        // 处理监听到的事件
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        SelectionKey key = null;
                        while (iterator.hasNext()) {
                            key = iterator.next();
                            // 连接请求事件
                            if (key.isReadable()) {
                                new GroupServerHandler(selector).readData(key);
                            } else {
                                System.out.println("xxxxxxxxxxxxxxxxxxx");
                            }

                            iterator.remove();
                        }
                    }
                }
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        //创建服务器对象
        MainReactor mainReactor = new MainReactor();
        mainReactor.start();
    }
}

ServerHandler:

package nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class GroupServerHandler {

    /**
     * Selector 选择器
     */
    private Selector selector;

    public GroupServerHandler(Selector selector) {
        this.selector = selector;
    }

    /**
     * 读取数据
     *
     * @param key
     */
    public void readData(SelectionKey key) {
        ThreadUtils.execute(new Runnable() {
            @Override
            public void run() {
                SocketChannel channel = null;
                try {
                    channel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    int read = channel.read(byteBuffer);
                    if (read > 0) {
                        //把缓存区的数据转成字符串
                        String msg = new String(byteBuffer.array());
                        System.out.println("form 客户端: " + msg);

                        //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                        sendInfoToOtherClients(msg, channel);
                    }
                } catch (Exception e) {
                    e.printStackTrace();

                    try {
                        System.out.println(channel.getRemoteAddress() + " 离线了..");
                        //取消注册
                        key.cancel();
                        //关闭通道
                        channel.close();
                    } catch (IOException e2) {
                        e2.printStackTrace();
                    }
                }
            }
        });
    }

    private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException {
        System.out.println("服务器转发消息中...");
        System.out.println("当前线程: " + Thread.currentThread().getName());

        try {
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
            // ignore
        }

        for (SelectionKey key : selector.keys()) {
            // 通过 key  取出对应的 SocketChannel
            Channel targetChannel = key.channel();
            // 排除掉自己
            if (targetChannel instanceof SocketChannel && targetChannel != channel) {
                // 转型
                SocketChannel dest = (SocketChannel) targetChannel;
                // 将msg 存储到buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                // 将buffer 的数据写入 通道
                dest.write(buffer);
                System.out.println("转发给: " + dest.getRemoteAddress());
            }
        }
    }
}

客户端代码同上面。

  这里需要提别注意,在SUbReactor 子线程的run 方法中调用selector.select() 会一直阻塞。可以用好

 方案优缺点:

优点:

(1) 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。

(2) 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。

缺点:编程复杂度较高

结合实例:这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持

 4. netty 模型

  Netty 主要基于主从 Reactors 多线程模型做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor。

1. 简单理解

(1) BossGroup 线程维护Selector , 只关注Accecpt

(2) 当接收到Accept事件,获取到对应的SocketChannel, 封装成 NIOScoketChannel并注册到Worker 线程(事件循环), 并进行维护

(3) 当Worker线程监听到selector 中通道发生自己感兴趣的事件后,就进行处理(就由handler), 此时handler 已经加入到通道

2. 模型理解

(1)  Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写

(2) BossGroup和WorkerGroup类型的本质都是NioEventLoopGroup类型。

(3) NioEventLoopGroup相当于一个事件循环组,它下面维护很多个NioEventLoop(事件循环)。

(4) NioEventLoop表示一个不断循环的执行处理任务的线程。每个NioEventLoop都包含一个Selector,用于监听绑定在它上面的socket通讯。

(5) 每个Boss NioEventLoop循环执行的步骤有3步:

1》轮询accept事件

2》处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个Worker NioEventLoop的selector上

3》处理任务队列到任务,即runAllTasks

(6) 每个Worker NioEventLoop循环执行的步骤:

1》轮询read,write事件

2》处理I/O事件,即read,write事件,在对应的NioSocketChannel中进行处理

3》处理任务队列的任务,即runAllTasks

(7) 每个 Worker NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel 通道,即同pipeline可以获取到channel通道。同时管道中维护了很多处理器。

【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
原文地址:https://www.cnblogs.com/qlqwjy/p/14465508.html